赞
踩
1、9999端口未占用
netstat -apn|grep 9999
2、消息发送端
nc -l -k -p 9999
- {"user":"ming","url":"www.baidu1.com", "timestamp":1200L, "score":1}
- {"user":"xiaohu","url":"www.baidu5.com","timestamp":1267L, "score":10}
- {"user":"ming","url":"www.baidu7.com","timestamp":4200L, "score":9}
- {"user":"xiaohu","url":"www.baidu8.com","timestamp":5500L, "score":90}
- {"user":"Biu","url":"www.baidu8.com","timestamp":5500L, "score":1000}
-
- {"user":"ming","url":"www.baidu1.com", "timestamp":1717171200000, "score":1}
- {"user":"xiaohu","url":"www.baidu5.com","timestamp":1717171202000, "score":10}
- {"user":"ming","url":"www.baidu7.com","timestamp":1717171260000, "score":9}
- {"user":"xiaohu","url":"www.baidu8.com","timestamp":1717264860000, "score":90}
- {"user":"Biu","url":"www.baidu8.com","timestamp":1718780790000, "score":1000}
3、运行
周期性水位线
- import com.alibaba.fastjson2.JSONObject;
- import org.apache.flink.api.common.eventtime.*;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- import java.sql.Timestamp;
- import java.util.ArrayList;
-
- /**
- * Description:
- * forMonotonousTimestamps->AscendingTimestampsWatermarks 有序流 -> 自定义断点式水位线(周期延迟时间=0ms)\
- * forBoundedOutOfOrderness->BoundedOutOfOrdernessWatermarks 无序流 -> 自定义周期性水位线
- */
- public class FlinkPeriodicWatermarkGeneratorTestJob {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // ArrayList<Event> list = new ArrayList<>();
- // list.add(new Event("ming","www.baidu1.com",1200L));
- // list.add(new Event("xiaohu","www.baidu5.com",1267L));
- // list.add(new Event("ming","www.baidu7.com",4200L));
- // list.add(new Event("xiaohu","www.baidu8.com",5500L));
- //
- // DataStreamSource<Event> ds = env.fromCollection(list, BasicTypeInfo.of(Event.class));
-
- DataStreamSource<String> dss = env.socketTextStream("test002", 9999);
- SingleOutputStreamOperator<Event> ds = dss.map(new MapFunction<String, Event>() {
- @Override
- public Event map(String value) throws Exception {
- Event event = new Event();
- event.toEvent(value);
- return event;
- }
- });
- // ds.print();
- SingleOutputStreamOperator<Event> watermarks = ds
- // AscendingTimestampsWatermarks 有序流 查看源码,实际上是延迟时间=0ms的乱序流
- // .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
- // BoundedOutOfOrdernessWatermarks 无序流 5ms固定延迟时间/表示最大乱序程度 处理乱序流数据
- .assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {
- @Override
- public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
- return new SerializableTimestampAssigner<Event>() {
- @Override
- public long extractTimestamp(Event element, long recordTimestamp) {
- return element.getTimestamp();
- }
- };
- }
-
- @Override
- public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
- return new WatermarkGenerator<Event>() {
- private Long delayTime = 5000L; // 延迟时间
-
- private Long maxTs = Long.MIN_VALUE + delayTime + 1L;
-
- @Override
- public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
- // 每来一条数据就调用一次
- maxTs = Math.max(event.timestamp, maxTs);// 更新最大时间戳
- }
-
- @Override
- public void onPeriodicEmit(WatermarkOutput output) {
- // 发射水位线,默认 200ms 调用一次 可以使用 env.getConfig().setAutoWatermarkInterval(60 * 1000L); 调整周期时间 flink时间窗口(左开,右闭]
- output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
- }
- };
- }
- });
-
- ds.print();
-
- env.setParallelism(1);
- env.execute();
- }
-
- public static class Event{
- String user;
- String url;
- Long timestamp;
-
- public Event(){
- }
- public Event(String user, String url, Long timestamp) {
- this.user = user;
- this.url = url;
- this.timestamp = timestamp;
- }
-
- public String getUser() {
- return user;
- }
-
- public String getUrl() {
- return url;
- }
-
- public Long getTimestamp() {
- return timestamp;
- }
-
- @Override
- public String toString() {
- return "Event{" +
- "user='" + user + '\'' +
- ", url='" + url + '\'' +
- ", timestamp=" + new Timestamp(timestamp) +
- '}';
- }
-
- public void toEvent(String val){
- JSONObject js = JSONObject.parseObject(val);
- this.user = js.getString("user");
- this.url = js.getString("url");
- this.timestamp = js.getLong("timestamp");
- }
- }
- }

