当前位置:   article > 正文

Flink 实时数仓(十)【DWS 层搭建(四)交易域汇总表创建】

Flink 实时数仓(十)【DWS 层搭建(四)交易域汇总表创建】

前言

        今天完成 DWS 层交易域剩下的两个指标,估计一早上就完了,这两个需求用到的知识点和昨天的需求差不多;

1、交易域省份粒度下单各窗口汇总表

1.1、思路分析

        这个需求是比较简单的,province_id 字段是订单表中的字段,在 DWD 层的下单事务事实表中我们已经将该字段保留下来了,所以我们只需要读取 DWD 层的下单事务事实表即可;考虑到下单事务事实表来源于订单与处理表,而这张表在生成时需要和 活动、优惠券进行 left join,所以依然是迟到数据造成的数据重复问题,不过依然不影响,因为那俩表的字段我们用不上,所以我们只需要过滤出第一条数据即可完成去重;

  • 消费 dwd_trade_order_detail
  • 转为 JSON 流
  • 第一次去重(取迟到数据中的第一条即可)
  • 转为 JavaBean 流
  • 提取事件时间(取 create_time)并设置水位线
  • 按照粒度(province_id)分组
  • 开窗(依旧是 10s 的滚动窗口)
  • 聚合(依旧是增量聚合 + 全量聚合)
  • 关联省份信息补全 province_name
  • 写出到 clickhouse

1.2、代码实现

1.2.1、创建 ck 表及其实体类

字段依然是 粒度 + 窗口起止时间 + 度量值

  1. create table if not exists dws_trade_province_order_window
  2. (
  3. stt DateTime,
  4. edt DateTime,
  5. province_id String,
  6. province_name String,
  7. order_count UInt64,
  8. order_amount Decimal(38, 20),
  9. ts UInt64
  10. ) engine = ReplacingMergeTree(ts)
  11. partition by toYYYYMMDD(stt)
  12. order by (stt, edt, province_id);
  1. import lombok.AllArgsConstructor;
  2. import lombok.Builder;
  3. import lombok.Data;
  4. import java.util.Set;
  5. @Data
  6. @AllArgsConstructor
  7. @Builder
  8. public class TradeProvinceOrderWindow {
  9. // 窗口起始时间
  10. String stt;
  11. // 窗口结束时间
  12. String edt;
  13. // 省份 ID
  14. String provinceId;
  15. // 省份名称
  16. @Builder.Default
  17. String provinceName = "";
  18. // 累计下单次数
  19. Long orderCount;
  20. // 订单 ID 集合,用于统计下单次数
  21. @TransientSink
  22. Set<String> orderIdSet;
  23. // 累计下单金额
  24. Double orderAmount;
  25. // 时间戳
  26. Long ts;
  27. }

1.2.2、读取下单事务事实表并转为 JSON 流

  1. // TODO 2. 读取 kafka dwd_trade_order_detail
  2. String groupId = "dws_trade_province_order_window";
  3. DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_detail", groupId));
  4. // TODO 3. 转为 JSON 流
  5. SingleOutputStreamOperator<JSONObject> jsonDS = orderDetailDS.flatMap(new RichFlatMapFunction<String, JSONObject>() {
  6. @Override
  7. public void flatMap(String value, Collector<JSONObject> out) throws Exception {
  8. try {
  9. JSONObject jsonObject = JSONObject.parseObject(value);
  10. out.collect(jsonObject);
  11. } catch (Exception e) {
  12. // 可以选择输出到侧输出流
  13. e.printStackTrace();
  14. }
  15. }
  16. });

1.2.3、第一次去重(上游 left join 迟到数据)

这里因为我们的数据和 left join 的右表无关,所以直接取第一条即可

  1. // TODO 4. 第一次去重(根据 order_detail_id 进行分组)
  2. KeyedStream<JSONObject, String> keyedByIdStream = jsonDS.keyBy(json -> json.getString("id"));
  3. SingleOutputStreamOperator<JSONObject> filterDS = keyedByIdStream.filter(new RichFilterFunction<JSONObject>() {
  4. // 用来存储第一条数据,之后的数据来了全部丢弃(上游 left join 迟到的数据)
  5. private ValueState<String> state;
  6. @Override
  7. public void open(Configuration parameters) throws Exception {
  8. StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(5))
  9. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  10. .build();
  11. ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("first-value", String.class);
  12. stateDescriptor.enableTimeToLive(ttlConfig);
  13. state = getRuntimeContext().getState(stateDescriptor);
  14. }
  15. @Override
  16. public boolean filter(JSONObject value) throws Exception {
  17. String data = state.value();
  18. if (data == null) {
  19. state.update("1"); // 随便存就行
  20. return true;
  21. }
  22. return false;
  23. }
  24. });

