当前位置:   article > 正文

Flink写Doris又有哪些坑呢_flink doris

flink doris

上一篇写了Spark如何通过structured streaming消费kafka的数据实时写Doris表,表面上看挺简单的一个逻辑,但是却经历了一波三折,好在,最后通过分析源码搞定了。

那么当我后续把spark换成flink之后,会不会要顺利一点呢?

事实证明并没有。

无论是官方文档给的样例,还是GitHub提供的demo,都没有办法直接解决当前我的问题,也还是只能不断试错,官网和源码满地找的方式,踉踉跄跄地把问题给解决了。

那么这篇文章,咱就来看看把spark换成flink之后,还是基于相同的业务场景,又会遇到哪些意想不到的坑呢?

0. 先看兼容性

Flink作为当下炙手可热的计算引擎之一,Doris自然会提供对其的兼容:

同样,官方文档也给出了对Flink不同版本的兼容情况:

而我当前软件工程里的Flink版本为1.15.2、Java版本也为1.8、对应的Doris版本为1.2.3,于是正好对应Doris的1.2.0版本的connector。

1.环境准备

官方文档描述了一堆Flink写Doris前的准备工作,但是读完之后,在我看来,大部分其实是没有必要的(原因跟上一篇通过spark写Doris一样)。

因为我是通过idea开发的代码,对于开发环境来说,理论上我只需要额外引入对应的flink maven依赖,doris connector maven依赖就够了。

因为flink的环境我已经具备现成的了,那么暂时就只需要引入connector依赖就可以:

而至于集群运行环境,我有yarn,理论上也就够了,其他一切花里胡哨的配置全都不需要(当然,你最好要清楚为啥)。

2. SQL API第一次试错

从官方文档描述来看,Doris即支持Flink的Table API(SQL API),也支持其普通的DataStream API,这个就有点类似spark的Dataset(DataFrame)和RDD。

由于SQL API的语法更加简洁和便利,于是,我决定先用这个API来试试,代码如下:

