当前位置:   article > 正文

Flink实时数仓_DWD层数据准备_flink数仓分层 ods dwd

flink数仓分层 ods dwd

第1章 需求分析及实现思路

1.1 分层需求分析

        在之前介绍实时数仓概念时讨论过,建设实时数仓的目的,主要是增加数据计算的复用性。每次新增加统计需求时,不至于从原始数据进行计算,而是从半成品继续加工而成。
我们这里从 kafka 的 ods 层读取用户行为日志以及业务数据,并进行简单处理,写回到 kafka 作为 dwd 层。

 1.2 每层的职能 

分层数据描述生成计算工具存储媒介
ODS原始数据,日志和业务数据  日志服务器maxwellkafka
DWD根据数据对象为单位进行分流,比如订单、页面访问等等FLINKkafka
DIM维度数据FLINKHBase/Redis
DWM对于部分数据对象进行进一步加工,比如独立访问、跳出行为依旧是明细数据FLINKkafka
DWS根据某个维度主题将多个事实数据轻度聚合,形成主题宽表FLINKClickhouse
ADS把 Clickhouse 中的数据根据可视化需要进行筛选聚合Clickhouse可视化展示

3 . DWD 层数据准备实现思路

➢  功能 1:环境搭建
➢  功能 2:计算用户行为日志 DWD 层
➢  功能 3:计算业务数据 DWD 层

一、用户行为日志 DWD 层

        我们前面采集的日志数据已经保存到 Kafka 中,作为日志数据的 ODS 层,从 kafka 的ODS 层读取的日志数据分为 3 类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。利用侧输出流实现数据拆分,将拆分后的不同的日志写回 Kafka 不同主题中,作为日志 DWD 层。

  • 页面日志输出到主流,
  • 启动日志输出到启动侧输出流,
  • 曝光日志输出到曝光侧输出流

2 识别新老访客