1.2.4、转为 JavaBean 流并设置水位线

转 JavaBean 流时,取 create_time 作为 ts 字段,以供下面提取它为事件时间(虽然之后还会把它设为系统时间作为 ck 表的版本字段)

  1. // TODO 5. 转为 JavaBean 流
  2. SingleOutputStreamOperator<TradeProvinceOrderWindow> provinceOrderDS = filterDS.map(new MapFunction<JSONObject, TradeProvinceOrderWindow>() {
  3. @Override
  4. public TradeProvinceOrderWindow map(JSONObject value) throws Exception {
  5. HashSet<String> orderIds = new HashSet<>();
  6. orderIds.add(value.getString("order_id"));
  7. return TradeProvinceOrderWindow.builder()
  8. .orderAmount(value.getDouble("split_total_amount"))
  9. .orderIdSet(orderIds)
  10. .ts(DateFormatUtil.toTs(value.getString("create_time"),true))
  11. .provinceId(value.getString("province_id"))
  12. .build();
  13. }
  14. });
  15. // TODO 6. 提取事件时间并设置水位线
  16. SingleOutputStreamOperator<TradeProvinceOrderWindow> provinceOrderWithWmDS = provinceOrderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TradeProvinceOrderWindow>forBoundedOutOfOrderness(Duration.ofSeconds(2))
  17. .withTimestampAssigner(new SerializableTimestampAssigner<TradeProvinceOrderWindow>() {
  18. @Override
  19. public long extractTimestamp(TradeProvinceOrderWindow element, long recordTimestamp) {
  20. return element.getTs();
  21. }
  22. }));

1.2.5、分组开窗聚合

按照粒度分组,度量字段求和,并在窗口闭合后补充窗口起始时间和结束时间,时间戳字段设置为当前时间:

  1. // TODO 7. 分组开窗聚合
  2. SingleOutputStreamOperator<TradeProvinceOrderWindow> reduceDS = provinceOrderDS.keyBy(TradeProvinceOrderWindow::getProvinceId)
  3. .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
  4. .reduce(new ReduceFunction<TradeProvinceOrderWindow>() {
  5. @Override
  6. public TradeProvinceOrderWindow reduce(TradeProvinceOrderWindow value1, TradeProvinceOrderWindow value2) throws Exception {
  7. value1.getOrderIdSet().addAll(value2.getOrderIdSet());
  8. value1.setOrderAmount(value1.getOrderAmount() + value2.getOrderAmount());
  9. return value1;
  10. }
  11. }, new WindowFunction<TradeProvinceOrderWindow, TradeProvinceOrderWindow, String, TimeWindow>() {
  12. @Override
  13. public void apply(String s, TimeWindow window, Iterable<TradeProvinceOrderWindow> input, Collector<TradeProvinceOrderWindow> out) throws Exception {
  14. TradeProvinceOrderWindow next = input.iterator().next();
  15. next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
  16. next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
  17. next.setTs(System.currentTimeMillis());
  18. next.setOrderCount((long) next.getOrderIdSet().size());
  19. out.collect(next);
  20. }
  21. });

1.2.6、关联维表 dim_base_province 并写出到 ck

  1. // TODO 8. 关联维保 province_info
  2. SingleOutputStreamOperator<TradeProvinceOrderWindow> resultDS = AsyncDataStream.unorderedWait(reduceDS, new DimAsyncFunction<TradeProvinceOrderWindow>("DIM_BASE_PROVINCE") {
  3. @Override
  4. public String getKey(TradeProvinceOrderWindow input) {
  5. return input.getProvinceId();
  6. }
  7. @Override
  8. public void addAttribute(TradeProvinceOrderWindow pojo, JSONObject dimInfo) {
  9. pojo.setProvinceName(dimInfo.getString("NAME"));
  10. }
  11. }, 100, TimeUnit.SECONDS);
  12. // TODO 9. 写出到 clickhouse
  13. reduceDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_province_order_window values (?,?,?,?,?,?)"));
  14. // TODO 10. 启动任务
  15. env.execute("DwsTradeProvinceOrderWindow");

2、交易域品牌-品类-用户粒度退单各窗口汇总表

2.1、思路分析

