当前位置:   article > 正文

java Flink(十四)Flink实战之电商用户行为分析之统计热门商品并用Flink table api实现 使用:map、aggregate、process、定时器、filter_java flink 实战

java flink 实战

继续学习了Fklink,今天在网上学习Flink的实战视频,跟着写了一下代码。

因为是学习,为了方便主要是用了读取文件的方式获取源数据,主要使用了滑动窗口、keyby、filter等功能。

题目:

统计某个时间段内,热门商品访问的前几名

首先是正常实现

1、目录结构:

1是存放bean对象类 2是放项目代码 3是文件路径

2、pom文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>UserBehavior</artifactId>
  8. <packaging>pom</packaging>
  9. <version>1.0-SNAPSHOT</version>
  10. <modules>
  11. <module>HottltemsAnalysis</module>
  12. </modules>
  13. <properties>
  14. <flink.version>1.10.1</flink.version>
  15. <scala.binary.version>2.12</scala.binary.version>
  16. <kafka.version>2.2.0</kafka.version>
  17. </properties>
  18. <dependencies>
  19. <dependency>
  20. <groupId>org.apache.flink</groupId>
  21. <artifactId>flink-java</artifactId>
  22. <version>${flink.version}</version>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.apache.flink</groupId>
  26. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  27. <version>${flink.version}</version>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.apache.kafka</groupId>
  31. <artifactId>kafka_${scala.binary.version}</artifactId>
  32. <version>${kafka.version}</version>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.apache.flink</groupId>
  36. <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  37. <version>${flink.version}</version>
  38. </dependency>
  39. </dependencies>
  40. </project>

3、数据源文件内容(部分)

  1. 464646,2512167,2733371,pv,1511658001
  2. 1007641,5046581,2355072,pv,1511658001
  3. 723938,4719377,1464116,pv,1511658001
  4. 513008,3472922,401357,pv,1511658001
  5. 769215,22738,2355072,pv,1511658002
  6. 652863,4967749,1320293,pv,1511658002
  7. 801610,900305,634390,pv,1511658002
  8. 411478,3259235,2667323,pv,1511658002
  9. 431664,764155,2520377,pv,1511658002
  10. 487768,4125503,2465336,pv,1511658002
  11. 223813,4104826,2042400,pv,1511658002
  12. 672849,1822977,4801426,fav,1511658002
  13. 550127,4602135,65362,pv,1511658002
  14. 205752,1467139,171529,pv,1511658002
  15. 64419,2029769,2729260,pv,1511658002
  16. 756093,2881426,2520377,pv,1511658002
  17. 48353,4362292,583014,pv,1511658002
  18. 355509,4712764,4082778,pv,1511658003
  19. 826492,4016552,2735466,pv,1511658003
  20. 624915,2243311,2520377,pv,1511658003
  21. 682317,655740,982926,fav,1511658003
  22. 677621,1051389,4801426,pv,1511658003
  23. 422974,4649255,4818107,pv,1511658003
  24. 86512,563566,4756105,pv,1511658003
  25. 565218,2331370,3607361,pv,1511658003
  26. 232313,4182588,1730376,pv,1511658003
  27. 436966,1329977,3607361,cart,1511658003
  28. 561158,269170,2342116,fav,1511658003

4、代码展示

首先是用于返回结果的包装类

  1. package Bean;
  2. //包装类 用于返回展示的类型
  3. public class ItemViewCount {
  4. private Long itemId;//分组Id
  5. private Long windowEnd;//窗口时间戳
  6. private Long count;//次数
  7. public ItemViewCount() {
  8. }
  9. public ItemViewCount(Long itemId, Long windowEnd, Long count) {
  10. this.itemId = itemId;
  11. this.windowEnd = windowEnd;
  12. this.count = count;
  13. }
  14. public Long getItemId() {
  15. return itemId;
  16. }
  17. public void setItemId(Long itemId) {
  18. this.itemId = itemId;
  19. }
  20. public Long getWindowEnd() {
  21. return windowEnd;
  22. }
  23. public void setWindowEnd(Long windowEnd) {
  24. this.windowEnd = windowEnd;
  25. }
  26. public Long getCount() {
  27. return count;
  28. }
  29. public void setCount(Long count) {
  30. this.count = count;
  31. }
  32. @Override
  33. public String toString() {
  34. return "ItemViewCount{" +
  35. "itemId=" + itemId +
  36. ", windowEnd=" + windowEnd +
  37. ", count=" + count +
  38. '}';
  39. }
  40. }

 

