1 Star 0 Fork 10

weiyxiong / flink-jobs-launcher

forked from tenmg / flink-jobs-launcher
暂停
 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

flink-jobs-launcher

介绍

flink-jobs-launcher是flink-jobs应用程序启动器类库,可用于启动flink-jobs或普通flink作业,通过flink-jobs-launcher可将flink快速集成到现有基于Java实现的系统中,还可以通过XML格式的配置文件玩转Flink SQL。

起步

添加依赖(以Maven项目为例):


<!-- https://mvnrepository.com/artifact/cn.tenmg/flink-jobs-launcher -->
<dependency>
    <groupId>cn.tenmg</groupId>
    <artifactId>flink-jobs-launcher</artifactId>
    <version>${flink-jobs-launcher.version}</version>
</dependency>

调用XMLConfigLoader的load方法加载XML配置文件并提交给启动器执行:

FlinkJobs flinkJobs = XMLConfigLoader.getInstance().load(AppTest.class.getClassLoader().getResourceAsStream("flink-jobs.xml"));
CommandLineFlinkJobsLauncher flinkJobsLauncher = new CommandLineFlinkJobsLauncher();
flinkJobsLauncher.setFlinkHome("D:\\Programs\\flink-1.8.3");
flinkJobsLauncher.setAction(Action.RUN);
FlinkJobsApplicationInfo appInfo = flinkJobsLauncher.launch(flinkJobs);

FlinkJobs flinkJobs = XMLConfigLoader.getInstance()
		.load("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\r\n" + 
			"<flink-jobs xmlns=\"http://www.10mg.cn/schema/flink-jobs\"\r\n" + 
			"	xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\r\n" + 
			"	xsi:schemaLocation=\"http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd\"\r\n" + 
			"	jar=\"/opt/flink-jobs/flink-jobs-quickstart-1.0.0.jar\" serviceName=\"HelloWorldService\">\r\n" + 
        		"</flink-jobs>");
CommandLineFlinkJobsLauncher flinkJobsLauncher = new CommandLineFlinkJobsLauncher();
flinkJobsLauncher.setFlinkHome("/opt/flink-1.13.1");
flinkJobsLauncher.setAction(Action.RUN);
FlinkJobsApplicationInfo appInfo = flinkJobsLauncher.launch(flinkJobs);

快速入门

详见https://gitee.com/tenmg/flink-jobs-launcher-quickstart

任务配置

<flink-jobs>

flink-jobs是flink-jobs任务XML配置文件的根节点,需注意必须配置正确的命名空间,通常结构如下:

<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd">
</flink-jobs>

相关属性及说明:

属性 类型 必需 说明
jar String 运行的JAR包。可通过配置文件的flink.jobs.default.jar配置指定默认运行的JAR包。
class String 运行的主类。可通过配置文件的flink.jobs.default.class配置指定默认运行的主类。
serviceName String 运行的服务名称。该名称由用户定义并实现根据服务名称获取服务的方法,flink-jobs则在运行时调用并确定运行的实际服务。在运行SQL任务时,通常通过flink-jobs内的其他标签(如<execute-sql>)指定操作,而无需指定serviceName。
runtimeMode String 运行模式。可选值:"BATCH"/"STREAMING"/"AUTOMATIC",相关含义详见Flink官方文档。

<options>

运行选项配置,用于指定flink程序的运行选项。

属性 类型 必需 说明
keyPrefix String 运行选项的默认前缀,默认为“--”。
<option>

特定运行选项配置。XSD文件提供了常用的选项key值枚举,能够在IDE环境下自动提示。但并不代表仅支持这些选项,其他任何flink支持的选项也是可以的,详见Flink官方文档,或者通过运行flink -h获取帮助。

输入图片说明

属性 类型 必需 说明
key String 选项键。如果键值以“-”开头,则不会添加默认前缀,否则会自动添加默认前缀keyPrefix。
value String 选项的值。直接通过option内以文本形式提供即可,如<option>value</option><option><![CDATA[value]]></option>

<params>

参数查找表配置。通常可用于SQL中,也可以在flink-jobs应用程序自定义的服务中通过arguments参数获取。

<param>

特定参数配置。

属性 类型 必需 说明
name String 参数名。
value String 参数值。

