赞
踩
flink cdc底层就是通过监控mysql的binlog日志,实时捕获到一个表或多个表的变更;所以必须开启mysql的binlog日志。
mysql配置文件默认位于/etc/目录下,直接用过以下命令开启
sudo vim /etc/my.cnf
- ##启动binlog,该参数的值会作为binlog的文件名
- log-bin=mysql-bimysql
- ##binlog类型
- binlog_format=row
- ##启用binlog的数据库,需根据实际情况作出修改,一个库占一行
- binlog-do-db=库名
- binlog-do-db=库名
systemctl restart mysqld
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>2.3.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka</artifactId>
- <version>1.17.0</version>
- </dependency>
如果cdc版本用了2.4.0出现异常的可以退回2.3.0
- public class TestApp {
-
- public static void main(String[] args) {
-
- //1.获取执行环境
- Configuration conf = new Configuration();
- //设置web端口
- conf.setInteger("rest.port",10000);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
-
- //2.通过flink cdc 读取mysql中的维度数据并创建流
- MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
- .hostname("主机号")
- .port(3306)
- .username("用户名")
- .password("密码")
- //设置mysql数据库
- .databaseList("数据库名")
- //设置mysql表(多个用,分隔)
- .tableList("表1,表2")
- //设置cdc启动方式
- .startupOptions(StartupOptions.initial())
- //设置反序列化器
- .deserializer(new JsonDebeziumDeserializationSchema())
- .build();
- DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "cdc-source");
-
- //3.数据同步到kafka
- KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
- //指定Kafka的连接地址
- .setBootstrapServers("主机:端口号")
- //指定序列化器
- .setRecordSerializer(
- KafkaRecordSerializationSchema.<String>builder()
- .setTopic("Topic")
- .setValueSerializationSchema(new SimpleStringSchema())
- .build()
- )
- //写入kafka的一致性级别
- .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
- //如果是精确一次,必须设置事务的前缀
- .setTransactionalIdPrefix("zhike-")
- //如果是精确一次必须设置事务超时时间
- .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "300000")
- .build();
-
- streamSource.sinkTo(kafkaSink);
- //4.执行任务
- try {
- env.execute("ods_cdc");
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。