然后是用于接收数据的包装类

  1. package Bean;
  2. public class UserBehavior {
  3. //定义私有属性
  4. private Long userId; //用户Id
  5. private Long itemId; //分组Id
  6. private Integer categoryId; //类别Id
  7. private String behavior; //动作类型
  8. private Long timestamp; //时间戳
  9. public UserBehavior() {
  10. }
  11. public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {
  12. this.userId = userId;
  13. this.itemId = itemId;
  14. this.categoryId = categoryId;
  15. this.behavior = behavior;
  16. this.timestamp = timestamp;
  17. }
  18. public Long getUserId() {
  19. return userId;
  20. }
  21. public void setUserId(Long userId) {
  22. this.userId = userId;
  23. }
  24. public Long getItemId() {
  25. return itemId;
  26. }
  27. public void setItemId(Long itemId) {
  28. this.itemId = itemId;
  29. }
  30. public Integer getCategoryId() {
  31. return categoryId;
  32. }
  33. public void setCategoryId(Integer categoryId) {
  34. this.categoryId = categoryId;
  35. }
  36. public String getBehavior() {
  37. return behavior;
  38. }
  39. public void setBehavior(String behavior) {
  40. this.behavior = behavior;
  41. }
  42. public Long getTimestamp() {
  43. return timestamp;
  44. }
  45. public void setTimestamp(Long timestamp) {
  46. this.timestamp = timestamp;
  47. }
  48. @Override
  49. public String toString() {
  50. return "UserBehavior{" +
  51. "userId=" + userId +
  52. ", itemId=" + itemId +
  53. ", categoryId=" + categoryId +
  54. ", behavior='" + behavior + '\'' +
  55. ", timestamp=" + timestamp +
  56. '}';
  57. }
  58. }

