当前位置:   article > 正文

flink stream broadcast广播变量_flink在process中使用open读取广播

flink在process中使用open读取广播

Apache Flink 官方文档提供了广播状态的功能以及有关 API 的详细指南。在使用广播状态时要记住以下4个重要事项:

  • 使用广播状态,operator task 之间不会相互通信
  • 广播状态中事件的顺序在各个并发实例中可能不尽相同
  • 所有 operator task 都会快照下他们的广播状态
  • RocksDB 状态后端目前还不支持广播状态

广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。
另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。

一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份,节约内存。

用法
  • 1、定义一个MapStateDescriptor来描述我们要广播的数据的格式
  • 2、需要一个Stream来广播下游的operator
  • 3、添加数据源并把数据源注册成广播流
  • 4、连接广播流和处理数据的流
示例

1、flink batch

package flink.batch;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
 * broadcast广播变量
 *
 * 需求:
 * flink会从数据源中获取到用户的姓名
 * 最终需要把用户的姓名和年龄信息打印出来
 *
 * 所以就需要在中间map处理的时候,就需要获取用户的年龄信息
 * 建议将用户的关系数据集使用广播变量进行处理
 *
 *
 * 注意:如果多个算子需要使用同一份数据集,那么需要在对应的多个算子后面分别注册广播变量
 */
public class BatchDemoBroadcast {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //1、准备广播变量的数据
        ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();
        broadData.add(new Tuple2<>("python",18));
        broadData.add(new Tuple2<>("scala",20));
        broadData.add(new Tuple2<>("java",17));
        DataSource <Tuple2 <String, Integer>> dataBroad = env.fromCollection(broadData);

        //2、对需要广播的数据进行处理,将tuple2类型转换成hashMap类型
        DataSet<HashMap <String, Integer>> baseData = dataBroad.map(new MapFunction <Tuple2 <String, Integer>, HashMap <String, Integer>>() {
            @Override
            public HashMap <String, Integer> map(Tuple2 <String, Integer> value) throws Exception {
                HashMap <String, Integer> res = new HashMap <>();
                res.put(value.f0, value.f1);
                return res;
            }
        });

        DataSet <String> mainData = env.fromElements("python", "java","java","kafka","scala","redis");

        DataSet <String> result = mainData.map(new RichMapFunction <String, String>() {
            List <HashMap <String, Integer>> broadCastMap = new ArrayList <HashMap <String, Integer>>();
            HashMap <String, Integer> allMap = new HashMap <String, Integer>();

            /**
             * 这个方法只会执行一次
             * 可以在这里实现一些初始化的功能
             *
             * 所以,就可以在open方法中获取广播变量数据
             *
             */
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //3:获取广播数据
                this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");
                for (HashMap map : broadCastMap) {
                    allMap.putAll(map);
                }

            }

            @Override
            public String map(String value) throws Exception {
                Integer age = allMap.get(value);
                return value + "," + age;
            }
        }).withBroadcastSet(baseData,"broadCastMapName");

        result.print();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83

2、flink stream

package flink.stream.addsource;

import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

/**
 * 功能:
 * 使用广播流,实现数据流的动态配置
 *
 * @author unisinsight/tu.tengfei
 * @date 2019/5/13 11:07
 */
