当前位置:   article > 正文

Flink CDC 实时同步MySQL数据到Kafka_flinkcdc 读取mysql数据到kafka

flinkcdc 读取mysql数据到kafka

1.前置要求

flink cdc底层就是通过监控mysql的binlog日志,实时捕获到一个表或多个表的变更;所以必须开启mysql的binlog日志。

1.1 打开mysql配置文件

mysql配置文件默认位于/etc/目录下,直接用过以下命令开启

sudo vim /etc/my.cnf

1.2 修改配置文件

  1. ##启动binlog,该参数的值会作为binlog的文件名
  2. log-bin=mysql-bimysql
  3. ##binlog类型
  4. binlog_format=row
  5. ##启用binlog的数据库,需根据实际情况作出修改,一个库占一行
  6. binlog-do-db=库名
  7. binlog-do-db=库名

1.3 重启mysql服务

systemctl restart mysqld

2.环境依赖

  1. <dependency>
  2. <groupId>com.ververica</groupId>
  3. <artifactId>flink-connector-mysql-cdc</artifactId>
  4. <version>2.3.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-connector-kafka</artifactId>
  9. <version>1.17.0</version>
  10. </dependency>

如果cdc版本用了2.4.0出现异常的可以退回2.3.0

3.代码实现

  1. public class TestApp {
  2. public static void main(String[] args) {
  3. //1.获取执行环境
  4. Configuration conf = new Configuration();
  5. //设置web端口
  6. conf.setInteger("rest.port",10000);
  7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
  8. //2.通过flink cdc 读取mysql中的维度数据并创建流
  9. MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
  10. .hostname("主机号")
  11. .port(3306)
  12. .username("用户名")
  13. .password("密码")
  14. //设置mysql数据库
  15. .databaseList("数据库名")
  16. //设置mysql表(多个用,分隔)
  17. .tableList("表1,表2")
  18. //设置cdc启动方式
  19. .startupOptions(StartupOptions.initial())
  20. //设置反序列化器
  21. .deserializer(new JsonDebeziumDeserializationSchema())
  22. .build();
  23. DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "cdc-source");
  24. //3.数据同步到kafka
  25. KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
  26. //指定Kafka的连接地址
  27. .setBootstrapServers("主机:端口号")
  28. //指定序列化器
  29. .setRecordSerializer(
  30. KafkaRecordSerializationSchema.<String>builder()
  31. .setTopic("Topic")
  32. .setValueSerializationSchema(new SimpleStringSchema())
  33. .build()
  34. )
  35. //写入kafka的一致性级别
  36. .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  37. //如果是精确一次,必须设置事务的前缀
  38. .setTransactionalIdPrefix("zhike-")
  39. //如果是精确一次必须设置事务超时时间
  40. .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "300000")
  41. .build();
  42. streamSource.sinkTo(kafkaSink);
  43. //4.执行任务
  44. try {
  45. env.execute("ods_cdc");
  46. } catch (Exception e) {
  47. throw new RuntimeException(e);
  48. }
  49. }
  50. }

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/594790
推荐阅读
相关标签
  

闽ICP备14008679号