赞
踩
/**
* 物联采集数据
*/
@Data
public class IotData {
/\*\*
* 设备ID
*/
private Integer deviceId;
/\*\*
* 时间戳
*/
private Long timestamp;
/\*\*
* 采集参数
*/
private Map<String , Double> data;
}
##### 监控报警规则 `alarmRule`
/**
* 报警规则
*/
@Data
public class AlarmRule {
/\*\*
* 规则ID
*/
private Integer id;
/\*\*
* 设备ID
*/
private Integer deviceId;
/\*\*
* 监控的变量名称
*/
private String varName;
/\*\*
* 最小值
*/
private Double min;
/\*\*
* 最大值
*/
private Double max;
}
##### 报警事件
/**
* 报警消息
*/
@Data
public class AlarmMessage {
/\*\*
* 设备
*/
private Integer deviceId;
/\*\*
* 报警时间
*/
private Long timestamp;
/**
* 触发报警的采集变量名称
*/
private String alarmVar;
/\*\*
* 触发报警的采集值
*/
private Number alarmValue;
}
#### 开始实现
public class IotMonitorJob {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); // 采集数据Stream DataStreamSource<IotData> iotDataStream = getIotStream(environment); // 报警规则Stream DataStreamSource<AlarmRule> ruleConfig = getRuleConfig(environment); // 缓存报警规则 并监控报警数据 SingleOutputStreamOperator<AlarmMessage> alarmStream = iotDataStream.connect(ruleConfig) .keyBy(IotData::getDeviceId, AlarmRule::getDeviceId) .process(new CoProcessFunction<IotData, AlarmRule, AlarmMessage>() { // 用临时保存设备的报警规则 ,这里的状态交由flink维护 private MapState<Integer, AlarmRule> alarmRuleValueState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 初始化 ValueState alarmRuleValueState = getRuntimeContext().getMapState(new MapStateDescriptor<>("alarm-rule-state", Integer.class, AlarmRule.class)); } @Override public void processElement1(IotData iotData, CoProcessFunction<IotData, AlarmRule, AlarmMessage>.Context context, Collector<AlarmMessage> collector) throws Exception { Map<String, Double> data = iotData.getData(); // 遍历每个规则 alarmRuleValueState.values().forEach(rule -> { String varName = rule.getVarName(); // 获取变量值 Double val = data.get(varName); if (val == null) { // 变量里没有值 return; } if (val <= rule.getMin() || val > rule.getMax()) { // 超过限制,输出报警信息 AlarmMessage alarmMessage = new AlarmMessage(); alarmMessage.setDeviceId(iotData.getDeviceId()); alarmMessage.setTimestamp(iotData.getTimestamp()); alarmMessage.setAlarmVar(varName); alarmMessage.setAlarmValue(val); collector.collect(alarmMessage); } }); } @Override public void processElement2(AlarmRule alarmRule, CoProcessFunction<IotData, AlarmRule, AlarmMessage>.Context context, Collector<AlarmMessage> collector) throws Exception { // 接收到AlarmRule, 仅更新 alarmRuleValueState alarmRuleValueState.put(alarmRule.getId(), alarmRule); } }); alarmStream.print(); environment.execute(); } /\*\*
* 获取物联采集数据
*
* @param environment
* @return
*/
private static DataStreamSource getIotStream(StreamExecutionEnvironment environment) {
return environment.addSource(new SourceFunction<>() {
private boolean running = true;
@Override public void run(SourceContext<IotData> sourceContext) throws Exception { while (running) { // 模拟100个设备 每秒一次上报数据 long ts = System.currentTimeMillis(); ts = ts - ts % 1000; for (int i = 0; i < 100; i++) { IotData iotData = new IotData(); iotData.setTimestamp(ts); iotData.setDeviceId(i); Map<String, Double> data = new HashMap<>(); data.put("var1", RandomUtils.nextDouble()); data.put("var2", RandomUtils.nextDouble()); iotData.setData(data); sourceContext.collect(iotData); } Thread.sleep(1000 - ts % 1000); } } @Override public void cancel() { running = false; } }); } /\*\*
* 获取规则配置
*/
public static DataStreamSource getRuleConfig(StreamExecutionEnvironment environment) {
// 仅针对部分设备监控
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新**
如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
[外链图片转存中…(img-hYoezG2W-1713106630358)]
一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。