<bsh>

运行基于Beanshell的java代码的配置。

属性 类型 必需 说明
saveAs String 操作结果另存为一个新的变量的名称。变量的值是基于Beanshell的java代码的返回值(通过return xxx;表示)。
<var>

基于Beanshell的java代码使用的变量声明配置。

属性 类型 必需 说明
name String Beanshell中使用的变量名称
value String 变量对应的值的名称。默认与name相同。flink-jobs会从参数查找表中查找名称为value值的参数值,如果指定参数存在且不是null,则该值作为该参数的值;否则,使用value值作为该变量的值。
<java>

java代码。采用文本表示,如:<java>java code</java><option><![CDATA[java code]]></option>。注意:使用泛型时,不能使用尖括号声明泛型。例如,使用Map不能使用“Map<String , String> map = new HashMap<String , String>();”,但可以使用“Map map = new HashMap();”。

<execute-sql>

运行基于DSL的SQL代码配置。

属性 类型 必需 说明
saveAs String 操作结果另存为一个新的变量的名称。变量的值是flink的tableEnv.executeSql(statement);的返回值。
dataSource String 使用的数据源名称。这里的数据源是在flink-jobs应用程序的配置文件中配置,并非在flink-jobs-launcher应用程序的配置文件中配置。详见flink-jobs数据源配置
catalog String 执行SQL使用的Flink SQL的catalog名称。
script String 基于DSL的SQL脚本。采用文本表示,如:<execute-sql>SQL code</execute-sql><execute-sql><![CDATA[SQL code]]></execute-sql>。由于Flink SQL不支持DELETE、UPDATE语句,因此如果配置的SQL脚本是DELETE或者UPDATE语句,该语句将在程序main函数中采用JDBC执行。

<sql-query>

运行基于DSL的SQL查询代码配置。

属性 类型 必需 说明
saveAs String 查询结果另存为临时表的表名及操作结果另存为一个新的变量的名称。变量的值是flink的tableEnv.executeSql(statement);的返回值。
catalog String 执行SQL使用的Flink SQL的catalog名称。
script String 基于DSL的SQL脚本。采用文本表示,如:<sql-query>SQL code</sql-query><sql-query><![CDATA[SQL code]]></sql-query>

<jdbc>

运行基于DSL的JDBC SQL代码配置。目标JDBC SQL代码是在flink-jobs应用程序的main函数中运行的。

属性 类型 必需 说明
saveAs String 执行结果另存为一个新的变量的名称。变量的值是执行JDBC指定方法的返回值。
dataSource String 使用的数据源名称。这里的数据源是在flink-jobs应用程序的配置文件中配置,并非在flink-jobs-launcher应用程序的配置文件中配置。详见flink-jobs数据源配置
method String 调用的JDBC方法。默认是"executeLargeUpdate"。
script String 基于DSL的SQL脚本。

XML配置示例

为了更好的理解flink-jobs的XML配置文件,以下提供几种常见场景的XML配置文件示例:

运行普通flink程序

<?xml version="1.0" encoding="UTF-8"?>
<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd"
	jar="D:\Programs\flink-1.8.3\examples\batch\WordCount.jar">
</flink-jobs>

运行自定义服务

以下为一个自定义服务任务XML配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd"
	jar="/yourPath/yourJar.jar" serviceName="yourServiceName">
</flink-jobs>

运行批处理SQL

