赞
踩
在大数据的实时处理中,实时的大屏展示已经成了一个很重要的展示项,比如最有名的双十一大屏实时销售总价展示。除了这个,还有一些其他场景的应用,比如我们在我们的后台系统实时的展示我们网站当前的pv、uv等等,其实做法都是类似的。
今天我们就做一个最简单的模拟电商统计大屏的小例子,我们抽取一下最简单的需求。
首先我们通过自定义source 模拟订单的生成,生成了一个Tuple2,第一个元素是分类,第二个元素表示这个分类下产生的订单金额,金额我们通过随机生成.
- /**
- * 模拟生成某一个分类下的订单生成
- */
- public static class MySource implements SourceFunction<Tuple2<String,Double>>{
- private volatile boolean isRunning = true;
- private Random random = new Random();
- String category[] = {
- "女装", "男装",
- "图书", "家电",
- "洗护", "美妆",
- "运动", "游戏",
- "户外", "家具",
- "乐器", "办公"
- };
-
- @Override
- public void run(SourceContext<Tuple2<String,Double>> ctx) throws Exception{
- while (isRunning){
- Thread.sleep(10);
- //某一个分类
- String c = category[(int) (Math.random() * (category.length - 1))];
- //某一个分类下产生了price的成交订单
- double price = random.nextDouble() * 100;
- ctx.collect(Tuple2.of(c, price));
- }
- }
-
- @Override
- public void cancel(){
- isRunning = false;
- }
- }
-
- 复制代码
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
- public static class CategoryPojo{
- // 分类名称
- private String category;
- // 改分类总销售额
- private double totalPrice;
- // 截止到当前时间的时间
- private String dateTime;
-
- getter and setter ........
- }
-
- 复制代码
-
- DataStream<CategoryPojo> result = dataStream.keyBy(0)
- .window(TumblingProcessingTimeWindows.of(Time.days(
- 1), Time.hours(-8)))
- .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(
- 1)))
- .aggregate(
- new PriceAggregate(),
- new WindowResult()
- );
-
- 复制代码
首先我们定义一个窗口期是一天的滚动窗口,然后设置一个1秒钟的触发器,之后进行聚合计算.
- private static class PriceAggregate
- implements AggregateFunction<Tuple2<String,Double>,Double,Double>{
-
- @Override
- public Double createAccumulator(){
- return 0D;
- }
-
- @Override
- public Double add(Tuple2<String,Double> value, Double accumulator){
- return accumulator + value.f1;
- }
-
- @Override
- public Double getResult(Double accumulator){
- return accumulator;
- }
-
- @Override
- public Double merge(Double a, Double b){
- return a + b;
- }
- }
-
- 复制代码
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
聚合计算也比较简单,其实就是对price的简单sum操作
-
- private static class WindowResult
- implements WindowFunction<Double,CategoryPojo,Tuple,TimeWindow>{
- SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
- @Override
- public void apply(
- Tuple key,
- TimeWindow window,
- Iterable<Double> input,
- Collector<CategoryPojo> out) throws Exception{
- CategoryPojo categoryPojo = new CategoryPojo();
- categoryPojo.setCategory(((Tuple1<String>) key).f0);
-
- BigDecimal bg = new BigDecimal(input.iterator().next());
- double p = bg.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
- categoryPojo.setTotalPrice(p);
- categoryPojo.setDateTime(simpleDateFormat.format(new Date()));
- out.collect(categoryPojo);
- }
- }
-
- 复制代码
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
我们最聚合的结果进行简单的封装,封装成CategoryPojo类以便后续处理
-
- result.keyBy("dateTime")
- .window(TumblingProcessingTimeWindows.of(Time.seconds(
- 1)))
- .process(new WindowResultProcess());
-
- 复制代码
接下来我们要使用上面聚合的结果,所以我们使用上面的window聚合结果流又定义了时间是1秒的滚动窗口.
如何使用窗口的结果,可以参考flink的官网[1]
接下来我们做最后的结果统计,在这里,我们会把各个分类的总价加起来,就是全站的总销量金额,然后我们同时使用优先级队列计算出分类销售的Top3,打印出结果,在生产过程中我们可以把这个结果数据发到hbase或者redis等外部存储,以供前端的实时页面展示。
-
- private static class WindowResultProcess
- extends ProcessWindowFunction<CategoryPojo,Object,Tuple,TimeWindow>{
-
- @Override
- public void process(
-
- Tuple tuple,
- Context context,
- Iterable<CategoryPojo> elements,
- Collector<Object> out) throws Exception{
- String date = ((Tuple1<String>) tuple).f0;
-
- Queue<CategoryPojo> queue = new PriorityQueue<>(
- 3,
- (o1, o2)->o1.getTotalPrice() >= o2.getTotalPrice() ? 1 : -1);
- double price = 0D;
- Iterator<CategoryPojo> iterator = elements.iterator();
- int s = 0;
- while (iterator.hasNext()){
- CategoryPojo categoryPojo = iterator.next();
- if (queue.size() < 3){
- queue.add(categoryPojo);
- } else {
- CategoryPojo tmp = queue.peek();
- if (categoryPojo.getTotalPrice() > tmp.getTotalPrice()){
- queue.poll();
- queue.add(categoryPojo);
- }
- }
- price += categoryPojo.getTotalPrice();
- }
-
- List<String> list = queue.stream()
- .sorted((o1, o2)->o1.getTotalPrice() <=
- o2.getTotalPrice() ? 1 : -1)
- .map(f->"(分类:" + f.getCategory() + " 销售额:" +
- f.getTotalPrice() + ")")
-
- .collect(
- Collectors.toList());
- System.out.println("时间 : " + date + " 总价 : " + price + " top3 " +
- StringUtils.join(list, ","));
- System.out.println("-------------");
- }
-
- }
-
-
- 复制代码
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
-
- 3> CategoryPojo{category='户外', totalPrice=734.45, dateTime=2020-06-13 22:55:34}
- 2> CategoryPojo{category='游戏', totalPrice=862.86, dateTime=2020-06-13 22:55:34}
- 4> CategoryPojo{category='洗护', totalPrice=926.83, dateTime=2020-06-13 22:55:34}
- 3> CategoryPojo{category='运动', totalPrice=744.98, dateTime=2020-06-13 22:55:34}
- 2> CategoryPojo{category='乐器', totalPrice=648.81, dateTime=2020-06-13 22:55:34}
- 4> CategoryPojo{category='图书', totalPrice=1010.12, dateTime=2020-06-13 22:55:34}
- 1> CategoryPojo{category='家具', totalPrice=880.35, dateTime=2020-06-13 22:55:34}
- 3> CategoryPojo{category='家电', totalPrice=1225.34, dateTime=2020-06-13 22:55:34}
- 2> CategoryPojo{category='男装', totalPrice=796.06, dateTime=2020-06-13 22:55:34}
- 1> CategoryPojo{category='女装', totalPrice=1018.88, dateTime=2020-06-13 22:55:34}
- 1> CategoryPojo{category='美妆', totalPrice=768.37, dateTime=2020-06-13 22:55:34}
- 时间 : 2020-06-13 22:55:34 总价 : 9617.050000000001 top3 (分类:家电 销售额:1225.34),(分类:女装 销售额:1018.88),(分类:图书 销售额:1010.12)
-
- 复制代码
完整的代码请参考
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/windows/BigScreem.java
文章来源:https://juejin.cn/post/6844904192180486158
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。