赞
踩
今天完成 DWS 层交易域剩下的两个指标,估计一早上就完了,这两个需求用到的知识点和昨天的需求差不多;
这个需求是比较简单的,province_id 字段是订单表中的字段,在 DWD 层的下单事务事实表中我们已经将该字段保留下来了,所以我们只需要读取 DWD 层的下单事务事实表即可;考虑到下单事务事实表来源于订单与处理表,而这张表在生成时需要和 活动、优惠券进行 left join,所以依然是迟到数据造成的数据重复问题,不过依然不影响,因为那俩表的字段我们用不上,所以我们只需要过滤出第一条数据即可完成去重;
字段依然是 粒度 + 窗口起止时间 + 度量值
- create table if not exists dws_trade_province_order_window
- (
- stt DateTime,
- edt DateTime,
- province_id String,
- province_name String,
- order_count UInt64,
- order_amount Decimal(38, 20),
- ts UInt64
- ) engine = ReplacingMergeTree(ts)
- partition by toYYYYMMDD(stt)
- order by (stt, edt, province_id);
- import lombok.AllArgsConstructor;
- import lombok.Builder;
- import lombok.Data;
-
- import java.util.Set;
-
- @Data
- @AllArgsConstructor
- @Builder
- public class TradeProvinceOrderWindow {
- // 窗口起始时间
- String stt;
-
- // 窗口结束时间
- String edt;
-
- // 省份 ID
- String provinceId;
-
- // 省份名称
- @Builder.Default
- String provinceName = "";
-
- // 累计下单次数
- Long orderCount;
-
- // 订单 ID 集合,用于统计下单次数
- @TransientSink
- Set<String> orderIdSet;
-
- // 累计下单金额
- Double orderAmount;
-
- // 时间戳
- Long ts;
- }

- // TODO 2. 读取 kafka dwd_trade_order_detail
- String groupId = "dws_trade_province_order_window";
- DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_detail", groupId));
-
- // TODO 3. 转为 JSON 流
- SingleOutputStreamOperator<JSONObject> jsonDS = orderDetailDS.flatMap(new RichFlatMapFunction<String, JSONObject>() {
- @Override
- public void flatMap(String value, Collector<JSONObject> out) throws Exception {
- try {
- JSONObject jsonObject = JSONObject.parseObject(value);
- out.collect(jsonObject);
- } catch (Exception e) {
- // 可以选择输出到侧输出流
- e.printStackTrace();
- }
- }
- });

这里因为我们的数据和 left join 的右表无关,所以直接取第一条即可
- // TODO 4. 第一次去重(根据 order_detail_id 进行分组)
- KeyedStream<JSONObject, String> keyedByIdStream = jsonDS.keyBy(json -> json.getString("id"));
- SingleOutputStreamOperator<JSONObject> filterDS = keyedByIdStream.filter(new RichFilterFunction<JSONObject>() {
- // 用来存储第一条数据,之后的数据来了全部丢弃(上游 left join 迟到的数据)
- private ValueState<String> state;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(5))
- .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
- .build();
-
- ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("first-value", String.class);
- stateDescriptor.enableTimeToLive(ttlConfig);
- state = getRuntimeContext().getState(stateDescriptor);
- }
-
- @Override
- public boolean filter(JSONObject value) throws Exception {
- String data = state.value();
- if (data == null) {
- state.update("1"); // 随便存就行
- return true;
- }
- return false;
- }
- });

转 JavaBean 流时,取 create_time 作为 ts 字段,以供下面提取它为事件时间(虽然之后还会把它设为系统时间作为 ck 表的版本字段)
- // TODO 5. 转为 JavaBean 流
- SingleOutputStreamOperator<TradeProvinceOrderWindow> provinceOrderDS = filterDS.map(new MapFunction<JSONObject, TradeProvinceOrderWindow>() {
- @Override
- public TradeProvinceOrderWindow map(JSONObject value) throws Exception {
- HashSet<String> orderIds = new HashSet<>();
- orderIds.add(value.getString("order_id"));
-
- return TradeProvinceOrderWindow.builder()
- .orderAmount(value.getDouble("split_total_amount"))
- .orderIdSet(orderIds)
- .ts(DateFormatUtil.toTs(value.getString("create_time"),true))
- .provinceId(value.getString("province_id"))
- .build();
- }
- });
-
- // TODO 6. 提取事件时间并设置水位线
- SingleOutputStreamOperator<TradeProvinceOrderWindow> provinceOrderWithWmDS = provinceOrderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TradeProvinceOrderWindow>forBoundedOutOfOrderness(Duration.ofSeconds(2))
- .withTimestampAssigner(new SerializableTimestampAssigner<TradeProvinceOrderWindow>() {
- @Override
- public long extractTimestamp(TradeProvinceOrderWindow element, long recordTimestamp) {
- return element.getTs();
- }
- }));

