当前位置:   article > 正文

Flink nc -l -p 监听端口测试

Flink nc -l -p 监听端口测试

1、9999端口未占用

netstat -apn|grep 9999

2、消息发送端

nc -l -k -p 9999
  1. {"user":"ming","url":"www.baidu1.com", "timestamp":1200L, "score":1}
  2. {"user":"xiaohu","url":"www.baidu5.com","timestamp":1267L, "score":10}
  3. {"user":"ming","url":"www.baidu7.com","timestamp":4200L, "score":9}
  4. {"user":"xiaohu","url":"www.baidu8.com","timestamp":5500L, "score":90}
  5. {"user":"Biu","url":"www.baidu8.com","timestamp":5500L, "score":1000}
  6. {"user":"ming","url":"www.baidu1.com", "timestamp":1717171200000, "score":1}
  7. {"user":"xiaohu","url":"www.baidu5.com","timestamp":1717171202000, "score":10}
  8. {"user":"ming","url":"www.baidu7.com","timestamp":1717171260000, "score":9}
  9. {"user":"xiaohu","url":"www.baidu8.com","timestamp":1717264860000, "score":90}
  10. {"user":"Biu","url":"www.baidu8.com","timestamp":1718780790000, "score":1000}

3、运行

周期性水位线

  1. import com.alibaba.fastjson2.JSONObject;
  2. import org.apache.flink.api.common.eventtime.*;
  3. import org.apache.flink.api.common.functions.MapFunction;
  4. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import java.sql.Timestamp;
  9. import java.util.ArrayList;
  10. /**
  11. * Description:
  12. * forMonotonousTimestamps->AscendingTimestampsWatermarks 有序流 -> 自定义断点式水位线(周期延迟时间=0ms)\
  13. * forBoundedOutOfOrderness->BoundedOutOfOrdernessWatermarks 无序流 -> 自定义周期性水位线
  14. */
  15. public class FlinkPeriodicWatermarkGeneratorTestJob {
  16. public static void main(String[] args) throws Exception {
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. // ArrayList<Event> list = new ArrayList<>();
  19. // list.add(new Event("ming","www.baidu1.com",1200L));
  20. // list.add(new Event("xiaohu","www.baidu5.com",1267L));
  21. // list.add(new Event("ming","www.baidu7.com",4200L));
  22. // list.add(new Event("xiaohu","www.baidu8.com",5500L));
  23. //
  24. // DataStreamSource<Event> ds = env.fromCollection(list, BasicTypeInfo.of(Event.class));
  25. DataStreamSource<String> dss = env.socketTextStream("test002", 9999);
  26. SingleOutputStreamOperator<Event> ds = dss.map(new MapFunction<String, Event>() {
  27. @Override
  28. public Event map(String value) throws Exception {
  29. Event event = new Event();
  30. event.toEvent(value);
  31. return event;
  32. }
  33. });
  34. // ds.print();
  35. SingleOutputStreamOperator<Event> watermarks = ds
  36. // AscendingTimestampsWatermarks 有序流 查看源码,实际上是延迟时间=0ms的乱序流
  37. // .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
  38. // BoundedOutOfOrdernessWatermarks 无序流 5ms固定延迟时间/表示最大乱序程度 处理乱序流数据
  39. .assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {
  40. @Override
  41. public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
  42. return new SerializableTimestampAssigner<Event>() {
  43. @Override
  44. public long extractTimestamp(Event element, long recordTimestamp) {
  45. return element.getTimestamp();
  46. }
  47. };
  48. }
  49. @Override
  50. public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
  51. return new WatermarkGenerator<Event>() {
  52. private Long delayTime = 5000L; // 延迟时间
  53. private Long maxTs = Long.MIN_VALUE + delayTime + 1L;
  54. @Override
  55. public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
  56. // 每来一条数据就调用一次
  57. maxTs = Math.max(event.timestamp, maxTs);// 更新最大时间戳
  58. }
  59. @Override
  60. public void onPeriodicEmit(WatermarkOutput output) {
  61. // 发射水位线,默认 200ms 调用一次 可以使用 env.getConfig().setAutoWatermarkInterval(60 * 1000L); 调整周期时间 flink时间窗口(左开,右闭]
  62. output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
  63. }
  64. };
  65. }
  66. });
  67. ds.print();
  68. env.setParallelism(1);
  69. env.execute();
  70. }
  71. public static class Event{
  72. String user;
  73. String url;
  74. Long timestamp;
  75. public Event(){
  76. }
  77. public Event(String user, String url, Long timestamp) {
  78. this.user = user;
  79. this.url = url;
  80. this.timestamp = timestamp;
  81. }
  82. public String getUser() {
  83. return user;
  84. }
  85. public String getUrl() {
  86. return url;
  87. }
  88. public Long getTimestamp() {
  89. return timestamp;
  90. }
  91. @Override
  92. public String toString() {
  93. return "Event{" +
  94. "user='" + user + '\'' +
  95. ", url='" + url + '\'' +
  96. ", timestamp=" + new Timestamp(timestamp) +
  97. '}';
  98. }
  99. public void toEvent(String val){
  100. JSONObject js = JSONObject.parseObject(val);
  101. this.user = js.getString("user");
  102. this.url = js.getString("url");
  103. this.timestamp = js.getLong("timestamp");
  104. }
  105. }
  106. }

