当前位置:   article > 正文

FlinkCDC读取MySQL并写入Kafka案例(com.alibaba.ververica)

com.alibaba.ververica

场景应用:将MySQL的变化数据转为实时流输出到Kafka中。

注意版本问题,版本不同可能会出现异常,以下版本测试没问题:

flink1.12.7

flink-connector-mysql-cdc 1.3.0(com.alibaba.ververica) (测试时使用1.2.0版本时会出现空指针错误)

1. MySQL的配置

 在/etc/my.cnf文件中,【mysqld】下面添加以下配置:

binlog-do-db  是指定要监控的数据库,如果是多个数据库,每个数据库需要单独一行设置。

 修改完成后,需要重启数据库,并检查binlog有没有生成。

补充几个其他的配置:

  1. 1、修改配置
  2. [mysqld]
  3. # 前面还有其他配置
  4. # 添加的部分
  5. server-id = 12345
  6. log-bin = mysql-bin
  7. # 必须为ROW
  8. binlog_format = ROW
  9. # 必须为FULL,MySQL-5.7后才有该参数
  10. binlog_row_image = FULL
  11. expire_logs_days = 15
  12. 2、验证
  13. SHOW VARIABLES LIKE '%binlog%';
  14. 3、设置权限
  15. -- 设置拥有同步权限的用户
  16. CREATE USER 'flinkuser' IDENTIFIED BY 'flinkpassword';
  17. -- 赋予同步相关权限
  18. GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser';
  19. 创建用户并赋予权限成功后,使用该用户登录MySQL,可以使用以下命令查看主从同步相关信息
  20. SHOW MASTER STATUS
  21. SHOW SLAVE STATUS
  22. SHOW BINARY LOGS

2. FlinkCDC的开发

从这里开始建立flink工程项目,以下项目flink版本为1.12.7,scala版本用的2.12。

大概的思考步骤如下:

1) 获取执行环境

2)开启检查点ck (重点)

3)通过flinkcdc构建sourceFunction,并读取数据 (重点)

4)在执行环境中添加3)中构建的source

5)配置kafka生产者环境(重点)

6)在执行环境中增加5)中的Sink

7)启动任务

项目结构(gmall-realtime)如下:

2.1 Pom文件配置

