flink-jobs-quickstart是一个简单的网页应用,用于带领开发者快速入门flink-jobs,它演示了如何将flink-jobs集成到现有基于Java应用中(例如JavaWeb等)。该网站只有一个主页面,可以启动和停止一个flink-jobs应用程序或普通flink应用程序。程序被启动后,页面还可以显示一些简单的监控信息。
需先准备Flink、Tomcat、MySQL、StarRocks等环境,Flink需要配置state.savepoints.dir
以便可以通过flink-jobs-clients安全停止任务。
下载flink-jobs-quickstart并根据实际运行环境修改配置文件main/resources/flink-jobs.properties
和main/resources/flink-jobs-clients.properties
后运行mvn clean package -Dmaven.test.skip=true
打包。
将依赖的JAR包(如mysql-connector-java、flink-connector-starrocks等)上传到Flink的lib目录下,重启Flink。
将WAR包上传到Tomcat的webapps目录下,并启动Tomcat。
浏览器打开主页http://${youraddress}:8080/flink-jobs-quickstart/,可以看到文本框中默认的flink-jobs配置内容。
根据自己的环境,修改jar后点击“启动”按钮,测试是否可以正常提交Flink的官方范例。
Flink官方范例跑通后停止测试任务(也可以直接跳过官方范例测试),在两个数据库中分别创建test数据库、test_table表,并在MySQL插入一些数据。
/* MySQL */
/* Create database */
CREATE DATABASE test;
/* Create table */
USE test;
CREATE TABLE test_table (
ID int NOT NULL,
NAME varchar(255),
CREATE_TIME datetime,
PRIMARY KEY (ID)
);
/* Insert records */
INSERT INTO test_table(ID, NAME, CREATE_TIME) VALUES (1, 'Flink Jobs', '2022-09-20 15:13:24');
INSERT INTO test_table(ID, NAME, CREATE_TIME) VALUES (2, 'Flink CDC', '2022-09-20 15:24:01');
INSERT INTO test_table(ID, NAME, CREATE_TIME) VALUES (3, 'MySQL', '2022-09-20 17:12:16');
INSERT INTO test_table(ID, NAME, CREATE_TIME) VALUES (4, 'StarRocks', '2022-09-20 17:12:55');
/* StarRocks */
/* Create database */
CREATE DATABASE test;
/* Create table */
USE test;
CREATE TABLE test_table (
ID int(11) NOT NULL,
NAME varchar(765),
CREATE_TIME datetime,
EVENT_TIMESTAMP datetime COMMENT "事件时间",
ETL_TIMESTAMP datetime COMMENT "清洗时间"
)
PRIMARY KEY(ID)
DISTRIBUTED BY HASH(ID) BUCKETS 3;
<?xml version="1.0" encoding="UTF-8"?>
<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd">
<!-- configuration的内容支持Flink SQL中WITH参数的语法,使用“,”来分隔两个配置;也支持properties语法(使用换行分隔两个配置,暂不支持配置的值带换行符) -->
<!-- configuration的内容可以在使用客户端提交任务之前根据规则统一指定,这样就不需要每个任务都配置了 -->
<configuration><![CDATA[pipeline.name=test]]></configuration>
<!-- 配置文件中并没有配置名称为test的数据源,因此会根据自动数据源的配置自动生成数据源。-->
<!-- 根据自动数据源的配置这个数据源实际上指向StarRocks的test数据库 -->
<data-sync from="source" to="test" table="test_table">
<!-- server-id 可以使用客户端提交任务之前统一指定的参数,然后使用 server-id=#serverId或 server-id=:serverId来引用 -->
<from-config><![CDATA[server-id=5500]]></from-config>
</data-sync>
</flink-jobs>
任务正常运行后,对MySQL中的数据进行增、删、改,观察StarRocks中的数据,将看到实时同步数据已经生效。
StarRocks的主键模型还存在问题,不适合数据量较大的表的数据同步,可以通过使用flink-cdc-log-connectors并在更新模型的表中加入OP
列来解决数据删除问题。
将flink-jobs任务配置内容保存在数据库中,通过flink-jobs-clients启动任务后,将返回的jobsId
保存到数据库的任务运行日志中,再使用jobsId
通过flink-jobs-clients(或者flink-clients、Flink REST)提供的接口监控任务可快速将Flink彻底集成到现有系统中,实现统一平台管理。
flink-jobs开源地址:https://gitee.com/tenmg/flink-jobs
flink-cdc-log-connectors开源地址:https://gitee.com/tenmg/flink-cdc-log-connectors
DSL开源地址:https://gitee.com/tenmg/dsl
Flink官网:https://flink.apache.org
Debezuim官网:https://debezium.io
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。