当前位置:   article > 正文

Flink向Doris表写入数据(Sink)_flink sink doris

flink sink doris

业务场景

最近在工作中遇到了Flink处理kafka中的数据,最后写入Doris存储的场景。

Apache Doris 是一款基于 MPP 架构的高性能、实时的分析型数据库,以高效、简单、统一的特点被人们所熟知,仅需亚秒级响应时间即可返回海量数据下的查询结果,不仅可以支持高并发的点查询场景,也能支持高吞吐的复杂分析场景。基于此,Apache Doris 能够较好的满足报表分析、即席查询、统一数仓构建、数据湖联邦查询加速等使用场景,用户可以在此之上构建大屏看板、用户行为分析、AB 实验平台、日志检索分析、用户画像分析、订单分析等应用。

以上介绍来自于Doris的官网,关于Doris的内容不做过多的介绍,可以参考官档 快速体验 Apache Doris - Apache Doris

依赖

本人是通过datastream的方式向Doris写入数据,并非通过FlinkSQL的方式,不过不管使用哪种方式,都需要以下的依赖

  1. <dependency>
  2. <groupId>org.apache.doris</groupId>
  3. <artifactId>flink-doris-connector-1.16</artifactId>
  4. <version>1.6.0</version>
  5. </dependency>

代码实现

在doris的官档中,数据通过datastream写入doris,支持两种不同的序列化方法,一种是String 数据流 (SimpleStringSerializer),另一种是RowData 数据流 (RowDataSerializer)。本人使用的是后者,因为后者可以很好的兼容json类型的数据。

现将主类的业务代码提供如下,重点处通过注释做了说明

  1. package cn.gwm.dp.main;
  2. import cn.gwm.dp.consts.Constant;
  3. import cn.gwm.dp.entity.TaskDetail;
  4. import cn.gwm.dp.entity.TaskSum;
  5. import cn.gwm.dp.entity.YRcanbus;
  6. import cn.gwm.dp.functions.KafkaFunction;
  7. import cn.gwm.dp.functions.KeyedFunction;
  8. import cn.gwm.dp.functions.RowDataFunction;
  9. import cn.gwm.dp.functions.TosFunction;
  10. import org.apache.doris.flink.cfg.DorisExecutionOptions;
  11. import org.apache.doris.flink.cfg.DorisOptions;
  12. import org.apache.doris.flink.cfg.DorisReadOptions;
  13. import org.apache.doris.flink.sink.DorisSink;
  14. import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
  15. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  16. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
  17. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  18. import org.apache.flink.api.common.time.Time;
  19. import org.apache.flink.api.java.tuple.Tuple2;
  20. import org.apache.flink.api.java.utils.ParameterTool;
  21. import org.apache.flink.connector.kafka.source.KafkaSource;
  22. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
  23. import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
  24. import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
  25. import org.apache.flink.streaming.api.CheckpointingMode;
  26. import org.apache.flink.streaming.api.datastream.*;
  27. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  28. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  29. import org.apache.flink.streaming.api.functions.ProcessFunction;
  30. import org.apache.flink.table.api.DataTypes;
  31. import org.apache.flink.table.data.RowData;
  32. import org.apache.flink.table.types.DataType;
  33. import org.apache.flink.util.Collector;
  34. import org.apache.flink.util.OutputTag;
  35. import org.apache.kafka.clients.CommonClientConfigs;
  36. import org.apache.kafka.common.config.SaslConfigs;
  37. import java.util.Properties;
  38. /**
  39. * @Author: Spring
  40. * @Description:
  41. * @Date: Created on 10:27 2024/5/13
  42. */
  43. public class NewbieTaskApp {
  44. public static void main(String[] args) throws Exception {
  45. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  46. // 业务处理代码省略。。。
  47. // 获得Doris Sink
  48. DorisSink<RowData> taskDetailSink = getTaskDetailSink(parameterTool);
  49. // 数据分流写入到doris
  50. SideOutputDataStream<RowData> taskDetailData = eachRowData.getSideOutput(taskDetailOutputTag);
  51. taskDetailData.sinkTo(taskDetailSink).name("taskDetailSink");
  52. env.execute("newbie_task_online");
  53. }
  54. private static DorisSink<RowData> getTaskDetailSink(ParameterTool parameterTool) {
  55. // doris的连接地址
  56. String fenodes = parameterTool.get(Constant.FENODES);
  57. // doris的表
  58. String dorisDetailTable = parameterTool.get(Constant.DORIS_TASK_DETAIL_TABLE);
  59. // doris连接用户
  60. String dorisUser = parameterTool.get(Constant.DORIS_USER);
  61. // doris连接密码
  62. String dorisPwd = parameterTool.get(Constant.DORIS_PWD);
  63. DorisSink.Builder<RowData> builder = DorisSink.builder();
  64. DorisOptions.Builder optionsBuilder = DorisOptions.builder();
  65. optionsBuilder.setFenodes(fenodes)
  66. .setTableIdentifier(dorisDetailTable)
  67. .setUsername(dorisUser)
  68. .setPassword(dorisPwd);
  69. Properties properties = new Properties();
  70. // 指定处理json类型数据
  71. properties.setProperty("format", "json");
  72. properties.setProperty("read_json_by_line", "true");
  73. DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
  74. // 这里设定的prefix,每个Flink应用都不能相同
  75. executionBuilder.setLabelPrefix("label-task-detail-test2")
  76. .setDeletable(false)
  77. .setStreamLoadProp(properties);
  78. // 指定要落入的doris表的字段
  79. String[] fields = {"day", "uin", "platform", "task_type", "trip_id", "vin", "task_time",
  80. "task_distance", "tja_ica_mod_disp", "noh_sts", "road_class", "create_time"};
  81. // 指定doris表字段类型所对应的Flink的类型
  82. DataType[] types = {DataTypes.DATE(),
  83. DataTypes.CHAR(40),
  84. DataTypes.TINYINT(),
  85. DataTypes.TINYINT(),
  86. DataTypes.INT(),
  87. DataTypes.VARCHAR(64),
  88. DataTypes.INT(),
  89. DataTypes.FLOAT(),
  90. DataTypes.TINYINT(),
  91. DataTypes.TINYINT(),
  92. DataTypes.TINYINT(),
  93. DataTypes.TIMESTAMP()};
  94. // 构建sink
  95. builder.setDorisReadOptions(DorisReadOptions.builder().build())
  96. .setDorisExecutionOptions(executionBuilder.build())
  97. .setSerializer(RowDataSerializer.builder()
  98. .setFieldNames(fields)
  99. .setType("json")
  100. .setFieldType(types).build())
  101. .setDorisOptions(optionsBuilder.build());
  102. return builder.build();
  103. }
  104. }