由于这是我的一个子项目,所以实际使用的时候自己修改。

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>gmall-flink-2021</artifactId>
  7. <groupId>com.king</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>gmall-flink-cdc</artifactId>
  12. <version>1.0</version>
  13. <properties>
  14. <java.version>1.8</java.version>
  15. <maven.compiler.source>${java.version}</maven.compiler.source>
  16. <maven.compiler.target>${java.version}</maven.compiler.target>
  17. <flink.version>1.12.7</flink.version>
  18. <scala.version>2.12</scala.version>
  19. <hadoop.version>3.1.3</hadoop.version>
  20. </properties>
  21. <dependencies>
  22. <dependency>
  23. <groupId>org.apache.flink</groupId>
  24. <artifactId>flink-java</artifactId>
  25. <version>${flink.version}</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.flink</groupId>
  29. <artifactId>flink-streaming-java_${scala.version}</artifactId>
  30. <version>${flink.version}</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.apache.flink</groupId>
  34. <artifactId>flink-connector-kafka_${scala.version}</artifactId>
  35. <version>${flink.version}</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.apache.flink</groupId>
  39. <artifactId>flink-clients_${scala.version}</artifactId>
  40. <version>${flink.version}</version>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.apache.flink</groupId>
  44. <artifactId>flink-cep_${scala.version}</artifactId>
  45. <version>${flink.version}</version>
  46. </dependency>
  47. <dependency>
  48. <groupId>org.apache.flink</groupId>
  49. <artifactId>flink-json</artifactId>
  50. <version>${flink.version}</version>
  51. </dependency>
  52. <dependency>
  53. <groupId>com.alibaba</groupId>
  54. <artifactId>fastjson</artifactId>
  55. <version>1.2.68</version>
  56. </dependency>
  57. <!--如果保存检查点到 hdfs 上,需要引入此依赖-->
  58. <dependency>
  59. <groupId>org.apache.hadoop</groupId>
  60. <artifactId>hadoop-client</artifactId>
  61. <version>${hadoop.version}</version>
  62. </dependency>
  63. <dependency>
  64. <groupId>mysql</groupId>
  65. <artifactId>mysql-connector-java</artifactId>
  66. <version>8.0.16</version>
  67. </dependency>
  68. <dependency>
  69. <groupId>org.apache.kafka</groupId>
  70. <artifactId>kafka-clients</artifactId>
  71. <version>2.7.0</version>
  72. </dependency>
  73. <dependency>
  74. <groupId>com.alibaba.ververica</groupId>
  75. <artifactId>flink-connector-mysql-cdc</artifactId>
  76. <version>1.3.0</version>
  77. </dependency>
  78. <!-- https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc -->
  79. <!-- <dependency> 该包仅支持flink1.13版本及以上-->
  80. <!-- <groupId>com.ververica</groupId>-->
  81. <!-- <artifactId>flink-connector-mysql-cdc</artifactId>-->
  82. <!-- <version>2.1.1</version>-->
  83. <!-- </dependency>-->
  84. <dependency>
  85. <groupId>org.projectlombok</groupId>
  86. <artifactId>lombok</artifactId>
  87. <version>1.18.20</version>
  88. </dependency>
  89. <!--Flink 默认使用的是 slf4j 记录日志,相当于一个日志的接口,我们这里使用 log4j 作为
  90. 具体的日志实现-->
  91. <dependency>
  92. <groupId>org.slf4j</groupId>
  93. <artifactId>slf4j-api</artifactId>
  94. <version>1.7.32</version>
  95. </dependency>
  96. <dependency>
  97. <groupId>org.slf4j</groupId>
  98. <artifactId>slf4j-log4j12</artifactId>
  99. <version>1.7.32</version>
  100. </dependency>
  101. <dependency>
  102. <groupId>org.apache.logging.log4j</groupId>
  103. <artifactId>log4j-to-slf4j</artifactId>
  104. <version>2.17.1</version>
  105. </dependency>
  106. </dependencies>
  107. <build>
  108. <!-- <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>-->
  109. <!-- <resources>-->
  110. <!-- <resource>-->
  111. <!-- <directory>${project.basedir}/src/main/resources</directory>-->
  112. <!-- </resource>-->
  113. <!-- </resources>-->
  114. <plugins>
  115. <plugin>
  116. <groupId>org.apache.maven.plugins</groupId>
  117. <artifactId>maven-assembly-plugin</artifactId>
  118. <version>3.0.0</version>
  119. <configuration>
  120. <descriptorRefs>
  121. <descriptorRef>jar-with-dependencies</descriptorRef>
  122. </descriptorRefs>
  123. </configuration>
  124. <executions>
  125. <execution>
  126. <id>make-assembly</id>
  127. <phase>package</phase>
  128. <goals>
  129. <goal>single</goal>
  130. </goals>
  131. </execution>
  132. </executions>
  133. </plugin>
  134. </plugins>
  135. </build>
  136. </project>
pom.xml

注意一点:如果使用java开发,可以直接编译成功。但是我这里全部使用scala开发,所以需要在pom文件配置额外的插件,否则打包scala项目会不成功。

  1. <plugins>
  2. <plugin>
  3. <!-- !!必须有这个插件,才可以编译scala代码找到主类,版本我是网上搞来的 -->
  4. <groupId>net.alchim31.maven</groupId>
  5. <artifactId>scala-maven-plugin</artifactId>
  6. <version>3.2.2</version>
  7. <executions>
  8. <execution>
  9. <id>compile-scala</id>
  10. <phase>compile</phase>
  11. <goals>
  12. <goal>add-source</goal>
  13. <goal>compile</goal>
  14. </goals>
  15. </execution>
  16. <execution>
  17. <id>test-compile-scala</id>
  18. <phase>test-compile</phase>
  19. <goals>
  20. <goal>add-source</goal>
  21. <goal>testCompile</goal>
  22. </goals>
  23. </execution>
  24. </executions>
  25. </plugin>
  26. </plugins>
