赞
踩
继续学习了Fklink,今天在网上学习Flink的实战视频,跟着写了一下代码。
因为是学习,为了方便主要是用了读取文件的方式获取源数据,主要使用了滑动窗口、keyby、filter等功能。
统计某个时间段内,热门商品访问的前几名
首先是正常实现
1、目录结构:
1是存放bean对象类 2是放项目代码 3是文件路径
2、pom文件
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>org.example</groupId>
- <artifactId>UserBehavior</artifactId>
- <packaging>pom</packaging>
- <version>1.0-SNAPSHOT</version>
- <modules>
- <module>HottltemsAnalysis</module>
- </modules>
-
- <properties>
- <flink.version>1.10.1</flink.version>
- <scala.binary.version>2.12</scala.binary.version>
- <kafka.version>2.2.0</kafka.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
- <version>${kafka.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- </dependencies>
-
- </project>
3、数据源文件内容(部分)
464646,2512167,2733371,pv,1511658001 1007641,5046581,2355072,pv,1511658001 723938,4719377,1464116,pv,1511658001 513008,3472922,401357,pv,1511658001 769215,22738,2355072,pv,1511658002 652863,4967749,1320293,pv,1511658002 801610,900305,634390,pv,1511658002 411478,3259235,2667323,pv,1511658002 431664,764155,2520377,pv,1511658002 487768,4125503,2465336,pv,1511658002 223813,4104826,2042400,pv,1511658002 672849,1822977,4801426,fav,1511658002 550127,4602135,65362,pv,1511658002 205752,1467139,171529,pv,1511658002 64419,2029769,2729260,pv,1511658002 756093,2881426,2520377,pv,1511658002 48353,4362292,583014,pv,1511658002 355509,4712764,4082778,pv,1511658003 826492,4016552,2735466,pv,1511658003 624915,2243311,2520377,pv,1511658003 682317,655740,982926,fav,1511658003 677621,1051389,4801426,pv,1511658003 422974,4649255,4818107,pv,1511658003 86512,563566,4756105,pv,1511658003 565218,2331370,3607361,pv,1511658003 232313,4182588,1730376,pv,1511658003 436966,1329977,3607361,cart,1511658003 561158,269170,2342116,fav,1511658003
4、代码展示
首先是用于返回结果的包装类
- package Bean;
-
- //包装类 用于返回展示的类型
- public class ItemViewCount {
- private Long itemId;//分组Id
- private Long windowEnd;//窗口时间戳
- private Long count;//次数
-
- public ItemViewCount() {
- }
-
- public ItemViewCount(Long itemId, Long windowEnd, Long count) {
- this.itemId = itemId;
- this.windowEnd = windowEnd;
- this.count = count;
- }
-
- public Long getItemId() {
- return itemId;
- }
-
- public void setItemId(Long itemId) {
- this.itemId = itemId;
- }
-
- public Long getWindowEnd() {
- return windowEnd;
- }
-
- public void setWindowEnd(Long windowEnd) {
- this.windowEnd = windowEnd;
- }
-
- public Long getCount() {
- return count;
- }
-
- public void setCount(Long count) {
- this.count = count;
- }
-
- @Override
- public String toString() {
- return "ItemViewCount{" +
- "itemId=" + itemId +
- ", windowEnd=" + windowEnd +
- ", count=" + count +
- '}';
- }
- }
然后是用于接收数据的包装类
- package Bean;
-
- public class UserBehavior {
- //定义私有属性
- private Long userId; //用户Id
- private Long itemId; //分组Id
- private Integer categoryId; //类别Id
- private String behavior; //动作类型
- private Long timestamp; //时间戳
-
- public UserBehavior() {
- }
-
- public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {
- this.userId = userId;
- this.itemId = itemId;
- this.categoryId = categoryId;
- this.behavior = behavior;
- this.timestamp = timestamp;
- }
-
- public Long getUserId() {
- return userId;
- }
-
- public void setUserId(Long userId) {
- this.userId = userId;
- }
-
- public Long getItemId() {
- return itemId;
- }
-
- public void setItemId(Long itemId) {
- this.itemId = itemId;
- }
-
- public Integer getCategoryId() {
- return categoryId;
- }
-
- public void setCategoryId(Integer categoryId) {
- this.categoryId = categoryId;
- }
-
- public String getBehavior() {
- return behavior;
- }
-
- public void setBehavior(String behavior) {
- this.behavior = behavior;
- }
-
- public Long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(Long timestamp) {
- this.timestamp = timestamp;
- }
-
- @Override
- public String toString() {
- return "UserBehavior{" +
- "userId=" + userId +
- ", itemId=" + itemId +
- ", categoryId=" + categoryId +
- ", behavior='" + behavior + '\'' +
- ", timestamp=" + timestamp +
- '}';
- }
- }
项目代码
- package Project;
-
- import Bean.ItemViewCount;
- import Bean.UserBehavior;
- import org.apache.commons.compress.utils.Lists;
- import org.apache.flink.api.common.functions.AggregateFunction;
- import org.apache.flink.api.common.functions.FilterFunction;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.state.ListState;
- import org.apache.flink.api.common.state.ListStateDescriptor;
- import org.apache.flink.api.java.tuple.Tuple;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.TimeCharacteristic;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
- import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
- import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
- import org.apache.flink.util.Collector;
-
- import java.sql.Timestamp;
- import java.util.ArrayList;
- import java.util.Comparator;
-
- /**
- * 热门商品统计
- */
- public class HotItems {
- public static void main(String[] args) throws Exception{
- //创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- //并行度
- env.setParallelism(1);
- //设置时间语义
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- //读取数据,创建DataStream数据流(这里我们可以用kafka 但是为了方便我们使用读取文件)
- DataStream<String> inputStream = env.readTextFile("D:\\idle\\UserBehavior\\HottltemsAnalysis\\src\\main\\resources\\UserBehavior.csv");
- //转换为POJO,分配时间戳和watermark
- DataStream<UserBehavior> dataStream = inputStream
- .map(new MapFunction<String, UserBehavior>() {
- public UserBehavior map(String s) throws Exception {
- String[] fields = s.split(",");
- return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
- }
- }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
- @Override
- public long extractAscendingTimestamp(UserBehavior element) {
- return element.getTimestamp()*1000L;
- }
- });
-
- //分组开窗聚合,得到每个窗口内各个商品的count值
- DataStream<ItemViewCount> windowAggStream = dataStream
- .filter(new FilterFunction<UserBehavior>() {
- public boolean filter(UserBehavior userBehavior) throws Exception {
- //过滤pv行为
- return "pv".equals(userBehavior.getBehavior());
- }
- })
- .keyBy("itemId")//按商品ID分组
- .timeWindow(Time.hours(1), Time.minutes(5))//开启滑动窗口,一小时一个窗口,每五分钟滑动一次
- .aggregate(new ItemCountAgg(), new WindowItemCountResult());//聚合操作,传入增量聚合函数以及全窗口函数用来包装信息
- //收集同一窗口的所有商品count数据,排序输出top n
- DataStream<String> resultStream = windowAggStream
- .keyBy("windowEnd")//按照窗口分组
- .process(new TopNHotItems(5)); //用自定义函数排序取前5
- resultStream.print();
- env.execute("hot items analysis");
- }
- //实现自定义增量聚合函数
- public static class ItemCountAgg implements AggregateFunction<UserBehavior, Long, Long>{
- public Long createAccumulator() {
- return 0L;
- }
-
- public Long add(UserBehavior userBehavior, Long aLong) {
- return aLong+1;
- }
-
- public Long getResult(Long aLong) {
- return aLong;
- }
-
- public Long merge(Long aLong, Long acc1) {
- return aLong+acc1;
- }
- }
- //自定义全窗口函数
- public static class WindowItemCountResult implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow>{
- public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
- Long itemId = tuple.getField(0);
- Long windowEnd = window.getEnd();
- Long count = input.iterator().next(); //这里是从ItemCountAgg聚合的结果中拿出数据
-
- out.collect(new ItemViewCount(itemId, windowEnd, count));
- }
- }
- //自定义实现process function
- public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String>{
- //定义属性 top n 大小
- private Integer topSize;
-
- public TopNHotItems(Integer topSize) {
- this.topSize = topSize;
- }
- //定义列表状态,保存当前窗口内所有输出的ItemViewCount
- ListState<ItemViewCount> itemViewCountListState;
-
- //通过上下文注册状态列表
- @Override
- public void open(Configuration parameters) throws Exception {
- itemViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("item-view-count-list", ItemViewCount.class));
- }
-
- public void processElement(ItemViewCount value, Context ctx, Collector<String> out) throws Exception {
- //每来一条数据,存入状态
- itemViewCountListState.add(value);
- //并注册定时器
- ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);
- }
-
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
- //定时器出发,当前已收集到所有数据,排序输出
- ArrayList<ItemViewCount> itemViewCounts = Lists.newArrayList(itemViewCountListState.get().iterator());
- itemViewCounts.sort(new Comparator<ItemViewCount>() {
- public int compare(ItemViewCount o1, ItemViewCount o2) {
- return o2.getCount().intValue()-o1.getCount().intValue();
- }
- });
- //排名信息格式化成string,方便打印输出
- StringBuilder resultBuilder = new StringBuilder();
- resultBuilder.append("==========");
- resultBuilder.append("窗口结束时间: ").append( new Timestamp(timestamp-1)).append("\n");
- //遍历列表,取topN输出
- for(int i = 0;i<Math.min(topSize,itemViewCounts.size());i++){
- ItemViewCount currentItemViewCount = itemViewCounts.get(i);
- resultBuilder.append("NO ").append(i+1).append(":")
- .append(" ID=").append(currentItemViewCount.getItemId())
- .append("热门度: ").append(currentItemViewCount.getCount())
- .append("\n");
- }
- resultBuilder.append("==========\n");
- Thread.sleep(1000L);
- out.collect(resultBuilder.toString());
- }
- }
-
- }
5、结果展示
增加依赖:
代码:
- package Project;
-
- import Bean.UserBehavior;
- import javafx.scene.control.Tab;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.streaming.api.TimeCharacteristic;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
- import org.apache.flink.table.api.EnvironmentSettings;
- import org.apache.flink.table.api.Slide;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.java.StreamTableEnvironment;
- import org.apache.flink.types.Row;
- import org.omg.CORBA.Environment;
-
- public class HotitemsWithSql {
- public static void main(String[] args) throws Exception{
- //创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- //并行度
- env.setParallelism(1);
- //设置时间语义
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- //读取数据,创建DataStream数据流(这里我们可以用kafka 但是为了方便我们使用读取文件)
- DataStream<String> inputStream = env.readTextFile("D:\\idle\\UserBehavior\\HottltemsAnalysis\\src\\main\\resources\\UserBehavior.csv");
- //转换为POJO,分配时间戳和watermark
- DataStream<UserBehavior> dataStream = inputStream
- .map(new MapFunction<String, UserBehavior>() {
- public UserBehavior map(String s) throws Exception {
- String[] fields = s.split(",");
- return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
- }
- }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
- @Override
- public long extractAscendingTimestamp(UserBehavior element) {
- return element.getTimestamp()*1000L;
- }
- });
- //创建表执行环境 blink版本
- EnvironmentSettings settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
- .inStreamingMode()
- .build();
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
- //将流转换成表 并筛选想要的数据
- Table dataTable = tableEnv.fromDataStream(dataStream, "itemId, behavior, timestamp.rowtime as ts");
- //分组开窗
- //table api
- Table windowAggTable = dataTable
- .filter("behavior='pv'")
- .window(Slide.over("1.hours").every("5.minutes").on("ts").as("w"))
- .groupBy("itemId, w")
- .select("itemId, w.end as windowEnd, itemId.count as cnt");
- //利用开窗函数 对count值进行排序并获取row Number 得到TopN
- //SQL
- DataStream<Row> aggStream = tableEnv.toAppendStream(windowAggTable, Row.class);
- tableEnv.createTemporaryView("agg", aggStream, "itemId, windowEnd, cnt");
- //分组排序
- Table resultTable = tableEnv.sqlQuery("select * from "+
- " ( select *, ROW_NUMBER() over (partition by windowEnd order by cnt desc) as row_num from agg)"+
- "where row_num <= 5");
- tableEnv.toRetractStream(resultTable, Row.class).print();
- env.execute("hot items with sql job");
-
-
- }
- }
结果展示:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。