packagecom.anryg.doris importjava.time.Duration importorg.apache.flink.streaming.api.CheckpointingMode importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment importorg.apache.flink.table.api.bridge.scala.StreamTableEnvironment /** *@DESC:通过SQLAPI读取kafka数据,写入到Doris *@Auther:Anryg *@Date:2023/8/311:29 */ objectFlinkSQLFromKafka2Doris{ defmain(args:Array[String]):Unit={ valenv=StreamExecutionEnvironment.getExecutionEnvironment env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/checkpoint/FlinkSQLFromKafka2Doris") env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1L)) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //env.setRuntimeMode(RuntimeExecutionMode.BATCH) valtableEnv=StreamTableEnvironment.create(env) /**第一步:读取kafka数据源*/ tableEnv.executeSql( """ Createtabledns_logs_from_kafka( |client_ipSTRING, |domainSTRING, |`time`STRING, |target_ipSTRING, |rcodeSTRING, |query_typeSTRING, |authority_recordSTRING, |add_msgSTRING, |dns_ipSTRING |) |with( |'connector'='kafka', |'topic'='test', |'properties.bootstrap.servers'='192.168.211.107:6667', |'properties.group.id'='FlinkSQLFromKafka2Doris', |'scan.startup.mode'='latest-offset', |'value.format'='csv',//确定数据源为文本格式 |'value.csv.field-delimiter'='|'//确定文本数据源的分隔符 |) """.stripMargin) //tableEnv.executeSql("select*fromdns_logs_from_kafka").print() /**第二步:创建Doris映射表*/ tableEnv.executeSql( """ |CREATETABLEdns_logs_from_flink02( |`client_ip`STRING |) |WITH( |'connector'='doris', |'fenodes'='192.168.221.173:8030', |'table.identifier'='example_db.dns_logs_from_flink02', |'username'='root', |'password'='', |'sink.properties.format'='json', |'sink.properties.read_json_by_line'='true', |'sink.label-prefix'='doris_label' |) """.stripMargin) /**第三步:数据写入到Doris表中*/ tableEnv.executeSql( """ |INSERTINTOdns_logs_from_flink02 |select |client_ip |from |dns_logs_from_kafka """.stripMargin) } }

首先,我这肯定是严格按照官方文档的要求写的,而且还变着花样,改了多处设置,甚至为了尽可能快速解决问题,我都委屈到把要写入到Doris表的数据,减少到了一个字段。

先后尝试了至少10次以上,日志debug的模式也打开了(需要额外配置log4j2.xml文件),甚至在读取完kafka之后,我还专门把读取结果打印出来,确保了读取kafka这一步没有问题。

但是,即便这样,我还是没法定位出问题,这个奇葩的原因在于,即便我打开了日志的debug设置,程序依然没有任何报错提示,但数据就是写不进去

再次查看官方文档,我看它提到了需要在FE上做这么一个设置,心想,莫非是因为这一步没做吗?

于是赶紧把我的3台FE都增加了这个配置,然后重启集群。

但,然并卵,依然写不进数据,也依然没有任何报错提示。

那我就没有办法了,现在就是想找出问题,都不知道从哪找起,就是这么气人(当然,也可能是因为我水平不行)。

既然这样,那咱就换一棵歪脖子树试试(不想折腾太久了)。

3.DataStream第二次试错

除了SQL API,官网还给出了DataStream API的样例,既然这样,那就试一下吧。

我根据这个官网样例,写出来这样的代码:

packagecom.anryg.doris importjava.util.Properties importcom.alibaba.fastjson.JSONObject importorg.apache.flink.api.common.RuntimeExecutionMode importorg.apache.flink.api.common.serialization.SimpleStringSchema importorg.apache.flink.connector.kafka.source.KafkaSource importorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer importorg.apache.flink.streaming.api.CheckpointingMode importorg.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment importorg.apache.doris.flink.cfg.DorisOptions importorg.apache.doris.flink.sink.DorisSink importorg.apache.doris.flink.cfg.DorisExecutionOptions importorg.apache.doris.flink.cfg.DorisReadOptions importorg.apache.doris.flink.sink.writer.{DorisRecordSerializer,SimpleStringSerializer} importorg.apache.flink.api.common.eventtime.WatermarkStrategy /** *@DESC:通过DataStreamAPI读取kafka数据,写入到Doris *@Auther:Anryg *@Date:2023/8/317:05 */ objectFlinkDSFromKafka2Doris{ defmain(args:Array[String]):Unit={ //获取流任务的环境变量 valenv=StreamExecutionEnvironment.getExecutionEnvironment .enableCheckpointing(10000,CheckpointingMode.EXACTLY_ONCE)//打开checkpoint功能 env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkDSFromKafka2Doris")//设置checkpoint的hdfs目录 env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)//设置checkpoint记录的保留策略 valkafkaSource=KafkaSource.builder()//获取kafka数据源 .setBootstrapServers("192.168.211.107:6667") .setTopics("test") .setGroupId("FlinkDSFromKafka2Doris") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(newSimpleStringSchema()) .build() importorg.apache.flink.streaming.api.scala._//引入隐式转换函数,需要将从kafka读出来的数据做隐式转换 /**配置Doris相关参数*/ valdorisBuilder=DorisOptions.builder dorisBuilder.setFenodes("192.168.221.173:8030") .setTableIdentifier("example_db.dns_logs_from_flink01") .setUsername("root") .setPassword("") /**确定数据写入到Doris的方式,即streamload方式*/ valexecutionBuilder=DorisExecutionOptions.builder executionBuilder.setLabelPrefix("flink-doris-label03")//streamloadlabelprefix /**确定Doris的Sink数据方式*/ valbuilder=DorisSink.builder[String]//注意这个数据类型String需要加上 builder.setDorisReadOptions(DorisReadOptions.builder.build)//确定数据读取策略 .setDorisExecutionOptions(executionBuilder.build)//确定数据执行策略 .setSerializer(newSimpleStringSerializer())//确定数据数据序列化(写入)类型 .setDorisOptions(dorisBuilder.build)//添加Doris配置 /**读取数据源生成DataStream对象*/ valkafkaDS=env.fromSource[String](kafkaSource,WatermarkStrategy.noWatermarks(),"kafka-data") /**对读取的数据进行处理*/ valtargetDS=kafkaDS.map(line=>{ valarray=line.split("\\|") array }).filter(_.length==9) .map(array=>{ valjson=newJSONObject()/**将数据封装为json对象*/ json.put("client_ip",array(0)) json.put("domain",array(1)) json.put("time",array(2)) json.put("target_ip",array(3)) json.put("rcode",array(4)) json.put("query_type",array(5)) json.put("authority_record",array(6)) json.put("add_msg",array(7)) json.put("dns_ip",array(8)) json.toJSONString/**转换为jsonstring*/ }) targetDS.sinkTo(builder.build()) env.execute("FlinkDSFromKafka2Doris") } }

而且吸取了上次用spark写Doris的惨痛经历,我把数据格式都转成json string了。

按理说,应该没什么毛病才对,可是一运行呢,又报错了:

不过好消息是,它终于给我错误提示了,对比上面用SQL api时,没有任何报错的一脸懵逼要强多了。

通过给的这个错误连接,我顺利打开了其中的内容,是这样的:

告诉我说,现在这个数据处理格式(json string)还是不符合要求,这下又有点懵逼了。

心想,这个通过计算引擎写Doris的数据格式,还真有点让人捉摸不透,上一篇spark写Doris时,是必须要把数据转为json string格式的,但到了Flink这里,又不行了

那咋办呢?又只能看源码了。

4. 查看源码,定位问题

既然要决定看源码,从哪开始看呢?

那就是找到它的数据写入逻辑,或者写入前配置,因为Flink写Doris的方式是Steam Load,那么就找到这个对应的类,而这个相关的类在这里:

怎么说呢,又是一个几乎0注释的源码。

通读这个类之后呢,发现了一个关键函数:

这里面清清楚楚的定义了数据的格式为json string类型,而且从函数命名来看,说它是一个默认的设置方式(函数名为defaults嘛)。

但是,奇葩的是,官网给的这个获取该执行对象的代码,却并没有读到这些所谓的默认配置

看到这里,我似乎一下子就找到了原因。

5.解决问题

原因既然找到了,是因为源码那个数据格式的设置没有生效。

那么咱就只能手动把这个设置给加上,怎么加呢,这样:

加完之后,整个代码逻辑就是这样的:

packagecom.anryg.doris importjava.util.Properties importcom.alibaba.fastjson.JSONObject importorg.apache.flink.api.common.serialization.SimpleStringSchema importorg.apache.flink.connector.kafka.source.KafkaSource importorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer importorg.apache.flink.streaming.api.CheckpointingMode importorg.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment importorg.apache.doris.flink.cfg.DorisOptions importorg.apache.doris.flink.sink.DorisSink importorg.apache.doris.flink.cfg.DorisExecutionOptions importorg.apache.doris.flink.cfg.DorisReadOptions importorg.apache.doris.flink.sink.writer.{DorisRecordSerializer,SimpleStringSerializer} importorg.apache.flink.api.common.eventtime.WatermarkStrategy /** *@DESC:通过DataStreamAPI读取kafka数据,写入到Doris *@Auther:Anryg *@Date:2023/8/317:05 */ objectFlinkDSFromKafka2Doris{ defmain(args:Array[String]):Unit={ //获取流任务的环境变量 valenv=StreamExecutionEnvironment.getExecutionEnvironment .enableCheckpointing(10000,CheckpointingMode.EXACTLY_ONCE)//打开checkpoint功能 env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkDSFromKafka2Doris")//设置checkpoint的hdfs目录 env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)//设置checkpoint记录的保留策略 valkafkaSource=KafkaSource.builder()//获取kafka数据源 .setBootstrapServers("192.168.211.107:6667") .setTopics("test") .setGroupId("FlinkDSFromKafka2Doris") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(newSimpleStringSchema()) .build() importorg.apache.flink.streaming.api.scala._//引入隐式转换函数,需要将从kafka读出来的数据做隐式转换 /**配置Doris相关参数*/ valdorisBuilder=DorisOptions.builder dorisBuilder.setFenodes("192.168.221.173:8030") .setTableIdentifier("example_db.dns_logs_from_flink01") .setUsername("root") .setPassword("") /**确定数据写入到Doris的方式,即streamload方式*/ valexecutionBuilder=DorisExecutionOptions.builder executionBuilder.setLabelPrefix("flink-doris-label03")//streamloadlabelprefix /**添加额外的数据格式设置,否则写不进去*/ valproperties=newProperties properties.setProperty("format","json") properties.setProperty("read_json_by_line","true") executionBuilder.setStreamLoadProp(properties) /**确定Doris的Sink数据方式*/ valbuilder=DorisSink.builder[String]//注意这个数据类型String需要加上 builder.setDorisReadOptions(DorisReadOptions.builder.build)//确定数据读取策略 .setDorisExecutionOptions(executionBuilder.build)//确定数据执行策略 .setSerializer(newSimpleStringSerializer())//确定数据数据序列化(写入)类型 .setDorisOptions(dorisBuilder.build)//添加Doris配置 /**读取数据源生成DataStream对象*/ valkafkaDS=env.fromSource[String](kafkaSource,WatermarkStrategy.noWatermarks(),"kafka-data") /**对读取的数据进行处理*/ valtargetDS=kafkaDS.map(line=>{ valarray=line.split("\\|") array }).filter(_.length==9) .map(array=>{ valjson=newJSONObject()/**将数据封装为json对象*/ json.put("client_ip",array(0)) json.put("domain",array(1)) json.put("time",array(2)) json.put("target_ip",array(3)) json.put("rcode",array(4)) json.put("query_type",array(5)) json.put("authority_record",array(6)) json.put("add_msg",array(7)) json.put("dns_ip",array(8)) json.toJSONString/**转换为jsonstring*/ }) targetDS.sinkTo(builder.build()) env.execute("FlinkDSFromKafka2Doris") } }

官网的例子给的是batch模式,我这里是流模式

再执行,之前一直显示数据量为0的表记录,一下子就蹭蹭往上涨了。

最后

跟上次用spark写Doris一样,这次用Flink写Doris,也一样不顺利,磕磕碰碰一路,好在汲取了上次的教训,最终解决问题的效率提高了不少。

这里我想表达一点的就是,官方文档的详尽、严谨程度,源码注释的详细程度,在很大程度上决定了使用者对一项新技术的学习成本

其次就是,只要对出现的问题,框架能够抛出足够详细(一般详细也成)的错误提示,使用者通过对逻辑关系的梳理,也能找到问题的原因并解决。

但是,就怕那种虽然有问题,但是偏偏还不告诉你问题原因的(不报错,不提示),这就让人很惆怅。

最后,希望这篇文章对你有用。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/504478
推荐阅读
相关标签
  

闽ICP备14008679号