plugins
2.2 读取MySQL

Flinkcdc.scala中:

通过引入的flink-connector-mysql-cdc已经提供了读取MySQL的工具类。

  1. val sourceFunction = MySQLSource.builder[String]()
  2. .hostname("hadoop200")
  3. .port(3306)
  4. .username("root")
  5. .password("root")
  6. .databaseList("gmall-210325-flink")
        //如果不添加该参数,则消费指定数据库中所有表的数据        //如果添加,则需要按照 数据库名.表名 的格式指定,多个表使用逗号隔开//        .tableList("gmall-210325-flink.base_trademark")        .deserializer(new CustomerDeseriallization())
new CustomerDeseriallization() 是自定义的读取的MySQL的数据输出格式,如果不指定,系统也有个new StringDebeziumDeserializationSchema()可以使用。
2.3 自定义从MySQL读取的数据的输出格式
CustomerDeseriallization类
  1. package com.king.app.function
  2. import com.alibaba.fastjson.JSONObject
  3. import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema
  4. import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
  5. import org.apache.flink.util.Collector
  6. import org.apache.kafka.connect.data.{Schema, Struct}
  7. import org.apache.kafka.connect.source.SourceRecord
  8. /**
  9. * @Author: KingWang
  10. * @Date: 2021/12/29
  11. * @Desc:
  12. **/
  13. class CustomerDeseriallization extends DebeziumDeserializationSchema[String]{
  14. /**
  15. * 封装的数据:
  16. * {
  17. * "database":"",
  18. * "tableName":"",
  19. * "type":"c r u d",
  20. * "before":"",
  21. * "after":"",
  22. * "ts": ""
  23. *
  24. * }
  25. *
  26. * @param sourceRecord
  27. * @param collector
  28. */
  29. override def deserialize(sourceRecord: SourceRecord, collector: Collector[String]): Unit = {
  30. //1. 创建json对象用于保存最终数据
  31. val result = new JSONObject()
  32. val value:Struct = sourceRecord.value().asInstanceOf[Struct]
  33. //2. 获取库名&表名
  34. val source:Struct = value.getStruct("source")
  35. val database = source.getString("db")
  36. val table = source.getString("table")
  37. //3. 获取before
  38. val before = value.getStruct("before")
  39. val beforeObj = if(before != null) getJSONObjectBySchema(before.schema(),before) else new JSONObject()
  40. //4. 获取after
  41. val after = value.getStruct("after")
  42. val afterObj = if(after != null) getJSONObjectBySchema(after.schema(),after) else new JSONObject()
  43. //5. 获取操作类型
  44. val op:String = value.getString("op")
  45. //6. 获取操作时间
  46. val ts = source.getInt64("ts_ms")
  47. // val ts = value.getInt64("ts_ms")
  48. //7. 拼接结果
  49. result.put("database", database)
  50. result.put("table", table)
  51. result.put("type", op)
  52. result.put("before", beforeObj)
  53. result.put("after", afterObj)
  54. result.put("ts", ts)
  55. collector.collect(result.toJSONString)
  56. }
  57. override def getProducedType: TypeInformation[String] = {
  58. BasicTypeInfo.STRING_TYPE_INFO
  59. }
  60. //从Schema中获取字段和值
  61. def getJSONObjectBySchema(schema:Schema,struct:Struct):JSONObject = {
  62. val fields = schema.fields()
  63. var jsonBean = new JSONObject()
  64. val iter = fields.iterator()
  65. while(iter.hasNext){
  66. val field = iter.next()
  67. val key = field.name()
  68. val value = struct.get(field)
  69. jsonBean.put(key,value)
  70. }
  71. jsonBean
  72. }
  73. }
