赞
踩
目前Flink支持使用DataStream API 和SQL API方式实时读取和写入I=ceberg表,建议使用SQL API方式实时读取和写入Iceberg表。
Flink版本 | Iceberg版本 | 备注 |
---|---|---|
Flink1.11.X | Iceberg0.11.1 | |
Flink1.12.x ~ Flink1.13.x | Iceberg0.12.1 | SQL API有Bug |
Flink1.14.x | Iceberg0.12.1 | SQL API有Bug |
本次学习以Flink和Iceberg整合使用Flink版本为1.14.5,Iceberg版本为0.12.1版本。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>flinkiceberg1</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <!-- flink 1.12.x -1.13.x 版本与Iceberg 0.12.1 版本兼容 ,不能与Flink 1.14 兼容--> <flink.version>1.13.5</flink.version> <!--<flink.version>1.12.1</flink.version>--> <!--<flink.version>1.14.2</flink.version>--> <!-- flink 1.11.x 与Iceberg 0.11.1 合适--> <!--<flink.version>1.11.6</flink.version>--> <hadoop.version>3.1.1</hadoop.version> </properties> <dependencies> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-iceberg</artifactId> <version>1.13-vvr-4.0.7</version> <exclusions> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava-parent</artifactId> </exclusion> </exclusions> </dependency> <!-- Flink 操作Iceberg 需要的Iceberg依赖 --> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink-runtime</artifactId> <version>0.12.1</version> <!--<version>0.11.1</version>--> </dependency> <!-- java开发Flink所需依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink Kafka连接器的依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- 读取hdfs文件需要jar包--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> </exclusions> </dependency> <!-- Flink SQL & Table--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.5</version> </dependency> </dependencies> </project>
-- 1、创建catalog CREATE CATALOG hadoop_catalog WITH ( > 'type'='iceberg', > 'catalog-type'='hadoop', > 'warehouse'='hdfs://leidi01:8020/iceberg_catalog', > 'property-version'='1' > ); -- 2、创建databases create database flink_iceberg; -- 3、创建Sink表 CREATE TABLE hadoop_catalog.flink_iceberg.icebergdemo1 ( id STRING, data STRING );
public class FlinkIcebergDemo1 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //1.必须设置checkpoint ,Flink向Iceberg中写入数据时当checkpoint发生后,才会commit数据。 env.enableCheckpointing(5000); //2.读取Kafka 中的topic 数据 KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("192.168.6.102:6667") .setTopics("json") .setGroupId("my-group-id") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); //3.对数据进行处理,包装成RowData 对象,方便保存到Iceberg表中。 SingleOutputStreamOperator<RowData> dataStream = kafkaSource.map(new MapFunction<String, RowData>() { @Override public RowData map(String s) throws Exception { System.out.println("s = "+s); String[] split = s.split(","); GenericRowData row = new GenericRowData(4); row.setField(0, Integer.valueOf(split[0])); row.setField(1, StringData.fromString(split[1])); row.setField(2, Integer.valueOf(split[2])); row.setField(3, StringData.fromString(split[3])); return row; } }); //4.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表 Configuration hadoopConf = new Configuration(); Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://leidi01:8020/flinkiceberg/"); //配置iceberg 库名和表名 TableIdentifier name = TableIdentifier.of("icebergdb", "flink_iceberg_tbl"); //创建Icebeng表Schema Schema schema = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "nane", Types.StringType.get()), Types.NestedField.required(3, "age", Types.IntegerType.get()), Types.NestedField.required(4, "loc", Types.StringType.get())); //如果有分区指定对应分区,这里“loc”列为分区列,可以指定unpartitioned 方法不设置表分区 // PartitionSpec spec = PartitionSpec.unpartitioned(); PartitionSpec spec = PartitionSpec.builderFor(schema).identity("loc").build(); //指定Iceberg表数据格式化为Parquet存储 Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()); Table table = null; // 通过catalog判断表是否存在,不存在就创建,存在就加载 if (!catalog.tableExists(name)) { table = catalog.createTable(name, schema, spec, props); }else { table = catalog.loadTable(name); } TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://leidi01:8020/flinkiceberg//icebergdb/flink_iceberg_tbl", hadoopConf); //5.通过DataStream Api 向Iceberg中写入数据 FlinkSink.forRowData(dataStream) //这个 .table 也可以不写,指定tableLoader 对应的路径就可以。 .table(table) .tableLoader(tableLoader) //默认为false,追加数据。如果设置为true 就是覆盖数据 .overwrite(false) .build(); env.execute("DataStream Api Write Data To Iceberg"); } }
(1)需要设置Checkpoint,Flink向Iceberg中写入Commit数据时,只有Checkpoint成功之后才会Commit数据,否则后期在Hive中查询不到数据。
(2)读取Kafka数据后需要包装成RowData或者Row对象,才能向Iceberg表中写出数据。写出数据时默认是追加数据,如果指定overwrite就是全部覆盖数据。
(3)在向Iceberg表中写数据之前需要创建对应的Catalog、表Schema,否则写出时只指定对应的路径会报错找不到对应的Iceberg表。
(4)不建议使用DataStream API 向Iceberg中写数据,建议使用SQL API。
bin/kafka-console-producer.sh --topic json --broker-list leidi01:6667
bin/kafka-console-consumer.sh --bootstrap-server leidi01:6667 --topic json --from-beginning
data
中有两个分区在Flink SQL中创建Hadoop Catalog
。-- 1、创建Hadoop Catalog
CREATE CATALOG flinkiceberg WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://leidi01:8020/flinkiceberg/',
'property-version'='1'
);
-- 2、查询表中数据
use catalog flinkiceberg;
use icebergdb;
select * from flink_iceberg_tbl;
public class FlinkIcebergRead { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //1.配置TableLoader Configuration hadoopConf = new Configuration(); TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://leidi01:8020/flinkiceberg//icebergdb/flink_iceberg_tbl", hadoopConf); //2.从Iceberg中读取全量/增量读取数据 DataStream<RowData> batchData = FlinkSource.forRowData().env(env) .tableLoader(tableLoader) //默认为false,整批次读取,设置为true 为流式读取 .streaming(false) .build(); batchData.map(new MapFunction<RowData, String>() { @Override public String map(RowData rowData) throws Exception { int id = rowData.getInt(0); String name = rowData.getString(1).toString(); int age = rowData.getInt(2); String loc = rowData.getString(3).toString(); return id+","+name+","+age+","+loc; } }).print(); env.execute("DataStream Api Read Data From Iceberg"); } }
说明:设置方法“streaming(true)”
代码实现
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
//默认为false,整批次读取,设置为true 为流式读取
.streaming(true)
.build();
insert into flink_iceberg_tbl values (5,'s1',30,'guangzhou'),(6,'s2',31,'tianjin');
(1)查看快照编号
(2)代码实现
//2.从Iceberg中读取全量/增量读取数据
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
//基于某个快照实时增量读取数据,快照需要从元数据中获取
.startSnapshotId(1738199999360637062L)
//默认为false,整批次读取; 设置为true为流式读取
.streaming(true)
.build();
(3)运行结果
(1)未处理文件
(2)代码实现
public class RewrietDataFiles { public static void main(String[] args) { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 1、配置TableLoader Configuration hadoopConf = new Configuration(); //2.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表 Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://leidi01:8020/flinkiceberg/"); //3.配置iceberg 库名和表名并加载表 TableIdentifier name = TableIdentifier.of("icebergdb", "flink_iceberg_tbl"); Table table = catalog.loadTable(name); //4..合并 data files 小文件 RewriteDataFilesActionResult result = Actions.forTable(table) .rewriteDataFiles() //默认 512M ,可以手动通过以下指定合并文件大小,与Spark中一样。 .targetSizeInBytes(536870912L) .execute(); } }
(3)运行结果
(1)代码实现
public class SQLAPIWriteIceberg { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env); env.enableCheckpointing(1000); //1.创建Catalog tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" + "'type'='iceberg'," + "'catalog-type'='hadoop'," + "'warehouse'='hdfs://leidi01:8020/flinkiceberg')"); //2.使用当前Catalog tblEnv.useCatalog("hadoop_iceberg"); //3.创建数据库 tblEnv.executeSql("create database iceberg_db"); //4.使用数据库 tblEnv.useDatabase("iceberg_db"); //5.创建iceberg表 flink_iceberg_tbl tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl2(id int,name string,age int,loc string) partitioned by (loc)"); //6.写入数据到表 flink_iceberg_tbl tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 values (1,'zs',18,'beijing'),(2,'ls',19,'shanghai'),(3,'ww',20,'guangzhou')"); } }
(2)运行结果
(3)查看数据
-- 1、创建Catalog
CREATE CATALOG flinkiceberg WITH (
> 'type'='iceberg',
> 'catalog-type'='hadoop',
> 'warehouse'='hdfs://leidi01:8020/flinkiceberg/',
> 'property-version'='1'
> );
-- 2、查询数据
use catalog flinkiceberg
use iceberg_db;
select * from flink_iceberg_tbl2;
(1)代码逻辑
(2)代码实现
public class SQLAPIReadIceberg { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env); env.enableCheckpointing(1000); //1.创建Catalog tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" + "'type'='iceberg'," + "'catalog-type'='hadoop'," + "'warehouse'='hdfs://leidi01:8020/flinkiceberg')"); //2.批量读取表数据 TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 "); tableResult.print(); } }
(1)代码逻辑
(2)代码实现
public class SQLStreamReadIceberg { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env); env.enableCheckpointing(1000); Configuration configuration = tblEnv.getConfig().getConfiguration(); // 支持SQL语法中的 OPTIONS 选项 configuration.setBoolean("table.dynamic-table-options.enabled", true); //1.创建Catalog tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" + "'type'='iceberg'," + "'catalog-type'='hadoop'," + "'warehouse'='hdfs://leidi01:8020/flinkiceberg')"); //2.从Iceberg表当前快照读取所有数据,并继续增量读取数据 // streaming指定为true支持实时读取数据,monitor_interval 监控数据的间隔,默认1s TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/"); tableResult.print(); } }
(3)测试验证
insert into flink_iceberg_tbl2 values (5,'s1',30,'guangzhou'),(6,'s2',31,'tianjin');
(1)代码逻辑
(2)代码实现
insert into flink_iceberg_tbl2 values (7,'s11',30,'beijing'),(8,'s22',31,'beijing');
public class SQLSnapshotReadIceberg { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env); env.enableCheckpointing(1000); Configuration configuration = tblEnv.getConfig().getConfiguration(); // 支持SQL语法中的 OPTIONS 选项 configuration.setBoolean("table.dynamic-table-options.enabled", true); //1.创建Catalog tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" + "'type'='iceberg'," + "'catalog-type'='hadoop'," + "'warehouse'='hdfs://leidi01:8020/flinkiceberg')"); //2.从Iceberg 指定的快照继续实时读取数据,快照ID从对应的元数据中获取 //start-snapshot-id :快照ID TableResult tableResult2 = tblEnv.executeSql("SELECT * FROM hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='8334669420406375204')*/"); tableResult2.print(); } }
(3)运行结果
HADOOP_HOME and hadoop.home.dir are unset.
winutils:由于hadoop主要基于linux编写,**winutil.exe主要用于模拟linux下的目录环境**。当Hadoop在windows下运行或调用远程Hadoop集群的时候,需要该辅助程序才能运行。winutils是Windows中的二进制文件,适用于不同版本的Hadoop系统并构建在Windows VM上,该VM用以在Windows系统中测试Hadoop相关的应用程序。
(1)下载hadoop
集群对应winutils
版本
https://github.com/steveloughran/winutils
(2)将环境变量%HADOOP_HOME%设置为指向包含WINUTILS.EXE的BIN目录上方的目录
com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
guava
包版本冲突①第一步:在pom
界面点击Dependency Analyzer
②第二步:查看Dependency Analyzer功能界面
Ⅰ、显示冲突的jar包
Ⅱ、以列表形式显示所有依赖
Ⅲ、以数的形式显示所有依赖
③第三步:逐个解决conflicts列表中的jar包冲突问题,以guava为例:
点击guava,找到右侧部分红色字体,即依赖冲突的地方,下图显示当前guava版本是24.0,但是有两个依赖的guava版本分别是27.0.0.1和16.0.1。
④将低版本依赖都排除掉
选中红色字体显示的内容->右键->Exclude,完成上述步骤结果如下:
⑤重新加载依赖配置
-------------------------------------------------------------------分割线-------------------------------------------------------------------------------
以上guava包冲突解决后依旧报错,将Hadoop版本从3.2.2降低到3.1.1不报错。
注意hive-3.1.2依赖的Hadoop版本是3.1.0 [3],一般不建议runtime的Hadoop版本高于hive依赖的版本。
Ⅰ、解决方法一是在hive-exec里对guava做迁移,这个需要自己手动给hive-exec重新打包。
Ⅱ、解决方法二是降低Hadoop版本,这里不一定要降低集群的Hadoop版本,而只是降低flink和hive这边用到的Hadoop版本,相对于用老的Hadoop客户端去访问新的Hadoop服务器,这个小版本的包容性一般来说是没有问题的。
<hadoop.version>3.2.2</hadoop.version>
<!-->将hadoop版本由3.2.2版本降低为3.1.1<-->
<hadoop.version>3.1.1</hadoop.version>
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Set system property 'org.apache.logging.log4j.simplelog.StatusLogger.level' to TRACE to show Log4j2 internal initialization logging.
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="WARN"> <Properties> <property name="log_level" value="info" /> <Property name="log_dir" value="log" /> <property name="log_pattern" value="[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%p] - [%t] %logger - %m%n" /> <property name="file_name" value="test" /> <property name="every_file_size" value="100 MB" /> </Properties> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="${log_pattern}" /> </Console> <RollingFile name="RollingFile" filename="${log_dir}/${file_name}.log" filepattern="${log_dir}/$${date:yyyy-MM}/${file_name}-%d{yyyy-MM-dd}-%i.log"> <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY" /> <PatternLayout pattern="${log_pattern}" /> <Policies> <SizeBasedTriggeringPolicy size="${every_file_size}" /> <TimeBasedTriggeringPolicy modulate="true" interval="1" /> </Policies> <DefaultRolloverStrategy max="20" /> </RollingFile> <RollingFile name="RollingFileErr" fileName="${log_dir}/${file_name}-warnerr.log" filePattern="${log_dir}/$${date:yyyy-MM}/${file_name}-%d{yyyy-MM-dd}-warnerr-%i.log"> <ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" /> <PatternLayout pattern="${log_pattern}" /> <Policies> <SizeBasedTriggeringPolicy size="${every_file_size}" /> <TimeBasedTriggeringPolicy modulate="true" interval="1" /> </Policies> </RollingFile> </Appenders> <Loggers> <Root level="${log_level}"> <AppenderRef ref="Console" /> <AppenderRef ref="RollingFile" /> <appender-ref ref="RollingFileErr" /> </Root> </Loggers> </Configuration>
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.calcite.sql.parser.SqlParser.config()Lorg/apache/calcite/sql/parser/SqlParser$Config;
报错原因:依赖报错
解决方案:将所有依赖切换到2.12,切换flink-table-api-java-bridge
到flink-table-api-scala-bridge_2.12
。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。