保存每个 mid 的首次访问日期,每条进入该算子的访问记录,都会把 mid 对应的首次访问时间读取出来,跟当前日期进行比较,只有首次访问时间不为空,且首次访问时间早于当日的,则认为该访客是老访客,否则是新访客。同时如果是新访客且没有访问记录的话,会写入首次访问时间。

  1. /**
  2. * Desc: 准备用户行为日志的DWD层
  3. */
  4. public class BaseLogApp {
  5. private static final String TOPIC_START = "dwd_start_log";
  6. private static final String TOPIC_DISPLAY = "dwd_display_log";
  7. private static final String TOPIC_PAGE = "dwd_page_log";
  8. public static void main(String[] args) throws Exception {
  9. //TODO 1.准备环境
  10. //1.1 创建Flink流执行环境
  11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. //1.2设置并行度
  13. env.setParallelism(1);
  14. //1.3设置Checkpoint
  15. //每5000ms开始一次checkpoint,模式是EXACTLY_ONCE(默认)
  16. //env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
  17. //env.getCheckpointConfig().setCheckpointTimeout(60000);
  18. //env.setStateBackend(new FsStateBackend("hdfs://hadoop202:8020/gmall/checkpoint/baselogApp"));
  19. //System.setProperty("HADOOP_USER_NAME","atguigu");
  20. //TODO 2.从Kafka中读取数据
  21. String topic = "ods_base_log";
  22. String groupId = "base_log_app_group";
  23. //2.1 调用Kafka工具类,获取FlinkKafkaConsumer
  24. FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
  25. DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
  26. //TODO 3.对读取到的数据格式进行转换 String->json
  27. SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(
  28. new MapFunction<String, JSONObject>() {
  29. @Override
  30. public JSONObject map(String value) throws Exception {
  31. JSONObject jsonObject = JSON.parseObject(value);
  32. return jsonObject;
  33. }
  34. }
  35. );
  36. //jsonObjDS.print("json>>>>>>>>");
  37. /*
  38. TODO 4.识别新老访客 前端也会对新老状态进行记录,有可能会不准,咱们这里是再次做一个确认
  39. 保存mid某天方法情况(将首次访问日期作为状态保存起来),等后面该设备在有日志过来的时候,从状态中获取日期
  40. 和日志产生日志进行对比。如果状态不为空,并且状态日期和当前日期不相等,说明是老访客,如果is_new标记是1,那么对其状态进行修复
  41. */
  42. //4.1 根据mid对日志进行分组
  43. KeyedStream<JSONObject, String> midKeyedDS = jsonObjDS.keyBy(
  44. data -> data.getJSONObject("common").getString("mid")
  45. );
  46. //4.2 新老方法状态修复 状态分为算子状态和键控状态,我们这里要记录某一个设备的访问,使用键控状态比较合适
  47. SingleOutputStreamOperator<JSONObject> jsonDSWithFlag = midKeyedDS.map(
  48. new RichMapFunction<JSONObject, JSONObject>() {
  49. //定义该mid访问状态
  50. private ValueState<String> firstVisitDateState;
  51. //定义日期格式化对象
  52. private SimpleDateFormat sdf;
  53. @Override
  54. public void open(Configuration parameters) throws Exception {
  55. //对状态以及日期格式进行初始化
  56. firstVisitDateState = getRuntimeContext().getState(
  57. new ValueStateDescriptor<String>("newMidDateState", String.class)
  58. );
  59. sdf = new SimpleDateFormat("yyyyMMdd");
  60. }
  61. @Override
  62. public JSONObject map(JSONObject jsonObj) throws Exception {
  63. //获取当前日志标记状态
  64. String isNew = jsonObj.getJSONObject("common").getString("is_new");
  65. //获取当前日志访问时间戳
  66. Long ts = jsonObj.getLong("ts");
  67. if ("1".equals(isNew)) {
  68. //获取当前mid对象的状态
  69. String stateDate = firstVisitDateState.value();
  70. //对当前条日志的日期格式进行抓换
  71. String curDate = sdf.format(new Date(ts));
  72. //如果状态不为空,并且状态日期和当前日期不相等,说明是老访客
  73. if (stateDate != null && stateDate.length() != 0) {
  74. //判断是否为同一天数据
  75. if (!stateDate.equals(curDate)) {
  76. isNew = "0";
  77. jsonObj.getJSONObject("common").put("is_new", isNew);
  78. }
  79. } else {
  80. //如果还没记录设备的状态,将当前访问日志作为状态值
  81. firstVisitDateState.update(curDate);
  82. }
  83. }
  84. return jsonObj;
  85. }
  86. }
  87. );
  88. //jsonDSWithFlag.print(">>>>>>>>>>>");
  89. //TODO 5 .分流 根据日志数据内容,将日志数据分为3类, 页面日志、启动日志和 曝光日志。
  90. // 页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光日志侧输出流
  91. // 侧输出流:1)接收迟到数据 2)分流
  92. //定义启动侧输出流标签
  93. OutputTag<String> startTag = new OutputTag<String>("start"){};
  94. //定义曝光侧输出流标签
  95. OutputTag<String> displayTag = new OutputTag<String>("display"){};
  96. SingleOutputStreamOperator<String> pageDS = jsonDSWithFlag.process(
  97. new ProcessFunction<JSONObject, String>() {
  98. @Override
  99. public void processElement(JSONObject jsonObj, Context ctx, Collector<String> out) throws Exception {
  100. //获取启动日志标记
  101. JSONObject startJsonObj = jsonObj.getJSONObject("start");
  102. //将json格式转换为字符串,方便向侧输出流输出以及向kafka中写入
  103. String dataStr = jsonObj.toString();
  104. //判断是否为启动日志
  105. if (startJsonObj != null && startJsonObj.size() > 0) {
  106. //如果是启动日志,输出到启动侧输出流
  107. ctx.output(startTag, dataStr);
  108. } else {
  109. //如果不是启动日志,获取曝光日志标记(曝光日志中也携带了页面)
  110. JSONArray displays = jsonObj.getJSONArray("displays");
  111. //判断是否为曝光日志
  112. if (displays != null && displays.size() > 0) {
  113. //如果是曝光日志,遍历输出到侧输出流
  114. for (int i = 0; i < displays.size(); i++) {
  115. //获取每一条曝光事件
  116. JSONObject displaysJsonObj = displays.getJSONObject(i);
  117. //获取页面id
  118. String pageId = jsonObj.getJSONObject("page").getString("page_id");
  119. //给每一条曝光事件加pageId
  120. displaysJsonObj.put("page_id", pageId);
  121. ctx.output(displayTag, displaysJsonObj.toString());
  122. }
  123. } else { //如果不是启动日志 说明是页面日志 ,输出到主流
  124. out.collect(dataStr);
  125. }
  126. }
  127. }
  128. }
  129. );
  130. //获取侧输出流
  131. DataStream<String> startDS = pageDS.getSideOutput(startTag);
  132. DataStream<String> displayDS = pageDS.getSideOutput(displayTag);
  133. //打印输出
  134. pageDS.print("page>>>>");
  135. startDS.print("start>>>>");
  136. displayDS.print("display>>>>");
  137. //TODO 6.将不同流的数据写回到kafka的不同topic中
  138. FlinkKafkaProducer<String> startSink = MyKafkaUtil.getKafkaSink(TOPIC_START);
  139. startDS.addSink(startSink);
  140. FlinkKafkaProducer<String> displaySink = MyKafkaUtil.getKafkaSink(TOPIC_DISPLAY);
  141. displayDS.addSink(displaySink);
  142. FlinkKafkaProducer<String> pageSink = MyKafkaUtil.getKafkaSink(TOPIC_PAGE);
  143. pageDS.addSink(pageSink);
  144. env.execute();
  145. }
  146. }