按照粒度分组,度量字段求和,并在窗口闭合后补充窗口起始时间和结束时间,时间戳字段设置为当前时间:
- // TODO 7. 分组开窗聚合
- SingleOutputStreamOperator<TradeProvinceOrderWindow> reduceDS = provinceOrderDS.keyBy(TradeProvinceOrderWindow::getProvinceId)
- .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
- .reduce(new ReduceFunction<TradeProvinceOrderWindow>() {
- @Override
- public TradeProvinceOrderWindow reduce(TradeProvinceOrderWindow value1, TradeProvinceOrderWindow value2) throws Exception {
- value1.getOrderIdSet().addAll(value2.getOrderIdSet());
- value1.setOrderAmount(value1.getOrderAmount() + value2.getOrderAmount());
- return value1;
- }
- }, new WindowFunction<TradeProvinceOrderWindow, TradeProvinceOrderWindow, String, TimeWindow>() {
- @Override
- public void apply(String s, TimeWindow window, Iterable<TradeProvinceOrderWindow> input, Collector<TradeProvinceOrderWindow> out) throws Exception {
- TradeProvinceOrderWindow next = input.iterator().next();
-
- next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
- next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
- next.setTs(System.currentTimeMillis());
- next.setOrderCount((long) next.getOrderIdSet().size());
-
- out.collect(next);
- }
- });

- // TODO 8. 关联维保 province_info
- SingleOutputStreamOperator<TradeProvinceOrderWindow> resultDS = AsyncDataStream.unorderedWait(reduceDS, new DimAsyncFunction<TradeProvinceOrderWindow>("DIM_BASE_PROVINCE") {
- @Override
- public String getKey(TradeProvinceOrderWindow input) {
- return input.getProvinceId();
- }
-
- @Override
- public void addAttribute(TradeProvinceOrderWindow pojo, JSONObject dimInfo) {
- pojo.setProvinceName(dimInfo.getString("NAME"));
- }
- }, 100, TimeUnit.SECONDS);
-
- // TODO 9. 写出到 clickhouse
- reduceDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_province_order_window values (?,?,?,?,?,?)"));
-
- // TODO 10. 启动任务
- env.execute("DwsTradeProvinceOrderWindow");

这里是退单,DWD 层退单事务事实表的数据仅来自 ODS 中过滤出来的退单数据、字典表(字典表被关联两次,一次获取退单类型、一次获取退单原因)等
- create table if not exists dws_trade_trademark_category_user_refund_window
- (
- stt DateTime,
- edt DateTime,
- trademark_id String,
- trademark_name String,
- category1_id String,
- category1_name String,
- category2_id String,
- category2_name String,
- category3_id String,
- category3_name String,
- user_id String,
- refund_count UInt64,
- ts UInt64
- ) engine = ReplacingMergeTree(ts)
- partition by toYYYYMMDD(stt)
- order by (stt, edt, trademark_id, trademark_name, category1_id,
- category1_name, category2_id, category2_name, category3_id, category3_name, user_id);

- import lombok.AllArgsConstructor;
- import lombok.Builder;
- import lombok.Data;
-
- import java.util.Set;
-
- @Data
- @AllArgsConstructor
- @Builder
- public class TradeTrademarkCategoryUserRefundBean {
- // 窗口起始时间
- String stt;
- // 窗口结束时间
- String edt;
- // 品牌 ID
- String trademarkId;
- // 品牌名称
- String trademarkName;
- // 一级品类 ID
- String category1Id;
- // 一级品类名称
- String category1Name;
- // 二级品类 ID
- String category2Id;
- // 二级品类名称
- String category2Name;
- // 三级品类 ID
- String category3Id;
- // 三级品类名称
- String category3Name;
-
- // 订单 ID
- @TransientSink
- Set<String> orderIdSet;
-
- // sku_id
- @TransientSink
- String skuId;
-
- // 用户 ID
- String userId;
- // 退单次数
- Long refundCount;
- // 时间戳
- Long ts;
- }

- // TODO 2. 读取 kafka dwd_trade_order_detail
- String groupId = "dws_trade_trademark_category_user_refund_window";
- DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_refund", groupId));
-
- // TODO 3. 转为 JSON 流
- SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> tmCateUserDS = orderDetailDS.flatMap(new RichFlatMapFunction<String, TradeTrademarkCategoryUserRefundBean>() {
- @Override
- public void flatMap(String value, Collector<TradeTrademarkCategoryUserRefundBean> out) throws Exception {
- try {
- JSONObject jsonObject = JSONObject.parseObject(value);
-
- HashSet<String> orderIds = new HashSet<>();
- orderIds.add(jsonObject.getString("order_id"));
-
- out.collect(TradeTrademarkCategoryUserRefundBean.builder()
- .userId(jsonObject.getString("user_id"))
- .ts(jsonObject.getString(DateFormatUtil.toTs("create_time",true)))
- .skuId(jsonObject.getString("sku_id"))
- .orderIdSet(orderIds)
- .build()
- );
- } catch (Exception e) {
- // 可以选择输出到侧输出流
- e.printStackTrace();
- }
- }
- });

