当前位置:   article > 正文

基于 Kafka + Flink + Redis 的电商大屏实时计算案_flink kafka实时

flink kafka实时
  • 前言

  • 数据格式与接入

  • 统计站点指标

  • 商品Top N

  • The End


前言

阿里的双11销量大屏可以说是一道特殊的风景线。实时大屏(real-time dashboard)正在被越来越多的企业采用,用来及时呈现关键的数据指标。并且在实际操作中,肯定也不会仅仅计算一两个维度。由于Flink的“真·流式计算”这一特点,它比Spark Streaming要更适合大屏应用。本文从笔者的实际工作经验抽象出简单的模型,并简要叙述计算流程(当然大部分都是源码)。

数据格式与接入

简化的子订单消息体如下。

  1. {
  2. "userId": 234567,
  3. "orderId": 2902306918400,
  4. "subOrderId": 2902306918401,
  5. "siteId": 10219,
  6. "siteName": "site_blabla",
  7. "cityId": 101,
  8. "cityName": "北京市",
  9. "warehouseId": 636,
  10. "merchandiseId": 187699,
  11. "price": 299,
  12. "quantity": 2,
  13. "orderStatus": 1,
  14. "isNewOrder": 0,
  15. "timestamp": 1572963672217
  16. }

由于订单可能会包含多种商品,故会被拆分成子订单来表示,每条JSON消息表示一个子订单。现在要按照自然日来统计以下指标,并以1秒的刷新频率呈现在大屏上:

  • 每个站点(站点ID即siteId)的总订单数、子订单数、销量与GMV;

  • 当前销量排名前N的商品(商品ID即merchandiseId)与它们的销量。

由于大屏的最大诉求是实时性,等待迟到数据显然不太现实,因此我们采用处理时间作为时间特征,并以1分钟的频率做checkpointing。

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  3. env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
  4. env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);

然后订阅Kafka的订单消息作为数据源。

  1. Properties consumerProps = ParameterUtil.getFromResourceFile("kafka.properties");
  2. DataStream<String> sourceStream = env
  3. .addSource(new FlinkKafkaConsumer011<>(
  4. ORDER_EXT_TOPIC_NAME, // topic
  5. new SimpleStringSchema(), // deserializer
  6. consumerProps // consumer properties
  7. ))
  8. .setParallelism(PARTITION_COUNT)
  9. .name("source_kafka_" + ORDER_EXT_TOPIC_NAME)
  10. .uid("source_kafka_" + ORDER_EXT_TOPIC_NAME);

给带状态的算子设定算子ID(通过调用uid()方法)是个好习惯,能够保证Flink应用从保存点重启时能够正确恢复状态现场。为了尽量稳妥,Flink官方也建议为每个算子都显式地设定ID,参考:https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job

接下来将JSON数据转化为POJO,JSON框架采用FastJSON。

  1. DataStream<SubOrderDetail> orderStream = sourceStream
  2. .map(message -> JSON.parseObject(message, SubOrderDetail.class))
  3. .name("map_sub_order_detail").uid("map_sub_order_detail");

JSON已经是预先处理好的标准化格式,所以POJO类SubOrderDetail的写法可以通过Lombok极大地简化。如果JSON的字段有不规范的,那么就需要手写Getter和Setter,并用@JSONField注解来指明。

  1. @Getter
  2. @Setter
  3. @NoArgsConstructor
  4. @AllArgsConstructor
  5. @ToString
  6. publicclass SubOrderDetail implements Serializable {
  7. privatestaticfinallong serialVersionUID = 1L;
  8. privatelong userId;
  9. privatelong orderId;
  10. privatelong subOrderId;
  11. privatelong siteId;
  12. private String siteName;
  13. privatelong cityId;
  14. private String cityName;
  15. privatelong warehouseId;
  16. privatelong merchandiseId;
  17. privatelong price;
  18. privatelong quantity;
  19. privateint orderStatus;
  20. privateint isNewOrder;
  21. privatelong timestamp;
  22. }

统计站点指标

将子订单流按站点ID分组,开1天的滚动窗口,并同时设定ContinuousProcessingTimeTrigger触发器,以1秒周期触发计算。注意处理时间的时区问题,这是老生常谈了。

  1. WindowedStream<SubOrderDetail, Tuple, TimeWindow> siteDayWindowStream = orderStream
  2. .keyBy("siteId")
  3. .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
  4. .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)));