这里是退单,DWD 层退单事务事实表的数据仅来自 ODS 中过滤出来的退单数据、字典表(字典表被关联两次,一次获取退单类型、一次获取退单原因)等

  • 从 dwd_trade_order_refund 读取数据
  • 转为 JavaBean 流(String -> JSON -> JavaBean)
  • 关联维表 sku_info 补充 tm_id、category3_id 字段
  • 提取事件时间生成水位线
  • 分组(按照粒度)开窗聚合
  • 关联维表
    • 关联base_trademark 获得 name
    • 关联 base_category3、base_category2、base_category1 获得 id 和 name
  • 写出到 clickhouse

2.2、代码实现

2.2.1、创建 ck 表及其 JavaBean

  1. create table if not exists dws_trade_trademark_category_user_refund_window
  2. (
  3. stt DateTime,
  4. edt DateTime,
  5. trademark_id String,
  6. trademark_name String,
  7. category1_id String,
  8. category1_name String,
  9. category2_id String,
  10. category2_name String,
  11. category3_id String,
  12. category3_name String,
  13. user_id String,
  14. refund_count UInt64,
  15. ts UInt64
  16. ) engine = ReplacingMergeTree(ts)
  17. partition by toYYYYMMDD(stt)
  18. order by (stt, edt, trademark_id, trademark_name, category1_id,
  19. category1_name, category2_id, category2_name, category3_id, category3_name, user_id);
  1. import lombok.AllArgsConstructor;
  2. import lombok.Builder;
  3. import lombok.Data;
  4. import java.util.Set;
  5. @Data
  6. @AllArgsConstructor
  7. @Builder
  8. public class TradeTrademarkCategoryUserRefundBean {
  9. // 窗口起始时间
  10. String stt;
  11. // 窗口结束时间
  12. String edt;
  13. // 品牌 ID
  14. String trademarkId;
  15. // 品牌名称
  16. String trademarkName;
  17. // 一级品类 ID
  18. String category1Id;
  19. // 一级品类名称
  20. String category1Name;
  21. // 二级品类 ID
  22. String category2Id;
  23. // 二级品类名称
  24. String category2Name;
  25. // 三级品类 ID
  26. String category3Id;
  27. // 三级品类名称
  28. String category3Name;
  29. // 订单 ID
  30. @TransientSink
  31. Set<String> orderIdSet;
  32. // sku_id
  33. @TransientSink
  34. String skuId;
  35. // 用户 ID
  36. String userId;
  37. // 退单次数
  38. Long refundCount;
  39. // 时间戳
  40. Long ts;
  41. }

2.2.2、读取退单事务事实表并转换格式

  1. // TODO 2. 读取 kafka dwd_trade_order_detail
  2. String groupId = "dws_trade_trademark_category_user_refund_window";
  3. DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_refund", groupId));
  4. // TODO 3. 转为 JSON 流
  5. SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> tmCateUserDS = orderDetailDS.flatMap(new RichFlatMapFunction<String, TradeTrademarkCategoryUserRefundBean>() {
  6. @Override
  7. public void flatMap(String value, Collector<TradeTrademarkCategoryUserRefundBean> out) throws Exception {
  8. try {
  9. JSONObject jsonObject = JSONObject.parseObject(value);
  10. HashSet<String> orderIds = new HashSet<>();
  11. orderIds.add(jsonObject.getString("order_id"));
  12. out.collect(TradeTrademarkCategoryUserRefundBean.builder()
  13. .userId(jsonObject.getString("user_id"))
  14. .ts(jsonObject.getString(DateFormatUtil.toTs("create_time",true)))
  15. .skuId(jsonObject.getString("sku_id"))
  16. .orderIdSet(orderIds)
  17. .build()
  18. );
  19. } catch (Exception e) {
  20. // 可以选择输出到侧输出流
  21. e.printStackTrace();
  22. }
  23. }
  24. });

2.2.3、关联维表 sku_info

关联维表 sku_info 补充 tm_id、category3_id 字段

  1. // TODO 4. 关联维表 sku_info 补充 tm_id、category3_id 字段
  2. SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> withSkuCateUserIdDS = AsyncDataStream.unorderedWait(tmCateUserDS, new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_SKU_INFO") {
  3. @Override
  4. public String getKey(TradeTrademarkCategoryUserRefundBean input) {
  5. return input.getSkuId();
  6. }
  7. @Override
  8. public void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {
  9. pojo.setCategory3Id(dimInfo.getString("CATEGORY3_ID"));
  10. pojo.setTrademarkId(dimInfo.getString("TM_ID"));
  11. }
  12. }, 60 * 5, TimeUnit.SECONDS);