上述代码中,数据的业务处理逻辑我省略了,各位换成自己的。

在写入doris之前,我是通过分流的方式拿到要写入的数据,且数据的类型是RowData。各位的数据可根据自己的业务逻辑准备,如果你要写入doris的数据和我一样,也是json类型的,那么一定要使用RowData类型。

关于RowData类型的构建,可以简单的参考下面的代码

  1. // TaskDetail 是一个自定义的实体类
  2. TaskDetail taskDetail = tp.f0;
  3. GenericRowData taskDetailRowData = new GenericRowData(10);
  4. taskDetailRowData.setField(0, Integer.parseInt(String.valueOf(taskDetail.getDay().toEpochDay())));
  5. taskDetailRowData.setField(1, StringData.fromString(taskDetail.getUin()));
  6. taskDetailRowData.setField(2, (byte) taskDetail.getPlatform());
  7. taskDetailRowData.setField(3, (byte) taskDetail.getTaskType());
  8. taskDetailRowData.setField(4, taskDetail.getTripId());
  9. taskDetailRowData.setField(5, StringData.fromString(taskDetail.getVin()));
  10. taskDetailRowData.setField(6, taskDetail.getTaskTime());
  11. taskDetailRowData.setField(7, taskDetail.getTaskDistance());
  12. taskDetailRowData.setField(8, (byte) taskDetail.getTjaIcaModDisp());
  13. taskDetailRowData.setField(9, (byte) taskDetail.getNohSts());

在获得doris sink的getTaskDetailSink方法中,需要注意以下几个地方:

  1. fields 和 types 的指定,一定要和具体的doris表关联起来
  2. doris中的数据类型要和Flink中的数据类型关联正确
  3. setLabelPrefix 方法指定的标签,每个Flink应用都不一样。如果你的程序启动后,没有挂,且已经将部分数据写入到了doris中,那么在程序关停的时候,一定要设置savepoint路径,不能直接暴力的cancel,并且下次启动时从savepoint启动,否则程序会报错。如果你cancel了,那么下次启动程序前,必须要修改这个标签的值为不同的,才能启动成功。

关于doris中的类型和Flink类型之间的对应关系,可以参考官档Flink Doris Connector - Apache Doris

Doris官方提供的doris sink连接器,默认是已经开启了两阶段提交,不需要我们额外的过多设置。

以上就是Flink将数据最终写入Doris的大概主体代码,目前本人程序运行正常,各位可自行参考修改,有任何问题也欢迎评论区留言讨论。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/765693
推荐阅读
相关标签
  

闽ICP备14008679号