CustomerDeseriallization
2.4 写入到Kafka
  1. package com.king.util
  2. import org.apache.flink.api.common.serialization.{SerializationSchema, SimpleStringSchema}
  3. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
  4. /**
  5. * @Author: KingWang
  6. * @Date: 2022/1/1
  7. * @Desc:
  8. **/
  9. object MyKafkaUtil {
  10. val broker_list = "hadoop200:9092,hadoop201:9092,hadoop202:9092"
  11. def getKafkaProducer(topic:String):FlinkKafkaProducer[String] =
  12. new FlinkKafkaProducer[String](broker_list,topic,new SimpleStringSchema())
  13. }
MyKafkaUtil

FlinkCDC.scala的完整代码如下:

  1. package com.king.app.ods
  2. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource
  3. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions
  4. import com.king.app.function.CustomerDeseriallization
  5. import com.king.util.MyKafkaUtil
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
  7. /**
  8. * @Author: KingWang
  9. * @Date: 2021/12/26
  10. * @Desc:
  11. **/
  12. object FlinkCDC {
  13. def main(args: Array[String]): Unit = {
  14. //1. 获取执行环境
  15. val env = StreamExecutionEnvironment.getExecutionEnvironment
  16. env.setParallelism(1)
  17. //1.1 开启ck并指定状态后端fs
  18. // env.setStateBackend(new FsStateBackend("hdfs://hadoop200:8020/gmall-flink-210325/ck"))
  19. // .enableCheckpointing(10000L) //头尾间隔:每10秒触发一次ck
  20. // env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //
  21. // env.getCheckpointConfig.setCheckpointTimeout(10000L)
  22. // env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
  23. // env.getCheckpointConfig.setMinPauseBetweenCheckpoints(3000l) //尾和头间隔时间3
  24. // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));
  25. //2. 通过flinkCDC构建SourceFunction并读取数据
  26. val sourceFunction = MySQLSource.builder[String]()
  27. .hostname("hadoop200")
  28. .port(3306)
  29. .username("root")
  30. .password("root")
  31. .databaseList("gmall-210325-flink")
  32. //如果不添加该参数,则消费指定数据库中所有表的数据
  33. //如果添加,则需要按照 数据库名.表名 的格式指定,多个表使用逗号隔开
  34. // .tableList("gmall-210325-flink.base_trademark")
  35. .deserializer(new CustomerDeseriallization())
  36. //监控的方式:
  37. // 1. initial 初始化全表拷贝,然后再比较
  38. // 2. earliest 不做初始化,只从当前的
  39. // 3. latest 指定最新的
  40. // 4. specificOffset 指定offset
  41. // 3. timestamp 比指定的时间大的
  42. .startupOptions(StartupOptions.latest())
  43. .build()
  44. val dataStream = env.addSource(sourceFunction)
  45. //3. sink, 写入kafka
  46. dataStream.print()
  47. val sinkTopic = "ods_base_db"
  48. dataStream.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic))
  49. //4. 启动任务
  50. env.execute("flinkCDC")
  51. }
  52. }
FlinkCDC.scala

3. 测试项目

准备好kafka,mysql,可以在本地测试。

启动kafka消费者,topic是ods_base_db

在idea中启动flinkcdc程序。

打开mysql编辑器,表base_trademark中原始记录有12条如下:

 现在手工增加一条记录,编号为13  wang

 查看idea控制台显示添加消息如下:

 同时在Kafka消费者也看到一条记录如下,字段type为操作类型,c表示创建

再次在MySQL中做修改和删除操作,可以看到控制多了两条记录,操作类型分别为u和d,表示修改和删除操作。

到此flinkcdc的操作基本完成。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/836347
推荐阅读
相关标签
  

闽ICP备14008679号