当前位置:   article > 正文

从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能(1)

从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能(1)

/**
* 物联采集数据
*/
@Data
public class IotData {

/\*\*
  • 1

* 设备ID
*/
private Integer deviceId;

/\*\*
  • 1

* 时间戳
*/
private Long timestamp;

/\*\*
  • 1

* 采集参数
*/
private Map<String , Double> data;

}


##### 监控报警规则 `alarmRule`



  • 1
  • 2
  • 3
  • 4
  • 5

/**
* 报警规则
*/
@Data
public class AlarmRule {

/\*\*
  • 1

* 规则ID
*/
private Integer id;

/\*\*
  • 1

* 设备ID
*/
private Integer deviceId;

/\*\*
  • 1

* 监控的变量名称
*/
private String varName;

/\*\*
  • 1

* 最小值
*/
private Double min;

/\*\*
  • 1

* 最大值
*/
private Double max;

}


##### 报警事件



  • 1
  • 2
  • 3
  • 4
  • 5

/**
* 报警消息
*/
@Data
public class AlarmMessage {

/\*\*
  • 1

* 设备
*/
private Integer deviceId;

/\*\*
  • 1

* 报警时间
*/
private Long timestamp;
/**
* 触发报警的采集变量名称
*/
private String alarmVar;

/\*\*
  • 1

* 触发报警的采集值
*/
private Number alarmValue;
}


#### 开始实现



  • 1
  • 2
  • 3
  • 4
  • 5

public class IotMonitorJob {

public static void main(String[] args) throws Exception {


    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.setParallelism(1);

    // 采集数据Stream
    DataStreamSource<IotData> iotDataStream = getIotStream(environment);
    // 报警规则Stream
    DataStreamSource<AlarmRule> ruleConfig = getRuleConfig(environment);
    // 缓存报警规则 并监控报警数据
    SingleOutputStreamOperator<AlarmMessage> alarmStream = iotDataStream.connect(ruleConfig)
            .keyBy(IotData::getDeviceId, AlarmRule::getDeviceId)
            .process(new CoProcessFunction<IotData, AlarmRule, AlarmMessage>() {

                // 用临时保存设备的报警规则 ,这里的状态交由flink维护
                private MapState<Integer, AlarmRule> alarmRuleValueState;

                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    // 初始化 ValueState
                    alarmRuleValueState = getRuntimeContext().getMapState(new MapStateDescriptor<>("alarm-rule-state", Integer.class, AlarmRule.class));
                }

                @Override
                public void processElement1(IotData iotData, CoProcessFunction<IotData, AlarmRule, AlarmMessage>.Context context, Collector<AlarmMessage> collector) throws Exception {
                    Map<String, Double> data = iotData.getData();

                    // 遍历每个规则
                    alarmRuleValueState.values().forEach(rule -> {

                        String varName = rule.getVarName();
                        // 获取变量值
                        Double val = data.get(varName);
                        if (val == null) {
                            // 变量里没有值
                            return;
                        }

                        if (val <= rule.getMin() || val > rule.getMax()) {
                            // 超过限制,输出报警信息
                            AlarmMessage alarmMessage = new AlarmMessage();
                            alarmMessage.setDeviceId(iotData.getDeviceId());
                            alarmMessage.setTimestamp(iotData.getTimestamp());
                            alarmMessage.setAlarmVar(varName);
                            alarmMessage.setAlarmValue(val);
                            collector.collect(alarmMessage);
                        }
                    });


                }

                @Override
                public void processElement2(AlarmRule alarmRule, CoProcessFunction<IotData, AlarmRule, AlarmMessage>.Context context, Collector<AlarmMessage> collector) throws Exception {
                    // 接收到AlarmRule, 仅更新 alarmRuleValueState
                    alarmRuleValueState.put(alarmRule.getId(), alarmRule);
                }
            });


    alarmStream.print();
    environment.execute();
}

/\*\*
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

* 获取物联采集数据
*
* @param environment
* @return
*/
private static DataStreamSource getIotStream(StreamExecutionEnvironment environment) {
return environment.addSource(new SourceFunction<>() {
private boolean running = true;

        @Override
        public void run(SourceContext<IotData> sourceContext) throws Exception {
            while (running) {

                // 模拟100个设备 每秒一次上报数据

                long ts = System.currentTimeMillis();
                ts = ts - ts % 1000;

                for (int i = 0; i < 100; i++) {
                    IotData iotData = new IotData();
                    iotData.setTimestamp(ts);
                    iotData.setDeviceId(i);

                    Map<String, Double> data = new HashMap<>();
                    data.put("var1", RandomUtils.nextDouble());
                    data.put("var2", RandomUtils.nextDouble());
                    iotData.setData(data);

                    sourceContext.collect(iotData);
                }

                Thread.sleep(1000 - ts % 1000);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    });
}


/\*\*
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

* 获取规则配置
*/
public static DataStreamSource getRuleConfig(StreamExecutionEnvironment environment) {
// 仅针对部分设备监控

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
img
img
img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
img

一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新**

如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
[外链图片转存中…(img-hYoezG2W-1713106630358)]

一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/557330
推荐阅读
相关标签
  

闽ICP备14008679号