1 Star 11 Fork 8

tenmg / flink-jobs-quickstart

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
README.md 5.76 KB
一键复制 编辑 原始数据 按行查看 历史
tenmg 提交于 2023-09-19 03:14 . update README.md.

flink-jobs-quickstart

介绍

flink-jobs-quickstart是一个简单的网页应用,用于带领开发者快速入门flink-jobs,它演示了如何将flink-jobs集成到现有基于Java应用中(例如JavaWeb等)。该网站只有一个主页面,可以启动和停止一个flink-jobs应用程序或普通flink应用程序。程序被启动后,页面还可以显示一些简单的监控信息。

安装教程

  1. 需先准备Flink、Tomcat、MySQL、StarRocks等环境,Flink需要配置state.savepoints.dir以便可以通过flink-jobs-clients安全停止任务。

  2. 下载flink-jobs-quickstart并根据实际运行环境修改配置文件main/resources/flink-jobs.propertiesmain/resources/flink-jobs-clients.properties后运行mvn clean package -Dmaven.test.skip=true打包。

  3. 将依赖的JAR包(如mysql-connector-java、flink-connector-starrocks等)上传到Flink的lib目录下,重启Flink。

  4. 将WAR包上传到Tomcat的webapps目录下,并启动Tomcat。

  5. 浏览器打开主页http://${youraddress}:8080/flink-jobs-quickstart/,可以看到文本框中默认的flink-jobs配置内容。

主页.png

  1. 根据自己的环境,修改jar后点击“启动”按钮,测试是否可以正常提交Flink的官方范例。

  2. Flink官方范例跑通后停止测试任务(也可以直接跳过官方范例测试),在两个数据库中分别创建test数据库、test_table表,并在MySQL插入一些数据。

/* MySQL */
/* Create database */
CREATE DATABASE test;
/* Create table */
USE test;
CREATE TABLE test_table (
  ID int NOT NULL,
  NAME varchar(255),
  CREATE_TIME datetime,
  PRIMARY KEY (ID)
);
/* Insert records */
INSERT INTO test_table(ID, NAME, CREATE_TIME) VALUES (1, 'Flink Jobs', '2022-09-20 15:13:24');
INSERT INTO test_table(ID, NAME, CREATE_TIME) VALUES (2, 'Flink CDC', '2022-09-20 15:24:01');
INSERT INTO test_table(ID, NAME, CREATE_TIME) VALUES (3, 'MySQL', '2022-09-20 17:12:16');
INSERT INTO test_table(ID, NAME, CREATE_TIME) VALUES (4, 'StarRocks', '2022-09-20 17:12:55');
/* StarRocks */
/* Create database */
CREATE DATABASE test;
/* Create table */
USE test;
CREATE TABLE test_table (
  ID int(11) NOT NULL,
  NAME varchar(765),
  CREATE_TIME datetime,
  EVENT_TIMESTAMP datetime COMMENT "事件时间",
  ETL_TIMESTAMP datetime COMMENT "清洗时间"
)
PRIMARY KEY(ID)
DISTRIBUTED BY HASH(ID) BUCKETS 3;
  1. 将任务配置改为如下内容并启动任务:
<?xml version="1.0" encoding="UTF-8"?>
<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd">
	<!-- configuration的内容支持Flink SQL中WITH参数的语法,使用“,”来分隔两个配置;也支持properties语法(使用换行分隔两个配置,暂不支持配置的值带换行符) -->
	<!-- configuration的内容可以在使用客户端提交任务之前根据规则统一指定,这样就不需要每个任务都配置了 -->
	<configuration><![CDATA[pipeline.name=test]]></configuration>
	<!-- 配置文件中并没有配置名称为test的数据源,因此会根据自动数据源的配置自动生成数据源。-->
	<!-- 根据自动数据源的配置这个数据源实际上指向StarRocks的test数据库 -->
	<data-sync from="source" to="test" table="test_table">
		<!-- server-id 可以使用客户端提交任务之前统一指定的参数,然后使用 server-id=#serverId或 server-id=:serverId来引用 -->
		<from-config><![CDATA[server-id=5500]]></from-config>
	</data-sync>
</flink-jobs>
  1. 任务正常运行后,对MySQL中的数据进行增、删、改,观察StarRocks中的数据,将看到实时同步数据已经生效。

  2. StarRocks的主键模型还存在问题,不适合数据量较大的表的数据同步,可以通过使用flink-cdc-log-connectors并在更新模型的表中加入OP列来解决数据删除问题。

使用说明

  1. 打开主页后,可修改配置内容后点击“启动”按钮启动flink-jobs任务,也可以直接点击“启动”按钮启动flink的WordCount样例程序。关于flink-jobs的详细配置详见https://gitee.com/tenmg/flink-jobs

启动任务.png

  1. 任务启动后,网页便开始计时,并不断请求后台监控任务运行的情况,主页上会显示相关信息。

监控任务.png

  1. 任务启动后,可以随时关闭任务,关闭任务后会返回保存点,将返回的保存点存储起来,再下次提交任务时作为启动选项提交给flink-jobs可实现从保存点重新恢复或重启任务的效果。

停止任务.png

玩转flink

将flink-jobs任务配置内容保存在数据库中,通过flink-jobs-clients启动任务后,将返回的jobsId保存到数据库的任务运行日志中,再使用jobsId通过flink-jobs-clients(或者flink-clients、Flink REST)提供的接口监控任务可快速将Flink彻底集成到现有系统中,实现统一平台管理。

玩转flink

相关链接

flink-jobs开源地址:https://gitee.com/tenmg/flink-jobs

flink-cdc-log-connectors开源地址:https://gitee.com/tenmg/flink-cdc-log-connectors

DSL开源地址:https://gitee.com/tenmg/dsl

Flink官网:https://flink.apache.org

Debezuim官网:https://debezium.io

Java
1
https://gitee.com/tenmg/flink-jobs-quickstart.git
git@gitee.com:tenmg/flink-jobs-quickstart.git
tenmg
flink-jobs-quickstart
flink-jobs-quickstart
master

搜索帮助