当前位置:   article > 正文

Flink 通过批量和CDC两种方式读取MySQL数据入Iceberg_flinkcdc读取mysql

flinkcdc读取mysql

简介

Flink JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库读取数据并将数据写入其中。本文档介绍如何设置 JDBC 连接器以针对关系数据库运行 SQL 查询。

Flink 读写MySQL 可以参考:Flink 读写MySQL数据(DataStream和Table API)_wank1259162的博客-CSDN博客Flink提供了基于JDBC的方式,可以将读取到的数据写入到MySQL中;本文通过两种方式将数据下入到MySQL数据库,其他的基于JDBC的数据库类似,另外,Table API方式的Catalog指定为Hive Catalog方式,持久化DDL操作。Maven依赖,包含了Hive Catalog的相关依赖 DataStream方式读写MySQL数据Table API的方式读写MySQL,其中Flink的Catalog使用Hive Catalog的方式MySQL中的数据..........https://blog.csdn.net/wank1259162/article/details/125442030?spm=1001.2014.3001.5502

Flink-MySQL-CDC连接器允许实时同步MySQL数据 

如果在 DDL 上定义了主键,则 JDBC sink 以 upsert 模式与外部系统交换 UPDATE/DELETE 消息,否则,它以 append 模式运行,不支持消费 UPDATE/DELETE 消息。

Apache Iceberg是一种表格式(table format)。我们可以简单理解为它是基于计算层(flink、spark)和存储层(orc、parquet)的一个中间层,我们可以把它定义成一种“数据组织格式”,Iceberg将其称之为“表格式”也是表达类似的含义。

它与底层的存储格式(比如ORC、Parquet之类的列式存储格式)最大的区别是,它并不定义数据存储方式,而是定义了数据、元数据的组织方式,向上提供统一的“表”的语义。它构建在数据存储格式之上,其底层的数据存储仍然使用Parquet、ORC等进行存储。在hive建立一个iceberg格式的表。用flink或者spark写入iceberg,然后再通过其他方式来读取这个表,比如spark、flink、presto等。
 

Iceberg 优势
增量读取处理能力:Iceberg支持通过流式方式读取增量数据,支持Structured Streaming以及Flink table Source;
支持事务(ACID),上游数据写入即可见,不影响当前数据处理任务,简化ETL;提供upsert和merge into能力,可以极大地缩小数据入库延迟;
可扩展的元数据,快照隔离以及对于文件列表的所有修改都是原子操作;
同时支持流批处理、支持多种存储格式和灵活的文件组织:提供了基于流式的增量计算模型和基于批处理的全量表计算模型。批处理和流任务可以使用相同的存储模型,数据不再孤立;Iceberg支持隐藏分区和分区进化,方便业务进行数据分区策略更新。支持Parquet、Avro以及ORC等存储格式。
支持多种计算引擎,优秀的内核抽象使之不绑定特定的计算引擎,目前Iceberg支持的计算引擎有Spark、Flink、Presto以及Hive。

 

代码依赖

  1. <properties>
  2. <maven.compiler.source>8</maven.compiler.source>
  3. <maven.compiler.target>8</maven.compiler.target>
  4. <flink.version>1.14.4</flink.version>
  5. <scala.binary.version>2.12</scala.binary.version>
  6. <hadoop.version>3.1.2</hadoop.version>
  7. <hive.version>3.1.2</hive.version>
  8. </properties>
  9. <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime -->
  10. <dependency>
  11. <groupId>org.apache.iceberg</groupId>
  12. <artifactId>iceberg-flink-runtime</artifactId>
  13. <version>0.12.1</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.apache.flink</groupId>
  17. <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
  18. <version>${flink.version}</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>mysql</groupId>
  22. <artifactId>mysql-connector-java</artifactId>
  23. <version>8.0.21</version>
  24. </dependency>

测试代码

批量方式

  1. /**
  2. * MySQL数据导入iceberg
  3. */
  4. public class JDBC2IcebergTable {
  5. public static void main(String[] args) throws Exception {
  6. // create environments of both APIs
  7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  9. tableEnv.executeSql("CREATE TABLE IF NOT EXISTS EventTable (\n" +
  10. "`user` STRING,\n" +
  11. "url STRING,\n" +
  12. "`timestamp` BIGINT\n" +
  13. ") WITH (\n" +
  14. "'connector' = 'jdbc',\n" +
  15. "'url' = 'jdbc:mysql://127.0.0.1:3306/flink',\n" +
  16. "'table-name' = 'events',\n" +
  17. "'username'='root',\n" +
  18. "'password'='00000'\n" +
  19. ")");
  20. Table eventTable = tableEnv.from("EventTable");
  21. // Table aliceTable = tableEnv.sqlQuery("select * from EventTable ");
  22. //创建CATALOG
  23. tableEnv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
  24. " 'type'='iceberg',\n" +
  25. " 'catalog-type'='hadoop',\n" +
  26. " 'warehouse'='file:///tmp/warehouse/iceberg',\n" +
  27. " 'property-version'='1'\n" +
  28. ")");
  29. tableEnv.executeSql("USE CATALOG hadoop_catalog");
  30. //创建表
  31. tableEnv.executeSql("CREATE TABLE IF NOT EXISTS eve (\n" +
  32. " `user` STRING,\n" +
  33. " url STRING,\n" +
  34. " `timestamp` BIGINT \n" +
  35. ")");
  36. Configuration configuration = new Configuration();
  37. TableSchema schema = eventTable.getSchema();
  38. DataStream<Row> input = tableEnv.toDataStream(eventTable);
  39. // input.print();
  40. TableLoader tableLoader = TableLoader.fromHadoopTable("file:///tmp/warehouse/iceberg/default/eve", configuration);
  41. DataStreamSink<RowData> dataDataStreamSink = FlinkSink.forRow(input, schema)
  42. .tableLoader(tableLoader)
  43. .build();
  44. //读数据
  45. DataStream<RowData> batch = FlinkSource.forRowData()
  46. .env(env)
  47. .tableLoader(tableLoader)
  48. .streaming(false)
  49. .build();
  50. batch.map(x -> x.getString(0)).print();
  51. //batch.print();
  52. env.execute("Test Iceberg Batch Read");
  53. }
  54. }