以下为一个简单订单量统计SQL批处理任务XML配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd"
	jar="/yourPath/yourJar.jar">
	<!--任务运行参数,一些公共参数也可在调用Java API之前指定,例如系统时间等 -->
	<params>
		<param name="beginDate">2021-01-01</param>
		<param name="endDate">2021-07-01</param>
	</params>

	<!-- 使用名为hivedb的数据源创建名为hive的catalog -->
	<execute-sql dataSource="hivedb">
		<![CDATA[
			create catalog hive
		]]>
	</execute-sql>
	<!--加载hive模块 -->
	<execute-sql>
		<![CDATA[
			load module hive
		]]>
	</execute-sql>
	<!--使用hive,core模块 -->
	<execute-sql>
		<![CDATA[
			use modules hive,core
		]]>
	</execute-sql>
	<!-- 使用名为pgdb的数据源创建表order_stats_daily(如果源表名和建表语句指定的表名不一致,可以通过 WITH ('table-name' 
		= 'actrual_table_name') 来指定) -->
	<execute-sql dataSource="pgdb">
		<![CDATA[
			CREATE TABLE order_stats_daily (
			  stats_date DATE,
			  `count` BIGINT,
			  PRIMARY KEY (stats_date) NOT ENFORCED
			) WITH ('sink.buffer-flush.max-rows' = '0')
		]]>
	</execute-sql>
	<!-- 使用hive catalog查询,并将结果存为临时表tmp,tmp放在默认的default_catalog中 -->
	<sql-query saveAs="tmp" catalog="hive">
		<![CDATA[
			select cast(to_date(o.business_date) as date) stats_date, count(*) `count` from odc_order_info_par o where o.business_date >= :beginDate and o.business_date < :endDate group by cast(to_date(o.business_date) as date)
		]]>
	</sql-query>
	<!-- 删除原有数据order_stats_daily(FLINK SQL不支持DELETE,此处执行的是JDBC)-->
	<execute-sql dataSource="pgdb">
		<![CDATA[
			delete from order_stats_daily where stats_date >= :beginDate and stats_date < :endDate
		]]>
	</execute-sql>
	<!-- 数据插入。实际上Flink最终将执行Upsert语法 -->
	<execute-sql>
		<![CDATA[
			INSERT INTO order_stats_daily(stats_date,`count`) SELECT stats_date, `count` FROM tmp
		]]>
	</execute-sql>
</flink-jobs>

运行流处理SQL

