赞
踩
使用flink中datagen connector进行数据的模拟生成,同时将生成的数据写入到kafka中,模拟流式数据的生成;然后再使用flink对kafka中的数据进行消费,存入到数据湖iceberg中。
该流程类似与上一篇文章:kafka-flinkSQL-hudi的测试,有兴趣请参考上篇文章。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.util.Date; /** * @author hyl * @date 2022/5/6 * @Description * @ModifiedBy */ public class CreateDataAll { public static void main(String[] args) { // datagen source 随机生成数据 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String sourceDDL = "create table datagen_source (\n" + " id int,\n" + " data string,\n" + "price double,\n" + " ts as localtimestamp,\n" + " watermark for ts as ts\n" + ") with (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='" + args[0] + "',\n" + // 每秒生成多少条数据,传入参数 " 'fields.id.kind'='sequence',\n" + " 'fields.id.start'='" + args[1] + "',\n" + // id的起始位置 " 'fields.id.end'='" + args[2] + "',\n" + // id的结束位置 " 'fields.data.length'='50'\n" + ")"; System.out.println(sourceDDL); tableEnv.executeSql(sourceDDL); // kafka sink String sinkDDL = "create table kafka_sink (\n" + " id int,\n" + " data string,\n" + "price double,\n" + " ts timestamp\n" + ") with (\n" + " 'connector' = 'kafka',\n" + " 'topic' = '" + args[3] + "',\n" + " 'properties.bootstrap.servers' = '192.168.100.14:9092',\n" + " 'properties.group.id' = 'flinkToKafka3',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'format' = 'json',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'\n" + ")"; System.out.println(sinkDDL); tableEnv.executeSql(sinkDDL); // insert into kafka,其实就是利用flink的自己模拟生成的数据,来发送给kafka,模拟流数据id>1000000 or String insertKafka = "insert into kafka_sink\n" + "select * from datagen_source "; System.out.println(insertKafka); tableEnv.executeSql(insertKafka); // print final Date date_end = new Date(); System.out.println("起始时间:" + date_end.toString() + "*********************************************"); } }
import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * @author hyl * @date 2022/4/28 * @Description 使用flink将kafka的数据拉来处理,实时写入iceberg中 */ public class KafkaToIceberg { public static void main(String[] args) { System.setProperty("HADOOP_USER_NAME", "root"); System.setProperty("user.name", "root"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(6000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().enableUnalignedCheckpoints(); env.getCheckpointConfig().setCheckpointStorage("hdfs://192.168.100.11:9000/tmp/hudi/testFlinkHudi/checkpoint/dir"); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // // kafka source String kafkaSourceDDL = "create table kafka_source (\n" + " id int,\n" + " data string,\n" + "price double,\n" + " ts timestamp\n" + ") with (\n" + " 'connector' = 'kafka',\n" + " 'topic' = '" + args[0] + "',\n" + // 消费topic " 'properties.bootstrap.servers' = '192.168.100.14:9092',\n" + " 'properties.group.id' = 'flinkToKafka3',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'format' = 'json',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'\n" + ")"; tableEnv.executeSql(kafkaSourceDDL); System.out.println(kafkaSourceDDL); // 使用table api 创建 hadoop catalog TableResult tableResult = tableEnv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'catalog-type'='hadoop',\n" + " 'warehouse'='hdfs://192.168.100.11:9000/tmp/iceberg/flink_iceberg/FlinkClientTable/',\n" + " 'property-version'='2', \n" + " 'format-version'='2'\n" + ")"); // 建立一张新表 String createHudiTable = "create table if not exists hadoop_catalog.iceberg_hadoop_db."+args[1]+" (\n" + " id int,\n" + " data string,\n" + " price double,\n" + " ts timestamp(3),\n" + " part int,\n"+ " event_time string,\n" + " event_date string, \n" + " PRIMARY KEY (`id`) NOT ENFORCED \n"+ ") partitioned by ( part ) \n" + "WITH (\n" + " 'property-version'='2', \n" + " 'format-version'='2' \n" + ")"; System.out.println(createHudiTable); tableEnv.executeSql(createHudiTable); // // 流式摄取到iceberg中 String insertDML = "insert into hadoop_catalog.iceberg_hadoop_db."+args[1]+" \n" + "select\n" + " id,\n" + " data,\n" + " price,\n " + " ts,\n" + " id/100000 as part,\n"+ " date_format(ts, 'yyyy-MM-dd HH:mm:ss') as event_time,\n" + " date_format(ts, 'yyyy-MM-dd') as event_date\n" + "from kafka_source"; tableEnv.executeSql(insertDML); } }
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>FlinkToIceberg</artifactId> <version>1.0-SNAPSHOT</version> <properties> <!-- project compiler --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <!-- maven compiler--> <scala.maven.plugin.version>3.2.2</scala.maven.plugin.version> <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version> <maven.assembly.plugin.version>3.1.1</maven.assembly.plugin.version> <!-- sdk --> <java.version>1.8</java.version> <!-- engine--> <hadoop.version>2.9.2</hadoop.version> <flink.version>1.13.6</flink.version> <iceberg.version>0.13.1</iceberg.version> <!-- <flink.version>1.12.2</flink.version>--> <!-- <hoodie.version>0.9.0</hoodie.version>--> <hive.version>2.3.9</hive.version> <!-- <scope.type>provided</scope.type>--> <scope.type>compile</scope.type> </properties> <dependencies> <!-- flink Dependency 这个是本地可以查看web执行界面--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <!--flink-core核心包--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <!--实现自定义connector接口--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <!--导入flink-table的java版本api--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <!--运行在本地的idea,需要加入以下两个模块--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.13.6</version> </dependency> <!-- hadoop Dependency,之前没有写这个依赖怎么存储到Hadoop上面--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <scope>${scope.type}</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime-1.13 --> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink-runtime-1.13</artifactId> <version>0.13.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-api --> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-api</artifactId> <version>0.13.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-data --> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-data</artifactId> <version>0.13.1</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink</artifactId> <version>0.13.1</version> </dependency> </dependencies> <build> <plugins> <!--瘦子打包,只打包代码--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>libs/</classpathPrefix> <mainClass> KafkaToIceberg </mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
谢谢观看,如果感兴趣flink-hudi案例测试,可以参考笔者之前的文章。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。