赞
踩
Apache Flink 官方文档提供了广播状态的功能以及有关 API 的详细指南。在使用广播状态时要记住以下4个重要事项:
广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。
另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。
一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份,节约内存。
1、flink batch
package flink.batch; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; /** * broadcast广播变量 * * 需求: * flink会从数据源中获取到用户的姓名 * 最终需要把用户的姓名和年龄信息打印出来 * * 所以就需要在中间map处理的时候,就需要获取用户的年龄信息 * 建议将用户的关系数据集使用广播变量进行处理 * * * 注意:如果多个算子需要使用同一份数据集,那么需要在对应的多个算子后面分别注册广播变量 */ public class BatchDemoBroadcast { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //1、准备广播变量的数据 ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>(); broadData.add(new Tuple2<>("python",18)); broadData.add(new Tuple2<>("scala",20)); broadData.add(new Tuple2<>("java",17)); DataSource <Tuple2 <String, Integer>> dataBroad = env.fromCollection(broadData); //2、对需要广播的数据进行处理,将tuple2类型转换成hashMap类型 DataSet<HashMap <String, Integer>> baseData = dataBroad.map(new MapFunction <Tuple2 <String, Integer>, HashMap <String, Integer>>() { @Override public HashMap <String, Integer> map(Tuple2 <String, Integer> value) throws Exception { HashMap <String, Integer> res = new HashMap <>(); res.put(value.f0, value.f1); return res; } }); DataSet <String> mainData = env.fromElements("python", "java","java","kafka","scala","redis"); DataSet <String> result = mainData.map(new RichMapFunction <String, String>() { List <HashMap <String, Integer>> broadCastMap = new ArrayList <HashMap <String, Integer>>(); HashMap <String, Integer> allMap = new HashMap <String, Integer>(); /** * 这个方法只会执行一次 * 可以在这里实现一些初始化的功能 * * 所以,就可以在open方法中获取广播变量数据 * */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //3:获取广播数据 this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName"); for (HashMap map : broadCastMap) { allMap.putAll(map); } } @Override public String map(String value) throws Exception { Integer age = allMap.get(value); return value + "," + age; } }).withBroadcastSet(baseData,"broadCastMapName"); result.print(); } }
2、flink stream
package flink.stream.addsource; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector; import java.util.concurrent.TimeUnit; /** * 功能: * 使用广播流,实现数据流的动态配置 * * @author unisinsight/tu.tengfei * @date 2019/5/13 11:07 */ public class StreamBroadcastDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(1); //自定义广播流,产生拦截数据的配置信息 DataStreamSource<String> filterData = env.addSource(new RichSourceFunction <String>() { private boolean isRunning = true; //测试数据集 String[] data = new String[]{"java", "python", "scala"}; /** * 模拟数据源,每1分钟产生一次数据,实现数据的跟新 * @param cxt * @throws Exception */ @Override public void run(SourceContext <String> cxt) throws Exception { int size = data.length; while (isRunning) { TimeUnit.MINUTES.sleep(1); int seed = (int) (Math.random() * size); //在数据集中随机生成一个数据进行发送 cxt.collect(data[seed]); System.out.println("发送的关键字是:" + data[seed]); } } @Override public void cancel() { isRunning = false; } }); //1、定义数据广播的规则: MapStateDescriptor <String, String> configFilter = new MapStateDescriptor <>("configFilter", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); //2、对filterData进行广播 BroadcastStream <String> broadcastConfig = filterData.setParallelism(1).broadcast(configFilter); //定义数据集 DataStreamSource <String> dataStream = env.addSource(new RichSourceFunction <String>() { private boolean isRunning = true; //测试数据集 String[] data = new String[]{ "java代码量太大", "python代码量少,易学习", "php是web开发语言", "scala流式处理语言,主要应用于大数据开发场景", "go是一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言" }; /** * 模拟数据源,每3s产生一次 * @param ctx * @throws Exception */ @Override public void run(SourceContext <String> ctx) throws Exception { int size = data.length; while (isRunning) { TimeUnit.SECONDS.sleep(3); int seed = (int) (Math.random() * size); //在数据集中随机生成一个数据进行发送 ctx.collect(data[seed]); System.out.println("上游发送的消息:" + data[seed]); } } @Override public void cancel() { isRunning = false; } }); //3、dataStream对广播的数据进行关联(使用connect进行连接) DataStream <String> result = dataStream.connect(broadcastConfig).process(new BroadcastProcessFunction <String, String, String>() { //拦截的关键字 private String keyWords = null; /** * open方法只会执行一次 * 可以在这实现初始化的功能 * 4、设置keyWords的初始值,否者会报错:java.lang.NullPointerException * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); keyWords="java"; System.out.println("初始化keyWords:java"); } /** * 6、 处理流中的数据 * @param value * @param ctx * @param out * @throws Exception */ @Override public void processElement(String value, ReadOnlyContext ctx, Collector <String> out) throws Exception { if (value.contains(keyWords)) { out.collect("拦截消息:" + value + ", 原因:包含拦截关键字:" + keyWords); } } /** */5、对广播变量的获取更新 * @param value * @param ctx * @param out * @throws Exception */ @Override public void processBroadcastElement(String value, Context ctx, Collector <String> out) throws Exception { keyWords = value; System.out.println("更新关键字:" + value); } }); result.print(); env.execute("broadcast test"); } }
3、读取数据库中的配置文件,对流数据进行处理。及流表与维表进行关联处理
package flink.stream.addsource; import flink.BasicConf; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.MapTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.util.Collector; /** * 需求: * 将postgresql中的数据读取到streamPgSql中,作为配置数据,包含code和name * 同时将streamPgSql通过广播,减少数据的内存消耗 * * 将kafka中的数据与postgresql中的数据进行join,清洗,得到相应的数据 * * Broadcast会将state广播到每个task * 注意该state并不会跨task传播 * 对其修改,仅仅是作用在其所在的task */ import java.util.HashMap; import java.util.Map; import java.util.Properties; public class StreamKafkaJoinPostgres { public static void main(String[] args) throws Exception { final String bootstrap = BasicConf.KafkaConf.bootstrap; final String zookeeper = BasicConf.KafkaConf.zookeeper; final String topic = "web"; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // env.enableCheckpointing(5000); //检查点 每5000ms // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // final StreamTableEnvironment tenv = TableEnvironment.getTableEnvironment(env); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", bootstrap);//kafka的节点的IP或者hostName,多个使用逗号分隔 properties.setProperty("zookeeper.connect", zookeeper);//zookeeper的节点的IP或者hostName,多个使用逗号进行分隔 properties.setProperty("group.id", "flinkStream");//flink consumer flink的消费者的group.id //1、读取postgresQL的配置消息 DataStream <String> streamPgSql = env.addSource(new PostgresqlSource()); final DataStream <HashMap <String, String>> conf = streamPgSql.map(new MapFunction <String, HashMap <String, String>>() { @Override public HashMap <String, String> map(String value) throws Exception { String[] tokens = value.split("\\t"); HashMap <String, String> hashMap = new HashMap <>(); hashMap.put(tokens[0], tokens[1]); System.out.println(tokens[0]+" : "+tokens[1]); return hashMap; // return new Tuple2<>(tokens[0],tokens[1]); } }); //2、创建MapStateDescriptor规则,对广播的数据的数据类型的规则 MapStateDescriptor <String,Map<String,String>> ruleStateDescriptor = new MapStateDescriptor <>("RulesBroadcastState" ,BasicTypeInfo.STRING_TYPE_INFO ,new MapTypeInfo<>(String.class,String.class)); //3、对conf进行broadcast返回BroadcastStream final BroadcastStream <HashMap <String, String>> confBroadcast = conf.broadcast(ruleStateDescriptor); //读取kafka中的stream FlinkKafkaConsumer011 <String> webStream = new FlinkKafkaConsumer011 <>(topic, new SimpleStringSchema(), properties); webStream.setStartFromEarliest(); DataStream <String> kafkaData = env.addSource(webStream).setParallelism(1); //192.168.108.209 2019-05-07 16:11:09 "GET /class/2.html" 503 https://search.yahoo.com/search?p=java核心编程 DataStream <Tuple5 <String, String, String, String, String>> map = kafkaData.map(new MapFunction <String, Tuple5 <String, String, String, String, String>>() { @Override public Tuple5 <String, String, String, String, String> map(String value) throws Exception { String[] tokens = value.split("\\t"); return new Tuple5 <>(tokens[0], tokens[1], tokens[2], tokens[3], tokens[4]); } }) //使用connect连接BroadcastStream,然后使用process对BroadcastConnectedStream流进行处理 .connect(confBroadcast) .process(new BroadcastProcessFunction <Tuple5 <String, String, String, String, String>, HashMap <String, String>, Tuple5 <String, String, String, String, String>>() { private HashMap<String,String> keyWords = new HashMap <>(); MapStateDescriptor <String,Map<String,String>> ruleStateDescriptor = new MapStateDescriptor <>("RulesBroadcastState" ,BasicTypeInfo.STRING_TYPE_INFO ,new MapTypeInfo<>(String.class,String.class)); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } @Override public void processElement(Tuple5 <String, String, String, String, String> value, ReadOnlyContext ctx, Collector <Tuple5 <String, String, String, String, String>> out) throws Exception { // Thread.sleep(10000); Map<String, String> map= ctx.getBroadcastState(ruleStateDescriptor).get("keyWords"); String result = map.get(value.f3); if (result == null) { out.collect(new Tuple5 <>(value.f0, value.f1, value.f2, value.f3, value.f4)); } else { out.collect(new Tuple5 <>(value.f0, value.f1, value.f2, result, value.f4)); } } /** * 接收广播中的数据 * @param value * @param ctx * @param out * @throws Exception */ @Override public void processBroadcastElement(HashMap <String, String> value, Context ctx, Collector <Tuple5 <String, String, String, String, String>> out) throws Exception { // System.out.println("收到广播数据:"+value.values()); BroadcastState <String, Map <String, String>> broadcastState = ctx.getBroadcastState(ruleStateDescriptor); keyWords.putAll(value); broadcastState.put("keyWords", keyWords); } }); map.print(); env.execute("Broadcast test kafka"); } }
PostgresqlSource
package flink.stream.addsource; /** * 对数据库中的数据进行读取,写入flink中 */ import flink.BasicConf; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; public class PostgresqlSource extends RichSourceFunction<String> { private static final long serialVersionUID = 1L; private Connection connection; private boolean isRunning = true; private PreparedStatement preparedStatement; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); String driver = BasicConf.PostgresConf.DRIVERNAME; String url = BasicConf.PostgresConf.URL; String user = BasicConf.PostgresConf.USERNAME; String password = BasicConf.PostgresConf.PASSWORD; Class.forName(driver); connection = DriverManager.getConnection(url,user, password); String sql = " SELECT code,name FROM public.config "; preparedStatement = connection.prepareStatement(sql); } @Override public void run(SourceContext <String> sourceContext) throws Exception { while (isRunning) { try { ResultSet resultSet = preparedStatement.executeQuery(); while (resultSet.next()) { Word word = new Word(); word.setCode(resultSet.getString("code")); word.setName(resultSet.getString("name")); sourceContext.collect(String.valueOf(word)); } } catch (Exception e) { e.printStackTrace(); } Thread.sleep(3000); } } @Override public void cancel() { isRunning=false; } @Override public void close() throws Exception { super.close(); if (connection != null) { connection.close(); } if (preparedStatement != null) { preparedStatement.close(); } } private class Word { private String code; private String name; public String getCode() { return code; } public void setCode(String code) { this.code = code; } public void setName(String name) { this.name = name; } public String getName() { return name; } public Word(){ this.code = code; this.name = name; } @Override public String toString() { return code+"\t"+name; } } }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。