当前位置:   article > 正文

从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能

从零开始搭建flink流式计算项目-2小试牛刀-物联网场景下,如何实现设备采集参数监控报警功能

物联网场景下,如何实现设备采集参数监控报警功能

这里主要描述如何使用flink stream API 实现功能, 业务场景经过简化

需求描述

物联设备实时上报采集参数,对每个设备的采集参数进行监控
发现某个采集参数存在异常时,触发报警

需求分析

source:

设备采集数据 deviceIotData
  • 采集变量 : 采集值

/**
 * 物联采集数据
 */
@Data
public class IotData {

    /**
     * 设备ID
     */
    private Integer deviceId;

    /**
     * 时间戳
     */
    private Long timestamp;

    /**
     * 采集参数
     */
    private Map<String , Double> data;


}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
监控报警规则 alarmRule

/**
 * 报警规则
 */
@Data
public class AlarmRule {


    /**
     * 规则ID
     */
    private Integer id;

    /**
     * 设备ID
     */
    private Integer deviceId;

    /**
     * 监控的变量名称
     */
    private String varName;

    /**
     * 最小值
     */
    private Double min;

    /**
     * 最大值
     */
    private Double max;

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
报警事件

/**
 * 报警消息
 */
@Data
public class AlarmMessage {


    /**
     * 设备
     */
    private Integer deviceId;

    /**
     * 报警时间
     */
    private Long timestamp;
    /**
     * 触发报警的采集变量名称
     */
    private String alarmVar;

    /**
     * 触发报警的采集值
     */
    private Number alarmValue;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

开始实现


public class IotMonitorJob {

    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        // 采集数据Stream
        DataStreamSource<IotData> iotDataStream = getIotStream(environment);
        // 报警规则Stream
        DataStreamSource<AlarmRule> ruleConfig = getRuleConfig(environment);
        // 缓存报警规则 并监控报警数据
        SingleOutputStreamOperator<AlarmMessage> alarmStream = iotDataStream.connect(ruleConfig)
                .keyBy(IotData::getDeviceId, AlarmRule::getDeviceId)
                .process(new CoProcessFunction<IotData, AlarmRule, AlarmMessage>() {

                    // 用临时保存设备的报警规则 ,这里的状态交由flink维护
                    private MapState<Integer, AlarmRule> alarmRuleValueState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        // 初始化 ValueState
                        alarmRuleValueState = getRuntimeContext().getMapState(new MapStateDescriptor<>("alarm-rule-state", Integer.class, AlarmRule.class));
                    }

                    @Override
                    public void processElement1(IotData iotData, CoProcessFunction<IotData, AlarmRule, AlarmMessage>.Context context, Collector<AlarmMessage> collector) throws Exception {
                        Map<String, Double> data = iotData.getData();

                        // 遍历每个规则
                        alarmRuleValueState.values().forEach(rule -> {

                            String varName = rule.getVarName();
                            // 获取变量值
                            Double val = data.get(varName);
                            if (val == null) {
                                // 变量里没有值
                                return;
                            }

                            if (val <= rule.getMin() || val > rule.getMax()) {
                                // 超过限制,输出报警信息
                                AlarmMessage alarmMessage = new AlarmMessage();
                                alarmMessage.setDeviceId(iotData.getDeviceId());
                                alarmMessage.setTimestamp(iotData.getTimestamp());
                                alarmMessage.setAlarmVar(varName);
                                alarmMessage.setAlarmValue(val);
                                collector.collect(alarmMessage);
                            }
                        });


                    }

                    @Override
                    public void processElement2(AlarmRule alarmRule, CoProcessFunction<IotData, AlarmRule, AlarmMessage>.Context context, Collector<AlarmMessage> collector) throws Exception {
                        // 接收到AlarmRule, 仅更新 alarmRuleValueState
                        alarmRuleValueState.put(alarmRule.getId(), alarmRule);
                    }
                });


        alarmStream.print();
        environment.execute();
    }

    /**
     * 获取物联采集数据
     *
     * @param environment
     * @return
     */
    private static DataStreamSource<IotData> getIotStream(StreamExecutionEnvironment environment) {
        return environment.addSource(new SourceFunction<>() {
            private boolean running = true;


            @Override
            public void run(SourceContext<IotData> sourceContext) throws Exception {
                while (running) {

                    // 模拟100个设备 每秒一次上报数据

                    long ts = System.currentTimeMillis();
                    ts = ts - ts % 1000;

                    for (int i = 0; i < 100; i++) {
                        IotData iotData = new IotData();
                        iotData.setTimestamp(ts);
                        iotData.setDeviceId(i);

                        Map<String, Double> data = new HashMap<>();
                        data.put("var1", RandomUtils.nextDouble());
                        data.put("var2", RandomUtils.nextDouble());
                        iotData.setData(data);

                        sourceContext.collect(iotData);
                    }

                    Thread.sleep(1000 - ts % 1000);
                }
            }

            @Override
            public void cancel() {
                running = false;
            }
        });
    }


