当前位置:   article > 正文

flink cdc数据同步,DataStream方式和SQL方式的简单使用_flink cdc datastream

flink cdc datastream

目录

一、flink cdc介绍

1、什么是flink cdc

2、flink cdc能用来做什么

3、flink cdc的优点

二、flink cdc基础使用

1、使用flink cdc读取txt文本数据

2、DataStream的使用方式

3、SQL的方式

总结


一、flink cdc介绍

1、什么是flink cdc

flink cdc是一个由阿里研发的,一个可以直接从MySQL、PostgreSQL等数据库直接读取全量数据和增量变更数据的source组件。

2、flink cdc能用来做什么

flink cdc能感知数据库的所有修改、新增、删除操作,并以流的形式,进行实时的触发和反馈。如:你想监听一个表的数据是否有变动,并且需要把变动的数据读取出来,插入到另外的表里,或者对该数据进行其他处理。在我们传统的开发里,如果不使用cdc技术,是不是就只能通过定时任务去定时的获取数据?或者在执行数据修改操作时调用指定的接口来进行数据上报?并且还要拿新数据和旧数据进行比较,才能得到自己想要的结果?flink cdc就是解决这种问题的,它是cdc里面的佼佼者,它能在数据表被修改时,进行实时的反馈。

3、flink cdc的优点

① 低延迟:毫秒级的延迟

② 高吞吐:每秒能处理数百万个事件

③ 高可用及结果的准确性、良好的容错性,动态扩展、全天候24小时运行

二、flink cdc基础使用

1、使用flink cdc读取txt文本数据

① 项目目录

② 需要用到的flink依赖(有些可以不用的,看实际需要使用哪些功能):

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.13.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-clients_2.12</artifactId>
  9. <version>1.13.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.flink</groupId>
  13. <artifactId>flink-streaming-java_2.12</artifactId>
  14. <version>1.13.0</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.hadoop</groupId>
  18. <artifactId>hadoop-client</artifactId>
  19. <version>3.1.3</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-table-planner-blink_2.12</artifactId>
  24. <version>1.13.0</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>com.ververica</groupId>
  28. <artifactId>flink-connector-mysql-cdc</artifactId>
  29. <version>2.0.0</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.apache.flink</groupId>
  33. <artifactId>flink-sql-parser</artifactId>
  34. <version>1.13.0</version>
  35. </dependency>

③ 具体代码(TestFlinkController)

  1. package com.bug.controller;
  2. import com.bug.util.flink.TextFlatUtil;
  3. import org.apache.flink.api.java.DataSet;
  4. import org.apache.flink.api.java.ExecutionEnvironment;
  5. import org.apache.flink.api.java.tuple.Tuple3;
  6. import org.springframework.web.bind.annotation.*;
  7. /**
  8. * 1、flink读取本地txt文件数据
  9. */
  10. public class TestFlinkController {
  11. /**
  12. * 1、flink读取本地txt文件数据
  13. * @param args args
  14. */
  15. public static void main(String[] args) throws Exception {
  16. String path = "D:\\javaprojects\\my_springboot1\\my_springboot1\\src\\main\\resources\\flinkText\\flinkTest.txt";
  17. //创建执行环境
  18. ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
  19. //读取txt文件数据
  20. DataSet<String> dataSet = environment.readTextFile(path);
  21. //处理读取的数据
  22. DataSet<Tuple3<String, String, String>> out = dataSet.flatMap(new TextFlatUtil());
  23. //输出
  24. out.print();
  25. }
  26. }

 TextFlatUtil代码:

  1. package com.bug.util.flink;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.tuple.Tuple3;
  4. import org.apache.flink.util.Collector;
  5. /**
  6. * 1、flink读取本地txt文件数据
  7. */
  8. public class TextFlatUtil implements FlatMapFunction<String, Tuple3<String, String, String>> {
  9. @Override
  10. public void flatMap(String value, Collector<Tuple3<String, String, String>> collector) {
  11. for(String word : value.split("\n")){
  12. String[] res = word.split("\t");
  13. collector.collect(new Tuple3<>(res[0],res[1],res[2]));
  14. }
  15. }
  16. }

flinkTest.txt文件值: 

  1. 801165935581855745 小明1 年龄1
  2. 801165936156475393 小明3 年龄3
  3. 801165936567517185 小明5 年龄5
  4. 801165936991141889 小明7 年龄7
  5. 801165937460903937 小明9 年龄9