public class StreamBroadcastDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);

        //自定义广播流,产生拦截数据的配置信息
        DataStreamSource<String> filterData = env.addSource(new RichSourceFunction <String>() {

            private boolean isRunning = true;
            //测试数据集
            String[] data = new String[]{"java", "python", "scala"};

            /**
             * 模拟数据源,每1分钟产生一次数据,实现数据的跟新
             * @param cxt
             * @throws Exception
             */
            @Override
            public void run(SourceContext <String> cxt) throws Exception {
                int size = data.length;
                while (isRunning) {
                    TimeUnit.MINUTES.sleep(1);
                    int seed = (int) (Math.random() * size);
                    //在数据集中随机生成一个数据进行发送
                    cxt.collect(data[seed]);
                    System.out.println("发送的关键字是:" + data[seed]);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });
        //1、定义数据广播的规则:
        MapStateDescriptor <String, String> configFilter = new MapStateDescriptor <>("configFilter", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

        //2、对filterData进行广播
        BroadcastStream <String> broadcastConfig = filterData.setParallelism(1).broadcast(configFilter);

        //定义数据集
        DataStreamSource <String> dataStream = env.addSource(new RichSourceFunction <String>() {
            private boolean isRunning = true;
            //测试数据集
            String[] data = new String[]{
                    "java代码量太大",
                    "python代码量少,易学习",
                    "php是web开发语言",
                    "scala流式处理语言,主要应用于大数据开发场景",
                    "go是一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言"
            };

            /**
             * 模拟数据源,每3s产生一次
             * @param ctx
             * @throws Exception
             */
            @Override
            public void run(SourceContext <String> ctx) throws Exception {
                int size = data.length;
                while (isRunning) {
                    TimeUnit.SECONDS.sleep(3);
                    int seed = (int) (Math.random() * size);
                    //在数据集中随机生成一个数据进行发送
                    ctx.collect(data[seed]);
                    System.out.println("上游发送的消息:" + data[seed]);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        //3、dataStream对广播的数据进行关联(使用connect进行连接)
        DataStream <String> result = dataStream.connect(broadcastConfig).process(new BroadcastProcessFunction <String, String, String>() {

            //拦截的关键字
            private String keyWords = null;

            /**
             * open方法只会执行一次
             * 可以在这实现初始化的功能
             * 4、设置keyWords的初始值,否者会报错:java.lang.NullPointerException
             * @param parameters
             * @throws Exception
             */
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                keyWords="java";
                System.out.println("初始化keyWords:java");
            }

            /**
             * 6、 处理流中的数据
             * @param value
             * @param ctx
             * @param out
             * @throws Exception
             */
            @Override
            public void processElement(String value, ReadOnlyContext ctx, Collector <String> out) throws Exception {
                if (value.contains(keyWords)) {
                    out.collect("拦截消息:" + value + ", 原因:包含拦截关键字:" + keyWords);
                }
            }

            /**
             */5、对广播变量的获取更新
             * @param value
             * @param ctx
             * @param out
             * @throws Exception
             */
            @Override
            public void processBroadcastElement(String value, Context ctx, Collector <String> out) throws Exception {
                keyWords = value;
                System.out.println("更新关键字:" + value);
            }
        });

        result.print();

        env.execute("broadcast test");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152

3、读取数据库中的配置文件,对流数据进行处理。及流表与维表进行关联处理

package flink.stream.addsource;

import flink.BasicConf;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.MapStateDescriptor;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;

/**
 * 需求:
 * 将postgresql中的数据读取到streamPgSql中,作为配置数据,包含code和name
 * 同时将streamPgSql通过广播,减少数据的内存消耗
 *
 * 将kafka中的数据与postgresql中的数据进行join,清洗,得到相应的数据
 *
 * Broadcast会将state广播到每个task
 * 注意该state并不会跨task传播
 * 对其修改,仅仅是作用在其所在的task
 */
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class StreamKafkaJoinPostgres {
    public static void main(String[] args) throws Exception {
        final String bootstrap = BasicConf.KafkaConf.bootstrap;
        final String zookeeper = BasicConf.KafkaConf.zookeeper;
        final String topic = "web";
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//        env.enableCheckpointing(5000);  //检查点 每5000ms
//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//        final StreamTableEnvironment tenv = TableEnvironment.getTableEnvironment(env);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", bootstrap);//kafka的节点的IP或者hostName,多个使用逗号分隔
        properties.setProperty("zookeeper.connect", zookeeper);//zookeeper的节点的IP或者hostName,多个使用逗号进行分隔
        properties.setProperty("group.id", "flinkStream");//flink consumer flink的消费者的group.id

        //1、读取postgresQL的配置消息
        DataStream <String> streamPgSql = env.addSource(new PostgresqlSource());

        final DataStream <HashMap <String, String>> conf = streamPgSql.map(new MapFunction <String, HashMap <String, String>>() {
            @Override
            public HashMap <String, String> map(String value) throws Exception {
                String[] tokens = value.split("\\t");
                HashMap <String, String> hashMap = new HashMap <>();
                hashMap.put(tokens[0], tokens[1]);
                System.out.println(tokens[0]+" : "+tokens[1]);
                return hashMap;
//                return new Tuple2<>(tokens[0],tokens[1]);
            }
        });


        //2、创建MapStateDescriptor规则,对广播的数据的数据类型的规则
        MapStateDescriptor <String,Map<String,String>> ruleStateDescriptor = new MapStateDescriptor <>("RulesBroadcastState"
                ,BasicTypeInfo.STRING_TYPE_INFO
                ,new MapTypeInfo<>(String.class,String.class));
        //3、对conf进行broadcast返回BroadcastStream
        final BroadcastStream <HashMap <String, String>> confBroadcast = conf.broadcast(ruleStateDescriptor);

        //读取kafka中的stream
        FlinkKafkaConsumer011 <String> webStream = new FlinkKafkaConsumer011 <>(topic, new SimpleStringSchema(), properties);
        webStream.setStartFromEarliest();
        DataStream <String> kafkaData = env.addSource(webStream).setParallelism(1);
        //192.168.108.209	2019-05-07 16:11:09	"GET /class/2.html"	503	https://search.yahoo.com/search?p=java核心编程
        DataStream <Tuple5 <String, String, String, String, String>> map = kafkaData.map(new MapFunction <String, Tuple5 <String, String, String, String, String>>() {
            @Override
            public Tuple5 <String, String, String, String, String> map(String value) throws Exception {
                String[] tokens = value.split("\\t");
                return new Tuple5 <>(tokens[0], tokens[1], tokens[2], tokens[3], tokens[4]);
            }
        })
                //使用connect连接BroadcastStream,然后使用process对BroadcastConnectedStream流进行处理
                .connect(confBroadcast)
                .process(new BroadcastProcessFunction <Tuple5 <String, String, String, String, String>, HashMap <String, String>, Tuple5 <String, String, String, String, String>>() {
                    private HashMap<String,String> keyWords = new HashMap <>();
                    MapStateDescriptor <String,Map<String,String>> ruleStateDescriptor = new MapStateDescriptor <>("RulesBroadcastState"
                            ,BasicTypeInfo.STRING_TYPE_INFO
                            ,new MapTypeInfo<>(String.class,String.class));

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                    }

                    @Override
                    public void processElement(Tuple5 <String, String, String, String, String> value, ReadOnlyContext ctx, Collector <Tuple5 <String, String, String, String, String>> out) throws Exception {
//                        Thread.sleep(10000);
						Map<String, String> map= ctx.getBroadcastState(ruleStateDescriptor).get("keyWords");
                        String result = map.get(value.f3);
                        if (result == null) {
                            out.collect(new Tuple5 <>(value.f0, value.f1, value.f2, value.f3, value.f4));
                        } else {
                            out.collect(new Tuple5 <>(value.f0, value.f1, value.f2, result, value.f4));
                        }
                  
                    }

                    /**
                     * 接收广播中的数据
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void processBroadcastElement(HashMap <String, String> value, Context ctx, Collector <Tuple5 <String, String, String, String, String>> out) throws Exception {
//                        System.out.println("收到广播数据:"+value.values());
                        BroadcastState <String, Map <String, String>> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);
                        keyWords.putAll(value);
                        broadcastState.put("keyWords", keyWords);
                    }
                });

        map.print();
        env.execute("Broadcast test kafka");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130

PostgresqlSource

package flink.stream.addsource;

/**
 * 对数据库中的数据进行读取,写入flink中
 */

import flink.BasicConf;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class PostgresqlSource extends RichSourceFunction<String> {

    private static final long serialVersionUID = 1L;

    private Connection connection;

    private boolean isRunning = true;

    private PreparedStatement preparedStatement;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        String driver = BasicConf.PostgresConf.DRIVERNAME;
        String url = BasicConf.PostgresConf.URL;
        String user = BasicConf.PostgresConf.USERNAME;
        String password = BasicConf.PostgresConf.PASSWORD;

        Class.forName(driver);
        connection = DriverManager.getConnection(url,user, password);
        String sql = " SELECT code,name FROM  public.config ";
        preparedStatement = connection.prepareStatement(sql);
    }


    @Override
    public void run(SourceContext <String> sourceContext) throws Exception {
        while (isRunning) {
            try {
                ResultSet resultSet = preparedStatement.executeQuery();
                while (resultSet.next()) {
                    Word word = new Word();
                    word.setCode(resultSet.getString("code"));
                    word.setName(resultSet.getString("name"));
                    sourceContext.collect(String.valueOf(word));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

            Thread.sleep(3000);
        }
    }



    @Override
    public void cancel() {
        isRunning=false;
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) {
            connection.close();
        }
        if (preparedStatement != null) {
            preparedStatement.close();
        }
    }

    private class Word {
        private String code;
        private String name;

        public String getCode() {
            return code;
        }

        public void setCode(String code) {
            this.code = code;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getName() {
            return name;

        }

        public Word(){
            this.code = code;
            this.name = name;
        }

        @Override
        public String toString() {
            return code+"\t"+name;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/696249
推荐阅读
相关标签
  

闽ICP备14008679号