赞
踩
CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
CDC主要分为基于查询和基于Binlog两种方式,我们主要了解一下这两种之间的区别:
基于查询的CDC | 基于Binlog的CDC | |
开源产品 | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium |
执行模式 | Batch | Streaming |
是否可以捕获所有数据变化 | 否 | 是 |
延迟性 | 高延迟 | 低延迟 |
是否增加数据库压力 | 是 | 否 |
Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,
开源地址:GitHub - ververica/flink-cdc-connectors: CDC Connectors for Apache Flink®
- vim /etc/my.cnf
-
- [mysqld]
- # 数据库id
- server-id = 1
-
- # 启动 binlog
- log-bin=mysql-bin
-
- # binlog 类型
- binlog_format=row
-
- # 启动binlog的数据库
- binlog-do-db=tms01
这里是利用MySQL的主从复制原理,将自身伪装成MySQL的一个从节点,从主节点中获取得到数据的增删改查的信息,做到对MySQL数据的增量同步。
写Flink代码,其中引入了CDC的依赖,然后打成jar包提交到服务器的Flink上运行。
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.admin</groupId>
- <artifactId>tms-realtime</artifactId>
- <version>1.0-SNAPSHOT</version>
-
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <java.version>1.8</java.version>
- <flink.version>1.16.1</flink.version>
- <scala.version>2.12</scala.version>
- <hadoop.version>3.3.4</hadoop.version>
- <flink-cdc.version>2.3.0</flink-cdc.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.68</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>3.3.4</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java-bridge</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>${flink-cdc.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-statebackend-rocksdb</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_2.12</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.0.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- </project>
- package com.admin.flink;
-
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.restartstrategy.RestartStrategies;
- import org.apache.flink.api.common.time.Time;
- import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
- import org.apache.flink.streaming.api.CheckpointingMode;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.CheckpointConfig;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- public class FlinkCDC_Stream {
-
- public static void main(String[] args) throws Exception {
- // TODO 1. 准备流处理环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
-
- // TODO 2. 开启检查点 Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,
- // 需要从Checkpoint或者Savepoint启动程序
- // 2.1 开启Checkpoint,每隔5秒钟做一次CK ,并指定CK的一致性语义
- env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
- // 2.2 设置超时时间为 1 分钟
- env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
- // 2.3 设置两次重启的最小时间间隔
- env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
- // 2.4 设置任务关闭的时候保留最后一次 CK 数据
- env.getCheckpointConfig().enableExternalizedCheckpoints(
- CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- // 2.5 指定从 CK 自动重启策略
- env.setRestartStrategy(RestartStrategies.failureRateRestart(
- 3, Time.days(1L), Time.minutes(1L)
- ));
- // 2.6 设置状态后端
- env.setStateBackend(new HashMapStateBackend());
- env.getCheckpointConfig().setCheckpointStorage(
- "hdfs://mycluster/flinkCDC"
- );
-
- // 2.7 设置访问HDFS的用户名
- System.setProperty("HADOOP_USER_NAME", "admin");
-
- // TODO 3. 创建 Flink-MySQL-CDC 的 Source
- // initial:Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
- // earliest:Never to perform snapshot on the monitored database tables upon first startup, just read from the beginning of the binlog. This should be used with care, as it is only valid when the binlog is guaranteed to contain the entire history of the database.
- // latest:Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
- // specificOffset:Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
- // timestamp:Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp.The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
-
- MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
- .hostname("hadoop102")
- .port(3306)
- .databaseList("tms01") // set captured database
- .tableList("tms01.user_info") // set captured table
- .username("root")
- .password("123456")
- .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
- .startupOptions(StartupOptions.initial())
- .build();
-
- // TODO 4.使用CDC Source从MySQL读取数据
- DataStreamSource<String> mysqlDS =
- env.fromSource(
- mySqlSource,
- WatermarkStrategy.noWatermarks(),
- "MysqlSource");
-
- // TODO 5.打印输出
- mysqlDS.print();
-
- // TODO 6.执行任务
- env.execute();
- }
-
- }
因为代码中Flink开启了检查点的功能,并将检查点保存在HDFS,所以需要将Hadoop的相关配置文件放入resource文件夹,用于连接配置HDFS集群。
1)打包并上传至Linux
2)启动HDFS集群
[admin@hadoop102 flink-local]$ start-dfs.sh
3)启动Flink集群
[admin@hadoop102 flink-local]$ bin/start-cluster.sh
4)启动程序
[admin@hadoop102 flink-local]$ bin/flink run -m hadoop102:8081 -c com.admin.flink.FlinkCDC_Stream ./tms-flink-cdc.jar
5)观察taskManager日志,会从头读取表数据
6)给当前的Flink程序创建Savepoint
[admin@hadoop102 flink-local]$ bin/flink savepoint JobId hdfs://hadoop102:8020/flinkCDC/save
在WebUI中cancelJob
在MySQL的tms01.user_info表中添加、修改或者删除数据
从Savepoint重启程序
[admin@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flink/save/... -c com.admin.flink.FlinkCDC_Stream ./gmall-flink-cdc.jar
观察taskManager日志,会从检查点读取表数据
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_2.12</artifactId>
- <version>1.16.1</version>
- </dependency>
- package com.admin.flink;
-
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
- public class FlinkCDC_SQL {
-
- public static void main(String[] args) throws Exception {
- // TODO 1. 准备环境
- // 1.1 流处理环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // 1.2 表执行环境
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
-
- // TODO 2. 创建动态表
- tableEnv.executeSql("CREATE TABLE user_info (\n" +
- "id INT,\n" +
- "login_name STRING,\n" +
- "nick_name STRING,\n" +
- "primary key(id) not enforced\n" +
- ") WITH (" +
- "'connector' = 'mysql-cdc'," +
- "'hostname' = 'hadoop102'," +
- "'port' = '3306'," +
- "'username' = 'root'," +
- "'password' = '123456'," +
- "'database-name' = 'tms01'," +
- "'table-name' = 'user_info'" +
- ")");
-
- tableEnv.executeSql("select * from user_info").print();
-
- // TODO 3. 执行任务
- env.execute();
- }
-
- }
直接运行查看控制台输出。
(1)创建tms-realtime模块
(2)创建如下包结构
目录 | 作用 |
app | Flink任务应用程序 |
util | 工具类 |
(3)修改配置文件
在pom.xml的<project></project>添加如下配置。
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <java.version>1.8</java.version>
- <flink.version>1.16.1</flink.version>
- <scala.version>2.12</scala.version>
- <hadoop.version>3.3.4</hadoop.version>
- <flink-cdc.version>2.3.0</flink-cdc.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.68</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>3.3.4</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java-bridge</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>${flink-cdc.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-statebackend-rocksdb</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.0.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
(1)流处理环境准备
撰写Flink程序首先要初始化流处理环境,配套的实时项目会有很多Flink程序,为了避免重复书写大量代码,将环境准备抽取为工具类CreateEnvUtil 中的方法getStreamEnv(String[] args)。思路如下。
① 初始化流处理环境
② 获取命令行参数
生产环境下Flink Job通常采用Flink on Yarn模式部署,所有程序会打包部署到服务器,然后提交给Yarn。Flink Job的配置信息可以通过flink-conf.yaml文件指定、可以在命令行提交Job时指定、可以在代码中指定,三者的优先级依次升高。如果所有配置项都在代码中指定,要更改就必须修改代码,重新打包部署,然后提交Job。而一个Flink Job涉及的配置信息多达十几甚至几十个,当业务场景发生变化如数据洪峰到来时可能需要频繁修改,无疑会极大增加工作量。为了提升效率,配置项通常不再代码中写死,而是通过命令行与配置文件flink-conf.yaml指定。通常在flink-conf.yaml中配置默认值,如果Job的配置与默认值不同,在提交Job时显式指定以覆盖默认值。当然,用到的所有配置项全部在提交Job时指定也是可以的。
配置项可以分为两类:
a)可以通过官方配置项指定。Flink官方提供可以通过-Dkey=value在提交Job时指定,此类无须赘述
b)只能通过args指定。某些配置官方没有提供对应的配置项,只能通过main方法的参数,字符串数组args传递,然后用ParameterTool解析。ParameterTool会遍历args数组的元素,将--或-开头的字符串识别为配置项的key,其后紧邻的非key元素识别为对应的value,然后将所有键值对封装到HashMap中,保存在ParameterTool类对象Map类型的属性data中,然后将该对象返回。因此,通过b方式传递配置信息时需要注意:
i)配置项的key必须以--或-开头,
ii)与key对应的value必须与key紧邻,位于其后。
③ 对流处理环境进行配置
对于②中的a)类配置项,代码中无须改动,对于b)类配置,将命令行传入的参数传递给流处理对象。
(2)Mysql数据源准备
① 命令行传参
Flink-CDC可以监控Mysql的binlog,将采集到的数据封装为MySqlSource类型的对象。该类型对象初始化时需要指定Mysql的IP、端口、用户名、密码、数据库名称、原始表等,这些参数只能通过args传递。
② 启动模式
离线数仓需要对历史业务数据做分析处理,因此StartupMode应为initial(),即首次启动Job时执行一次全表扫描。
③ 原始表列表
增量表分为两类:与事实相关的表、与维度相关的表,这两类表的配置相同,但考虑到实时数仓测试环境下可能要对这两类表做不同的配置,我们将它们的名称封装到不同的String数组中。
④ 数据序列化
通过Flink-CDC提供的JsonDebeziumDeserializationSchema类对象做序列化,采集到的数据会被封装为JSON字符串。特殊地,decimal类型数据默认会被序列化为base-64编码的字符串,如'10.2345'会输出为'D3J5',显然与我们的需求不符,需要将默认的序列化格式更换为NUMERIC。
⑤ serverId
Flink-CDC监控binlog是基于主从复制实现的,Flink的每个并行度会被伪装为一台从机,每台从机要有唯一的serverId。
(3)Kafka工具类
对数据处理后需要将数据写出到Kafka,编写KafkaUtil工具类及Kafka生产者初始化方法。
① 命令行传参
Kafka主题、URL等配置信息通过args传递。
② 初始化FlinkKafkaProducer对象
语义设置为精准一次,Flink对接Kafka的精准一次是基于Kafka事务通过两阶段提交实现的,未提交的数据仍然会进入Kafka,只是被标记为未提交状态。Kafka的隔离机制决定了是否可以读取这部分数据,该配置项为isolation.level,取值如下。
read_committed:读已提交,下游只能读取状态为已提交的事务数据和所有非事务数据。
read_uncommitted:读未提交,下游可以读取所有状态的事务数据和非事务数据。该值为默认值。
如果要真正保证精准一次,我们需要将isolation.level修改为read_comitted,但写入Kafka的数据只有在检查点完成时才会被提交,因而数据延迟会在原先的基础上大大增加。相邻两次检查点完成的时间间隔越大,延迟越高。因此,企业通常不愿承担这样的代价,仍使用默认的隔离级别,通过下游去重保证统计结果的准确。但是,对于某些无法去重或严格要求精准一次的场景只能接受延迟的增加。生产环境应根据实际情况在延迟和精准一次之间作取舍。
(4)时间戳处理工具类
提供两个方法,分别提供将yyyy-MM-dd HH:mm:ss的格式化时间字符串转换为毫秒时时间戳的功能和将后者转换为前者的功能。用途下文说明。
(5)主程序
① 初始化流处理环境
调用工具类CreateEnvUtil的对应方法即可。
② 全局并行度设置
Flink应用的并行度可以在4处指定:算子并行度、全局并行度、Job提交时、flink-conf.yaml配置文件中,优先级依次降低。配置文件中的并行度通常作为默认值存在,即其它三处均不指定时此处生效。对于广播算子等并行度不会更改的算子设置算子并行度,生产环境一般通常不会在代码中设置全局并行度,而是通过命令行调整。
Kafka分区数为4,Flink并行度应与之相同,全局并行度设置为4,调试程序时可以在代码中指定,打包部署之前应删除对应语句。
③ 定义source算子
a)指定读取类型(dim/dwd),监控不同的表。维度相关的表和事实相关的表各自生成数据源。
b)指定算子并行度,当source算子并行度大于1时,同一主键的数据可能会进入不同的并行度,可能因此导致数据乱序。如JSON格式为{"id":1, "name":"小红"}的数据先被修改为{"id":1, "name":"小花"},又被修改为{"id":1, "name":"小黄"},若两次修改进入了不同并行度,下游可能先接收到第二次修改,那么最终保留的name为“小花”而非“小黄”,从而导致统计结果出错。因此source算子并行度设置为1,保证数据严格有序。
c)ETL及数据结构转换。舍弃两种数据:i)格式不完整的JSON数据;ii)操作类型为删除的数据。通过filter算子实现。要注意,我们设置了全局并行度为4,若不显式声明则filter算子并行度为4而source并行度为1,会发生rebalance,相同主键数据可能进入不同并行度从而导致数据乱序使得计算结果出错,因此,显式声明filter算子并行度为1。
常用的时间戳字段名均为ts,为避免不必要的麻烦,将时间戳字段名由ts_ms替换为ts。此外,Flink-CDC采集到的数据中,与before、after同级的时间戳ts_ms对应的时间为数据进入Flink-CDC Job的时间,生产环境下分为两种情况:如果是snapshot采集到的历史数据该时间戳为采集时间,Flume的HDFSSink会按照它将数据写入HDFS对应日期的目录(下文详述);如果是实时监控binlog采集到的数据,该时间戳与业务数据变更时间相差无几,可以直接将其作为业务数据变更时间,同样会被作为HDFS分区的时间戳,这两种情况改时间戳都不需要更改。但是,教学环境下我们需要模拟生成指定日期的数据,指定的日期可能与ts_ms的日期不同,此时就需要将其yyyy-MM-dd部分替换为指定的日期。使用上文提到的时间戳工具类中的对应方法完成此操作。
d)按照主键分区,相同主键数据进入相同分区,严格有序。
e)写入Kafka指定主题
- {
- // 变更前的数据(一行)
- "before":null,
- // 变更后的数据(一行)
- "after":{
- "id":232232,
- "shift_id":1794,
- "line_id":833,
- "start_org_id":25,
- "start_org_name":"重庆市市辖区转运中心",
- "end_org_id":26,
- "end_org_name":"四川省成都市转运中心",
- "status":"67004",
- "order_num":0,
- "driver1_emp_id":1620,
- "driver1_name":"袁永",
- "driver2_emp_id":1621,
- "driver2_name":"邬晶",
- "truck_id":897,
- "truck_no":"渝A6N61K",
- "actual_start_time":1659808843000,
- "actual_end_time":1659824732000,
- "actual_distance":314,
- "create_time":"2022-08-06T16:50:32Z",
- "update_time":"2022-08-06T22:25:32Z",
- "is_deleted":"0"
- },
- // 元数据信息
- "source":{
- "version":"1.5.4.Final",
- "connector":"mysql",
- "name":"mysql_binlog_source",
- "ts_ms":0,
- "snapshot":"false",
- "db":"tms",
- "sequence":null,
- "table":"transport_task",
- "server_id":0,
- "gtid":null,
- "file":"",
- "pos":0,
- "row":0,
- "thread":null,
- "query":null
- },
- // 对当前行数据进行的操作
- // "c":create;新增
- // "r":read;读取,只有首日新增才会是r
- // "u":update;修改
- // "d":delete;删除
- "op":"r",
- "ts_ms":1672128012567,
- "transaction":null
- }
① 导入配置文件
本项目对接HDFS高可用集群,为了解析集群名称,需要将core-site.xml和hdfs-site.xml文件置于项目resources目录下。
② 主程序如下。
- package com.admin.tms.realtime.util;
-
- import com.esotericsoftware.minlog.Log;
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
- import org.apache.flink.api.java.utils.ParameterTool;
- import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
- import org.apache.flink.streaming.api.CheckpointingMode;
- import org.apache.flink.streaming.api.environment.CheckpointConfig;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.kafka.connect.json.DecimalFormat;
- import org.apache.kafka.connect.json.JsonConverterConfig;
-
- import java.util.HashMap;
-
- public class CreateEnvUtil {
-
- /**
- * 初始化流处理环境
- *
- * @param args 命令行参数数组
- * @return 流处理环境
- */
- public static StreamExecutionEnvironment getStreamEnv(String[] args) {
- // TODO 1. 初始化流处理环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .getExecutionEnvironment();
-
-
- return env;
- }
-
- /**
- * 生成 Flink-CDC 的 MysqlSource 对象
- * @param option 选项,dim|dwd,对应不同的原始表列表
- * @param serverId MySQL 从机的 serverId
- * @param args 命令行参数数组
- * @return MySqlSource 对象
- */
- public static MySqlSource<String> getJSONSchemaMysqlSource(String option, String serverId, String[] args) {
- ParameterTool parameterTool = ParameterTool.fromArgs(args);
- String mysqlHostname = parameterTool.get("mysql-hostname", "hadoop102");
- int mysqlPort = Integer.parseInt(parameterTool.get("mysql-port", "3306"));
- String mysqlUsername = parameterTool.get("mysql-username", "root");
- String mysqlPasswd = parameterTool.get("mysql-passwd", "000000");
- serverId = parameterTool.get("server-id", serverId);
- option = parameterTool.get("start-up-options", option);
-
- // 将 Decimal 类型数据的解析格式由 BASE64 更改为 NUMERIC,否则解析报错
- // 创建配置信息 Map 集合,将 Decimal 数据类型的解析格式配置 k-v 置于其中
- HashMap config = new HashMap<>();
- config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
- // 将前述 Map 集合中的配置信息传递给 JSON 解析 Schema,该 Schema 将用于 MysqlSource 的初始化
- JsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema =
- new JsonDebeziumDeserializationSchema(false, config);
-
- // 创建 MysqlSourceBuilder 对象
- MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
- .hostname(mysqlHostname)
- .port(mysqlPort)
- .username(mysqlUsername)
- .password(mysqlPasswd)
- .deserializer(jsonDebeziumDeserializationSchema);
-
- // 根据方法的 option 参数做不同的初始化操作,返回不同的 MysqlSource 对象
- switch (option) {
- case "dim":
- String[] dimTables = new String[]{"tms.user_info",
- "tms.user_address"};
- return builder
- .databaseList("tms")
- .tableList(dimTables)
- .startupOptions(StartupOptions.initial())
- .serverId(serverId)
- .build();
- case "dwd":
- String[] dwdTables = new String[]{"tms.order_info",
- "tms.order_cargo",
- "tms.transport_task",
- "tms.order_org_bound"};
- return builder
- .databaseList("tms")
- .tableList(dwdTables)
- .startupOptions(StartupOptions.initial())
- .serverId(serverId)
- .build();
- }
- Log.error("不支持的操作类型!");
- return null;
- }
- }
- package com.admin.tms.realtime.util;
-
- import org.apache.flink.api.java.utils.ParameterTool;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
- import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import javax.annotation.Nullable;
- import java.util.Properties;
-
- import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic.EXACTLY_ONCE;
-
- public class KafkaUtil {
-
- private static final String DEFAULT_TOPIC = "default_topic";
-
- /**
- * 指定 topic 获取 FlinkKafkaProducer 实例
- *
- * @param topic 主题
- * @param args 命令行参数数组
- * @return FlinkKafkaProducer 实例
- */
- public static FlinkKafkaProducer<String> getKafkaProducer(String topic, String[] args) {
- // 创建配置对象
- Properties producerProp = new Properties();
- // 将命令行参数对象封装为 ParameterTool 类对象
- ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
- // 提取命令行传入的 key 为 topic 的配置信息,并将默认值指定为方法参数 topic
- // 当命令行没有指定 topic 时,会采用默认值
- topic = parameterTool.get("topic", topic);
- // 如果命令行没有指定主题名称且默认值为 null 则抛出异常
- if (topic == null) {
- throw new IllegalArgumentException("主题名不可为空:命令行传参为空且没有默认值!");
- }
-
- // 获取命令行传入的 key 为 bootstrap-servers 的配置信息,并指定默认值
- String bootstrapServers = parameterTool.get(
- "bootstrap-severs", "hadoop102:9092, hadoop103:9092, hadoop104:9092");
- // 获取命令行传入的 key 为 transaction-timeout 的配置信息,并指定默认值
- String transactionTimeout = parameterTool.get(
- "transaction-timeout", 15 * 60 * 1000 + "");
- // 设置 Kafka 连接的 URL
- producerProp.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- // 设置 Kafka 事务超时时间
- producerProp.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout);
-
- // 内部类中使用但未声明的局部变量必须在内部类代码段之前明确分配
- String finalTopic = topic;
- return new FlinkKafkaProducer<String>(
- DEFAULT_TOPIC,
- new KafkaSerializationSchema<String>() {
- @Override
- public ProducerRecord<byte[], byte[]> serialize(String jsonStr, @Nullable Long timestamp) {
- return new ProducerRecord<byte[], byte[]>(finalTopic, jsonStr.getBytes());
- }
- },
- producerProp,
- EXACTLY_ONCE);
- }
- }
- package com.admin.tms.realtime.util;
-
- import java.time.LocalDateTime;
- import java.time.ZoneId;
- import java.time.ZoneOffset;
- import java.time.format.DateTimeFormatter;
- import java.util.Date;
-
- public class DateFormatUtil {
- // yyyy-MM-dd HH:mm:ss 日期格式化对象
- private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-
- /**
- * 将 yyyy-MM-dd HH:mm:ss 格式化日期字符串转换为毫秒时间戳
- * @param dtStr yyyy-MM-dd HH:mm:ss 格式化日期字符串
- * @return 毫秒时间戳
- */
- public static Long toTs(String dtStr) {
- LocalDateTime localDateTime = LocalDateTime.parse(dtStr, dtf);
- return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
- }
-
- /**
- * 将毫秒时间戳转换为 yyyy-MM-dd HH:mm:ss 格式化日期字符串
- * @param ts 毫秒时间戳
- * @return yyyy-MM-dd HH:mm:ss 格式化日期字符串
- */
- public static String toYmdHms(Long ts) {
- Date dt = new Date(ts);
- LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
- return dtf.format(localDateTime);
- }
- }
- package com.admin.tms.realtime.app.ods;
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONException;
- import com.alibaba.fastjson.JSONObject;
- import com.admin.tms.realtime.util.CreateEnvUtil;
- import com.admin.tms.realtime.util.DateFormatUtil;
- import com.admin.tms.realtime.util.KafkaUtil;
- import com.esotericsoftware.minlog.Log;
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.api.java.utils.ParameterTool;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
- import org.apache.flink.util.Collector;
-
- public class OdsApp {
- public static void main(String[] args) throws Exception {
-
- // TODO 1. 初始化流处理环境
- StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
-
- // 并行度设置,部署时应注释,通过 args 指定全局并行度
- env.setParallelism(4);
-
- // TODO 2. 处理维度数据
- String dimOption = "dim";
- String dimServerId = "6020";
- String dimSourceName = "ods_dim_source";
- sinkToKafka(dimOption, dimServerId, dimSourceName, env, args);
-
- // TODO 3. 处理事实数据
- String dwdOption = "dwd";
- String dwdServerId = "6030";
- String dwdSourceName = "ods_dwd_source";
- sinkToKafka(dwdOption, dwdServerId, dwdSourceName, env, args);
-
- env.execute();
- }
-
- public static void sinkToKafka(
- String option, String serverId, String sourceName, StreamExecutionEnvironment env, String[] args) {
- // 1. 读取数据
- MySqlSource<String> mysqlSource = CreateEnvUtil.getJSONSchemaMysqlSource(option, serverId, args);
- DataStreamSource<String> source = env
- .fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), sourceName)
- .setParallelism(1);
-
- // 2. ETL
- // 获取统计日期
- ParameterTool parameterTool = ParameterTool.fromArgs(args);
- String mockDate = parameterTool.get("mock_date");
- SingleOutputStreamOperator<String> flatMappedStream =
- source.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public void flatMap(String jsonStr, Collector<String> out) throws Exception {
- try {
- JSONObject jsonObj = JSON.parseObject(jsonStr);
- if (jsonObj.getJSONObject("after") != null
- && !jsonObj.getString("op").equals("d")) {
- Long ts = jsonObj.getLong("ts_ms");
- if (mockDate != null) {
- String curDate = DateFormatUtil.toYmdHms(ts);
- String fixedDate = mockDate + curDate.substring(10);
- Long fixedTs = DateFormatUtil.toTs(fixedDate);
- jsonObj.put("ts", fixedTs);
- } else {
- jsonObj.put("ts", ts);
- }
- jsonObj.remove("ts_ms");
- out.collect(jsonObj.toJSONString());
- }
- } catch (JSONException jsonException) {
- jsonException.printStackTrace();
- Log.error("从Flink-CDC读取的数据解析异常" + jsonException.getMessage());
- }
- }
- }).setParallelism(1);
-
- // 3. 按照主键分组,避免数据倾斜
- KeyedStream<String, String> keyedStream = flatMappedStream.keyBy(
- new KeySelector<String, String>() {
- @Override
- public String getKey(String jsonStr) {
- JSONObject jsonObj = JSON.parseObject(jsonStr);
- return jsonObj.getJSONObject("after").getString("id");
- }
- }
- );
-
- // 4. 写入 Kafka 对应主题
- String topic = "tms_ods";
- FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(topic, args);
- keyedStream
- .addSink(kafkaProducer);
- }
- }
注意:这里使用的是yarn模式提交Flink代码,使用的是yarn的容器来启动Flink,所以并不需要启动Flink的集群,所以也并不需要多台机器部署Flink集群。
(1)上传安装包
将Flink-1.16.1安装包上传至hadoop102的/opt/software目录下。
- [admin@hadoop102 software]$ ls | grep flink-1.16.1
-
- flink-1.16.1-bin-scala_2.12.tgz
(2)解压
- [admin@hadoop102 software]$ tar -zxvf flink-1.16.1-bin-scala_2.12.tgz -C ../module
- [admin@hadoop102 software]$ cd ../module/
- [admin@hadoop102 module]$ ls | grep flink-1.16.1
- flink-1.16.1
(3)配置环境变量
[admin@hadoop103 module]$ sudo vim /etc/profile.d/my_env.sh
在文件末尾添加以下内容。
- # FLINK_HOME
- export FLINK_HOME=/opt/module/flink-1.16.1
- export PATH=$PATH:$FLINK_HOME/bin
保存退出,刷新环境变量。
[admin@hadoop102 module]$ source /etc/profile.d/my_env.sh
Flink的Yarn per Job模式下一共有三类日志,如下。
① Flink客户端日志,默认会保存在Flink家目录lib下的flink-${username}-client-${hostname}.log文件中,username为执行提交命令所用的Linux系统用户名,hostname为Flink客户端所在节点主机名。部分客户端日志会被同时打印在控制台。
② JobManager容器日志,需要配置Yarn日志聚集(Hadoop基础课已有介绍,不再赘述)。如果开启了历史服务器可以在Yarn界面查看日志,否则须在对应的本地路径查看。
启动历史服务器
[admin@hadoop102 log]$ mapred --daemon start historyserver
点击Logs。
红框选中的即为JM日志。
/opt/module/flink-1.16.1/lib目录下的log4j-slf4j-impl-2.17.1.jar文件会与Hadoop家目录share/hadoop/common/lib目录下的slf4j-log4j12-1.7.25.jar文件发生冲突,日志无法正常打印。修改Flink家目录lib下对应文件的后缀即可,如下。
- [admin@hadoop102 log]$ cd /opt/module/flink-1.16.1/lib/
- [admin@hadoop102 lib]$ mv log4j-slf4j-impl-2.17.1.jar log4j-slf4j-impl-2.17.1.jar.bak
提交Job时控制台会抛出如下异常。
java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
原因是Hadoop3.x版本会启动异步线程来执行一些终止方法。这些方法在任务执行之后运行,而此时类加载器已经被释放,就抛出以上异常。这个异常并不会影响任务的运行,只会在控制台打印堆栈信息。详见Flink Jira:[FLINK-19916] Hadoop3 ShutdownHookManager visit closed ClassLoader - ASF JIRA
我们可以通过在flink-conf.yaml文件中增加以下配置项禁用类加载器检查来避免报错。
- [admin@hadoop103 lib]$ cd /opt/module/flink-1.16.1/conf/
- [admin@hadoop103 conf]$ vim flink-conf.yaml
在文件中添加如下内容,而后保存退出即可。
classloader.check-leaked-classloader: false
(1)将pom文件中服务器已有依赖的scope修改为provided,修改后的pom文件如下
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <java.version>1.8</java.version>
- <flink.version>1.16.1</flink.version>
- <scala.version>2.12</scala.version>
- <hadoop.version>3.1.3</hadoop.version>
- <flink-cdc.version>2.3.0</flink-cdc.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.68</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>3.1.3</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java-bridge</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.25</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.25</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-to-slf4j</artifactId>
- <version>2.14.0</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>${flink-cdc.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-statebackend-rocksdb</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.0.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
(2)注释检查点相关配置及并行度设置,如下
- // 3.1 启用检查点
- // env.enableCheckpointing(60 * 1000L, CheckpointingMode.EXACTLY_ONCE);
- // 3.2 设置相邻两次检查点最小间隔
- // env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30 * 1000L);
- // 3.3 设置取消 Job 时检查点的清理模式
- // env.getCheckpointConfig().setExternalizedCheckpointCleanup(
- // CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
- // );
- // 3.4 设置状态后端类型
- // env.setStateBackend(new HashMapStateBackend());
- // 3.5 设置检查点存储路径
- // env.getCheckpointConfig().setCheckpointStorage("hdfs://mycluster/tms/ck/test");
-
-
- // 并行度设置,部署时应注释,通过 args 指定全局并行度
- // env.setParallelism(4);
(3)打包上传带有依赖的jar包
- [admin@hadoop102 job]$ cd /opt/module/flink-1.16.1/
- [admin@hadoop102 flink-1.16.1]$ mkdir job
- [admin@hadoop102 flink-1.16.1]$ cd job/
- [admin@hadoop102 job]$ ll
- 总用量 96984
-
- -rw-r--r-- 1 admin admin 99309838 12月 28 15:51 tms-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar
命令如下。
- flink run \
- # 指定 Job 提交模式为 yarn-per-job
- -t yarn-per-job \
- # 并行度为4
- -p 4 \
- # 以守护进程的方式提交 Job,提交过程不会阻塞页面
- -d \
- # 指定 Job 名称
- -Dyarn.application.name=OdsApp \
- # 指定 Job 所处的 Yarn 队列
- -Dyarn.application.queue=default \
- # 指定 JobManager 所占用的内存大小
- -Djobmanager.memory.process.size=1g \
- # 指定 TaskManager 所占用的内存大小
- -Dtaskmanager.memory.process.size=1536mb \
- # 指定每个 TaskManager 的 Slot 数量
- -Dtaskmanager.numberOfTaskSlots=2 \
- # 指定管理内存大小,如果不适用 RocksDB 状态后端且没有排序等复杂操作可以设置为0
- -Dtaskmanager.memory.managed.size=0 \
- # 相邻两次检查点触发的时间间隔
- -Dexecution.checkpointing.interval='30 s' \
- # 语义(精准一次、至少一次),此处为精准一次,当上游多个并行度进入下游一个并行度时,若某个并行度的 barrier 先到达则会阻塞该并行度的后续处理,直至其余并行度 barrier 到达,完成检查点同步后方可继续处理。如果不阻塞则语义为至少一次
- -Dexecution.checkpointing.mode=EXACTLY_ONCE \
- # 单次检查点完成的超时时间(最长时间)
- -Dexecution.checkpointing.timeout='1 min' \
- # 相邻检查点的最小时间间隔(从上一个检查点结束到下一个检查点触发的最小时间间隔
- -Dexecution.checkpointing.min-pause='20 s' \
- # 取消 Job 时检查点的保存策略,此处表示取消 Job 时保留检查点
- -Dexecution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
- # Job 失败时的重启策略,此处为失败率重启,即声明指定时间间隔内重启最大次数的策略
- -Drestart-strategy=failure-rate \
- # 相邻两次 Job 重启的最小时间间隔
- -Drestart-strategy.failure-rate.delay='3 min' \
- # 重启次数重置的时间间隔
- -Drestart-strategy.failure-rate.failure-rate-interval='1 d' \
- # 声明指定时间内的最大重启次数
- -Drestart-strategy.failure-rate.max-failures-per-interval=10 \
- # 状态后端类型,可选项:hashmap,rockdb,分别对应全部基于内存哈希表的状态后端和基于内嵌 RocksDB 数据库的状态后端
- -Dstate.backend=hashmap \
- # 检查点存储介质,可选项:jobmanager、filesystem,分别对应 JobManager 堆内存和文件系统
- -Dstate.checkpoint-storage=filesystem \
- # 指定检查点在文件系统的存储路径,要求上一配置项取值为 filesystem
- -Dstate.checkpoints.dir=hdfs://mycluster/tms/ck/ods_app \
- # 指定 Job 中数据的编码方式为 UTF-8,如果不指定,则中文乱码
- -Denv.java.opts="-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
- # 指定主类名
- -c com.admin.tms.realtime.app.ods.OdsApp \
- # 指定 Jar 包路径
- /opt/module/flink-1.16.1/job/tms-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar \
- # 指定访问 Hadoop 的用户名为其超级用户 admin,与代码中指定的默认值相同,此处可以省略
- --HADOOP_USER_NAME admin \
- # 指定模拟数据的日期,生产环境不需要
- --mock_date 2023-01-10
第二次及之后启动Job应从上次记录的位置继续读取数据,因此需要有检查点或保存点,二者的区别在于前者是Flink自动生成的,后者是手动触发的。生产环境下Flink Job应常驻,仅在需要新增功能或调整配置信息时才需要重启。Job的重启应由两个步骤组成,如下。
① 手动触发保存点
② 修改完成后从手动触发的保存点重启
此处对步骤①进行说明,保存点手动触发命令如下。
- flink stop \
- # 手动触发的保存点存储路径
- --savepointPath hdfs://mycluster/tms/sp/ods_app \
- # Flink JobId
- a22d74cae0db7db9c206ed8cb0d1ecaa \
- # Flink Job在 Yarn 的 application ID
- -yid application_1672211800662_0002
JobId可以通过Flink WebUI查看,如下。
Flink Job在Yarn的application ID可以通过其WebUI页面查看,如下。
重启需要指定到_metadata目录的上级路径。当Job故障达到最大重启次数最终失败时没有最新状态的保存点,此时应从检查点重启。
① 从检查点重启
如果不需要调整配置,则与首次提交命令唯一的区别在于需要指定历史检查点路径,如下。
- flink run -t yarn-per-job -p 4 -d \
- # 指定检查点存储路径,需要指定到 _metadata 目录的上级路径
- -s hdfs://mycluster/tms/ck/ods_app/1cad146250d7a4f564efe0307bce479a/chk-221 \
- -Dyarn.application.name=OdsApp \
- -Dyarn.application.queue=default \
- -Djobmanager.memory.process.size=1g \
- -Dtaskmanager.memory.process.size=1536mb \
- -Dtaskmanager.numberOfTaskSlots=2 \
- -Dtaskmanager.memory.managed.size=0 \
- -Dexecution.checkpointing.interval='30 s' \
- -Dexecution.checkpointing.mode=EXACTLY_ONCE \
- -Dexecution.checkpointing.timeout='1 min' \
- -Dexecution.checkpointing.min-pause='20 s' \
- -Dexecution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
- -Drestart-strategy=failure-rate \
- -Drestart-strategy.failure-rate.delay='3 min' \
- -Drestart-strategy.failure-rate.failure-rate-interval='1 d' \
- -Drestart-strategy.failure-rate.max-failures-per-interval=10 \
- -Dstate.backend=hashmap \
- -Dstate.checkpoint-storage=filesystem \
- -Dstate.checkpoints.dir=hdfs://mycluster/tms/ck/ods_app \
- -Denv.java.opts="-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
- -c com.admin.tms.realtime.app.ods.OdsApp \
- /opt/module/flink-1.16.1/job/tms-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar \
- --HADOOP_USER_NAME admin \
- --mock_date 2023-01-10
② 从保存点重启
如果不需要调整配置,将①中的检查点路径更换为保存点路径即可,如下。
- flink run -t yarn-per-job -p 4 -d \
- # 指定保存点存储路径,需要指定到 _metadata 目录的上级路径
- -s hdfs://mycluster/tms/sp/ods_app/savepoint-a22d74-42e58a5e8420 \
- -Dyarn.application.name=OdsApp \
- -Dyarn.application.queue=default \
- -Djobmanager.memory.process.size=1g \
- -Dtaskmanager.memory.process.size=1536mb \
- -Dtaskmanager.numberOfTaskSlots=2 \
- -Dtaskmanager.memory.managed.size=0 \
- -Dexecution.checkpointing.interval='30 s' \
- -Dexecution.checkpointing.mode=EXACTLY_ONCE \
- -Dexecution.checkpointing.timeout='1 min' \
- -Dexecution.checkpointing.min-pause='20 s' \
- -Dexecution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
- -Drestart-strategy=failure-rate \
- -Drestart-strategy.failure-rate.delay='3 min' \
- -Drestart-strategy.failure-rate.failure-rate-interval='1 d' \
- -Drestart-strategy.failure-rate.max-failures-per-interval=10 \
- -Dstate.backend=hashmap \
- -Dstate.checkpoint-storage=filesystem \
- -Dstate.checkpoints.dir=hdfs://mycluster/tms/ck/ods_app \
- -Denv.java.opts="-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
- -c com.admin.tms.realtime.app.ods.OdsApp \
- /opt/module/flink-1.16.1/job/tms-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar \
- --HADOOP_USER_NAME admin \
- --mock_date 2023-01-10
(7)测试
启动Kafka命令行消费者,命令如下
[admin@hadoop103 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_ods
消费到的部分数据如下
{"op":"r","after":{"end_org_name":"湖北省武汉市转运中心","truck_id":454,"create_time":"2023-01-09T16:50:42Z","driver1_emp_id":1103,"start_org_id":259,"line_id":453,"start_org_name":"湖北省武汉市汉南区转运站","driver1_name":"俞珊莎","update_time":"2023-01-09T19:10:57Z","is_deleted":"0","shift_id":908,"actual_distance":69.00,"actual_end_time":1673291457000,"id":100312,"order_num":0,"truck_no":"鄂B2CMQ0","actual_start_time":1673287247000,"end_org_id":19,"status":"67004"},"source":{"server_id":0,"version":"1.5.4.Final","file":"","connector":"mysql","pos":0,"name":"mysql_binlog_source","row":0,"ts_ms":0,"snapshot":"false","db":"tms","table":"transport_task"},"ts":1672214547625}
- [admin@hadoop102 bin]$ cd
- [admin@hadoop102 ~]$ cd bin
- [admin@hadoop102 bin]$ vim flink-cdc.sh
在脚本中写入以下内容,而后保存退出。
- #!/bin/bash
-
- # 将昨日格式化为 yyyy-MM-dd 字符串,作为 mock_date 的默认值
- yesterday=`date -d "-1 day" +%F`
-
- # 校验参数,不合法警告并退出
- if [ $1 = "initial" ]
- then
- # 首次启动 Job 时判断是否有第二个参数,若没有则使用昨天日期,否则将其传递给 mock_date
- if [ q$2 = q ]
- then
- mock_date=$yesterday
- else
- mock_date=$2
- fi
- elif [ $1 = "start" ]
- then
- # 从检查点或保存点启动需要传入存储路径,若没有则退出
- if [ q$2 = q ]
- then
- echo "请传入检查点或保存点存储路径"
- exit
- elif [ q$3 = q ]
- then
- mock_date=$yesterday
- else
- mock_date=$3
- fi
- elif [ $1 = "stop" ]
- then
- # 停止 Job 需要传入 JobID 和 application ID,第二个参数为 JobID,没有则退出
- if [ q$2 = q ]
- then
- echo "请传入Flink-CDC JobID"
- exit
- # 若没有第三个参数则退出
- elif [ q$3 = q ]
- then
- echo "请传入Flink-CDC 在 Yarn 的 application ID"
- exit
- fi
- fi
-
- case $1 in
- "initial")
- flink run -t yarn-per-job -p 4 -d \
- -Dyarn.application.name=OdsApp \
- -Dyarn.application.queue=default \
- -Djobmanager.memory.process.size=1g \
- -Dtaskmanager.memory.process.size=1536mb \
- -Dtaskmanager.numberOfTaskSlots=2 \
- -Dtaskmanager.memory.managed.size=0 \
- -Dexecution.checkpointing.interval='30 s' \
- -Dexecution.checkpointing.mode=EXACTLY_ONCE \
- -Dexecution.checkpointing.timeout='1 min' \
- -Dexecution.checkpointing.min-pause='20 s' \
- -Dexecution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
- -Drestart-strategy=failure-rate \
- -Drestart-strategy.failure-rate.delay='3 min' \
- -Drestart-strategy.failure-rate.failure-rate-interval='1 d' \
- -Drestart-strategy.failure-rate.max-failures-per-interval=10 \
- -Dstate.backend=hashmap \
- -Dstate.checkpoint-storage=filesystem \
- -Dstate.checkpoints.dir=hdfs://mycluster/tms/ck/ods_app \
- -Denv.java.opts="-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
- -c com.admin.tms.realtime.app.ods.OdsApp \
- /opt/module/flink-1.16.1/job/tms-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar \
- --HADOOP_USER_NAME admin \
- --mock_date $mock_date
- ;;
- "start")
- flink run -t yarn-per-job -p 4 -d \
- -s $2 \
- -Dyarn.application.name=OdsApp \
- -Dyarn.application.queue=default \
- -Djobmanager.memory.process.size=1g \
- -Dtaskmanager.memory.process.size=1536mb \
- -Dtaskmanager.numberOfTaskSlots=2 \
- -Dtaskmanager.memory.managed.size=0 \
- -Dexecution.checkpointing.interval='30 s' \
- -Dexecution.checkpointing.mode=EXACTLY_ONCE \
- -Dexecution.checkpointing.timeout='1 min' \
- -Dexecution.checkpointing.min-pause='20 s' \
- -Dexecution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
- -Drestart-strategy=failure-rate \
- -Drestart-strategy.failure-rate.delay='3 min' \
- -Drestart-strategy.failure-rate.failure-rate-interval='1 d' \
- -Drestart-strategy.failure-rate.max-failures-per-interval=10 \
- -Dstate.backend=hashmap \
- -Dstate.checkpoint-storage=filesystem \
- -Dstate.checkpoints.dir=hdfs://mycluster/tms/ck/ods_app \
- -Denv.java.opts="-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
- -c com.admin.tms.realtime.app.ods.OdsApp \
- /opt/module/flink-1.16.1/job/tms-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar \
- --HADOOP_USER_NAME admin \
- --mock_date $mock_date
- ;;
- "stop")
- flink stop \
- --savepointPath hdfs://mycluster/tms/sp/ods_app \
- $2 \
- -yid $3
- ;;
- "*")
- echo "参数不合法,第一个参数必须为 initial | start | stop"
- ;;
- esac
脚本用法如下。
① 初次启动
- [admin@hadoop102 bin]$ flink-cdc.sh initial \
- # 日期可省略
- 2023-01-10
部分日志如下
$ echo "stop" | ./bin/yarn-session.sh -id application_1672211800662_0011
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1672211800662_0011
Note that killing Flink might not clean up all job artifacts and temporary files.
2022-12-28 19:43:23,873 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface hadoop103:42536 of application 'application_1672211800662_0011'.
Job has been submitted with JobID be0b720d9982d10a4fdca66b60637aff
② 停止
部分参数需要用户依据实际情况修改,下文同理。
[admin@hadoop102 bin]$ flink-cdc.sh stop be0b720d9982d10a4fdca66b60637aff application_1672211800662_0011
部分日志如下
Suspending job "434405fe813ea29eb152c0d93b6b1792" with a savepoint.
2022-12-28 19:47:30,341 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at hadoop103/192.168.10.103:8032
2022-12-28 19:47:30,636 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2022-12-28 19:47:30,723 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface hadoop102:43407 of application 'application_1672211800662_0012'.
Savepoint completed. Path: hdfs://mycluster/tms/sp/ods_app/savepoint-434405-ca9094d9fa45
③ 从检查点或保存点重启
- [admin@hadoop102 bin]$ flink-cdc.sh start hdfs://mycluster/tms/sp/ods_app/savepoint-434405-ca9094d9fa45 \
- # 日期可省略
- 2023-01-10
部分日志如下
$ echo "stop" | ./bin/yarn-session.sh -id application_1672211800662_0013
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1672211800662_0013
Note that killing Flink might not clean up all job artifacts and temporary files.
2022-12-28 19:48:15,676 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface hadoop102:38301 of application 'application_1672211800662_0013'.
Job has been submitted with JobID a39e9b72a89c75c27216a149c6bae0ca
对增量同步工具Flink-CDC的介绍和使用。还有保存点和检查点在实际生产环境上的使用。
有帮助的话请点个赞吧!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。