以下为通过Debezium实现异构数据库同步任务XML配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd">
	<!-- Flink内创建SOURCE数据库 -->
	<!-- <execute-sql>
		<![CDATA[
		CREATE DATABASE SOURCE
		]]>
	</execute-sql> -->
	<!-- 使用SOURCE数据库执行Flink SQL -->
	<!-- <execute-sql>
		<![CDATA[
		USE SOURCE
		]]>
	</execute-sql> -->
	<!-- 上述两步操作是非必须的,只是为了Flink自动生成的作业名称更容易识别 -->
	<!-- 定义名为kafka数据源的订单明细表 -->
	<execute-sql dataSource="kafka">
		<![CDATA[
		CREATE TABLE KAFKA_ORDER_DETAIL (
		  DETAIL_ID STRING,
		  ORDER_ID STRING,
		  ITEM_ID STRING,
		  ITEM_CODE STRING,
		  ITEM_NAME STRING,
		  ITEM_TYPE STRING,
		  ITEM_SPEC STRING,
		  ITEM_UNIT STRING,
		  ITEM_PRICE DECIMAL(12, 2),
		  ITEM_QUANTITY DECIMAL(12, 2),
		  SALE_PRICE DECIMAL(12, 2),
		  SALE_AMOUNT DECIMAL(12, 2),
		  SALE_DISCOUNT DECIMAL(12, 2),
		  SALE_MODE STRING,
		  CURRENCY STRING,
		  SUPPLY_TYPE STRING,
		  SUPPLY_CODE STRING,
		  REMARKS STRING,
		  CREATE_BY STRING,
		  CREATE_TIME BIGINT,
		  UPDATE_BY STRING,
		  UPDATE_TIME BIGINT,
		  OIL_GUN STRING,
		  EVENT_TIME TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
		  PRIMARY KEY (DETAIL_ID) NOT ENFORCED
		) WITH ('topic' = 'kaorder1.kaorder.order_detail', 'properties.group.id' = 'flink-jobs_source_order_detail')
		]]>
	</execute-sql>
	<!-- 定义名为source数据源的订单明细表 -->
	<execute-sql dataSource="source">
		<![CDATA[
		CREATE TABLE ORDER_DETAIL (
		  DETAIL_ID STRING,
		  ORDER_ID STRING,
		  ITEM_ID STRING,
		  ITEM_CODE STRING,
		  ITEM_NAME STRING,
		  ITEM_TYPE STRING,
		  ITEM_SPEC STRING,
		  ITEM_UNIT STRING,
		  ITEM_PRICE DECIMAL(12, 2),
		  ITEM_QUANTITY DECIMAL(12, 2),
		  SALE_PRICE DECIMAL(12, 2),
		  SALE_AMOUNT DECIMAL(12, 2),
		  SALE_DISCOUNT DECIMAL(12, 2),
		  SALE_MODE STRING,
		  CURRENCY STRING,
		  SUPPLY_TYPE STRING,
		  SUPPLY_CODE STRING,
		  REMARKS STRING,
		  CREATE_BY STRING,
		  CREATE_TIME TIMESTAMP(3),
		  UPDATE_BY STRING,
		  UPDATE_TIME TIMESTAMP(3),
		  OIL_GUN STRING,
		  EVENT_TIME TIMESTAMP(3),
		  PRIMARY KEY (DETAIL_ID) NOT ENFORCED
		)
		]]>
	</execute-sql>
	<!-- 将kafka订单明细数据插入到source数据库订单明细表中 -->
	<execute-sql>
		<![CDATA[
		INSERT INTO ORDER_DETAIL(
		  DETAIL_ID,
		  ORDER_ID,
		  ITEM_ID,
		  ITEM_CODE,
		  ITEM_NAME,
		  ITEM_TYPE,
		  ITEM_SPEC,
		  ITEM_UNIT,
		  ITEM_PRICE,
		  ITEM_QUANTITY,
		  SALE_PRICE,
		  SALE_AMOUNT,
		  SALE_DISCOUNT,
		  SALE_MODE,
		  CURRENCY,
		  SUPPLY_TYPE,
		  SUPPLY_CODE,
		  REMARKS,
		  CREATE_BY,
		  CREATE_TIME,
		  UPDATE_BY,
		  UPDATE_TIME,
		  OIL_GUN,
		  EVENT_TIME
		)
		SELECT
		  DETAIL_ID,
		  ORDER_ID,
		  ITEM_ID,
		  ITEM_CODE,
		  ITEM_NAME,
		  ITEM_TYPE,
		  ITEM_SPEC,
		  ITEM_UNIT,
		  ITEM_PRICE,
		  ITEM_QUANTITY,
		  SALE_PRICE,
		  SALE_AMOUNT,
		  SALE_DISCOUNT,
		  SALE_MODE,
		  CURRENCY,
		  SUPPLY_TYPE,
		  SUPPLY_CODE,
		  REMARKS,
		  CREATE_BY,
		  TO_TIMESTAMP(FROM_UNIXTIME(CREATE_TIME/1000, 'yyyy-MM-dd HH:mm:ss')) CREATE_TIME,
		  UPDATE_BY,
		  TO_TIMESTAMP(FROM_UNIXTIME(CREATE_TIME/1000, 'yyyy-MM-dd HH:mm:ss')) UPDATE_TIME,
		  OIL_GUN,
		  EVENT_TIME
		FROM KAFKA_ORDER_DETAIL
		]]>
	</execute-sql>
</flink-jobs>

配置文件

默认的配置文件为flink-jobs-launcher.properties(注意:需在classpath下),可通过flink-jobs-launcher-context-loader.properties配置文件的config.location修改配置文件路径和名称。

属性 类型 必需 说明
flink.jobs.default.jar String 启动时默认向Flink提交的JAR包。即当任务配置中没有指定jar时,会采用此配置。
flink.jobs.default.class String 启动时默认向Flink提交运行的主类。即当任务配置中没有指定class时,会采用此配置。

发布计划

计划将在1.1.2中发布以下功能:

标签 功能 说明
<data-sync> 数据同步 实现基于Debezuim的数据同步,以便简化通过<execute-sql>实现的数据同步功能。

参与贡献

  1. Fork 本仓库
  2. 新建 Feat_xxx 分支
  3. 提交代码
  4. 新建 Pull Request

相关链接

flink-jobs开源地址:https://gitee.com/tenmg/flink-jobs

DSL开源地址:https://gitee.com/tenmg/dsl

Flink官网:https://flink.apache.org

Debezuim官网:https://debezium.io

空文件

简介

flink-jobs应用程序启动器类库,玩转flink 展开 收起
Java
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/weiyxiong_admin/flink-jobs-launcher.git
git@gitee.com:weiyxiong_admin/flink-jobs-launcher.git
weiyxiong_admin
flink-jobs-launcher
flink-jobs-launcher
master

搜索帮助