    /**
     * 获取规则配置
     */
    public static DataStreamSource<AlarmRule> getRuleConfig(StreamExecutionEnvironment environment) {
        // 仅针对部分设备监控

        List<AlarmRule> ruleList = new ArrayList<>();
        for (int i = 0; i < 20; i++) {
            AlarmRule alarmRule1 = new AlarmRule();
            alarmRule1.setDeviceId(i);
            alarmRule1.setVarName("var1");
            alarmRule1.setMax(20.0);
            alarmRule1.setMin(0.0);
            ruleList.add(alarmRule1);

            AlarmRule alarmRule2 = new AlarmRule();
            alarmRule2.setDeviceId(i);
            alarmRule2.setVarName("var2");
            alarmRule2.setMax(10.0);
            alarmRule2.setMin(0.0);
            ruleList.add(alarmRule2);

        }
        return environment.fromCollection(ruleList);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142

启动job

实际运行基于 java 11 , flink 1.18.1

输出结果:


AlarmMessage(deviceId=0, timestamp=1709732511000, alarmVar=var2, alarmValue=1.0408785873261203E308)
AlarmMessage(deviceId=1, timestamp=1709732511000, alarmVar=var2, alarmValue=8.409717342118433E306)
AlarmMessage(deviceId=2, timestamp=1709732511000, alarmVar=var2, alarmValue=6.955367711979709E307)
AlarmMessage(deviceId=3, timestamp=1709732511000, alarmVar=var2, alarmValue=2.5403069646236554E307)
AlarmMessage(deviceId=4, timestamp=1709732511000, alarmVar=var2, alarmValue=7.629789041713245E307)
AlarmMessage(deviceId=5, timestamp=1709732511000, alarmVar=var2, alarmValue=6.918664964996954E307)
AlarmMessage(deviceId=6, timestamp=1709732511000, alarmVar=var2, alarmValue=1.1660434456728436E308)
AlarmMessage(deviceId=7, timestamp=1709732511000, alarmVar=var2, alarmValue=2.1272561368179368E307)
AlarmMessage(deviceId=8, timestamp=1709732511000, alarmVar=var2, alarmValue=2.8693117885744695E307)
AlarmMessage(deviceId=9, timestamp=1709732511000, alarmVar=var2, alarmValue=1.1232501067396574E308)
AlarmMessage(deviceId=10, timestamp=1709732511000, alarmVar=var2, alarmValue=1.6192738031099514E308)
AlarmMessage(deviceId=11, timestamp=1709732511000, alarmVar=var2, alarmValue=7.515829766654446E307)
AlarmMessage(deviceId=12, timestamp=1709732511000, alarmVar=var2, alarmValue=1.6409410780574847E308)
AlarmMessage(deviceId=13, timestamp=1709732511000, alarmVar=var2, alarmValue=7.372363635115241E307)
AlarmMessage(deviceId=14, timestamp=1709732511000, alarmVar=var2, alarmValue=5.269385013806783E306)
AlarmMessage(deviceId=15, timestamp=1709732511000, alarmVar=var2, alarmValue=9.736804956554577E307)
AlarmMessage(deviceId=16, timestamp=1709732511000, alarmVar=var2, alarmValue=5.403962718372102E307)
AlarmMessage(deviceId=17, timestamp=1709732511000, alarmVar=var2, alarmValue=1.7957965318588386E308)
AlarmMessage(deviceId=18, timestamp=1709732511000, alarmVar=var2, alarmValue=6.546384330721207E307)
AlarmMessage(deviceId=19, timestamp=1709732511000, alarmVar=var2, alarmValue=1.2797848722222382E308)
AlarmMessage(deviceId=0, timestamp=1709732512000, alarmVar=var2, alarmValue=8.096850966966417E307)
AlarmMessage(deviceId=1, timestamp=1709732512000, alarmVar=var2, alarmValue=1.1459880504481993E308)
AlarmMessage(deviceId=2, timestamp=1709732512000, alarmVar=var2, alarmValue=1.6878563127635106E308)
AlarmMessage(deviceId=3, timestamp=1709732512000, alarmVar=var2, alarmValue=1.3431398337246118E308)
AlarmMessage(deviceId=4, timestamp=1709732512000, alarmVar=var2, alarmValue=6.503426414090896E307)
AlarmMessage(deviceId=5, timestamp=1709732512000, alarmVar=var2, alarmValue=1.4354519016753652E308)
AlarmMessage(deviceId=6, timestamp=1709732512000, alarmVar=var2, alarmValue=3.855378202880747E307)
AlarmMessage(deviceId=7, timestamp=1709732512000, alarmVar=var2, alarmValue=1.536008451971638E308)
AlarmMessage(deviceId=8, timestamp=1709732512000, alarmVar=var2, alarmValue=2.8070328671172436E307)
AlarmMessage(deviceId=9, timestamp=1709732512000, alarmVar=var2, alarmValue=1.45270572341246E308)
AlarmMessage(deviceId=10, timestamp=1709732512000, alarmVar=var2, alarmValue=4.4799744200415005E307)
AlarmMessage(deviceId=11, timestamp=1709732512000, alarmVar=var2, alarmValue=1.712500895162964E307)
AlarmMessage(deviceId=12, timestamp=1709732512000, alarmVar=var2, alarmValue=1.6557317203079603E308)
AlarmMessage(deviceId=13, timestamp=1709732512000, alarmVar=var2, alarmValue=9.172029752661656E307)
AlarmMessage(deviceId=14, timestamp=1709732512000, alarmVar=var2, alarmValue=5.171078383438277E307)
AlarmMessage(deviceId=15, timestamp=1709732512000, alarmVar=var2, alarmValue=1.7828693572565125E308)
AlarmMessage(deviceId=16, timestamp=1709732512000, alarmVar=var2, alarmValue=7.925392321319335E307)
AlarmMessage(deviceId=17, timestamp=1709732512000, alarmVar=var2, alarmValue=6.268748480988385E307)
AlarmMessage(deviceId=18, timestamp=1709732512000, alarmVar=var2, alarmValue=1.4266529007265143E308)
AlarmMessage(deviceId=19, timestamp=1709732512000, alarmVar=var2, alarmValue=5.015200701556669E306)
AlarmMessage(deviceId=0, timestamp=1709732513000, alarmVar=var2, alarmValue=6.777961007521458E307)
AlarmMessage(deviceId=1, timestamp=1709732513000, alarmVar=var2, alarmValue=3.7468989126769943E307)
AlarmMessage(deviceId=2, timestamp=1709732513000, alarmVar=var2, alarmValue=7.544125822456755E307)
AlarmMessage(deviceId=3, timestamp=1709732513000, alarmVar=var2, alarmValue=1.2477230196962066E308)
AlarmMessage(deviceId=4, timestamp=1709732513000, alarmVar=var2, alarmValue=1.2333434785469278E308)
AlarmMessage(deviceId=5, timestamp=1709732513000, alarmVar=var2, alarmValue=1.4969296830188747E308)
AlarmMessage(deviceId=6, timestamp=1709732513000, alarmVar=var2, alarmValue=7.975914480844549E306)
AlarmMessage(deviceId=7, timestamp=1709732513000, alarmVar=var2, alarmValue=6.746069919870638E307)
AlarmMessage(deviceId=8, timestamp=1709732513000, alarmVar=var2, alarmValue=1.7504936868379706E308)
AlarmMessage(deviceId=9, timestamp=1709732513000, alarmVar=var2, alarmValue=2.624424900005404E307)
AlarmMessage(deviceId=10, timestamp=1709732513000, alarmVar=var2, alarmValue=1.6390580210239323E308)
AlarmMessage(deviceId=11, timestamp=1709732513000, alarmVar=var2, alarmValue=5.603466810531758E307)
AlarmMessage(deviceId=12, timestamp=1709732513000, alarmVar=var2, alarmValue=2.7405925136993203E307)
AlarmMessage(deviceId=13, timestamp=1709732513000, alarmVar=var2, alarmValue=3.622249477246505E307)
AlarmMessage(deviceId=14, timestamp=1709732513000, alarmVar=var2, alarmValue=1.4597500938901865E308)
AlarmMessage(deviceId=15, timestamp=1709732513000, alarmVar=var2, alarmValue=1.7106265100123184E308)
AlarmMessage(deviceId=16, timestamp=1709732513000, alarmVar=var2, alarmValue=1.6232693977359527E308)
AlarmMessage(deviceId=17, timestamp=1709732513000, alarmVar=var2, alarmValue=1.6454269356859472E308)
AlarmMessage(deviceId=18, timestamp=1709732513000, alarmVar=var2, alarmValue=1.103212644573044E308)
AlarmMessage(deviceId=19, timestamp=1709732513000, alarmVar=var2, alarmValue=8.534365652761233E307)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

接下来尝试将数据写入到mysql中


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/557297
推荐阅读
相关标签
  

闽ICP备14008679号