当前位置:   article > 正文

Flink 应用案例——求网页访问量Top N 实时计算(附可执行代码)_flink api 计算topn

flink api 计算topn

在学习了Flink之后,笔者通过以下案例对Flink API 进行简单复习

目录

案例要求

前置准备

编写主程序(点此跳转至代码)

运行截图


案例要求

以下数据 为某网站的访问日志 现要求通过以下数据 统计出最近10s内最热门的N个页面(即url链接),并且每5s更新一次;即求出页面访问量中的TOP N

{user='Alice', url='./home', time=1679043205254}

{user='Alice', url='./prod?id=1', time=1679043206254}

{user='Mary', url='./prod?id=2', time=1679043207256}

{user='Cary', url='./fav', time=1679043208257}

{user='Mary', url='./home', time=1679043209258}

{user='Cary', url='./prod?id=2', time=1679043210259}

......

前置准备

为方便编写程序 我们把以上数据封装为Event类

Event.class

  1. package com.flink.wc.myflink.bean;
  2. public class Event {
  3. public String user;
  4. public String url;
  5. public long time;
  6. public Event() {
  7. }
  8. public Event(String user, String url, long time) {
  9. this.user = user;
  10. this.url = url;
  11. this.time = time;
  12. }
  13. @Override
  14. public String toString() {
  15. return "Event{" +
  16. "user='" + user + '\'' +
  17. ", url='" + url + '\'' +
  18. ", time=" + time +
  19. '}';
  20. }
  21. }

同时我们需要自定义源算子 模拟网站实时访问日志

ClickSource.class

  1. package com.flink.wc.myflink.source_sink;
  2. import com.flink.wc.myflink.bean.Event;
  3. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  4. import java.util.Calendar;
  5. import java.util.Random;
  6. public class ClickSource implements SourceFunction<Event> {
  7. private Boolean running = true;
  8. @Override
  9. public void run(SourceContext<Event> ctx) throws Exception {
  10. Random random = new Random();
  11. String[] users = {"Mary","Alice","Bob","Cary"};
  12. String[] urls = {"./home", "./cart","./fav", "./prod?id=1","./prod?id=2"};
  13. while (running) {
  14. ctx.collect(new Event(
  15. users[random.nextInt(users.length)], // user 和 url 随机组合
  16. urls[random.nextInt(urls.length)],
  17. Calendar.getInstance().getTimeInMillis() //getTimeInMillis 方法返回当前时间
  18. ));
  19. // 在这个循环中 每隔一秒 就collect(发送)一个数据
  20. Thread.sleep(1000);
  21. }
  22. }
  23. @Override
  24. public void cancel() {
  25. running = false;
  26. }
  27. }

为方便程序输出,我们把输出结果封装为UrlViewCount类

UrlViewCount.class

  1. package com.flink.wc.myflink.bean;
  2. public class UrlViewCount {
  3. public String url;
  4. public Long count;
  5. public Long windowStart;
  6. public Long windowEnd;
  7. public UrlViewCount() {
  8. }
  9. public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) {
  10. this.url = url;
  11. this.count = count;
  12. this.windowStart = windowStart;
  13. this.windowEnd = windowEnd;
  14. }
  15. @Override
  16. public String toString() {
  17. return "UrlViewCount{" +
  18. "url='" + url + '\'' +
  19. ", count=" + count +
  20. ", windowStart=" + windowStart +
  21. ", windowEnd=" + windowEnd +
  22. '}';
  23. }
  24. }

做好了以上前置工作 下面我们开始编写主程序 求y页面pv的TOP N

编写主程序

大致分为以下三个步骤:

读取源数据 设置水位线

首先通过url进行聚合统计

再根据聚合统计的结果进行排序 求出Top N

详细步骤如下:

  • 读取数据 设置水位线
  • 根据url进行keyby分组
  • 设置窗口大小为10s 每5s 执行一次 设置窗口函数 统计各url被访问量
    • 通过 AggregateFunction 进行聚合统计
    • 通过 ProcessWindowFunction 输出聚合统计的结果 和 窗口信息 以便后续计算
  • 根据窗口时间进行keyby分组
  • 在窗口范围内对各数据进行排序
    • 来一个数据就把数据存储在状态变量中 (要利用到之前的数据进行排序 因此要用到状态变量)
    • 来一个数据就根据windowsEnd 注册定时器 (根据定时器的特性 同一个key的同一个时间戳不会被重复注册 因此这里相同的windowsEndtime只会只执行一次定时器,即一个窗口只会执行一次定时器)
      • 触发定时器
      • 触发定时器则说明在该水位线之前的数据已经全部到达 全部存储在了状态变量中
      • 把状态变量列表中的值全部赋值给java中List列表 进行排序
      • 将排序后的结果输出 out.collect()

 代码如下:

TopnTest.class
  1. package com.flink.wc.myflink.process;
  2. // 这三个是自定义的java类
  3. import com.flink.wc.myflink.bean.Event;
  4. import com.flink.wc.myflink.bean.UrlViewCount;
  5. import com.flink.wc.myflink.source_sink.ClickSource;
  6. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  7. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  8. import org.apache.flink.api.common.functions.AggregateFunction;
  9. import org.apache.flink.api.common.state.ListState;
  10. import org.apache.flink.api.common.state.ListStateDescriptor;
  11. import org.apache.flink.api.common.typeinfo.Types;
  12. import org.apache.flink.configuration.Configuration;
  13. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  14. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  15. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  16. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  17. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  18. import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
  19. import org.apache.flink.streaming.api.windowing.time.Time;
  20. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  21. import org.apache.flink.util.Collector;
  22. import java.sql.Timestamp;
  23. import java.time.Duration;
  24. import java.util.ArrayList;
  25. import java.util.Comparator;
  26. public class TopnTest {
  27. public static void main(String[] args) throws Exception {
  28. // step1 读取数据 并设置水位线
  29. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  30. env.setParallelism(1);
  31. DataStreamSource<Event> streamSource = env.addSource(new ClickSource());
  32. SingleOutputStreamOperator<Event> stream01 = streamSource.assignTimestampsAndWatermarks(
  33. WatermarkStrategy
  34. .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
  35. .withTimestampAssigner((SerializableTimestampAssigner<Event>) (x, l) -> x.time)
  36. );
  37. // step2 聚合统计 每个url的访问量
  38. SingleOutputStreamOperator<UrlViewCount> stream02 = stream01
  39. .keyBy(x -> x.url)
  40. .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  41. .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());
  42. stream02.print("stream02");
  43. // step3 合并排序各个url的访问量 计算出Top N
  44. stream02.keyBy(x -> x.windowEnd)
  45. .process(new TopN(2))
  46. .print();
  47. env.execute();
  48. }
  49. // 累加器 AggregateFunction<IN, ACC, OUT> extends Function, Serializable
  50. private static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
  51. @Override
  52. public Long createAccumulator() {
  53. return 0L;
  54. }
  55. @Override
  56. public Long add(Event event, Long aLong) {
  57. return aLong + 1;
  58. }
  59. @Override
  60. public Long getResult(Long aLong) {
  61. return aLong; // 返回UrlViewCountAgg 的结果 这里是url的计数count
  62. }
  63. @Override
  64. public Long merge(Long aLong, Long acc1) {
  65. return null;
  66. }
  67. }
  68. // 增量聚合函数的 getResult作为 全窗口函数的输入
  69. // just 为了输出窗口信息 ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction
  70. private static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {
  71. @Override
  72. // 这里的迭代器对象 Iterable<Long> 就是增量聚合函数UrlViewCountAgg 中累加器聚合的结果,即每个url的count
  73. public void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
  74. Long start = context.window().getStart();
  75. Long end = context.window().getEnd();
  76. // 这里的迭代器只有一个元素 就是聚合函数中增量聚合的结果
  77. out.collect(new UrlViewCount(url, elements.iterator().next(), start, end));
  78. }
  79. }
  80. private static class TopN extends KeyedProcessFunction<Long, UrlViewCount, String> {
  81. private Integer n;
  82. private ListState<UrlViewCount> urlViewCountListState;
  83. public TopN(Integer n){
  84. this.n = n;
  85. }
  86. @Override
  87. public void open(Configuration parmeters) throws Exception{
  88. // 从环境中获取列表状态句柄
  89. urlViewCountListState =
  90. getRuntimeContext()
  91. .getListState(new ListStateDescriptor<UrlViewCount>("url-view-count-list", Types.POJO(UrlViewCount.class)));
  92. }
  93. @Override
  94. public void processElement(UrlViewCount value, KeyedProcessFunction<Long, UrlViewCount, String>.Context ctx, Collector<String> out) throws Exception {
  95. // 来一个数据就加入状态列表中 UrlViewCount{url='./prod?id=2', count=3, windowStart=1678676045000, windowEnd=1678676055000}
  96. // System.out.println("value: " + value);
  97. urlViewCountListState.add(value);
  98. // 这个key是windowEnd 同一个窗口end 说明在同一个窗口 就在这个窗口排序 同一个定时器 过了时间windowEnd+1就执行
  99. ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);
  100. // System.out.println("ctx.getCurrentKey():" + ctx.getCurrentKey());
  101. }
  102. @Override
  103. public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception{
  104. ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();
  105. for (UrlViewCount urlViewCount : urlViewCountListState.get()){
  106. urlViewCountArrayList.add(urlViewCount);
  107. }
  108. urlViewCountListState.clear(); // 一个窗口内的数据已经全部赋值给了列表 把这个状态列表清空 让他去装其他窗口的数据(状态)
  109. // 接下来就对该窗口的列表进行排序咯
  110. // 也可以写为lambda表达式
  111. urlViewCountArrayList.sort(new Comparator<UrlViewCount>() {
  112. @Override
  113. public int compare(UrlViewCount o1, UrlViewCount o2) {
  114. return o2.count.intValue() - o1.count.intValue();
  115. }
  116. });
  117. // System.out.println("urlViewCountArrayList:" + urlViewCountArrayList);
  118. StringBuilder result = new StringBuilder();
  119. result.append("=================================================\n");
  120. result.append("窗口结束时间:").append(new Timestamp(timestamp - 1)).append("\n");
  121. for (int i = 0; i < this.n; i=i+1){ // 这个循环 关于i 有点问题!!!
  122. UrlViewCount UrlViewCount = urlViewCountArrayList.get(i);
  123. String info ="No." + (i +1) + " "
  124. + "url:" + UrlViewCount.url + " "
  125. +"浏览量:" + UrlViewCount.count + "\n";
  126. result.append(info);
  127. }
  128. result.append("=================================================\n");
  129. out.collect(result.toString());
  130. }
  131. }
  132. }

运行截图

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/772670
推荐阅读
相关标签
  

闽ICP备14008679号