接下来写个聚合函数。

  1. DataStream<OrderAccumulator> siteAggStream = siteDayWindowStream
  2. .aggregate(new OrderAndGmvAggregateFunc())
  3. .name("aggregate_site_order_gmv").uid("aggregate_site_order_gmv");
  4. publicstaticfinalclass OrderAndGmvAggregateFunc
  5. implements AggregateFunction<SubOrderDetail, OrderAccumulator, OrderAccumulator> {
  6. privatestaticfinallong serialVersionUID = 1L;
  7. @Override
  8. public OrderAccumulator createAccumulator() {
  9. returnnew OrderAccumulator();
  10. }
  11. @Override
  12. public OrderAccumulator add(SubOrderDetail record, OrderAccumulator acc) {
  13. if (acc.getSiteId() == 0) {
  14. acc.setSiteId(record.getSiteId());
  15. acc.setSiteName(record.getSiteName());
  16. }
  17. acc.addOrderId(record.getOrderId());
  18. acc.addSubOrderSum(1);
  19. acc.addQuantitySum(record.getQuantity());
  20. acc.addGmv(record.getPrice() * record.getQuantity());
  21. return acc;
  22. }
  23. @Override
  24. public OrderAccumulator getResult(OrderAccumulator acc) {
  25. return acc;
  26. }
  27. @Override
  28. public OrderAccumulator merge(OrderAccumulator acc1, OrderAccumulator acc2) {
  29. if (acc1.getSiteId() == 0) {
  30. acc1.setSiteId(acc2.getSiteId());
  31. acc1.setSiteName(acc2.getSiteName());
  32. }
  33. acc1.addOrderIds(acc2.getOrderIds());
  34. acc1.addSubOrderSum(acc2.getSubOrderSum());
  35. acc1.addQuantitySum(acc2.getQuantitySum());
  36. acc1.addGmv(acc2.getGmv());
  37. return acc1;
  38. }
  39. }

累加器类OrderAccumulator的实现很简单,看源码就大概知道它的结构了,因此不再多废话。唯一需要注意的是订单ID可能重复,所以需要用名为orderIds的HashSet来保存它。HashSet应付我们目前的数据规模还是没太大问题的,如果是海量数据,就考虑换用HyperLogLog吧。

接下来就该输出到Redis供呈现端查询了。这里有个问题:一秒内有数据变化的站点并不多,而ContinuousProcessingTimeTrigger每次触发都会输出窗口里全部的聚合数据,这样做了很多无用功,并且还会增大Redis的压力。所以,我们在聚合结果后再接一个ProcessFunction,代码如下。

  1. DataStream<Tuple2<Long, String>> siteResultStream = siteAggStream
  2. .keyBy(0)
  3. .process(new OutputOrderGmvProcessFunc(), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {}))
  4. .name("process_site_gmv_changed").uid("process_site_gmv_changed");
  5. publicstaticfinalclass OutputOrderGmvProcessFunc
  6. extends KeyedProcessFunction<Tuple, OrderAccumulator, Tuple2<Long, String>> {
  7. privatestaticfinallong serialVersionUID = 1L;
  8. private MapState<Long, OrderAccumulator> state;
  9. @Override
  10. public void open(Configuration parameters) throws Exception {
  11. super.open(parameters);
  12. state = this.getRuntimeContext().getMapState(new MapStateDescriptor<>(
  13. "state_site_order_gmv",
  14. Long.class,
  15. OrderAccumulator.class)
  16. );
  17. }
  18. @Override
  19. public void processElement(OrderAccumulator value, Context ctx, Collector<Tuple2<Long, String>> out) throws Exception {
  20. long key = value.getSiteId();
  21. OrderAccumulator cachedValue = state.get(key);
  22. if (cachedValue == null || value.getSubOrderSum() != cachedValue.getSubOrderSum()) {
  23. JSONObject result = new JSONObject();
  24. result.put("site_id", value.getSiteId());
  25. result.put("site_name", value.getSiteName());
  26. result.put("quantity", value.getQuantitySum());
  27. result.put("orderCount", value.getOrderIds().size());
  28. result.put("subOrderCount", value.getSubOrderSum());
  29. result.put("gmv", value.getGmv());
  30. out.collect(new Tuple2<>(key, result.toJSONString());
  31. state.put(key, value);
  32. }
  33. }
  34. @Override
  35. public void close() throws Exception {
  36. state.clear();
  37. super.close();
  38. }
  39. }

说来也简单,就是用一个MapState状态缓存当前所有站点的聚合数据。由于数据源是以子订单为单位的,因此如果站点ID在MapState中没有缓存,或者缓存的子订单数与当前子订单数不一致,表示结果有更新,这样的数据才允许输出。

最后就可以安心地接上Redis Sink了,结果会被存进一个Hash结构里。

  1. // 看官请自己构造合适的FlinkJedisPoolConfig
  2. FlinkJedisPoolConfig jedisPoolConfig = ParameterUtil.getFlinkJedisPoolConfig(false, true);
  3. siteResultStream
  4. .addSink(new RedisSink<>(jedisPoolConfig, new GmvRedisMapper()))
  5. .name("sink_redis_site_gmv").uid("sink_redis_site_gmv")
  6. .setParallelism(1);
  7. publicstaticfinalclass GmvRedisMapper implements RedisMapper<Tuple2<Long, String>> {
  8. privatestaticfinallong serialVersionUID = 1L;
  9. privatestaticfinal String HASH_NAME_PREFIX = "RT:DASHBOARD:GMV:";
  10. @Override
  11. public RedisCommandDescription getCommandDescription() {
  12. returnnew RedisCommandDescription(RedisCommand.HSET, HASH_NAME_PREFIX);
  13. }
  14. @Override
  15. public String getKeyFromData(Tuple2<Long, String> data) {
  16. return String.valueOf(data.f0);
  17. }
  18. @Override
  19. public String getValueFromData(Tuple2<Long, String> data) {
  20. return data.f1;
  21. }
  22. @Override
  23. public Optional<String> getAdditionalKey(Tuple2<Long, String> data) {
  24. return Optional.of(
  25. HASH_NAME_PREFIX +
  26. new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) +
  27. "SITES"
  28. );
  29. }
  30. }

