当前位置:   article > 正文

JAVA实现Flink Table 基于事件时间的滑动窗口_flink 滑动窗口java实例

flink 滑动窗口java实例

JAVA实现Flink Table基于事件时间的滑动窗口代码样例

  1. package org.fenghua.example.table.windos;
  2. import org.apache.flink.api.common.typeinfo.TypeInformation;
  3. import org.apache.flink.api.common.typeinfo.Types;
  4. import org.apache.flink.streaming.api.TimeCharacteristic;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.table.api.Table;
  7. import org.apache.flink.table.api.TableEnvironment;
  8. import org.apache.flink.table.api.java.Slide;
  9. import org.apache.flink.table.api.java.StreamTableEnvironment;
  10. import org.apache.flink.table.descriptors.Json;
  11. import org.apache.flink.table.descriptors.Kafka;
  12. import org.apache.flink.table.descriptors.Rowtime;
  13. import org.apache.flink.table.descriptors.Schema;
  14. import org.apache.flink.types.Row;
  15. /**
  16. * 滑动窗口
  17. * {"id":2,"product":"test","amount":66,"createTime":"1637215294000"}
  18. *
  19. * @author: fenghua
  20. * @date: 2021/11/17
  21. */
  22. public class KafkaSlidingEventTimeWindos {
  23. private final static String SOURCE_TOPIC = "topic3";
  24. private final static String ZOOKEEPER_CONNECT = "127.0.0.1:2181";
  25. private final static String METADATA_BROKER_LIST = "127.0.0.1:9092";
  26. public static void main(String[] args) throws Exception {
  27. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  28. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  29. StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
  30. tEnv.connect(
  31. new Kafka()
  32. .version("0.10")
  33. .topic(SOURCE_TOPIC)
  34. .startFromLatest()
  35. .property("zookeeper.connect", ZOOKEEPER_CONNECT)
  36. .property("bootstrap.servers", METADATA_BROKER_LIST)
  37. )
  38. .withFormat(
  39. new Json()
  40. .schema(
  41. org.apache.flink.table.api.Types.ROW(
  42. new String[]{"id", "product", "amount","createTime"},
  43. new TypeInformation[]{
  44. Types.LONG,
  45. Types.STRING,
  46. Types.INT,
  47. Types.SQL_TIMESTAMP,
  48. }))
  49. .failOnMissingField(true)
  50. )
  51. .withSchema(
  52. new Schema()
  53. .field("id", Types.LONG)
  54. .field("product", Types.STRING)
  55. .field("amount", Types.INT)
  56. .field("rowTime", Types.SQL_TIMESTAMP)
  57. .rowtime(new Rowtime()
  58. .timestampsFromField("createTime")
  59. .watermarksPeriodicBounded(1000))
  60. )
  61. .inAppendMode()
  62. .registerTableSource("sourceTable");
  63. Table table1 = tEnv.sqlQuery("select id, rowTime from sourceTable");
  64. Table table2 = table1
  65. .window(Slide.over("20.second").every("10.second").on("rowTime").as("w"))
  66. .groupBy("id,w")
  67. .select("id,id.count");
  68. tEnv.toRetractStream(table2, Row.class).print();
  69. env.execute(" test ");
  70. }
  71. }

代码仓库https://gitee.com/xuguoxi/FlinkLearn/blob/master/src/main/java/org/fenghua/example/table/windos/KafkaSlidingEventTimeWindos.javaicon-default.png?t=LA92https://gitee.com/xuguoxi/FlinkLearn/blob/master/src/main/java/org/fenghua/example/table/windos/KafkaSlidingEventTimeWindos.java仓库里面有其他样例代码

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/274797
推荐阅读
相关标签
  

闽ICP备14008679号