赞
踩
Flink JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库读取数据并将数据写入其中。本文档介绍如何设置 JDBC 连接器以针对关系数据库运行 SQL 查询。
Flink-MySQL-CDC连接器允许实时同步MySQL数据 。
如果在 DDL 上定义了主键,则 JDBC sink 以 upsert 模式与外部系统交换 UPDATE/DELETE 消息,否则,它以 append 模式运行,不支持消费 UPDATE/DELETE 消息。
Apache Iceberg是一种表格式(table format)。我们可以简单理解为它是基于计算层(flink、spark)和存储层(orc、parquet)的一个中间层,我们可以把它定义成一种“数据组织格式”,Iceberg将其称之为“表格式”也是表达类似的含义。
它与底层的存储格式(比如ORC、Parquet之类的列式存储格式)最大的区别是,它并不定义数据存储方式,而是定义了数据、元数据的组织方式,向上提供统一的“表”的语义。它构建在数据存储格式之上,其底层的数据存储仍然使用Parquet、ORC等进行存储。在hive建立一个iceberg格式的表。用flink或者spark写入iceberg,然后再通过其他方式来读取这个表,比如spark、flink、presto等。
Iceberg 优势
增量读取处理能力:Iceberg支持通过流式方式读取增量数据,支持Structured Streaming以及Flink table Source;
支持事务(ACID),上游数据写入即可见,不影响当前数据处理任务,简化ETL;提供upsert和merge into能力,可以极大地缩小数据入库延迟;
可扩展的元数据,快照隔离以及对于文件列表的所有修改都是原子操作;
同时支持流批处理、支持多种存储格式和灵活的文件组织:提供了基于流式的增量计算模型和基于批处理的全量表计算模型。批处理和流任务可以使用相同的存储模型,数据不再孤立;Iceberg支持隐藏分区和分区进化,方便业务进行数据分区策略更新。支持Parquet、Avro以及ORC等存储格式。
支持多种计算引擎,优秀的内核抽象使之不绑定特定的计算引擎,目前Iceberg支持的计算引擎有Spark、Flink、Presto以及Hive。
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <flink.version>1.14.4</flink.version>
- <scala.binary.version>2.12</scala.binary.version>
- <hadoop.version>3.1.2</hadoop.version>
- <hive.version>3.1.2</hive.version>
- </properties>
- <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime -->
- <dependency>
- <groupId>org.apache.iceberg</groupId>
- <artifactId>iceberg-flink-runtime</artifactId>
- <version>0.12.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.21</version>
- </dependency>
- /**
- * MySQL数据导入iceberg
- */
- public class JDBC2IcebergTable {
- public static void main(String[] args) throws Exception {
-
- // create environments of both APIs
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
-
- tableEnv.executeSql("CREATE TABLE IF NOT EXISTS EventTable (\n" +
- "`user` STRING,\n" +
- "url STRING,\n" +
- "`timestamp` BIGINT\n" +
- ") WITH (\n" +
- "'connector' = 'jdbc',\n" +
- "'url' = 'jdbc:mysql://127.0.0.1:3306/flink',\n" +
- "'table-name' = 'events',\n" +
- "'username'='root',\n" +
- "'password'='00000'\n" +
- ")");
-
- Table eventTable = tableEnv.from("EventTable");
-
- // Table aliceTable = tableEnv.sqlQuery("select * from EventTable ");
-
- //创建CATALOG
- tableEnv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
- " 'type'='iceberg',\n" +
- " 'catalog-type'='hadoop',\n" +
- " 'warehouse'='file:///tmp/warehouse/iceberg',\n" +
- " 'property-version'='1'\n" +
- ")");
-
- tableEnv.executeSql("USE CATALOG hadoop_catalog");
-
- //创建表
- tableEnv.executeSql("CREATE TABLE IF NOT EXISTS eve (\n" +
- " `user` STRING,\n" +
- " url STRING,\n" +
- " `timestamp` BIGINT \n" +
- ")");
-
- Configuration configuration = new Configuration();
- TableSchema schema = eventTable.getSchema();
- DataStream<Row> input = tableEnv.toDataStream(eventTable);
- // input.print();
-
- TableLoader tableLoader = TableLoader.fromHadoopTable("file:///tmp/warehouse/iceberg/default/eve", configuration);
- DataStreamSink<RowData> dataDataStreamSink = FlinkSink.forRow(input, schema)
- .tableLoader(tableLoader)
- .build();
-
- //读数据
-
- DataStream<RowData> batch = FlinkSource.forRowData()
- .env(env)
- .tableLoader(tableLoader)
- .streaming(false)
- .build();
-
- batch.map(x -> x.getString(0)).print();
- //batch.print();
-
- env.execute("Test Iceberg Batch Read");
-
-
- }
- }
创建脚本,warehouse的路径,它会自动创建HDFS路径里面 ns是命名空间,但namenode的使用ip:port代替。
- //创建CATALOG
- tableEnv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
- " 'type'='iceberg',\n" +
- " 'catalog-type'='hadoop',\n" +
- " 'warehouse'='file:///tmp/warehouse/iceberg',\n" +
- " 'property-version'='1'\n" +
- ")");
在刚才构建的Catalog下面创建数据表
- tableEnv.executeSql("USE CATALOG hadoop_catalog");
-
- //创建表
- //创建表
- tableEnv.executeSql("CREATE TABLE IF NOT EXISTS eve (\n" +
- " `user` STRING,\n" +
- " url STRING,\n" +
- " `timestamp` BIGINT \n" +
- ")");
新创建的表数据和元数据。
- DataStream<RowData> batch = FlinkSource.forRowData()
- .env(env)
- .tableLoader(tableLoader)
- .streaming(false)
- .build();
- batch.print();
Idea的输出
Flink SQL和Table API两种方式都可以。
- public class MysqlCDC2Iceberg {
-
- public static void main(String[] args) throws Exception {
-
- // create environments of both APIs
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.getCheckpointConfig().setCheckpointInterval(1000);
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
-
- // tableEnv.executeSql("CREATE TABLE order_info (\n" +
- // " `id` BIGINT NOT NULL,\n" +
- // " consignee STRING,\n" +
- // " order_status STRING,\n" +
- // " order_comment STRING,\n" +
- // " payment_way STRING,\n" +
- // " PRIMARY KEY (`id`) NOT ENFORCED\n" +
- // " ) WITH (\n" +
- // " 'connector' = 'mysql-cdc',\n" +
- // " 'hostname' = '127.0.0.1',\n" +
- // " 'port' = '3306',\n" +
- // " 'username' = 'root',\n" +
- // " 'password' = 'xxxx',\n" +
- // " 'database-name' = 'gmall2021',\n" +
- // " 'table-name' = 'order_info'\n" +
- // " )");
-
- //连接器为Mysql CDC
-
- TableDescriptor tableDesc = TableDescriptor.forConnector("mysql-cdc")
- .option("hostname", "127.0.0.1")
- .option("port", "3306")
- .option("username", "root")
- .option("password", "xxxx")
- .option("database-name", "gmall2021")
- .option("table-name", "order_info")
- .schema(
- Schema.newBuilder()
- .column("id", "BIGINT NOT NULL")
- .column("consignee", DataTypes.STRING())
- .column("order_status", DataTypes.STRING())
- .column("order_comment", DataTypes.STRING())
- .column("payment_way", DataTypes.STRING())
- .primaryKey("id")
- .build())
- .build();
-
- tableEnv.createTemporaryTable("order_info", tableDesc);
-
- Table user_source = tableEnv.from("order_info");
- DataStream<Row> input = tableEnv.toChangelogStream(user_source);
- // input.print();
-
- //创建CATALOG
- tableEnv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
- " 'type'='iceberg',\n" +
- " 'catalog-type'='hadoop',\n" +
- " 'warehouse'='file:///tmp/warehouse/iceberg',\n" +
- " 'property-version'='1'\n" +
- ")");
-
- tableEnv.executeSql("USE CATALOG hadoop_catalog");
-
- //创建表
- tableEnv.executeSql("CREATE TABLE IF NOT EXISTS all_order_sink (" +
- " `id` BIGINT NOT NULL,\n" +
- " consignee STRING,\n" +
- " order_status STRING,\n" +
- " order_comment STRING,\n" +
- " payment_way STRING,\n" +
- " PRIMARY KEY (`id`) NOT ENFORCED )\n");
-
- Configuration configuration = new Configuration();
- TableSchema schema = user_source.getSchema();
- TableLoader tableLoader = TableLoader.fromHadoopTable("file:///tmp/warehouse/iceberg/default/all_order_sink", configuration);
- DataStreamSink<RowData> dataDataStreamSink = FlinkSink.forRow(input, schema)
- .tableLoader(tableLoader)
- .overwrite(true)
- .build();
-
- env.execute();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。