赞
踩
物联网场景下,如何实现设备采集参数监控报警功能
这里主要描述如何使用flink stream API 实现功能, 业务场景经过简化
物联设备实时上报采集参数,对每个设备的采集参数进行监控
发现某个采集参数存在异常时,触发报警
deviceIotData
/** * 物联采集数据 */ @Data public class IotData { /** * 设备ID */ private Integer deviceId; /** * 时间戳 */ private Long timestamp; /** * 采集参数 */ private Map<String , Double> data; }
alarmRule
/** * 报警规则 */ @Data public class AlarmRule { /** * 规则ID */ private Integer id; /** * 设备ID */ private Integer deviceId; /** * 监控的变量名称 */ private String varName; /** * 最小值 */ private Double min; /** * 最大值 */ private Double max; }
/** * 报警消息 */ @Data public class AlarmMessage { /** * 设备 */ private Integer deviceId; /** * 报警时间 */ private Long timestamp; /** * 触发报警的采集变量名称 */ private String alarmVar; /** * 触发报警的采集值 */ private Number alarmValue; }
public class IotMonitorJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); // 采集数据Stream DataStreamSource<IotData> iotDataStream = getIotStream(environment); // 报警规则Stream DataStreamSource<AlarmRule> ruleConfig = getRuleConfig(environment); // 缓存报警规则 并监控报警数据 SingleOutputStreamOperator<AlarmMessage> alarmStream = iotDataStream.connect(ruleConfig) .keyBy(IotData::getDeviceId, AlarmRule::getDeviceId) .process(new CoProcessFunction<IotData, AlarmRule, AlarmMessage>() { // 用临时保存设备的报警规则 ,这里的状态交由flink维护 private MapState<Integer, AlarmRule> alarmRuleValueState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 初始化 ValueState alarmRuleValueState = getRuntimeContext().getMapState(new MapStateDescriptor<>("alarm-rule-state", Integer.class, AlarmRule.class)); } @Override public void processElement1(IotData iotData, CoProcessFunction<IotData, AlarmRule, AlarmMessage>.Context context, Collector<AlarmMessage> collector) throws Exception { Map<String, Double> data = iotData.getData(); // 遍历每个规则 alarmRuleValueState.values().forEach(rule -> { String varName = rule.getVarName(); // 获取变量值 Double val = data.get(varName); if (val == null) { // 变量里没有值 return; } if (val <= rule.getMin() || val > rule.getMax()) { // 超过限制,输出报警信息 AlarmMessage alarmMessage = new AlarmMessage(); alarmMessage.setDeviceId(iotData.getDeviceId()); alarmMessage.setTimestamp(iotData.getTimestamp()); alarmMessage.setAlarmVar(varName); alarmMessage.setAlarmValue(val); collector.collect(alarmMessage); } }); } @Override public void processElement2(AlarmRule alarmRule, CoProcessFunction<IotData, AlarmRule, AlarmMessage>.Context context, Collector<AlarmMessage> collector) throws Exception { // 接收到AlarmRule, 仅更新 alarmRuleValueState alarmRuleValueState.put(alarmRule.getId(), alarmRule); } }); alarmStream.print(); environment.execute(); } /** * 获取物联采集数据 * * @param environment * @return */ private static DataStreamSource<IotData> getIotStream(StreamExecutionEnvironment environment) { return environment.addSource(new SourceFunction<>() { private boolean running = true; @Override public void run(SourceContext<IotData> sourceContext) throws Exception { while (running) { // 模拟100个设备 每秒一次上报数据 long ts = System.currentTimeMillis(); ts = ts - ts % 1000; for (int i = 0; i < 100; i++) { IotData iotData = new IotData(); iotData.setTimestamp(ts); iotData.setDeviceId(i); Map<String, Double> data = new HashMap<>(); data.put("var1", RandomUtils.nextDouble()); data.put("var2", RandomUtils.nextDouble()); iotData.setData(data); sourceContext.collect(iotData); } Thread.sleep(1000 - ts % 1000); } } @Override public void cancel() { running = false; } }); } /** * 获取规则配置 */ public static DataStreamSource<AlarmRule> getRuleConfig(StreamExecutionEnvironment environment) { // 仅针对部分设备监控 List<AlarmRule> ruleList = new ArrayList<>(); for (int i = 0; i < 20; i++) { AlarmRule alarmRule1 = new AlarmRule(); alarmRule1.setDeviceId(i); alarmRule1.setVarName("var1"); alarmRule1.setMax(20.0); alarmRule1.setMin(0.0); ruleList.add(alarmRule1); AlarmRule alarmRule2 = new AlarmRule(); alarmRule2.setDeviceId(i); alarmRule2.setVarName("var2"); alarmRule2.setMax(10.0); alarmRule2.setMin(0.0); ruleList.add(alarmRule2); } return environment.fromCollection(ruleList); } }
实际运行基于 java 11 , flink 1.18.1
输出结果:
AlarmMessage(deviceId=0, timestamp=1709732511000, alarmVar=var2, alarmValue=1.0408785873261203E308) AlarmMessage(deviceId=1, timestamp=1709732511000, alarmVar=var2, alarmValue=8.409717342118433E306) AlarmMessage(deviceId=2, timestamp=1709732511000, alarmVar=var2, alarmValue=6.955367711979709E307) AlarmMessage(deviceId=3, timestamp=1709732511000, alarmVar=var2, alarmValue=2.5403069646236554E307) AlarmMessage(deviceId=4, timestamp=1709732511000, alarmVar=var2, alarmValue=7.629789041713245E307) AlarmMessage(deviceId=5, timestamp=1709732511000, alarmVar=var2, alarmValue=6.918664964996954E307) AlarmMessage(deviceId=6, timestamp=1709732511000, alarmVar=var2, alarmValue=1.1660434456728436E308) AlarmMessage(deviceId=7, timestamp=1709732511000, alarmVar=var2, alarmValue=2.1272561368179368E307) AlarmMessage(deviceId=8, timestamp=1709732511000, alarmVar=var2, alarmValue=2.8693117885744695E307) AlarmMessage(deviceId=9, timestamp=1709732511000, alarmVar=var2, alarmValue=1.1232501067396574E308) AlarmMessage(deviceId=10, timestamp=1709732511000, alarmVar=var2, alarmValue=1.6192738031099514E308) AlarmMessage(deviceId=11, timestamp=1709732511000, alarmVar=var2, alarmValue=7.515829766654446E307) AlarmMessage(deviceId=12, timestamp=1709732511000, alarmVar=var2, alarmValue=1.6409410780574847E308) AlarmMessage(deviceId=13, timestamp=1709732511000, alarmVar=var2, alarmValue=7.372363635115241E307) AlarmMessage(deviceId=14, timestamp=1709732511000, alarmVar=var2, alarmValue=5.269385013806783E306) AlarmMessage(deviceId=15, timestamp=1709732511000, alarmVar=var2, alarmValue=9.736804956554577E307) AlarmMessage(deviceId=16, timestamp=1709732511000, alarmVar=var2, alarmValue=5.403962718372102E307) AlarmMessage(deviceId=17, timestamp=1709732511000, alarmVar=var2, alarmValue=1.7957965318588386E308) AlarmMessage(deviceId=18, timestamp=1709732511000, alarmVar=var2, alarmValue=6.546384330721207E307) AlarmMessage(deviceId=19, timestamp=1709732511000, alarmVar=var2, alarmValue=1.2797848722222382E308) AlarmMessage(deviceId=0, timestamp=1709732512000, alarmVar=var2, alarmValue=8.096850966966417E307) AlarmMessage(deviceId=1, timestamp=1709732512000, alarmVar=var2, alarmValue=1.1459880504481993E308) AlarmMessage(deviceId=2, timestamp=1709732512000, alarmVar=var2, alarmValue=1.6878563127635106E308) AlarmMessage(deviceId=3, timestamp=1709732512000, alarmVar=var2, alarmValue=1.3431398337246118E308) AlarmMessage(deviceId=4, timestamp=1709732512000, alarmVar=var2, alarmValue=6.503426414090896E307) AlarmMessage(deviceId=5, timestamp=1709732512000, alarmVar=var2, alarmValue=1.4354519016753652E308) AlarmMessage(deviceId=6, timestamp=1709732512000, alarmVar=var2, alarmValue=3.855378202880747E307) AlarmMessage(deviceId=7, timestamp=1709732512000, alarmVar=var2, alarmValue=1.536008451971638E308) AlarmMessage(deviceId=8, timestamp=1709732512000, alarmVar=var2, alarmValue=2.8070328671172436E307) AlarmMessage(deviceId=9, timestamp=1709732512000, alarmVar=var2, alarmValue=1.45270572341246E308) AlarmMessage(deviceId=10, timestamp=1709732512000, alarmVar=var2, alarmValue=4.4799744200415005E307) AlarmMessage(deviceId=11, timestamp=1709732512000, alarmVar=var2, alarmValue=1.712500895162964E307) AlarmMessage(deviceId=12, timestamp=1709732512000, alarmVar=var2, alarmValue=1.6557317203079603E308) AlarmMessage(deviceId=13, timestamp=1709732512000, alarmVar=var2, alarmValue=9.172029752661656E307) AlarmMessage(deviceId=14, timestamp=1709732512000, alarmVar=var2, alarmValue=5.171078383438277E307) AlarmMessage(deviceId=15, timestamp=1709732512000, alarmVar=var2, alarmValue=1.7828693572565125E308) AlarmMessage(deviceId=16, timestamp=1709732512000, alarmVar=var2, alarmValue=7.925392321319335E307) AlarmMessage(deviceId=17, timestamp=1709732512000, alarmVar=var2, alarmValue=6.268748480988385E307) AlarmMessage(deviceId=18, timestamp=1709732512000, alarmVar=var2, alarmValue=1.4266529007265143E308) AlarmMessage(deviceId=19, timestamp=1709732512000, alarmVar=var2, alarmValue=5.015200701556669E306) AlarmMessage(deviceId=0, timestamp=1709732513000, alarmVar=var2, alarmValue=6.777961007521458E307) AlarmMessage(deviceId=1, timestamp=1709732513000, alarmVar=var2, alarmValue=3.7468989126769943E307) AlarmMessage(deviceId=2, timestamp=1709732513000, alarmVar=var2, alarmValue=7.544125822456755E307) AlarmMessage(deviceId=3, timestamp=1709732513000, alarmVar=var2, alarmValue=1.2477230196962066E308) AlarmMessage(deviceId=4, timestamp=1709732513000, alarmVar=var2, alarmValue=1.2333434785469278E308) AlarmMessage(deviceId=5, timestamp=1709732513000, alarmVar=var2, alarmValue=1.4969296830188747E308) AlarmMessage(deviceId=6, timestamp=1709732513000, alarmVar=var2, alarmValue=7.975914480844549E306) AlarmMessage(deviceId=7, timestamp=1709732513000, alarmVar=var2, alarmValue=6.746069919870638E307) AlarmMessage(deviceId=8, timestamp=1709732513000, alarmVar=var2, alarmValue=1.7504936868379706E308) AlarmMessage(deviceId=9, timestamp=1709732513000, alarmVar=var2, alarmValue=2.624424900005404E307) AlarmMessage(deviceId=10, timestamp=1709732513000, alarmVar=var2, alarmValue=1.6390580210239323E308) AlarmMessage(deviceId=11, timestamp=1709732513000, alarmVar=var2, alarmValue=5.603466810531758E307) AlarmMessage(deviceId=12, timestamp=1709732513000, alarmVar=var2, alarmValue=2.7405925136993203E307) AlarmMessage(deviceId=13, timestamp=1709732513000, alarmVar=var2, alarmValue=3.622249477246505E307) AlarmMessage(deviceId=14, timestamp=1709732513000, alarmVar=var2, alarmValue=1.4597500938901865E308) AlarmMessage(deviceId=15, timestamp=1709732513000, alarmVar=var2, alarmValue=1.7106265100123184E308) AlarmMessage(deviceId=16, timestamp=1709732513000, alarmVar=var2, alarmValue=1.6232693977359527E308) AlarmMessage(deviceId=17, timestamp=1709732513000, alarmVar=var2, alarmValue=1.6454269356859472E308) AlarmMessage(deviceId=18, timestamp=1709732513000, alarmVar=var2, alarmValue=1.103212644573044E308) AlarmMessage(deviceId=19, timestamp=1709732513000, alarmVar=var2, alarmValue=8.534365652761233E307)
接下来尝试将数据写入到mysql中
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。