赞
踩
业务场景如下:
概述:采集工厂设备的数据。
flink连接emqtt,采集工业物联网的数据,进行流处理,工厂设备数据内没有machID, 需要从mysq的设备信息基础表根据gateMac获取对应的machID。
访问mysql实现思路(一):
flink项目在初始化的时候从mysql获取所有设备的基础信息,所遇问题:如果新增一个设备,flink项目需要重启一次,从mysql来获取全部的machID,这样效果很差,被否定。
实现思路(二):
将flink读取mysql写成一个单流, 每5分钟重新获取一次,定时刷新, 将结果写入map中。
private static MapdeviceMap = new Hashtable();
/**
* 从mysql获取machID, 五分钟刷新一次 本博客讲解的地方
*/
DataStream> deviceStream = env.addSource(new JdbcReader());
deviceStream.broadcast().map(new MapFunction, Object>() {
@Override
public Object map(Mapvalue) {
deviceMap = value;
return null;
}
});
SourceMain.java (flink处理数据的主项目)
package com.flink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.flink.config.flinkConstants;
import com.flink.model.DeviceAlarm;
import com.flink.model.DeviceData;
import com.flink.utils.emqtt.EmqttSource;
import com.flink.utils.mysql.JdbcReader;
import com.flink.utils.mysql.JdbcWriter;
import com.flink.utils.opentsdb.OpnetsdbWriter;
import com.flink.utils.redis.RedisWriter;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.*;
public class EmqttFlinkMain {
private static MapdeviceMap = new Hashtable();
public static void main(String[] args) throws Exception {
flinkConstants fc = flinkConstants.getInstance();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 从mysql获取machID, 五分钟刷新一次 本博客讲解的地方
*/
DataStream> deviceStream &
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。