当前位置:   article > 正文

flink实战-模拟简易双11实时统计大屏_flink模拟某猫双十一实时统计大屏系统,1.实时计算出当天零点截止到当前时间的销售

flink模拟某猫双十一实时统计大屏系统,1.实时计算出当天零点截止到当前时间的销售

背景

在大数据的实时处理中,实时的大屏展示已经成了一个很重要的展示项,比如最有名的双十一大屏实时销售总价展示。除了这个,还有一些其他场景的应用,比如我们在我们的后台系统实时的展示我们网站当前的pv、uv等等,其实做法都是类似的。

今天我们就做一个最简单的模拟电商统计大屏的小例子,我们抽取一下最简单的需求。

  • 实时计算出当天零点截止到当前时间的销售总额
  • 计算出各个分类的销售top3
  • 每秒钟更新一次统计结果

实例讲解

构造数据

首先我们通过自定义source 模拟订单的生成,生成了一个Tuple2,第一个元素是分类,第二个元素表示这个分类下产生的订单金额,金额我们通过随机生成.

  1. /**
  2. * 模拟生成某一个分类下的订单生成
  3. */
  4. public static class MySource implements SourceFunction<Tuple2<String,Double>>{
  5. private volatile boolean isRunning = true;
  6. private Random random = new Random();
  7. String category[] = {
  8. "女装", "男装",
  9. "图书", "家电",
  10. "洗护", "美妆",
  11. "运动", "游戏",
  12. "户外", "家具",
  13. "乐器", "办公"
  14. };
  15. @Override
  16. public void run(SourceContext<Tuple2<String,Double>> ctx) throws Exception{
  17. while (isRunning){
  18. Thread.sleep(10);
  19. //某一个分类
  20. String c = category[(int) (Math.random() * (category.length - 1))];
  21. //某一个分类下产生了price的成交订单
  22. double price = random.nextDouble() * 100;
  23. ctx.collect(Tuple2.of(c, price));
  24. }
  25. }
  26. @Override
  27. public void cancel(){
  28. isRunning = false;
  29. }
  30. }
  31. 复制代码

构造统计结果类

  1. public static class CategoryPojo{
  2. // 分类名称
  3. private String category;
  4. // 改分类总销售额
  5. private double totalPrice;
  6. // 截止到当前时间的时间
  7. private String dateTime;
  8. getter and setter ........
  9. }
  10. 复制代码

定义窗口和触发器

  1. DataStream<CategoryPojo> result = dataStream.keyBy(0)
  2. .window(TumblingProcessingTimeWindows.of(Time.days(
  3. 1), Time.hours(-8)))
  4. .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(
  5. 1)))
  6. .aggregate(
  7. new PriceAggregate(),
  8. new WindowResult()
  9. );
  10. 复制代码

首先我们定义一个窗口期是一天的滚动窗口,然后设置一个1秒钟的触发器,之后进行聚合计算.

集合计算

  1. private static class PriceAggregate
  2. implements AggregateFunction<Tuple2<String,Double>,Double,Double>{
  3. @Override
  4. public Double createAccumulator(){
  5. return 0D;
  6. }
  7. @Override
  8. public Double add(Tuple2<String,Double> value, Double accumulator){
  9. return accumulator + value.f1;
  10. }
  11. @Override
  12. public Double getResult(Double accumulator){
  13. return accumulator;
  14. }
  15. @Override
  16. public Double merge(Double a, Double b){
  17. return a + b;
  18. }
  19. }
  20. 复制代码

聚合计算也比较简单,其实就是对price的简单sum操作

收集窗口结果数据

  1. private static class WindowResult
  2. implements WindowFunction<Double,CategoryPojo,Tuple,TimeWindow>{
  3. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  4. @Override
  5. public void apply(
  6. Tuple key,
  7. TimeWindow window,
  8. Iterable<Double> input,
  9. Collector<CategoryPojo> out) throws Exception{
  10. CategoryPojo categoryPojo = new CategoryPojo();
  11. categoryPojo.setCategory(((Tuple1<String>) key).f0);
  12. BigDecimal bg = new BigDecimal(input.iterator().next());
  13. double p = bg.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
  14. categoryPojo.setTotalPrice(p);
  15. categoryPojo.setDateTime(simpleDateFormat.format(new Date()));
  16. out.collect(categoryPojo);
  17. }
  18. }
  19. 复制代码

