赞
踩
大致说下处理思路哈:
1.程序启动,数据流元素和配置流元素开始加载,不分先后;
2.当接收到数据流元素时,首先判断配置流元素是否加载完毕(判断逻辑后面介绍),若未加载完毕,会把数据缓存起来,因为流处理是数据驱动的,数据不来,缓存的数据就不会被处理,所以为了解决这种情况,同时创建了一个定时器,默认30秒后自动把缓存的数据处理掉;缓存了一定的数据,如果当下一个数据流元素来的时候发现配置加载完毕了,然后首先把缓存的数据处理掉,同时清除缓存、定时处理标志等,接着处理收到的数据,后面就正常流转了;
3.然后说下配置加载完毕的判断逻辑,我这里是根据两个条件判断的,一是配置流元素加载的个数,即配置流元素每次加载都会累加,二是配置流元素更新的加载时间,即配置流元素每次加载都会把当前加载的时间更新下;如果加载的个数大于0,同时更新的加载时间和当前时间相差大于2秒,我即判断为加载完毕。
DataParseFunc
package com.yl.flink.processor; import cn.hutool.core.collection.ListUtil; import cn.hutool.core.date.DateUtil; import com.yl.constant.Const; import com.yl.entity.MultiDataEntity; import com.yl.entity.cdc.MysqlCdcEntity; import com.yl.util.FlinkUtil; import com.yl.util.PayloadParseUtil; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.util.Collector; import java.util.List; /** * @author wlf * @apiNote 数据解析 * @since 2022/9/2 */ @Slf4j public class DataParseFunc extends KeyedBroadcastProcessFunction<String, String, MysqlCdcEntity, MultiDataEntity> { // 配置信息未加载完毕时先缓存数据流元素 private transient ListState<String> payloads; // 未设置定时输出缓存流元素的定时器,默认未设置 private transient ValueState<Boolean> setTimer; /** * 初始化资源配置 */ @Override public void open(Configuration parameters) throws Exception { payloads = getRuntimeContext().getListState(new ListStateDescriptor<>(Const.S_PAYLOAD_STA, TypeInformation.of(new TypeHint<>() { }))); setTimer = getRuntimeContext().getState(new ValueStateDescriptor<>(Const.S_TIMER_SET, TypeInformation.of(new TypeHint<>() { }))); } /** * 处理数据流元素 * * @param payload 数据流元素 * @param ctx 上下文 * @param out 流输出器 */ @Override public void processElement(String payload, KeyedBroadcastProcessFunction<String, String, MysqlCdcEntity, MultiDataEntity>.ReadOnlyContext ctx, Collector<MultiDataEntity> out) throws Exception { ReadOnlyBroadcastState<String, List<MysqlCdcEntity>> state = ctx.getBroadcastState(FlinkUtil.getConfigDescriptor()); // 数据流元素来时首先判断一下配置信息是否加载完毕 if (configReady(state)) { // 如果加载完毕了,判断一下是否有缓存的数据需要处理一下 if (null != setTimer.value() && setTimer.value()) { for (String element : payloads.get()) { output(element, state, out); } // 处理后把缓存清空 payloads.clear(); // 同时修改定时处理缓存的标志,说明缓存已经没数据了,不需要定时处理缓存数据了 setTimer.update(false); } // 处理流元素 output(payload, state, out); } else { // 如果配置信息还没有加载完毕,先把流元素缓存起来 payloads.add(payload); // 只需要一个定时器就行了,到时执行一次就可以把缓存的所有数据处理掉 if (null == setTimer.value() || !setTimer.value()) { // 注册定时器,如果后面没来数据,默认30秒后把缓存的数据发送到下游 long fireTime = ctx.timerService().currentProcessingTime() + 30_000; ctx.timerService().registerProcessingTimeTimer(fireTime); setTimer.update(true); } } } /** * 处理配置流元素 * * @param config 配置流元素 * @param ctx 上下文 * @param out 流输出器 */ @Override public void processBroadcastElement(MysqlCdcEntity config, KeyedBroadcastProcessFunction<String, String, MysqlCdcEntity, MultiDataEntity>.Context ctx, Collector<MultiDataEntity> out) throws Exception { // 广播状态 BroadcastState<String, List<MysqlCdcEntity>> state = ctx.getBroadcastState(FlinkUtil.getConfigDescriptor()); // 每来一个配置流元素都会更新配置缓存的失效时间 updateCache(state); // 根据操作类型更新广播数据状态 FlinkUtil.updateState(state, config); } /** * 定时输出缓存的数据 * 配置流未加载完毕时缓存此时过来的数据流元素,缓存的数据流元素只有等下一个流元素来的时候才会触发后续的处理操作,这里的定时器可以定时输出缓存的数据 * * @param timestamp 定时器触发时间 * @param ctx 上下文 * @param out 输出器 */ @Override public void onTimer(long timestamp, KeyedBroadcastProcessFunction<String, String, MysqlCdcEntity, MultiDataEntity>.OnTimerContext ctx, Collector<MultiDataEntity> out) throws Exception { ReadOnlyBroadcastState<String, List<MysqlCdcEntity>> state = ctx.getBroadcastState(FlinkUtil.getConfigDescriptor()); for (String element : payloads.get()) { output(element, state, out); } // 输出后把缓存清空防止重复输出 payloads.clear(); // 执行完任务删除定时器 ctx.timerService().deleteProcessingTimeTimer(timestamp); // 同时更新定时器的状态 setTimer.update(false); } /** * 把流元素输出到下游 */ private void output(String payload, ReadOnlyBroadcastState<String, List<MysqlCdcEntity>> state, Collector<MultiDataEntity> out) throws Exception { // 解析数据,并把解析后的数据发送到下游 PayloadParseUtil .parse(payload, state) .forEach(out::collect); } /** * 每来一个配置流元素进行个数统计,同时更新加载时间 */ private void updateCache(BroadcastState<String, List<MysqlCdcEntity>> state) throws Exception { MysqlCdcEntity config; if (state.contains(Const.S_CONFIG_STA)) { config = state.get(Const.S_CONFIG_STA).get(0); // 统计的是流元素的个数 config.setConfigCount(config.getConfigCount() + 1); // 更新加载时间 config.setConfigTs(DateUtil.date().getTime()); } else { config = MysqlCdcEntity.builder() .configCount(1) .configTs(DateUtil.date().getTime()) .build(); } // 动态更新广播流状态 state.put(Const.S_CONFIG_STA, ListUtil.toList(config)); } /** * 配置流元素是否配置完毕 * 正常情况下配置流元素更新的时间和当前时间相差大于2秒代表配置完成 * 配置加载完毕判断的两个必要条件: * 1.配置流元素加载的个数大于0; * 2.配置流元素加载时更新的加载时间和当前时间相差大于2秒; */ private Boolean configReady(ReadOnlyBroadcastState<String, List<MysqlCdcEntity>> state) throws Exception { long nowTs = DateUtil.date().getTime(); if (state.contains(Const.S_CONFIG_STA)) { MysqlCdcEntity config = state.get(Const.S_CONFIG_STA).get(0); long configTs = config.getConfigTs(); return config.getConfigCount() > 0 && Math.abs((nowTs - configTs) / 1000) > 2; } return false; } }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。