代码说明

1、hadoop catalog创建

创建脚本,warehouse的路径,它会自动创建HDFS路径里面 ns是命名空间,但namenode的使用ip:port代替。

  1. //创建CATALOG
  2. tableEnv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
  3. " 'type'='iceberg',\n" +
  4. " 'catalog-type'='hadoop',\n" +
  5. " 'warehouse'='file:///tmp/warehouse/iceberg',\n" +
  6. " 'property-version'='1'\n" +
  7. ")");

2.建表

在刚才构建的Catalog下面创建数据表

  1. tableEnv.executeSql("USE CATALOG hadoop_catalog");
  2. //创建表
  3. //创建表
  4. tableEnv.executeSql("CREATE TABLE IF NOT EXISTS eve (\n" +
  5. " `user` STRING,\n" +
  6. " url STRING,\n" +
  7. " `timestamp` BIGINT \n" +
  8. ")");

3.查看目录 

新创建的表数据和元数据。

 4.读数据

  1. DataStream<RowData> batch = FlinkSource.forRowData()
  2. .env(env)
  3. .tableLoader(tableLoader)
  4. .streaming(false)
  5. .build();
  6. batch.print();

Idea的输出

MySQL CDC 方式实时写入Iceberg

Flink SQL和Table API两种方式都可以。

  1. public class MysqlCDC2Iceberg {
  2. public static void main(String[] args) throws Exception {
  3. // create environments of both APIs
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. env.getCheckpointConfig().setCheckpointInterval(1000);
  6. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  7. // tableEnv.executeSql("CREATE TABLE order_info (\n" +
  8. // " `id` BIGINT NOT NULL,\n" +
  9. // " consignee STRING,\n" +
  10. // " order_status STRING,\n" +
  11. // " order_comment STRING,\n" +
  12. // " payment_way STRING,\n" +
  13. // " PRIMARY KEY (`id`) NOT ENFORCED\n" +
  14. // " ) WITH (\n" +
  15. // " 'connector' = 'mysql-cdc',\n" +
  16. // " 'hostname' = '127.0.0.1',\n" +
  17. // " 'port' = '3306',\n" +
  18. // " 'username' = 'root',\n" +
  19. // " 'password' = 'xxxx',\n" +
  20. // " 'database-name' = 'gmall2021',\n" +
  21. // " 'table-name' = 'order_info'\n" +
  22. // " )");
  23. //连接器为Mysql CDC
  24. TableDescriptor tableDesc = TableDescriptor.forConnector("mysql-cdc")
  25. .option("hostname", "127.0.0.1")
  26. .option("port", "3306")
  27. .option("username", "root")
  28. .option("password", "xxxx")
  29. .option("database-name", "gmall2021")
  30. .option("table-name", "order_info")
  31. .schema(
  32. Schema.newBuilder()
  33. .column("id", "BIGINT NOT NULL")
  34. .column("consignee", DataTypes.STRING())
  35. .column("order_status", DataTypes.STRING())
  36. .column("order_comment", DataTypes.STRING())
  37. .column("payment_way", DataTypes.STRING())
  38. .primaryKey("id")
  39. .build())
  40. .build();
  41. tableEnv.createTemporaryTable("order_info", tableDesc);
  42. Table user_source = tableEnv.from("order_info");
  43. DataStream<Row> input = tableEnv.toChangelogStream(user_source);
  44. // input.print();
  45. //创建CATALOG
  46. tableEnv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
  47. " 'type'='iceberg',\n" +
  48. " 'catalog-type'='hadoop',\n" +
  49. " 'warehouse'='file:///tmp/warehouse/iceberg',\n" +
  50. " 'property-version'='1'\n" +
  51. ")");
  52. tableEnv.executeSql("USE CATALOG hadoop_catalog");
  53. //创建表
  54. tableEnv.executeSql("CREATE TABLE IF NOT EXISTS all_order_sink (" +
  55. " `id` BIGINT NOT NULL,\n" +
  56. " consignee STRING,\n" +
  57. " order_status STRING,\n" +
  58. " order_comment STRING,\n" +
  59. " payment_way STRING,\n" +
  60. " PRIMARY KEY (`id`) NOT ENFORCED )\n");
  61. Configuration configuration = new Configuration();
  62. TableSchema schema = user_source.getSchema();
  63. TableLoader tableLoader = TableLoader.fromHadoopTable("file:///tmp/warehouse/iceberg/default/all_order_sink", configuration);
  64. DataStreamSink<RowData> dataDataStreamSink = FlinkSink.forRow(input, schema)
  65. .tableLoader(tableLoader)
  66. .overwrite(true)
  67. .build();
  68. env.execute();
  69. }
  70. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/1005193
推荐阅读
相关标签
  

闽ICP备14008679号