赞
踩
做了那么多次flink统计,发现我居然没写过uv,pv统计(因为uv,pv实时统计,公共平台帮做了),最近找了一些资料当练手了。
public static final DateTimeFormatter TIME_FORMAT_YYYY_MM_DD_HHMMSS = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); ... Properties propsConsumer = ...//Kafka配置; env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); FlinkKafkaConsumer011<String> detailLog = new FlinkKafkaConsumer011<String>("test-topic", new SimpleStringSchema(), propsConsumer); detailLog.setStartFromLatest(); DataStream<String> detailStream = env.addSource(detailLog).name("uv-pv_log").disableChaining(); detailStream.print(); DataStream<Tuple2<UMessage,Integer>> detail = detailStream.map(new MapFunction<String, Tuple2<UMessage,Integer>>() { @Override public Tuple2<UMessage,Integer> map(String value) throws Exception { try { UMessage uMessage = JSON.parseObject(value, UMessage.class); return Tuple2.of(uMessage,1); } catch (Exception e) { e.printStackTrace(); } return Tuple2.of(null,null); } }).filter(s->s!=null&&s.f0!=null).assignTimestampsAndWatermarks( new AscendingTimestampExtractor<Tuple2<UMessage,Integer>>() { @Override public long extractAscendingTimestamp(Tuple2<UMessage, Integer> element) { LocalDate localDate=LocalDate.parse(element.f0.getCreateTime(),TIME_FORMAT_YYYY_MM_DD_HHMMSS); long timestamp = localDate.atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli(); return timestamp; } });
采用event时间,propsConsumer是kafka配置消息(忽略),UMessage的POJO类
import lombok.Builder; import lombok.Data; @Data @Builder public class UMessage { private String uid; private String createTime; public UMessage() { } public UMessage(String uid, String createTime) { this.uid = uid; this.createTime = createTime; } }
这段代码读数据源解析json文本,然后指定eventTime。
DataStream<Tuple2<String,Integer>> statsResult=detail.windowAll(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) // .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))) .trigger(CountTrigger.of(1)) .process(new ProcessAllWindowFunction<Tuple2<UMessage, Integer>, Tuple2<String,Integer>, TimeWindow>() { @Override public void process(Context context, Iterable<Tuple2<UMessage, Integer>> elements, Collector<Tuple2< String, Integer>> out) throws Exception { Set<String> uvNameSet=new HashSet<String>(); Integer pv=0; Iterator<Tuple2<UMessage,Integer>> mapIterator=elements.iterator(); while(mapIterator.hasNext()){ pv+=1; String uvName=mapIterator.next().f0.getUid(); uvNameSet.add(uvName); } out.collect(Tuple2.of("uv", uvNameSet.size())); out.collect(Tuple2.of("pv",pv)); } }); statsResult.print();
这里注意一点windowAll(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))),
从是每天凌晨开始计算的,跨天自动清空的。
发送原始消息
{"uid":"uid001","createTime":"2020-04-08 23:50:55"}
{"uid":"uid002","createTime":"2020-04-08 23:54:55"}
{"uid":"uid003","createTime":"2020-04-08 23:55:55"}
{"uid":"uid001","createTime":"2020-04-08 23:56:55"}
{"uid":"uid001","createTime":"2020-04-09 00:01:55"}
{"uid":"uid002","createTime":"2020-04-09 00:02:55"}
{"uid":"uid001","createTime":"2020-04-09 00:03:55"}
统计结果
1> {"uid":"uid001","createTime":"2020-04-08 23:50:55"} 1> (uv,1) 2> (pv,1) 1> {"uid":"uid002","createTime":"2020-04-08 23:54:55"} 4> (pv,2) 3> (uv,2) 1> {"uid":"uid003","createTime":"2020-04-08 23:55:55"} 6> (pv,3) 5> (uv,3) 1> {"uid":"uid001","createTime":"2020-04-08 23:56:55"} 2> (pv,4) 1> (uv,3) ##这里开始清空 1> {"uid":"uid001","createTime":"2020-04-09 00:01:55"} 3> (uv,1) 4> (pv,1) 1> {"uid":"uid002","createTime":"2020-04-09 00:02:55"} 5> (uv,2) 6> (pv,2) 1> {"uid":"uid001","createTime":"2020-04-09 00:03:55"} 2> (pv,3) 1> (uv,2)
发现在1> {"uid":"uid001","createTime":"2020-04-09 00:01:55"}
,uv和pv又从0开始计算了。
但是还是有问题的,原始数据特别大,TumblingEventTimeWindows会缓存一天数据,每次重新计算,内存吃不消啊。
采用keyBy,window,重新修改这段代码。
//写法二 DataStream<Tuple3<String,String, Integer>> statsResult=detail.keyBy(new KeySelector<Tuple2<UMessage, Integer>, String>() { @Override public String getKey(Tuple2<UMessage, Integer> value) throws Exception { return value.f0.getCreateTime().substring(0,10); } }).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) // .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))) .trigger(CountTrigger.of(1)) .evictor(TimeEvictor.of(Time.seconds(0), true)) .process(new ProcessWindowFunction<Tuple2<UMessage, Integer>, Tuple3<String,String, Integer>, String, TimeWindow>() { private transient MapState<String, String> uvCountState; private transient ValueState<Integer> pvCountState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.minutes(60 * 6)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build(); MapStateDescriptor<String, String> uvDescriptor = new MapStateDescriptor<String, String>("uv_count", String.class, String.class); ValueStateDescriptor<Integer> pvDescriptor= new ValueStateDescriptor<Integer>("pv_count", Integer.class); uvDescriptor.enableTimeToLive(ttlConfig); pvDescriptor.enableTimeToLive(ttlConfig); uvCountState=getRuntimeContext().getMapState(uvDescriptor); pvCountState=getRuntimeContext().getState(pvDescriptor); } @Override public void process(String key, Context context, Iterable<Tuple2<UMessage, Integer>> elements, Collector<Tuple3<String,String, Integer>> out) throws Exception { Integer pv=0; Iterator<Tuple2<UMessage,Integer>> mapIterator=elements.iterator(); while(mapIterator.hasNext()){ pv+=1; UMessage uMessage=mapIterator.next().f0; String uvName=uMessage.getUid(); uvCountState.put(uvName,null); } Integer uv=0; Iterator<String> uvIterator=uvCountState.keys().iterator(); while(uvIterator.hasNext()){ uvIterator.next(); uv+=1; } Integer originPv=pvCountState.value(); if(originPv==null){ pvCountState.update(pv); }else{ pvCountState.update(originPv+pv); } out.collect(Tuple3.of(key,"uv",uv)); out.collect(Tuple3.of(key,"pv",pvCountState.value())); } }); statsResult.print();
evictor(TimeEvictor.of(Time.seconds(0), true))
每次窗口计算剔除已经消息过数据。
发送消息测试
{"uid":"uid001","createTime":"2020-04-08 23:50:55"}
{"uid":"uid002","createTime":"2020-04-08 23:54:55"}
{"uid":"uid003","createTime":"2020-04-08 23:55:55"}
{"uid":"uid001","createTime":"2020-04-08 23:56:55"}
{"uid":"uid001","createTime":"2020-04-09 00:01:55"}
{"uid":"uid002","createTime":"2020-04-09 00:02:55"}
{"uid":"uid001","createTime":"2020-04-09 00:03:55"}
{"uid":"uid002","createTime":"2020-04-09 00:04:55"}
{"uid":"uid003","createTime":"2020-04-09 00:05:55"}
{"uid":"uid005","createTime":"2020-04-09 00:10:55"}
{"uid":"uid003","createTime":"2020-04-09 23:55:55"}
{"uid":"uid005","createTime":"2020-04-10 00:03:55"}
{"uid":"uid001","createTime":"2020-04-10 00:05:55"}
统计结果
1> {"uid":"uid001","createTime":"2020-04-08 23:50:55"} 1> (2020-04-08,uv,1) 1> (2020-04-08,pv,1) 1> {"uid":"uid002","createTime":"2020-04-08 23:54:55"} 1> (2020-04-08,uv,2) 1> (2020-04-08,pv,2) 1> {"uid":"uid003","createTime":"2020-04-08 23:55:55"} 1> (2020-04-08,uv,3) 1> (2020-04-08,pv,3) 1> {"uid":"uid001","createTime":"2020-04-08 23:56:55"} 1> (2020-04-08,uv,3) 1> (2020-04-08,pv,4) 1> {"uid":"uid001","createTime":"2020-04-09 00:01:55"} 2> (2020-04-09,uv,1) 2> (2020-04-09,pv,1) 1> {"uid":"uid002","createTime":"2020-04-09 00:02:55"} 2> (2020-04-09,uv,2) 2> (2020-04-09,pv,2) 1> {"uid":"uid001","createTime":"2020-04-09 00:03:55"} 2> (2020-04-09,uv,2) 2> (2020-04-09,pv,3) 1> {"uid":"uid002","createTime":"2020-04-09 00:04:55"} 2> (2020-04-09,uv,2) 2> (2020-04-09,pv,4) 1> {"uid":"uid003","createTime":"2020-04-09 00:05:55"} 2> (2020-04-09,uv,3) 2> (2020-04-09,pv,5) 1> {"uid":"uid005","createTime":"2020-04-09 00:10:55"} 2> (2020-04-09,uv,4) 2> (2020-04-09,pv,6) 1> {"uid":"uid003","createTime":"2020-04-09 23:55:55"} 2> (2020-04-09,uv,4) 2> (2020-04-09,pv,7) 1> {"uid":"uid005","createTime":"2020-04-10 00:03:55"} 1> (2020-04-10,uv,1) 1> (2020-04-10,pv,1) 1> {"uid":"uid001","createTime":"2020-04-10 00:05:55"} 1> (2020-04-10,uv,2) 1> (2020-04-10,pv,2)
UV如果很多,MapState太大了,而且要每次遍历。采用BoomFilter
- 写法三
import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; ..... DataStream<Tuple3<String, String, Integer>> statsResult = detail.keyBy(new KeySelector<Tuple2<UMessage, Integer>, String>() { @Override public String getKey(Tuple2<UMessage, Integer> value) throws Exception { return value.f0.getCreateTime().substring(0, 10); } }).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) // .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))) .trigger(CountTrigger.of(1)) .evictor(TimeEvictor.of(Time.seconds(0), true)) .process(new ProcessWindowFunction<Tuple2<UMessage, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() { private transient ValueState<BloomFilter<String>> boomFilterState; private transient ValueState<Integer> uvCountState; private transient ValueState<Integer> pvCountState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.minutes(60 * 6)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build(); ValueStateDescriptor<BloomFilter<String>> boomFilterDescriptor = new ValueStateDescriptor<BloomFilter<String>>("boom_filter", TypeInformation.of(new TypeHint<BloomFilter<String>>() { })); ValueStateDescriptor<Integer> pvDescriptor = new ValueStateDescriptor<Integer>("pv_count", Integer.class); ValueStateDescriptor<Integer> uvDescriptor = new ValueStateDescriptor<Integer>("uv_count", Integer.class); boomFilterDescriptor.enableTimeToLive(ttlConfig); pvDescriptor.enableTimeToLive(ttlConfig); uvDescriptor.enableTimeToLive(ttlConfig); boomFilterState = getRuntimeContext().getState(boomFilterDescriptor); pvCountState = getRuntimeContext().getState(pvDescriptor); uvCountState = getRuntimeContext().getState(uvDescriptor); } @Override public void process(String key, Context context, Iterable<Tuple2<UMessage, Integer>> elements, Collector<Tuple3<String, String, Integer>> out) throws Exception { Integer uv = uvCountState.value(); Integer pv = pvCountState.value(); BloomFilter<String> bloomFilter = boomFilterState.value(); if (bloomFilter == null) { bloomFilter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 10*1000*1000L); uv = 0; pv = 0; } Iterator<Tuple2<UMessage, Integer>> mapIterator = elements.iterator(); while (mapIterator.hasNext()) { pv += 1; UMessage uMessage = mapIterator.next().f0; String uid = uMessage.getUid(); if (!bloomFilter.mightContain(uid)) { bloomFilter.put(uid); //不包含就添加进去 uv += 1; } } boomFilterState.update(bloomFilter); uvCountState.update(uv); pvCountState.update(pv); out.collect(Tuple3.of(key, "uv", uv)); out.collect(Tuple3.of(key, "pv", pv)); } }); statsResult.print();
统计结果为
1> {"uid":"uid001","createTime":"2020-04-08 23:50:55"} 1> (2020-04-08,uv,1) 1> (2020-04-08,pv,1) 1> {"uid":"uid002","createTime":"2020-04-08 23:54:55"} 1> (2020-04-08,uv,2) 1> (2020-04-08,pv,2) 1> {"uid":"uid003","createTime":"2020-04-08 23:55:55"} 1> (2020-04-08,uv,3) 1> (2020-04-08,pv,3) 1> {"uid":"uid001","createTime":"2020-04-08 23:56:55"} 1> (2020-04-08,uv,3) 1> (2020-04-08,pv,4) 1> {"uid":"uid001","createTime":"2020-04-09 00:01:55"} 2> (2020-04-09,uv,1) 2> (2020-04-09,pv,1) 1> {"uid":"uid002","createTime":"2020-04-09 00:02:55"} 2> (2020-04-09,uv,2) 2> (2020-04-09,pv,2) 1> {"uid":"uid001","createTime":"2020-04-09 00:03:55"} 2> (2020-04-09,uv,2) 2> (2020-04-09,pv,3) 1> {"uid":"uid002","createTime":"2020-04-09 00:04:55"} 2> (2020-04-09,uv,2) 2> (2020-04-09,pv,4) 1> {"uid":"uid003","createTime":"2020-04-09 00:05:55"} 2> (2020-04-09,uv,3) 2> (2020-04-09,pv,5) 1> {"uid":"uid005","createTime":"2020-04-09 00:10:55"} 2> (2020-04-09,uv,4) 2> (2020-04-09,pv,6) 1> {"uid":"uid003","createTime":"2020-04-09 23:55:55"} 2> (2020-04-09,uv,4) 2> (2020-04-09,pv,7) 1> {"uid":"uid005","createTime":"2020-04-10 00:03:55"} 1> (2020-04-10,uv,1) 1> (2020-04-10,pv,1) 1> {"uid":"uid001","createTime":"2020-04-10 00:05:55"} 1> (2020-04-10,uv,2) 1> (2020-04-10,pv,2)
与写法二,统计结果一样。
https://www.cnblogs.com/Springmoon-venn/p/10919648.html
https://blog.csdn.net/u010271601/article/details/104868913
https://blog.csdn.net/u010271601/article/details/104833153
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。