二、业务数据 DWD 层

1、实现动态分流功能 

        业务数据的变化,我们可以通过 Maxwell 采集到,但是 MaxWell 是把全部数据统一写入一个 Topic 中, 这些数据包括业务数据,也包含维度数据,这样显然不利于日后的数据处理,所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表,有的表既是事实表在某种情况下也是维度表。

        所以这个功能是从 Kafka 的业务数据 ODS 层读取数据,在实时计算中,经过处理后, 一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase/ Redis / MySQL等。一般把事实数据写入流中,将事实数据写回 Kafka 作为业务数据的 DWD 层,进行进一步处理,最终形成宽表。

  • 业务数据保存到 Kafka 的主题中
  • 维度数据保存到 Hbase 的表中

        但是作为 Flink 实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又应该采集哪些字段呢?
        我们可以将上面的内容放到某一个地方,集中配置。这样的配置不适合写在配置文件中,因为业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这种可以有两个方案实现:

  • ➢ 一种是用 Zookeeper 存储,通过 Watch 感知数据变化。
  • ➢ 另一种是用 mysql 数据库存储,周期性的同步。

        这里选择第二种方案,主要是 mysql 对于配置数据初始化和维护管理,用 sql 都比较方便,虽然周期性操作时效性差一点,但是配置变化并不频繁。

  1. /**
  2. * Desc: 准备业务数据的DWD层
  3. */
  4. public class BaseDBApp {
  5. public static void main(String[] args) throws Exception {
  6. //TODO 1.准备环境
  7. //1.1 创建流处理执行环境
  8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9. //1.2 设置并新度
  10. env.setParallelism(1);
  11. //1.3 开启Checkpoint,并设置相关的参数
  12. //env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
  13. //env.getCheckpointConfig().setCheckpointTimeout(60000);
  14. //env.setStateBackend(new FsStateBackend("hdfs://hadoop202:8020/gmall/checkpoint/basedbapp"));
  15. //重启策略
  16. // 如果说没有开启重启Checkpoint,那么重启策略就是noRestart
  17. // 如果说没有开Checkpoint,那么重启策略会尝试自动帮你进行重启 重启Integer.MaxValue
  18. //env.setRestartStrategy(RestartStrategies.noRestart());
  19. //TODO 2.从Kafka的ODS层读取数据
  20. String topic = "ods_base_db_m";
  21. String groupId = "base_db_app_group";
  22. //2.1 通过工具类获取Kafka的消费者
  23. FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
  24. DataStreamSource<String> jsonStrDS = env.addSource(kafkaSource);
  25. //TODO 3.对DS中数据进行结构的转换 String-->Json
  26. //jsonStrDS.map(JSON::parseObject);
  27. SingleOutputStreamOperator<JSONObject> jsonObjDS = jsonStrDS.map(jsonStr -> JSON.parseObject(jsonStr));
  28. //jsonStrDS.print("json>>>>");
  29. //TODO 4.对数据进行ETL 如果table为空 或者 data为空,或者长度<3 ,将这样的数据过滤掉
  30. SingleOutputStreamOperator<JSONObject> filteredDS = jsonObjDS.filter(
  31. jsonObj -> {
  32. boolean flag = jsonObj.getString("table") != null
  33. && jsonObj.getJSONObject("data") != null
  34. && jsonObj.getString("data").length() > 3;
  35. return flag;
  36. }
  37. );
  38. //filteredDS.print("json>>>>>");
  39. //TODO 5. 动态分流 事实表放到主流,写回到kafka的DWD层;如果维度表,通过侧输出流,写入到Hbase
  40. //5.1定义输出到Hbase的侧输出流标签
  41. OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>(TableProcess.SINK_TYPE_HBASE){};
  42. //5.2 主流 写回到Kafka的数据
  43. SingleOutputStreamOperator<JSONObject> kafkaDS = filteredDS.process(
  44. new TableProcessFunction(hbaseTag)
  45. );
  46. //5.3获取侧输出流 写到Hbase的数据
  47. DataStream<JSONObject> hbaseDS = kafkaDS.getSideOutput(hbaseTag);
  48. kafkaDS.print("事实>>>>");
  49. hbaseDS.print("维度>>>>");
  50. //TODO 6.将维度数据保存到Phoenix对应的维度表中
  51. hbaseDS.addSink(new DimSink());
  52. //TODO 7.将事实数据写回到kafka的dwd层
  53. FlinkKafkaProducer<JSONObject> kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(
  54. new KafkaSerializationSchema<JSONObject>() {
  55. @Override
  56. public void open(SerializationSchema.InitializationContext context) throws Exception {
  57. System.out.println("kafka序列化");
  58. }
  59. @Override
  60. public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {
  61. String sinkTopic = jsonObj.getString("sink_table");
  62. JSONObject dataJsonObj = jsonObj.getJSONObject("data");
  63. return new ProducerRecord<>(sinkTopic,dataJsonObj.toString().getBytes());
  64. }
  65. }
  66. );
  67. kafkaDS.addSink(kafkaSink);
  68. env.execute();
  69. }
  70. }