项目代码

  1. package Project;
  2. import Bean.ItemViewCount;
  3. import Bean.UserBehavior;
  4. import org.apache.commons.compress.utils.Lists;
  5. import org.apache.flink.api.common.functions.AggregateFunction;
  6. import org.apache.flink.api.common.functions.FilterFunction;
  7. import org.apache.flink.api.common.functions.MapFunction;
  8. import org.apache.flink.api.common.state.ListState;
  9. import org.apache.flink.api.common.state.ListStateDescriptor;
  10. import org.apache.flink.api.java.tuple.Tuple;
  11. import org.apache.flink.configuration.Configuration;
  12. import org.apache.flink.streaming.api.TimeCharacteristic;
  13. import org.apache.flink.streaming.api.datastream.DataStream;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  16. import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
  17. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
  18. import org.apache.flink.streaming.api.windowing.time.Time;
  19. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  20. import org.apache.flink.util.Collector;
  21. import java.sql.Timestamp;
  22. import java.util.ArrayList;
  23. import java.util.Comparator;
  24. /**
  25. * 热门商品统计
  26. */
  27. public class HotItems {
  28. public static void main(String[] args) throws Exception{
  29. //创建执行环境
  30. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  31. //并行度
  32. env.setParallelism(1);
  33. //设置时间语义
  34. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  35. //读取数据,创建DataStream数据流(这里我们可以用kafka 但是为了方便我们使用读取文件)
  36. DataStream<String> inputStream = env.readTextFile("D:\\idle\\UserBehavior\\HottltemsAnalysis\\src\\main\\resources\\UserBehavior.csv");
  37. //转换为POJO,分配时间戳和watermark
  38. DataStream<UserBehavior> dataStream = inputStream
  39. .map(new MapFunction<String, UserBehavior>() {
  40. public UserBehavior map(String s) throws Exception {
  41. String[] fields = s.split(",");
  42. return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
  43. }
  44. }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
  45. @Override
  46. public long extractAscendingTimestamp(UserBehavior element) {
  47. return element.getTimestamp()*1000L;
  48. }
  49. });
  50. //分组开窗聚合,得到每个窗口内各个商品的count值
  51. DataStream<ItemViewCount> windowAggStream = dataStream
  52. .filter(new FilterFunction<UserBehavior>() {
  53. public boolean filter(UserBehavior userBehavior) throws Exception {
  54. //过滤pv行为
  55. return "pv".equals(userBehavior.getBehavior());
  56. }
  57. })
  58. .keyBy("itemId")//按商品ID分组
  59. .timeWindow(Time.hours(1), Time.minutes(5))//开启滑动窗口,一小时一个窗口,每五分钟滑动一次
  60. .aggregate(new ItemCountAgg(), new WindowItemCountResult());//聚合操作,传入增量聚合函数以及全窗口函数用来包装信息
  61. //收集同一窗口的所有商品count数据,排序输出top n
  62. DataStream<String> resultStream = windowAggStream
  63. .keyBy("windowEnd")//按照窗口分组
  64. .process(new TopNHotItems(5)); //用自定义函数排序取前5
  65. resultStream.print();
  66. env.execute("hot items analysis");
  67. }
  68. //实现自定义增量聚合函数
  69. public static class ItemCountAgg implements AggregateFunction<UserBehavior, Long, Long>{
  70. public Long createAccumulator() {
  71. return 0L;
  72. }
  73. public Long add(UserBehavior userBehavior, Long aLong) {
  74. return aLong+1;
  75. }
  76. public Long getResult(Long aLong) {
  77. return aLong;
  78. }
  79. public Long merge(Long aLong, Long acc1) {
  80. return aLong+acc1;
  81. }
  82. }
  83. //自定义全窗口函数
  84. public static class WindowItemCountResult implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow>{
  85. public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
  86. Long itemId = tuple.getField(0);
  87. Long windowEnd = window.getEnd();
  88. Long count = input.iterator().next(); //这里是从ItemCountAgg聚合的结果中拿出数据
  89. out.collect(new ItemViewCount(itemId, windowEnd, count));
  90. }
  91. }
  92. //自定义实现process function
  93. public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String>{
  94. //定义属性 top n 大小
  95. private Integer topSize;
  96. public TopNHotItems(Integer topSize) {
  97. this.topSize = topSize;
  98. }
  99. //定义列表状态,保存当前窗口内所有输出的ItemViewCount
  100. ListState<ItemViewCount> itemViewCountListState;
  101. //通过上下文注册状态列表
  102. @Override
  103. public void open(Configuration parameters) throws Exception {
  104. itemViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("item-view-count-list", ItemViewCount.class));
  105. }
  106. public void processElement(ItemViewCount value, Context ctx, Collector<String> out) throws Exception {
  107. //每来一条数据,存入状态
  108. itemViewCountListState.add(value);
  109. //并注册定时器
  110. ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);
  111. }
  112. @Override
  113. public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
  114. //定时器出发,当前已收集到所有数据,排序输出
  115. ArrayList<ItemViewCount> itemViewCounts = Lists.newArrayList(itemViewCountListState.get().iterator());
  116. itemViewCounts.sort(new Comparator<ItemViewCount>() {
  117. public int compare(ItemViewCount o1, ItemViewCount o2) {
  118. return o2.getCount().intValue()-o1.getCount().intValue();
  119. }
  120. });
  121. //排名信息格式化成string,方便打印输出
  122. StringBuilder resultBuilder = new StringBuilder();
  123. resultBuilder.append("==========");
  124. resultBuilder.append("窗口结束时间: ").append( new Timestamp(timestamp-1)).append("\n");
  125. //遍历列表,取topN输出
  126. for(int i = 0;i<Math.min(topSize,itemViewCounts.size());i++){
  127. ItemViewCount currentItemViewCount = itemViewCounts.get(i);
  128. resultBuilder.append("NO ").append(i+1).append(":")
  129. .append(" ID=").append(currentItemViewCount.getItemId())
  130. .append("热门度: ").append(currentItemViewCount.getCount())
  131. .append("\n");
  132. }
  133. resultBuilder.append("==========\n");
  134. Thread.sleep(1000L);
  135. out.collect(resultBuilder.toString());
  136. }
  137. }
  138. }