④ 输出效果

2、DataStream的使用方式

① 数据库修改配置my.cnf文件:binlog_format=row

 ② 直接上代码

  1. package com.bug.flinkcdc;
  2. import com.ververica.cdc.connectors.mysql.MySqlSource;
  3. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.ververica.cdc.debezium.DebeziumSourceFunction;
  5. import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  6. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  7. import org.apache.flink.streaming.api.CheckpointingMode;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. /**
  11. * 2、DataStream的方式
  12. */
  13. public class TestFlinkStream {
  14. /**
  15. * 2、DataStream的方式
  16. */
  17. public static void main(String[] args) throws Exception {
  18. //创建执行环境
  19. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  20. env.setParallelism(1);//线程数
  21. //开启ck
  22. // env.enableCheckpointing(60*1000);//60秒启动一次checkpoint
  23. // env.getCheckpointConfig().setCheckpointTimeout(30*1000);//设置超时时间,默认是10min
  24. // env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//Checkpoint级别,EXACTLY_ONCE精准一次,AT_LEAST_ONCE最多一次
  25. // env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);//设置两次checkpoint的最小时间间隔
  26. // env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//允许的最大checkpoint并行度
  27. // env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));//设置checkpoint的地址
  28. //构建sourceFunction环境,正式开发可以把一些配置提取出来写成公共配置即可
  29. DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
  30. .hostname("***.***.***.***")//ip地址
  31. .port(***)//端口号
  32. .username("***")//用户名
  33. .password("***")//密码
  34. .databaseList("xiaobug")//数据库名称
  35. .tableList("xiaobug.test_flink")//表名称
  36. .deserializer(new StringDebeziumDeserializationSchema())//反序列化
  37. .startupOptions(StartupOptions.initial())//同步方式,initial全量和增量,latest增量
  38. .build();
  39. DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
  40. //数据输出
  41. dataStreamSource.print();
  42. //启动
  43. env.execute();
  44. }
  45. }

③ 效果

 3、SQL的方式

① 数据库修改配置my.cnf文件:binlog_format=row

 

 ② 代码

  1. package com.bug.flinkcdc;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.table.api.Table;
  6. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  7. import org.apache.flink.types.Row;
  8. /**
  9. * 3、SQL的方式
  10. */
  11. public class TestFlinkSQL {
  12. /**
  13. * 3、SQL的方式
  14. */
  15. public static void main(String[] args) throws Exception {
  16. //创建执行环境
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. env.setParallelism(1);//线程数
  19. StreamTableEnvironment tev = StreamTableEnvironment.create(env);
  20. //正式开发时可以把这些语句做成单独的sql文件,更方便管理和维护,with的配置也可以做成公共的,然后读取即可
  21. tev.executeSql("CREATE TABLE test_flink (" +
  22. " userid String primary key," +
  23. " username String," +
  24. " userAge String," +
  25. " userCardid String" +
  26. " ) with ( " +
  27. " 'connector' = 'mysql-cdc'," + //别名
  28. " 'hostname' = '***.***.***.***'," + //数据库ip地址
  29. " 'port' = '***'," + //端口号
  30. " 'username' = '***'," + //用户名
  31. " 'password' = '***'," + //密码
  32. " 'database-name' = 'xiaobug'," + //数据库名称
  33. " 'table-name' = 'test_flink' " + //表名称
  34. ")");
  35. //查询数据sql,也可以写在单独的文件里,然后引用即可,复杂的连表查询也是可以的,但需要其他表也进行加载
  36. Table table = tev.sqlQuery("select * from test_flink");
  37. //输出,正式开发可以用sql语句的insert into进行插入,直接实现表到表的同步
  38. DataStream<Tuple2<Boolean, Row>> dataStream = tev.toRetractStream(table,Row.class);
  39. dataStream.print();
  40. //启动
  41. env.execute("FlinkSQLCDC");
  42. }
  43. }

 ③ 效果

 

总结

搞定啦,就是这么简单!flinkcdc的进阶:怎样确保数据的一致性、可靠性、不重复、不丢失,后面有时间再写啦。

测试的时候还碰到了一个jar包版本的问题,sql的方式一定要使用1.13.0以上的版本,不然会报错!

还有flink-sql-parser的这个包也一定要添加,不然会出现下面这个提示:

org.apache.calcite.tools.FrameworkConfig.getTraitDefs()Lorg/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/473013
推荐阅读
相关标签
  

闽ICP备14008679号