关联维表 sku_info 补充 tm_id、category3_id 字段
- // TODO 4. 关联维表 sku_info 补充 tm_id、category3_id 字段
- SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> withSkuCateUserIdDS = AsyncDataStream.unorderedWait(tmCateUserDS, new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_SKU_INFO") {
- @Override
- public String getKey(TradeTrademarkCategoryUserRefundBean input) {
- return input.getSkuId();
- }
-
- @Override
- public void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {
- pojo.setCategory3Id(dimInfo.getString("CATEGORY3_ID"));
- pojo.setTrademarkId(dimInfo.getString("TM_ID"));
- }
- }, 60 * 5, TimeUnit.SECONDS);
- // TODO 5. 设置水位线
- SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> withWmDS = withSkuCateUserIdDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TradeTrademarkCategoryUserRefundBean>forBoundedOutOfOrderness(Duration.ofSeconds(2))
- .withTimestampAssigner(new SerializableTimestampAssigner<TradeTrademarkCategoryUserRefundBean>() {
- @Override
- public long extractTimestamp(TradeTrademarkCategoryUserRefundBean element, long recordTimestamp) {
- return element.getTs();
- }
- }));
-
- // TODO 6. 分组(按照粒度)开窗聚合
- SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceDS = withWmDS.keyBy(data -> Tuple3.of(data.getCategory3Id(), data.getTrademarkId(), data.getUserId()))
- .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
- .reduce(new ReduceFunction<TradeTrademarkCategoryUserRefundBean>() {
- @Override
- public TradeTrademarkCategoryUserRefundBean reduce(TradeTrademarkCategoryUserRefundBean value1, TradeTrademarkCategoryUserRefundBean value2) throws Exception {
- value1.getOrderIdSet().addAll(value2.getOrderIdSet());
- return value1;
- }
- }, new WindowFunction<TradeTrademarkCategoryUserRefundBean, TradeTrademarkCategoryUserRefundBean, Tuple3<String, String, String>, TimeWindow>() {
- @Override
- public void apply(Tuple3<String, String, String> stringStringStringTuple3, TimeWindow window, Iterable<TradeTrademarkCategoryUserRefundBean> input, Collector<TradeTrademarkCategoryUserRefundBean> out) throws Exception {
- TradeTrademarkCategoryUserRefundBean next = input.iterator().next();
- next.setTs(System.currentTimeMillis());
- next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
- next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
- next.setRefundCount((long) next.getOrderIdSet().size());
- out.collect(next);
- }
- });

- // TODO 7. 关联其它维表
- // TODO 7.2 关联 tm
- SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithTmDS = AsyncDataStream.unorderedWait(reduceDS,
- new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_TRADEMARK") {
- @Override
- public String getKey(TradeTrademarkCategoryUserRefundBean input) {
- return input.getTrademarkId();
- }
-
- @Override
- public void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {
- pojo.setTrademarkName(dimInfo.getString("TM_NAME"));
- }
- },
- 100, TimeUnit.SECONDS);
- // TODO 7.3 关联 category3
- SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithCate3DS = AsyncDataStream.unorderedWait(reduceWithTmDS,
- new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_CATEGORY3") {
- @Override
- public String getKey(TradeTrademarkCategoryUserRefundBean input) {
- return input.getCategory3Id();
- }
-
- @Override
- public void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {
- pojo.setCategory3Name(dimInfo.getString("NAME"));
- pojo.setCategory2Id("CATEGORY2_ID");
- }
- },
- 100, TimeUnit.SECONDS);
- // TODO 7.4 关联 category2
- SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithCate2DS = AsyncDataStream.unorderedWait(reduceWithCate3DS,
- new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_CATEGORY2") {
- @Override
- public String getKey(TradeTrademarkCategoryUserRefundBean input) {
- return input.getCategory2Id();
- }
-
- @Override
- public void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {
- pojo.setCategory2Name(dimInfo.getString("NAME"));
- pojo.setCategory1Id("CATEGORY1_ID");
- }
- },
- 100, TimeUnit.SECONDS);
- // TODO 7.5 关联 category1
- SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithCate1DS = AsyncDataStream.unorderedWait(reduceWithCate2DS,
- new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_CATEGORY1") {
- @Override
- public String getKey(TradeTrademarkCategoryUserRefundBean input) {
- return input.getCategory1Id();
- }
-
- @Override
- public void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {
- pojo.setCategory1Name(dimInfo.getString("NAME"));
- }
- },
- 100, TimeUnit.SECONDS);
-
- // TODO 8. 写出到 clickhouse
- reduceWithCate1DS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_trademark_category_user_refund_window values(?,?,?,?,?,?,?,?,?,?,?,?,?)"));
-
- // TODO 9. 启动任务
- env.execute("DwsTradeTrademarkCategoryUserRefundWindow");

至此,DWS 层搭建完毕,总的来说每一层的逻辑差不太多,难点就是对实时数据的抽象理解;接下来就是 ADS 层,今天搞定它!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。