2.2.4、设置水位线并分组开窗聚合

  1. // TODO 5. 设置水位线
  2. SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> withWmDS = withSkuCateUserIdDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TradeTrademarkCategoryUserRefundBean>forBoundedOutOfOrderness(Duration.ofSeconds(2))
  3. .withTimestampAssigner(new SerializableTimestampAssigner<TradeTrademarkCategoryUserRefundBean>() {
  4. @Override
  5. public long extractTimestamp(TradeTrademarkCategoryUserRefundBean element, long recordTimestamp) {
  6. return element.getTs();
  7. }
  8. }));
  9. // TODO 6. 分组(按照粒度)开窗聚合
  10. SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceDS = withWmDS.keyBy(data -> Tuple3.of(data.getCategory3Id(), data.getTrademarkId(), data.getUserId()))
  11. .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
  12. .reduce(new ReduceFunction<TradeTrademarkCategoryUserRefundBean>() {
  13. @Override
  14. public TradeTrademarkCategoryUserRefundBean reduce(TradeTrademarkCategoryUserRefundBean value1, TradeTrademarkCategoryUserRefundBean value2) throws Exception {
  15. value1.getOrderIdSet().addAll(value2.getOrderIdSet());
  16. return value1;
  17. }
  18. }, new WindowFunction<TradeTrademarkCategoryUserRefundBean, TradeTrademarkCategoryUserRefundBean, Tuple3<String, String, String>, TimeWindow>() {
  19. @Override
  20. public void apply(Tuple3<String, String, String> stringStringStringTuple3, TimeWindow window, Iterable<TradeTrademarkCategoryUserRefundBean> input, Collector<TradeTrademarkCategoryUserRefundBean> out) throws Exception {
  21. TradeTrademarkCategoryUserRefundBean next = input.iterator().next();
  22. next.setTs(System.currentTimeMillis());
  23. next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
  24. next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
  25. next.setRefundCount((long) next.getOrderIdSet().size());
  26. out.collect(next);
  27. }
  28. });

2.2.5、关联其它维表并写出数据到 ck

  1. // TODO 7. 关联其它维表
  2. // TODO 7.2 关联 tm
  3. SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithTmDS = AsyncDataStream.unorderedWait(reduceDS,
  4. new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_TRADEMARK") {
  5. @Override
  6. public String getKey(TradeTrademarkCategoryUserRefundBean input) {
  7. return input.getTrademarkId();
  8. }
  9. @Override
  10. public void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {
  11. pojo.setTrademarkName(dimInfo.getString("TM_NAME"));
  12. }
  13. },
  14. 100, TimeUnit.SECONDS);
  15. // TODO 7.3 关联 category3
  16. SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithCate3DS = AsyncDataStream.unorderedWait(reduceWithTmDS,
  17. new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_CATEGORY3") {
  18. @Override
  19. public String getKey(TradeTrademarkCategoryUserRefundBean input) {
  20. return input.getCategory3Id();
  21. }
  22. @Override
  23. public void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {
  24. pojo.setCategory3Name(dimInfo.getString("NAME"));
  25. pojo.setCategory2Id("CATEGORY2_ID");
  26. }
  27. },
  28. 100, TimeUnit.SECONDS);
  29. // TODO 7.4 关联 category2
  30. SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithCate2DS = AsyncDataStream.unorderedWait(reduceWithCate3DS,
  31. new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_CATEGORY2") {
  32. @Override
  33. public String getKey(TradeTrademarkCategoryUserRefundBean input) {
  34. return input.getCategory2Id();
  35. }
  36. @Override
  37. public void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {
  38. pojo.setCategory2Name(dimInfo.getString("NAME"));
  39. pojo.setCategory1Id("CATEGORY1_ID");
  40. }
  41. },
  42. 100, TimeUnit.SECONDS);
  43. // TODO 7.5 关联 category1
  44. SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithCate1DS = AsyncDataStream.unorderedWait(reduceWithCate2DS,
  45. new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_CATEGORY1") {
  46. @Override
  47. public String getKey(TradeTrademarkCategoryUserRefundBean input) {
  48. return input.getCategory1Id();
  49. }
  50. @Override
  51. public void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {
  52. pojo.setCategory1Name(dimInfo.getString("NAME"));
  53. }
  54. },
  55. 100, TimeUnit.SECONDS);
  56. // TODO 8. 写出到 clickhouse
  57. reduceWithCate1DS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_trademark_category_user_refund_window values(?,?,?,?,?,?,?,?,?,?,?,?,?)"));
  58. // TODO 9. 启动任务
  59. env.execute("DwsTradeTrademarkCategoryUserRefundWindow");

总结

        至此,DWS 层搭建完毕,总的来说每一层的逻辑差不太多,难点就是对实时数据的抽象理解;接下来就是 ADS 层,今天搞定它!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/972221
推荐阅读
相关标签
  

闽ICP备14008679号