赞
踩
在学习了Flink之后,笔者通过以下案例对Flink API 进行简单复习
目录
编写主程序(点此跳转至代码)
以下数据 为某网站的访问日志 现要求通过以下数据 统计出最近10s内最热门的N个页面(即url链接),并且每5s更新一次;即求出页面访问量中的TOP N
{user='Alice', url='./home', time=1679043205254}
{user='Alice', url='./prod?id=1', time=1679043206254}
{user='Mary', url='./prod?id=2', time=1679043207256}
{user='Cary', url='./fav', time=1679043208257}
{user='Mary', url='./home', time=1679043209258}
{user='Cary', url='./prod?id=2', time=1679043210259}
......
为方便编写程序 我们把以上数据封装为Event类
Event.class
- package com.flink.wc.myflink.bean;
-
- public class Event {
- public String user;
-
- public String url;
- public long time;
-
- public Event() {
- }
-
- public Event(String user, String url, long time) {
- this.user = user;
- this.url = url;
- this.time = time;
- }
-
- @Override
- public String toString() {
- return "Event{" +
- "user='" + user + '\'' +
- ", url='" + url + '\'' +
- ", time=" + time +
- '}';
- }
- }
同时我们需要自定义源算子 模拟网站实时访问日志
ClickSource.class
- package com.flink.wc.myflink.source_sink;
-
- import com.flink.wc.myflink.bean.Event;
- import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
- import java.util.Calendar;
- import java.util.Random;
-
- public class ClickSource implements SourceFunction<Event> {
-
- private Boolean running = true;
-
- @Override
- public void run(SourceContext<Event> ctx) throws Exception {
- Random random = new Random();
- String[] users = {"Mary","Alice","Bob","Cary"};
- String[] urls = {"./home", "./cart","./fav", "./prod?id=1","./prod?id=2"};
-
- while (running) {
- ctx.collect(new Event(
- users[random.nextInt(users.length)], // user 和 url 随机组合
- urls[random.nextInt(urls.length)],
- Calendar.getInstance().getTimeInMillis() //getTimeInMillis 方法返回当前时间
- ));
-
- // 在这个循环中 每隔一秒 就collect(发送)一个数据
- Thread.sleep(1000);
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }
为方便程序输出,我们把输出结果封装为UrlViewCount类
UrlViewCount.class
- package com.flink.wc.myflink.bean;
-
- public class UrlViewCount {
- public String url;
- public Long count;
- public Long windowStart;
- public Long windowEnd;
-
- public UrlViewCount() {
- }
-
- public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) {
- this.url = url;
- this.count = count;
- this.windowStart = windowStart;
- this.windowEnd = windowEnd;
- }
-
- @Override
- public String toString() {
- return "UrlViewCount{" +
- "url='" + url + '\'' +
- ", count=" + count +
- ", windowStart=" + windowStart +
- ", windowEnd=" + windowEnd +
- '}';
- }
- }
做好了以上前置工作 下面我们开始编写主程序 求y页面pv的TOP N
大致分为以下三个步骤:
读取源数据 设置水位线
首先通过url进行聚合统计
再根据聚合统计的结果进行排序 求出Top N
详细步骤如下:
- 读取数据 设置水位线
- 根据url进行keyby分组
- 设置窗口大小为10s 每5s 执行一次 设置窗口函数 统计各url被访问量
- 通过 AggregateFunction 进行聚合统计
- 通过 ProcessWindowFunction 输出聚合统计的结果 和 窗口信息 以便后续计算
- 根据窗口时间进行keyby分组
- 在窗口范围内对各数据进行排序
- 来一个数据就把数据存储在状态变量中 (要利用到之前的数据进行排序 因此要用到状态变量)
- 来一个数据就根据windowsEnd 注册定时器 (根据定时器的特性 同一个key的同一个时间戳不会被重复注册 因此这里相同的windowsEndtime只会只执行一次定时器,即一个窗口只会执行一次定时器)
- 触发定时器
- 触发定时器则说明在该水位线之前的数据已经全部到达 全部存储在了状态变量中
- 把状态变量列表中的值全部赋值给java中List列表 进行排序
- 将排序后的结果输出 out.collect()
代码如下:
TopnTest.class
- package com.flink.wc.myflink.process;
-
- // 这三个是自定义的java类
- import com.flink.wc.myflink.bean.Event;
- import com.flink.wc.myflink.bean.UrlViewCount;
- import com.flink.wc.myflink.source_sink.ClickSource;
-
- import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.functions.AggregateFunction;
- import org.apache.flink.api.common.state.ListState;
- import org.apache.flink.api.common.state.ListStateDescriptor;
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
- import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
- import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
- import org.apache.flink.util.Collector;
-
- import java.sql.Timestamp;
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.Comparator;
-
- public class TopnTest {
-
- public static void main(String[] args) throws Exception {
-
- // step1 读取数据 并设置水位线
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- env.setParallelism(1);
-
- DataStreamSource<Event> streamSource = env.addSource(new ClickSource());
-
- SingleOutputStreamOperator<Event> stream01 = streamSource.assignTimestampsAndWatermarks(
- WatermarkStrategy
- .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
- .withTimestampAssigner((SerializableTimestampAssigner<Event>) (x, l) -> x.time)
- );
-
-
- // step2 聚合统计 每个url的访问量
- SingleOutputStreamOperator<UrlViewCount> stream02 = stream01
- .keyBy(x -> x.url)
- .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
- .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());
-
- stream02.print("stream02");
-
-
- // step3 合并排序各个url的访问量 计算出Top N
- stream02.keyBy(x -> x.windowEnd)
- .process(new TopN(2))
- .print();
-
- env.execute();
-
- }
-
-
- // 累加器 AggregateFunction<IN, ACC, OUT> extends Function, Serializable
- private static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
- @Override
- public Long createAccumulator() {
- return 0L;
- }
-
- @Override
- public Long add(Event event, Long aLong) {
- return aLong + 1;
- }
-
- @Override
- public Long getResult(Long aLong) {
- return aLong; // 返回UrlViewCountAgg 的结果 这里是url的计数count
- }
-
- @Override
- public Long merge(Long aLong, Long acc1) {
- return null;
- }
- }
-
- // 增量聚合函数的 getResult作为 全窗口函数的输入
- // just 为了输出窗口信息 ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction
- private static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {
-
- @Override
- // 这里的迭代器对象 Iterable<Long> 就是增量聚合函数UrlViewCountAgg 中累加器聚合的结果,即每个url的count
- public void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
-
- Long start = context.window().getStart();
- Long end = context.window().getEnd();
- // 这里的迭代器只有一个元素 就是聚合函数中增量聚合的结果
- out.collect(new UrlViewCount(url, elements.iterator().next(), start, end));
- }
- }
-
- private static class TopN extends KeyedProcessFunction<Long, UrlViewCount, String> {
-
- private Integer n;
- private ListState<UrlViewCount> urlViewCountListState;
-
- public TopN(Integer n){
- this.n = n;
- }
-
- @Override
- public void open(Configuration parmeters) throws Exception{
- // 从环境中获取列表状态句柄
- urlViewCountListState =
- getRuntimeContext()
- .getListState(new ListStateDescriptor<UrlViewCount>("url-view-count-list", Types.POJO(UrlViewCount.class)));
- }
-
- @Override
- public void processElement(UrlViewCount value, KeyedProcessFunction<Long, UrlViewCount, String>.Context ctx, Collector<String> out) throws Exception {
- // 来一个数据就加入状态列表中 UrlViewCount{url='./prod?id=2', count=3, windowStart=1678676045000, windowEnd=1678676055000}
- // System.out.println("value: " + value);
- urlViewCountListState.add(value);
-
- // 这个key是windowEnd 同一个窗口end 说明在同一个窗口 就在这个窗口排序 同一个定时器 过了时间windowEnd+1就执行
- ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);
- // System.out.println("ctx.getCurrentKey():" + ctx.getCurrentKey());
- }
-
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception{
- ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();
-
- for (UrlViewCount urlViewCount : urlViewCountListState.get()){
- urlViewCountArrayList.add(urlViewCount);
- }
-
- urlViewCountListState.clear(); // 一个窗口内的数据已经全部赋值给了列表 把这个状态列表清空 让他去装其他窗口的数据(状态)
-
- // 接下来就对该窗口的列表进行排序咯
- // 也可以写为lambda表达式
- urlViewCountArrayList.sort(new Comparator<UrlViewCount>() {
- @Override
- public int compare(UrlViewCount o1, UrlViewCount o2) {
- return o2.count.intValue() - o1.count.intValue();
- }
- });
-
- // System.out.println("urlViewCountArrayList:" + urlViewCountArrayList);
- StringBuilder result = new StringBuilder();
-
- result.append("=================================================\n");
- result.append("窗口结束时间:").append(new Timestamp(timestamp - 1)).append("\n");
- for (int i = 0; i < this.n; i=i+1){ // 这个循环 关于i 有点问题!!!
- UrlViewCount UrlViewCount = urlViewCountArrayList.get(i);
- String info ="No." + (i +1) + " "
- + "url:" + UrlViewCount.url + " "
- +"浏览量:" + UrlViewCount.count + "\n";
- result.append(info);
- }
- result.append("=================================================\n");
- out.collect(result.toString());
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。