2) 程序流程分析

TableProcessFunction 是一个自定义算子,主要包括三条时间线任务

➢  图中紫线,这个时间线与数据流入无关,只要任务启动就会执行。主要的任务方法是open()这个方法在任务启动时就会执行。他的主要工作就是初始化一些连接,开启周期调度。

➢  图中绿线,这个时间线也与数据流入无关,只要周期调度启动,会自动周期性执行。主要的任务是同步配置表(tableProcessMap)。通过在 open()方法中加入 timer定时器实现。同时还有个附带任务就是如果发现不存在数据表,要根据配置自动创建数据库表。

➢  图中黑线,这个时间线就是随着数据的流入持续发生,这部分的任务就是根据同步到内存的 tableProcessMap,来为流入的数据进行标识,同时清理掉没用的字段。

2、对业务数据进行分流处理的自定义处理函数

  1. /**
  2. * Desc: 用于对业务数据进行分流处理的自定义处理函数
  3. */
  4. public class TableProcessFunction extends ProcessFunction<JSONObject, JSONObject> {
  5. //因为要将维度数据通过侧输出流输出,所以我们在这里定义一个侧输出流标记
  6. private OutputTag<JSONObject> outputTag;
  7. //用于在内存中存放配置表信息的Map <表名:操作,tableProcess>
  8. private Map<String, TableProcess> tableProcessMap = new HashMap<>();
  9. //用于在内存中存放已经处理过的表(在phoenix中已经建过的表)
  10. private Set<String> existsTables = new HashSet<>();
  11. //声明Phoenix的连接对象
  12. Connection conn = null;
  13. ………………
  14. ………………
  15. //根据sinkType,将数据输出到不同的流
  16. if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_HBASE)) {
  17. //如果sinkType = hbase ,说明是维度数据,通过侧输出流输出
  18. ctx.output(outputTag, jsonObj);
  19. } else if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_KAFKA)) {
  20. //如果sinkType = kafka ,说明是事实数据,通过主流输出
  21. out.collect(jsonObj);
  22. }

3、自定义函数 TableProcessFunction-open

生命周期方法,初始化连接,初始化配置表信息并开启定时任务,用于不断读取配置表信息 

  1. //在函数被调用的时候执行的方法,执行一次
  2. @Override
  3. public void open(Configuration parameters) throws Exception {
  4. //初始化Phoenix连接
  5. Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
  6. conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
  7. //初始化配置表信息
  8. refreshMeta(); //========1.从MySQL数据库配置表中查询配置信息
  9. //开启一个定时任务
  10. // 因为配置表的数据可能会发生变化,每隔一段时间就从配置表中查询一次数据,更新到map,并检查建表
  11. //从现在起过delay毫秒后,每隔period执行一次
  12. Timer timer = new Timer();
  13. timer.schedule(new TimerTask() {
  14. @Override
  15. public void run() {
  16. refreshMeta();
  17. }
  18. }, 5000, 5000);
  19. }

 4、分流 Sink 之保存维度到 HBase(Phoenix) 、通过 Phoenix 向 Hbase 表中写数据

注意:为了开启 hbase 的 namespace 和 phoenix 的 schema 的映射,在程序中需要加这
个配置文件,另外在 linux 服务上,也需要在 hbase 以及 phoenix 的 hbase-site.xml 配置
文件中,加上以上两个配置,并使用 xsync 进行同步

  1. /**
  2. * Desc: 通过 Phoenix 向 Hbase 表中写数据
  3. */
  4. public class DimSink extends RichSinkFunction<JSONObject> {

四、总结

        DWD 的实时计算核心就是数据分流,其次是状态识别。在开发过程中我们实践了几个
灵活度较强算子,比如 RichMapFunction, ProcessFunction, RichSinkFunction。 那这
几个我们什么时候会用到呢?如何选择?

 从对比表中能明显看出,Rich 系列能功能强大,ProcessFunction 功能更强大,但是相对的越全面的算子使用起来也更加繁琐。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/943105
推荐阅读
相关标签
  

闽ICP备14008679号