5、结果展示

 

用Flink table api实现 

增加依赖:

代码:

  1. package Project;
  2. import Bean.UserBehavior;
  3. import javafx.scene.control.Tab;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.streaming.api.TimeCharacteristic;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
  9. import org.apache.flink.table.api.EnvironmentSettings;
  10. import org.apache.flink.table.api.Slide;
  11. import org.apache.flink.table.api.Table;
  12. import org.apache.flink.table.api.java.StreamTableEnvironment;
  13. import org.apache.flink.types.Row;
  14. import org.omg.CORBA.Environment;
  15. public class HotitemsWithSql {
  16. public static void main(String[] args) throws Exception{
  17. //创建执行环境
  18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19. //并行度
  20. env.setParallelism(1);
  21. //设置时间语义
  22. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  23. //读取数据,创建DataStream数据流(这里我们可以用kafka 但是为了方便我们使用读取文件)
  24. DataStream<String> inputStream = env.readTextFile("D:\\idle\\UserBehavior\\HottltemsAnalysis\\src\\main\\resources\\UserBehavior.csv");
  25. //转换为POJO,分配时间戳和watermark
  26. DataStream<UserBehavior> dataStream = inputStream
  27. .map(new MapFunction<String, UserBehavior>() {
  28. public UserBehavior map(String s) throws Exception {
  29. String[] fields = s.split(",");
  30. return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
  31. }
  32. }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
  33. @Override
  34. public long extractAscendingTimestamp(UserBehavior element) {
  35. return element.getTimestamp()*1000L;
  36. }
  37. });
  38. //创建表执行环境 blink版本
  39. EnvironmentSettings settings = EnvironmentSettings.newInstance()
  40. .useBlinkPlanner()
  41. .inStreamingMode()
  42. .build();
  43. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
  44. //将流转换成表 并筛选想要的数据
  45. Table dataTable = tableEnv.fromDataStream(dataStream, "itemId, behavior, timestamp.rowtime as ts");
  46. //分组开窗
  47. //table api
  48. Table windowAggTable = dataTable
  49. .filter("behavior='pv'")
  50. .window(Slide.over("1.hours").every("5.minutes").on("ts").as("w"))
  51. .groupBy("itemId, w")
  52. .select("itemId, w.end as windowEnd, itemId.count as cnt");
  53. //利用开窗函数 对count值进行排序并获取row Number 得到TopN
  54. //SQL
  55. DataStream<Row> aggStream = tableEnv.toAppendStream(windowAggTable, Row.class);
  56. tableEnv.createTemporaryView("agg", aggStream, "itemId, windowEnd, cnt");
  57. //分组排序
  58. Table resultTable = tableEnv.sqlQuery("select * from "+
  59. " ( select *, ROW_NUMBER() over (partition by windowEnd order by cnt desc) as row_num from agg)"+
  60. "where row_num <= 5");
  61. tableEnv.toRetractStream(resultTable, Row.class).print();
  62. env.execute("hot items with sql job");
  63. }
  64. }

结果展示:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/767704
推荐阅读
相关标签