我们最聚合的结果进行简单的封装,封装成CategoryPojo类以便后续处理

使用聚合窗口的结果

  1. result.keyBy("dateTime")
  2. .window(TumblingProcessingTimeWindows.of(Time.seconds(
  3. 1)))
  4. .process(new WindowResultProcess());
  5. 复制代码

接下来我们要使用上面聚合的结果,所以我们使用上面的window聚合结果流又定义了时间是1秒的滚动窗口.

如何使用窗口的结果,可以参考flink的官网[1]

结果统计

接下来我们做最后的结果统计,在这里,我们会把各个分类的总价加起来,就是全站的总销量金额,然后我们同时使用优先级队列计算出分类销售的Top3,打印出结果,在生产过程中我们可以把这个结果数据发到hbase或者redis等外部存储,以供前端的实时页面展示。

  1. private static class WindowResultProcess
  2. extends ProcessWindowFunction<CategoryPojo,Object,Tuple,TimeWindow>{
  3. @Override
  4. public void process(
  5. Tuple tuple,
  6. Context context,
  7. Iterable<CategoryPojo> elements,
  8. Collector<Object> out) throws Exception{
  9. String date = ((Tuple1<String>) tuple).f0;
  10. Queue<CategoryPojo> queue = new PriorityQueue<>(
  11. 3,
  12. (o1, o2)->o1.getTotalPrice() >= o2.getTotalPrice() ? 1 : -1);
  13. double price = 0D;
  14. Iterator<CategoryPojo> iterator = elements.iterator();
  15. int s = 0;
  16. while (iterator.hasNext()){
  17. CategoryPojo categoryPojo = iterator.next();
  18. if (queue.size() < 3){
  19. queue.add(categoryPojo);
  20. } else {
  21. CategoryPojo tmp = queue.peek();
  22. if (categoryPojo.getTotalPrice() > tmp.getTotalPrice()){
  23. queue.poll();
  24. queue.add(categoryPojo);
  25. }
  26. }
  27. price += categoryPojo.getTotalPrice();
  28. }
  29. List<String> list = queue.stream()
  30. .sorted((o1, o2)->o1.getTotalPrice() <=
  31. o2.getTotalPrice() ? 1 : -1)
  32. .map(f->"(分类:" + f.getCategory() + " 销售额:" +
  33. f.getTotalPrice() + ")")
  34. .collect(
  35. Collectors.toList());
  36. System.out.println("时间 : " + date + " 总价 : " + price + " top3 " +
  37. StringUtils.join(list, ","));
  38. System.out.println("-------------");
  39. }
  40. }
  41. 复制代码

示例运行结果

  1. 3> CategoryPojo{category='户外', totalPrice=734.45, dateTime=2020-06-13 22:55:34}
  2. 2> CategoryPojo{category='游戏', totalPrice=862.86, dateTime=2020-06-13 22:55:34}
  3. 4> CategoryPojo{category='洗护', totalPrice=926.83, dateTime=2020-06-13 22:55:34}
  4. 3> CategoryPojo{category='运动', totalPrice=744.98, dateTime=2020-06-13 22:55:34}
  5. 2> CategoryPojo{category='乐器', totalPrice=648.81, dateTime=2020-06-13 22:55:34}
  6. 4> CategoryPojo{category='图书', totalPrice=1010.12, dateTime=2020-06-13 22:55:34}
  7. 1> CategoryPojo{category='家具', totalPrice=880.35, dateTime=2020-06-13 22:55:34}
  8. 3> CategoryPojo{category='家电', totalPrice=1225.34, dateTime=2020-06-13 22:55:34}
  9. 2> CategoryPojo{category='男装', totalPrice=796.06, dateTime=2020-06-13 22:55:34}
  10. 1> CategoryPojo{category='女装', totalPrice=1018.88, dateTime=2020-06-13 22:55:34}
  11. 1> CategoryPojo{category='美妆', totalPrice=768.37, dateTime=2020-06-13 22:55:34}
  12. 时间 : 2020-06-13 22:55:34 总价 : 9617.050000000001 top3 (分类:家电 销售额:1225.34),(分类:女装 销售额:1018.88),(分类:图书 销售额:1010.12)
  13. 复制代码

完整的代码请参考

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/windows/BigScreem.java


文章来源:https://juejin.cn/post/6844904192180486158
 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/480562
推荐阅读
相关标签
  

闽ICP备14008679号