断点式水位线
- import com.alibaba.fastjson2.JSONObject;
- import org.apache.flink.api.common.eventtime.*;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- import java.sql.Timestamp;
- import java.util.ArrayList;
-
- /**
- * Description:
- * forMonotonousTimestamps->AscendingTimestampsWatermarks 有序流 -> 自定义断点式水位线(周期延迟时间=0ms)\
- * forBoundedOutOfOrderness->BoundedOutOfOrdernessWatermarks 无序流 -> 自定义周期性水位线
- */
- public class FlinkPunctuatedWatermarkGeneratorTestJob {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<String> dss = env.socketTextStream("test002", 9999);
- SingleOutputStreamOperator<Event> ds = dss.map(new MapFunction<String, Event>() {
- @Override
- public Event map(String value) throws Exception {
- Event event = new Event();
- event.toEvent(value);
- return event;
- }
- });
- // ds.print();
- SingleOutputStreamOperator<Event> watermarks = ds
- // AscendingTimestampsWatermarks 有序流 查看源码,实际上是延迟时间=0ms的乱序流
- // .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
- // BoundedOutOfOrdernessWatermarks 无序流 5ms固定延迟时间/表示最大乱序程度 处理乱序流数据
- .assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {
- @Override
- public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
- return new SerializableTimestampAssigner<Event>() {
- @Override
- public long extractTimestamp(Event element, long recordTimestamp) {
- return element.getTimestamp();
- }
- };
- }
-
- @Override
- public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
- return new WatermarkGenerator<Event>() {
- @Override
- public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
- // 只有在遇到特定的 itemId 时,才发出水位线
- if (event.getUser().equals("Biu")) {
- output.emitWatermark(new Watermark(event.getTimestamp() - 1));
- }
- }
-
- @Override
- public void onPeriodicEmit(WatermarkOutput output) {
- // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线
- }
- };
- }
- });
-
- ds.print();
-
- env.setParallelism(1);
- env.execute();
- }
-
- public static class Event{
- String user;
- String url;
- Long timestamp;
-
- public Event(){
- }
- public Event(String user, String url, Long timestamp) {
- this.user = user;
- this.url = url;
- this.timestamp = timestamp;
- }
-
- public String getUser() {
- return user;
- }
-
- public String getUrl() {
- return url;
- }
-
- public Long getTimestamp() {
- return timestamp;
- }
-
- @Override
- public String toString() {
- return "Event{" +
- "user='" + user + '\'' +
- ", url='" + url + '\'' +
- ", timestamp=" + new Timestamp(timestamp) +
- '}';
- }
-
- public void toEvent(String val){
- JSONObject js = JSONObject.parseObject(val);
- this.user = js.getString("user");
- this.url = js.getString("url");
- this.timestamp = js.getLong("timestamp");
- }
- }
- }
-
-

4、打印
- 3> Event{user='ming', url='www.baidu1.com', timestamp=1970-01-01 08:00:01.2}
- 4> Event{user='xiaohu', url='www.baidu5.com', timestamp=1970-01-01 08:00:01.267}
- 5> Event{user='ming', url='www.baidu7.com', timestamp=1970-01-01 08:00:04.2}
- 6> Event{user='xiaohu', url='www.baidu8.com', timestamp=1970-01-01 08:00:05.5}
参考:
【Flink】Flink 中的时间和窗口之水位线(Watermark)-CSDN博客
Flink watermark_nc -lp 9999-CSDN博客
NoteWarehouse/05_BigData/09_Flink(1).md at main · FGL12321/NoteWarehouse · GitHub
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。