商品Top N

我们可以直接复用前面产生的orderStream,玩法与上面的GMV统计大同小异。这里用1秒滚动窗口就可以了。

  1. WindowedStream<SubOrderDetail, Tuple, TimeWindow> merchandiseWindowStream = orderStream
  2. .keyBy("merchandiseId")
  3. .window(TumblingProcessingTimeWindows.of(Time.seconds(1)));
  4. DataStream<Tuple2<Long, Long>> merchandiseRankStream = merchandiseWindowStream
  5. .aggregate(new MerchandiseSalesAggregateFunc(), new MerchandiseSalesWindowFunc())
  6. .name("aggregate_merch_sales").uid("aggregate_merch_sales")
  7. .returns(TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() { }));

聚合函数与窗口函数的实现更加简单了,最终返回的是商品ID与商品销量的二元组。

  1. publicstaticfinalclass MerchandiseSalesAggregateFunc
  2. implements AggregateFunction<SubOrderDetail, Long, Long> {
  3. privatestaticfinallong serialVersionUID = 1L;
  4. @Override
  5. public Long createAccumulator() {
  6. return0L;
  7. }
  8. @Override
  9. public Long add(SubOrderDetail value, Long acc) {
  10. return acc + value.getQuantity();
  11. }
  12. @Override
  13. public Long getResult(Long acc) {
  14. return acc;
  15. }
  16. @Override
  17. public Long merge(Long acc1, Long acc2) {
  18. return acc1 + acc2;
  19. }
  20. }
  21. publicstaticfinalclass MerchandiseSalesWindowFunc
  22. implements WindowFunction<Long, Tuple2<Long, Long>, Tuple, TimeWindow> {
  23. privatestaticfinallong serialVersionUID = 1L;
  24. @Override
  25. public void apply(
  26. Tuple key,
  27. TimeWindow window,
  28. Iterable<Long> accs,
  29. Collector<Tuple2<Long, Long>> out) throws Exception {
  30. long merchId = ((Tuple1<Long>) key).f0;
  31. long acc = accs.iterator().next();
  32. out.collect(new Tuple2<>(merchId, acc));
  33. }
  34. }

既然数据最终都要落到Redis,那么我们完全没必要在Flink端做Top N的统计,直接利用Redis的有序集合(zset)就行了,商品ID作为field,销量作为分数值,简单方便。不过flink-redis-connector项目中默认没有提供ZINCRBY命令的实现(必须再吐槽一次),我们可以自己加,步骤参照之前写过的那篇加SETEX的命令的文章,不再赘述。RedisMapper的写法如下。

  1. publicstaticfinalclass RankingRedisMapper implements RedisMapper<Tuple2<Long, Long>> {
  2. privatestaticfinallong serialVersionUID = 1L;
  3. privatestaticfinal String ZSET_NAME_PREFIX = "RT:DASHBOARD:RANKING:";
  4. @Override
  5. public RedisCommandDescription getCommandDescription() {
  6. returnnew RedisCommandDescription(RedisCommand.ZINCRBY, ZSET_NAME_PREFIX);
  7. }
  8. @Override
  9. public String getKeyFromData(Tuple2<Long, Long> data) {
  10. return String.valueOf(data.f0);
  11. }
  12. @Override
  13. public String getValueFromData(Tuple2<Long, Long> data) {
  14. return String.valueOf(data.f1);
  15. }
  16. @Override
  17. public Optional<String> getAdditionalKey(Tuple2<Long, Long> data) {
  18. return Optional.of(
  19. ZSET_NAME_PREFIX +
  20. new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) + ":" +
  21. "MERCHANDISE"
  22. );
  23. }
  24. }

后端取数时,用ZREVRANGE命令即可取出指定排名的数据了。只要数据规模不是大到难以接受,并且有现成的Redis,这个方案完全可以作为各类Top N需求的通用实现。

The End

大屏的实际呈现需要保密,截图自然是没有的。以下是提交执行时Flink Web UI给出的执行计划(实际有更多的统计任务,不止3个Sink)。通过复用源数据,可以在同一个Flink job内实现更多统计需求。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/480549
推荐阅读
相关标签
  

闽ICP备14008679号