断点式水位线

  1. import com.alibaba.fastjson2.JSONObject;
  2. import org.apache.flink.api.common.eventtime.*;
  3. import org.apache.flink.api.common.functions.MapFunction;
  4. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import java.sql.Timestamp;
  9. import java.util.ArrayList;
  10. /**
  11. * Description:
  12. * forMonotonousTimestamps->AscendingTimestampsWatermarks 有序流 -> 自定义断点式水位线(周期延迟时间=0ms)\
  13. * forBoundedOutOfOrderness->BoundedOutOfOrdernessWatermarks 无序流 -> 自定义周期性水位线
  14. */
  15. public class FlinkPunctuatedWatermarkGeneratorTestJob {
  16. public static void main(String[] args) throws Exception {
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. DataStreamSource<String> dss = env.socketTextStream("test002", 9999);
  19. SingleOutputStreamOperator<Event> ds = dss.map(new MapFunction<String, Event>() {
  20. @Override
  21. public Event map(String value) throws Exception {
  22. Event event = new Event();
  23. event.toEvent(value);
  24. return event;
  25. }
  26. });
  27. // ds.print();
  28. SingleOutputStreamOperator<Event> watermarks = ds
  29. // AscendingTimestampsWatermarks 有序流 查看源码,实际上是延迟时间=0ms的乱序流
  30. // .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
  31. // BoundedOutOfOrdernessWatermarks 无序流 5ms固定延迟时间/表示最大乱序程度 处理乱序流数据
  32. .assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {
  33. @Override
  34. public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
  35. return new SerializableTimestampAssigner<Event>() {
  36. @Override
  37. public long extractTimestamp(Event element, long recordTimestamp) {
  38. return element.getTimestamp();
  39. }
  40. };
  41. }
  42. @Override
  43. public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
  44. return new WatermarkGenerator<Event>() {
  45. @Override
  46. public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
  47. // 只有在遇到特定的 itemId 时,才发出水位线
  48. if (event.getUser().equals("Biu")) {
  49. output.emitWatermark(new Watermark(event.getTimestamp() - 1));
  50. }
  51. }
  52. @Override
  53. public void onPeriodicEmit(WatermarkOutput output) {
  54. // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线
  55. }
  56. };
  57. }
  58. });
  59. ds.print();
  60. env.setParallelism(1);
  61. env.execute();
  62. }
  63. public static class Event{
  64. String user;
  65. String url;
  66. Long timestamp;
  67. public Event(){
  68. }
  69. public Event(String user, String url, Long timestamp) {
  70. this.user = user;
  71. this.url = url;
  72. this.timestamp = timestamp;
  73. }
  74. public String getUser() {
  75. return user;
  76. }
  77. public String getUrl() {
  78. return url;
  79. }
  80. public Long getTimestamp() {
  81. return timestamp;
  82. }
  83. @Override
  84. public String toString() {
  85. return "Event{" +
  86. "user='" + user + '\'' +
  87. ", url='" + url + '\'' +
  88. ", timestamp=" + new Timestamp(timestamp) +
  89. '}';
  90. }
  91. public void toEvent(String val){
  92. JSONObject js = JSONObject.parseObject(val);
  93. this.user = js.getString("user");
  94. this.url = js.getString("url");
  95. this.timestamp = js.getLong("timestamp");
  96. }
  97. }
  98. }

4、打印

  1. 3> Event{user='ming', url='www.baidu1.com', timestamp=1970-01-01 08:00:01.2}
  2. 4> Event{user='xiaohu', url='www.baidu5.com', timestamp=1970-01-01 08:00:01.267}
  3. 5> Event{user='ming', url='www.baidu7.com', timestamp=1970-01-01 08:00:04.2}
  4. 6> Event{user='xiaohu', url='www.baidu8.com', timestamp=1970-01-01 08:00:05.5}

参考:

【Flink】Flink 中的时间和窗口之水位线(Watermark)-CSDN博客

Flink watermark_nc -lp 9999-CSDN博客

NoteWarehouse/05_BigData/09_Flink(1).md at main · FGL12321/NoteWarehouse · GitHub

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号