赞
踩
<properties> <flink.version>1.17.0</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>1.17-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-changelog</artifactId> <version>${flink.version}</version> <scope>runtime</scope> </dependency> </dependencies>
import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.functions.*; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.SQLException; import java.time.Duration; import java.util.Arrays; import java.util.HashSet; import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH; public class ModuleFlink { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); //开启最终检查点 configuration.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); //自动模式由程序根据输入数据源是否有界,来自动选择执行模式 //env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //全局并行度,不设即默认机器核心数为并行度 env.setParallelism(1); //全局禁用合并算子链 //env.disableOperatorChaining(); // 使用 hashmap状态后端 HashMapStateBackend hashMapStateBackend = new HashMapStateBackend(); env.setStateBackend(hashMapStateBackend); // 使用 rocksdb状态后端,true表示开始增量备份 EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); env.setStateBackend(embeddedRocksDBStateBackend); //开启 Changelog env.enableChangelogStateBackend(true); // 代码中用到hdfs,需要导入hadoop依赖、指定访问hdfs的用户名 System.setProperty("HADOOP_USER_NAME", "hadoop"); //检查点常用配置 // 1、启用检查点 env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE );//默认是barrier对齐的,默认周期500ms,周期为5s, 精准一次 //env.enableCheckpointing(5000,CheckpointingMode.AT_LEAST_ONCE);//默认是barrier对齐的,默认周期500ms,周期为5s, 至少一次 // 2、指定检查点的存储位置 CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setCheckpointStorage("hdfs://node:8020/chk"); // 3、checkpoint的超时时间: 默认10分钟 checkpointConfig.setCheckpointTimeout(60000); // 4、同时运行中的checkpoint的最大数量 checkpointConfig.setMaxConcurrentCheckpoints(1); // 5、最小等待间隔: 上一轮checkpoint结束 到 下一轮checkpoint开始 之间的时间间隔 checkpointConfig.setMinPauseBetweenCheckpoints(1000); // 6、取消作业时,checkpoint的数据 是否保留在外部系统 // DELETE_ON_CANCELLATION:主动cancel时,删除存在外部系统的chk-xx目录 (如果是程序突然挂掉,不会删) // RETAIN_ON_CANCELLATION:主动cancel时,外部系统的chk-xx目录会保存下来 checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 7、允许 checkpoint 连续失败的次数,默认0次,表示checkpoint一但失败,job就挂掉 checkpointConfig.setTolerableCheckpointFailureNumber(10); // T开启 非对齐检查点(barrier非对齐) // 开启的要求: Checkpoint模式必须是精准一次,最大并发必须设为1 checkpointConfig.enableUnalignedCheckpoints(); // 开启非对齐检查点才生效: 默认0,表示一开始就用非对齐的检查点,如果大于0,一开始用对齐的检查点(barrier对齐),对齐的时间超过这个参数,自动切换成 非对齐检查点(barrier非对齐) checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1)); // 如果是精准一次,必须开启checkpoint env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); env.setRestartStrategy(RestartStrategies.noRestart()); //预先定义kafkaSource信息 KafkaSource<String> kafkaSource = KafkaSource .<String>builder() //指定kafka节点的地址和端口 .setBootstrapServers("localhost:9091,localhost:9092,localhost:9093") //指定消费的 Topic .setTopics("kafkaSource") //可以消费多个 //.setTopics("topic-a", "topic-b") //同时指定topic和分区 //.setPartitions(new HashSet<>(Arrays.asList(new TopicPartition("topic-a", 0)))) //指定value反序列化器 .setValueOnlyDeserializer(new SimpleStringSchema()) //flink消费kafka的策略 .setStartingOffsets(OffsetsInitializer.earliest()) //指定消费者组的id .setGroupId("WaterSensor") .build(); //预先定义kafkaSink信息,最终输出什么类型,这里的泛型就应该是什么类型 KafkaSink<String> kafkaSink = KafkaSink .<String>builder() //指定kafka节点的地址和端口 .setBootstrapServers("localhost:9091,localhost:9092,localhost:9093") //指定序列化器:指定Topic名称、具体的序列化 .setRecordSerializer( KafkaRecordSerializationSchema .<String>builder() .setTopic("kafkaSink") .setValueSerializationSchema(new SimpleStringSchema()) .build()) // todo 如果要使用 精准一次 必须开启checkpoint,设置事务前缀,设置事务超时时间:checkpoint间隔 < 事务超时时间 < max的15分钟 // 写到kafka的一致性级别: 精准一次、至少一次 .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 如果是精准一次,必须设置 事务的前缀 .setTransactionalIdPrefix("superQiu") // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟 .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"") .build(); //预先定义mysql jdbc sink SinkFunction<WaterSensor> jdbc = JdbcSink.sink( "insert into watersensor values(?,?,?)" , new JdbcStatementBuilder<WaterSensor>() { @Override public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException { //每收到一条WaterSensor,如何去填充占位符 preparedStatement.setString(1, waterSensor.getId()); preparedStatement.setLong(2, waterSensor.ts); preparedStatement.setInt(3, waterSensor.vc); } } , JdbcExecutionOptions .builder() // 重试次数 .withMaxRetries(3) // 批次的大小:条数 .withBatchSize(100) // 批次的时间毫秒 .withBatchIntervalMs(5000) .build() , new JdbcConnectionOptions .JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8") .withUsername("root") .withPassword("123456") .withDriverName("com.mysql.cj.jdbc.Driver") .withConnectionCheckTimeoutSeconds(60) .build()); //预先定义测输出流 OutputTag(标签名,放入侧输出流中的 数据的 类型,TypeInformation) OutputTag<WaterSensor> tag = new OutputTag<>("tag", Types.POJO(WaterSensor.class)); // 预先定义Watermark策略 WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy //指定watermark生成:升序的watermark,没有等待时间 .<WaterSensor>forMonotonousTimestamps() // 指定watermark生成:乱序的,等待3s //.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //自定义 周期性生成 //.<WaterSensor>forGenerator(ctx -> new MyPeriodWatermarkGenerator<>(3000L)) //自定义的 断点式生成 //.<WaterSensor>forGenerator(ctx -> new MyPuntuatedWatermarkGenerator<>(3000L)) // 指定 时间戳分配器,从数据中提取 .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { // 返回的时间戳,要 毫秒 System.out.println("数据=" + element + ",recordTs=" + recordTimestamp); return element.getTs() * 1000L; } }) .withIdleness(Duration.ofSeconds(5)) //空闲等待5s ; DataStreamSource<String> dataStreamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka"); //env.fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(),"MonotonousTimestamps"); //env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),"BoundedOutOfOrderness"); SingleOutputStreamOperator<WaterSensor> reduce = dataStreamSource //shuffle随机分区低层源码: random.nextInt(下游算子并行度) //.shuffle() //rebalance轮询低层源码:nextChannelToSendTo = (nextChannelToSendTo + 1) % 下游算子并行度 //如果是 读取数据源倾斜的场景:在source后面调用rebalance,就可以解决数据源数据倾斜 //.rebalance() //rescale缩放:局部组队,实现轮询,比rebalance更高效 //.rescale() //broadcast广播:发送给下游所有的子任务 //.broadcast() //global全局:全部发往第一个子任务 //.global() //自定义实现Partitioner接口 // .partitionCustom(new Partitioner<String>(){ // @Override // public int partition(String key, int numPartitions) { // return Integer.parseInt(key) % numPartitions; // } // }, value -> value) //map映射(一进一出) MapFunction<in,out> .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String v) throws Exception { String[] value = v.split(","); return new WaterSensor(value[0], Long.parseLong(value[1]), Integer.parseInt(value[2])); } }).uid("flatmap-wc").name("wc-flatmap")//uid是给程序看的,name算子别名是给程序员看的,建议开发时指定uid和别名 //RichXXXFunction: 富函数,多了生命周期管理方法 .map(new RichMapFunction<WaterSensor, WaterSensor>() { @Override public WaterSensor map(WaterSensor waterSensor) throws Exception { return waterSensor; } @Override//open(): 每个子任务,在启动时,调用一次 public void open(Configuration parameters) throws Exception { //多了一个 运行时上下文,可以获取一些运行时的环境信息,比如 子任务编号、名称、其他的..... RuntimeContext runtimeContext = getRuntimeContext(); int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask(); String taskNameWithSubtasks = runtimeContext.getTaskNameWithSubtasks(); System.out.println("子任务编号:"+indexOfThisSubtask+"启动,子任务名称:"+taskNameWithSubtasks+"调用open"); } @Override//close():每个子任务,在结束时,调用一次,如果是flink程序异常挂掉,不会调用close,如果是正常调用 cancel命令,可以close public void close() throws Exception { super.close(); } }) // 禁用算子链 //.disableChaining() //flatMap扁平映射(一进多出) FlatMapFunction<in,out> .flatMap(new FlatMapFunction<WaterSensor, WaterSensor>() { @Override public void flatMap(WaterSensor waterSensor, Collector<WaterSensor> collector) throws Exception { collector.collect(waterSensor); } }) // 从当前算子开始新链 //.startNewChain() //当前算子并行度 //.setParallelism(2) //filter过滤 FilterFunction<in> ,ture进入下流计算 .filter(new FilterFunction<WaterSensor>() { @Override public boolean filter(WaterSensor waterSensor) throws Exception { return "s1".equals(waterSensor.getId()); } }) //算子共享slot,slotSharingGroup(自定义槽名) //.slotSharingGroup("1") //指定 watermark策略,env.fromSource()指定和下面指定二选一 .assignTimestampsAndWatermarks(watermarkStrategy) //keyBy按键分区 KeySelector<in, key> .keyBy(new KeySelector<WaterSensor, String>() { @Override public String getKey(WaterSensor waterSensor) throws Exception { return waterSensor.getId(); } }) // .process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() { // // 定义状态 // ValueState<Integer> lastVcState; // ListState<Integer> vcListState; // MapState<Integer, Integer> vcCountMapState; // ReducingState<Integer> vcSumReducingState; // AggregatingState<Integer, Double> vcAvgAggregatingState; // @Override // public void open(Configuration parameters) throws Exception { // super.open(parameters); // //TTL 创建 StateTtlConfig // StateTtlConfig stateTtlConfig = StateTtlConfig // .newBuilder(Time.minutes(5)) // 过期时间5s // //.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入(更新) 更新 过期时间 // .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入(更新) 更新 过期时间 // .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值 // .build(); // // //状态描述器 启用 TTL // ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT); // stateDescriptor.enableTimeToLive(stateTtlConfig); // lastVcState = getRuntimeContext().getState(stateDescriptor); // // //必须在open方法中,初始化状态 // lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT)); // vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Types.INT)); // vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Types.INT, Types.INT)); // vcSumReducingState = getRuntimeContext().getReducingState( // new ReducingStateDescriptor<Integer>( // "vcSumReducingState", // new ReduceFunction<Integer>() { // @Override // public Integer reduce(Integer value1, Integer value2) throws Exception { // return value1 + value2; // } // }, // Types.INT // ) // ); // vcAvgAggregatingState = getRuntimeContext() // .getAggregatingState( // new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>( // "vcAvgAggregatingState", // new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() { // @Override // public Tuple2<Integer, Integer> createAccumulator() { // return Tuple2.of(0, 0); // } // // @Override // public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) { // return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1); // } // // @Override // public Double getResult(Tuple2<Integer, Integer> accumulator) { // return accumulator.f0 * 1D / accumulator.f1; // } // // @Override // public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) { return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1); // return null; // } // }, // Types.TUPLE(Types.INT, Types.INT)) // ); // } // // @Override // public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception { // //lastVcState.value(); // 取出 本组 值状态 的数据 // //lastVcState.update(); // 更新 本组 值状态 的数据 // //lastVcState.clear(); // 清除 本组 值状态 的数据 // // //vcListState.get(); // 取出 list状态 本组的数据,是一个Iterable // //vcListState.add(); // 向 list状态 本组 添加一个元素 // //vcListState.addAll(); // 向 list状态 本组 添加多个元素 // //vcListState.update(); // 更新 list状态 本组数据(覆盖) // //vcListState.clear(); // 清空 List状态 本组数据 // // //vcCountMapState.get(); // 对本组的Map状态,根据key,获取value // //vcCountMapState.contains(); // 对本组的Map状态,判断key是否存在 // //vcCountMapState.put(, ); // 对本组的Map状态,添加一个 键值对 // //vcCountMapState.putAll(); // 对本组的Map状态,添加多个 键值对 // //vcCountMapState.entries(); // 对本组的Map状态,获取所有键值对 // //vcCountMapState.keys(); // 对本组的Map状态,获取所有键 // //vcCountMapState.values(); // 对本组的Map状态,获取所有值 // //vcCountMapState.remove(); // 对本组的Map状态,根据指定key,移除键值对 // //vcCountMapState.isEmpty(); // 对本组的Map状态,判断是否为空 // //vcCountMapState.iterator(); // 对本组的Map状态,获取迭代器 // //vcCountMapState.clear(); // 对本组的Map状态,清空 // // //vcSumReducingState.get(); // 对本组的Reducing状态,获取结果 // //vcSumReducingState.add(); // 对本组的Reducing状态,添加数据 // //vcSumReducingState.clear(); // 对本组的Reducing状态,清空数据 // // //vcAvgAggregatingState.get(); // 对 本组的聚合状态 获取结果 // //vcAvgAggregatingState.add(); // 对 本组的聚合状态 添加数据,会自动进行聚合 // //vcAvgAggregatingState.clear(); // 对 本组的聚合状态 清空数据 // // //获取当前数据的key // String currentKey = ctx.getCurrentKey(); // // 定时器注册 // TimerService timerService = ctx.timerService(); // //定时器keyed才有,事件时间定时器,通过watermark来触发的 // //TimerService是Flink关于时间和定时器的基础服务接口,包含以下六个方法 // // 获取当前的处理时间 // timerService.currentProcessingTime(); // // 获取当前的水位线(事件时间) // timerService.currentWatermark(); // // 注册处理时间定时器,当处理时间超过time时触发 // timerService.registerProcessingTimeTimer(5000L); // // 注册事件时间定时器,当水位线超过time时触发 // timerService.registerEventTimeTimer(5000L); // // 删除触发时间为time的处理时间定时器 // timerService.deleteProcessingTimeTimer(5000L); // // 删除触发时间为time的处理时间定时器 // timerService.deleteEventTimeTimer(5000L); // // // // 1、事件时间的案例 // Long currentEventTime = ctx.timestamp(); // 数据中提取出来的事件时间 // timerService.registerEventTimeTimer(5000L); // System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器"); // // // 2、处理时间的案例 // //long currentTs = timerService.currentProcessingTime(); // //timerService.registerProcessingTimeTimer(currentTs + 5000L); // //System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器"); // // // 3、获取 process的 当前watermark // //long currentWatermark = timerService.currentWatermark(); // //System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark); // // // 删除定时器: 处理时间、事件时间 // //timerService.deleteEventTimeTimer(); // //timerService.deleteProcessingTimeTimer(); // // // 获取当前时间进展: 处理时间=当前系统时间, 事件时间=当前watermark // //long currentTs = timerService.currentProcessingTime(); // //long wm = timerService.currentWatermark(); // } // /** // * 时间进展到定时器注册的时间,调用该方法,TimerService会以键(key)和时间戳为标准,对定时器进行去重; // * 也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。 // * @param timestamp 当前时间进展,就是定时器被触发时的时间 // * @param ctx 上下文 // * @param out 采集器 // * @throws Exception // */ // @Override // public void onTimer(long timestamp, OnTimerContext ctx, Collector<WaterSensor> out) throws Exception { // super.onTimer(timestamp, ctx, out); // String currentKey = ctx.getCurrentKey(); // System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发"); // } // }) //时间窗口 .window(TumblingProcessingTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))) // 滚动窗口,窗口长度10秒 //.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) //滑动窗口,长度10s,步长5s //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) //会话窗口,间隔5s // .window(ProcessingTimeSessionWindows.withDynamicGap( // new SessionWindowTimeGapExtractor<WaterSensor>() { // @Override // public long extract(WaterSensor element) { // // 从数据中提取ts,作为间隔,单位ms // return element.getTs() * 1000L; // } // } // ))// 会话窗口,动态间隔,每条来的数据都会更新 间隔时间 //计数窗口 //.countWindow(5) // 滚动窗口,窗口长度5条数据 //.countWindow(5, 2)//滑动窗口,窗口长度5条数据,滑动步长2条数据(每经过一个步长,都有一个窗口触发输出,第一次输出在第2条数据来的时候) .allowedLateness(org.apache.flink.streaming.api.windowing.time.Time.seconds(2)) // 推迟2s关窗 .sideOutputLateData(tag) // 关窗后的迟到数据,放入侧输出流 //简单聚合,Tuple类型传位置索引,POJO只能通过字段名称来指定 //.sum("vc") //指定字段求和,未指定字段保留第一条数据的值 //.min("vc") //指定字段求最小,未指定字段保留第一条数据的值 //.max("vc") //指定字段求最大,未指定字段保留第一条数据的值 //.minBy("vc") //指定字段求最小,未指定字段取指定字段当前最小值的整条数据 //.maxBy("vc") //指定字段求最大,未指定字段取指定字段当前最大值的整条数据 //归约聚合(增量聚合)reduce ReduceFunction<in>,输入类型 = 输出类型,value1之前的计算结果,value2现在来的数据 // .reduce(new ReduceFunction<WaterSensor>() { // @Override // public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception { // return new WaterSensor(value1.id, value1.ts, value1.vc + value2.vc); // } // }) //增量聚合 Aggregate,若有window则属于本窗口的第一条数据来创建窗口,否则对所有数据累加 //new AggregateFunction<IN,累加器,OUT> 创建累加器,来一条计算一条(调用一次add方法),输出时调用一次getrResult方法 // .aggregate(new AggregateFunction<WaterSensor,WaterSensor, WaterSensor>() { // // @Override //创建累加器,初始化累加器 // public WaterSensor createAccumulator() { // return new WaterSensor("",0l,0); // } // // @Override //聚合逻辑,v是新来的数据,acc是上次累计的结果 // public WaterSensor add(WaterSensor v, WaterSensor acc) { // return new WaterSensor(v.id,v.ts,v.vc+acc.vc); // } // // @Override //获取最终结果,窗口触发时输出 // public WaterSensor getResult(WaterSensor waterSensor) { // return waterSensor; // } // // @Override // 只有会话窗口才会用到 // public WaterSensor merge(WaterSensor waterSensor, WaterSensor acc1) { // return null; // } // }) //基本处理函数 ProcessFunction<in, out> ,window后面必须经过聚合才可以调用,要不然不使用window或者调用ProcessWindowFunction //(1)ProcessFunction 最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。 //(2)KeyedProcessFunction 对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。 //(3)ProcessWindowFunction 开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。 //(4)ProcessAllWindowFunction 同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。 //(5)CoProcessFunction 合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。 //(6)ProcessJoinFunction 间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。 //(7)BroadcastProcessFunction 广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。 // 这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。 //(8)KeyedBroadcastProcessFunction 按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。 // 与BroadcastProcessFunction不同的是,这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物。 // .process(new ProcessFunction<WaterSensor, WaterSensor>() { // @Override//ctx:类型是ProcessFunction中定义的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳, // // 并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()。 // public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception { // out.collect(value); // } // // @Override//这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService来注册的。 // //timestamp是指设定好的触发时间,事件时间语义下当然就是水位线了 // public void onTimer(long timestamp, OnTimerContext ctx, Collector<WaterSensor> out) throws Exception { // super.onTimer(timestamp, ctx, out); // } // }) //全窗口函数ProcessWindowFunction<in, out,key,窗口类型>,窗口触发时才会调用一次,统一计算窗口的所有数据 // .process(new ProcessWindowFunction<WaterSensor, WaterSensor,String, TimeWindow>(){ // @Override//上下文可以拿到window对象,还有其他东西:侧输出流 等等 // public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<WaterSensor> out) throws Exception { // long startTs = context.window().getStart(); // long endTs = context.window().getEnd(); // String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS"); // String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS"); // //获取数据量 // long count = elements.spliterator().estimateSize(); // // } // }) //增全结合,实际就是增量进行计算,再把结果给到全量进行数据封装输出 .reduce( new ReduceFunction<WaterSensor>() { @Override public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception { return new WaterSensor(value1.id, value1.ts, value1.vc + value2.vc); } } , new ProcessWindowFunction<WaterSensor, WaterSensor,String, TimeWindow>(){ @Override//上下文可以拿到window对象,还有其他东西:侧输出流 等等 public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<WaterSensor> out) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS"); String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS"); //获取数据量 long count = elements.spliterator().estimateSize(); } } ) ; //上面kafkaSink定义的是字符串,所以pojo转string //reduce.map(data -> data.toString()).sinkTo(kafkaSink); reduce.print(); env.execute(); } //自定义水位线生成器:周期性水位线生成器(Periodic Generator) public static class MyPeriodWatermarkGenerator<T> implements WatermarkGenerator<T>{ // 乱序等待时间 private long delayTs; // 用来保存 当前为止 最大的事件时间 private long maxTs; public MyPeriodWatermarkGenerator(long delayTs) { this.delayTs = delayTs; this.maxTs = Long.MIN_VALUE + this.delayTs + 1; } /** * 每条数据来,都会调用一次: 用来提取最大的事件时间,保存下来 * @param event * @param eventTimestamp 提取到的数据的 事件时间 * @param output */ @Override public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { maxTs = Math.max(maxTs, eventTimestamp); System.out.println("调用onEvent方法,获取目前为止的最大时间戳=" + maxTs); } /** * 周期性调用: 发射 watermark * @param output */ @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(maxTs - delayTs - 1)); System.out.println("调用onPeriodicEmit方法,生成watermark=" + (maxTs - delayTs - 1)); } } //自定义水位线生成器:断点式水位线生成器(Punctuated Generator) public static class MyPuntuatedWatermarkGenerator<T> implements WatermarkGenerator<T> { // 乱序等待时间 private long delayTs; // 用来保存 当前为止 最大的事件时间 private long maxTs; public MyPuntuatedWatermarkGenerator(long delayTs) { this.delayTs = delayTs; this.maxTs = Long.MIN_VALUE + this.delayTs + 1; } /** * 每条数据来,都会调用一次: 用来提取最大的事件时间,保存下来,并发射watermark * @param event * @param eventTimestamp 提取到的数据的 事件时间 * @param output */ @Override public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { maxTs = Math.max(maxTs, eventTimestamp); output.emitWatermark(new Watermark(maxTs - delayTs - 1)); System.out.println("调用onEvent方法,获取目前为止的最大时间戳=" + maxTs+",watermark="+(maxTs - delayTs - 1)); } /** * 周期性调用: 不需要 * @param output */ @Override public void onPeriodicEmit(WatermarkOutput output) { } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。