当前位置:   article > 正文

Flink(51):Flink高级特性之广播状态(BroadcastState)_flink broadcast

flink broadcast

目录

0. 相关文章链接

1. BroadcastState介绍

2. 需求-实现配置动态更新

3. 编码步骤

4. 代码实现


0. 相关文章链接

Flink文章汇总

1. BroadcastState介绍

        在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用 Broadcast State。Broadcast State 是 Flink 1.5 引入的新特性。

        下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到另一个数据流的计算中 。

场景举例:

  1. 动态更新计算规则: 如事件流需要根据最新的规则进行计算,则可将规则作为广播状态广播到下游Task中。
  2. 实时增加额外字段: 如事件流需要实时增加用户的基础信息,则可将用户的基础信息作为广播状态广播到下游Task中。

API介绍:

        首先创建一个Keyed 或Non-Keyed 的DataStream,然后再创建一个BroadcastedStream,最后通过DataStream来连接(调用connect 方法)到Broadcasted Stream 上,这样实现将BroadcastState广播到Data Stream 下游的每个Task中。

案例一:

        如果DataStream是Keyed Stream ,则连接到Broadcasted Stream 后, 添加处理ProcessFunction 时需要使用KeyedBroadcastProcessFunction 来实现, 下面是KeyedBroadcastProcessFunction 的API,代码如下所示:

  1. public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
  2. public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
  3. public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
  4. }

上面泛型中的各个参数的含义,说明如下:

  • KS:表示Flink 程序从最上游的Source Operator 开始构建Stream,当调用keyBy 时所依赖的Key 的类型;
  • IN1:表示非Broadcast 的Data Stream 中的数据记录的类型;
  • IN2:表示Broadcast Stream 中的数据记录的类型;
  • OUT:表示经过KeyedBroadcastProcessFunction 的processElement()和processBroadcastElement()方法处理后输出结果数据记录的类型。

案例二:

        如果Data Stream 是Non-Keyed Stream,则连接到Broadcasted Stream 后,添加处理ProcessFunction 时需要使用BroadcastProcessFunction 来实现, 下面是BroadcastProcessFunction 的API,代码如下所示:

  1. public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
  2. public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
  3. public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
  4. }

        上面泛型中的各个参数的含义,与前面KeyedBroadcastProcessFunction 的泛型类型中的后3 个含义相同,只是没有调用keyBy 操作对原始Stream 进行分区操作,就不需要KS 泛型参数。

        具体如何使用上面的BroadcastProcessFunction,接下来我们会在通过实际编程,来以使用KeyedBroadcastProcessFunction 为例进行详细说明。

注意事项:

  • Broadcast State 是Map 类型,即K-V 类型。
  • Broadcast State 只有在广播的一侧, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processBroadcastElement 方法中可以修改。在非广播的一侧, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processElement 方法中只读。
  • Broadcast State 中元素的顺序,在各Task 中可能不同。基于顺序的处理,需要注意。
  • Broadcast State 在Checkpoint 时,每个Task 都会Checkpoint 广播状态。
  • Broadcast State 在运行时保存在内存中,目前还不能保存在RocksDB State Backend 中。

2. 需求-实现配置动态更新

实时过滤出配置中的用户,并在事件流中补全这批用户的基础信息。

1. 事件流:表示用户在某个时刻浏览或点击了某个商品,格式如下

  1. {"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}
  2. {"userID": "user_2", "eventTime": "2019-08-17 12:19:48", "eventType": "click", "productID": 1}

2. 配置数据: 表示用户的详细信息,在Mysql中,如下

  1. DROP TABLE IF EXISTS `user_info`;
  2. CREATE TABLE `user_info` (
  3. `userID` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  4. `userName` varchar(10) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  5. `userAge` int(11) NULL DEFAULT NULL,
  6. PRIMARY KEY (`userID`) USING BTREE
  7. ) ENGINE = MyISAM CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
  8. -- ----------------------------
  9. -- Records of user_info
  10. -- ----------------------------
  11. INSERT INTO `user_info` VALUES ('user_1', '张三', 10);
  12. INSERT INTO `user_info` VALUES ('user_2', '李四', 20);
  13. INSERT INTO `user_info` VALUES ('user_3', '王五', 30);
  14. INSERT INTO `user_info` VALUES ('user_4', '赵六', 40);
  15. SET FOREIGN_KEY_CHECKS = 1;

3. 输出结果

  1. (user_3,2019-08-17 12:19:47,browse,1,王五,33)
  2. (user_2,2019-08-17 12:19:48,click,1,李四,20)

3. 编码步骤

  1. 1. env
  2. 2. source
  3. 2.1. 构建实时数据事件流-自定义随机
  4. <userID, eventTime, eventType, productID>
  5. 2.2.构建配置流-从MySQL
  6. <用户id,<姓名,年龄>>
  7. 3. transformation
  8. 3.1. 定义状态描述器
  9. MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =
  10. new MapStateDescriptor<>("config",Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
  11. 3.2. 广播配置流
  12. BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor);
  13. 3.3. 将事件流和广播流进行连接
  14. BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS);
  15. 3.4. 处理连接后的流-根据配置流补全事件流中的用户的信息
  16. 4. sink
  17. 5. execute

4. 代码实现

  1. import org.apache.flink.api.common.state.BroadcastState;
  2. import org.apache.flink.api.common.state.MapStateDescriptor;
  3. import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
  4. import org.apache.flink.api.common.typeinfo.Types;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.api.java.tuple.Tuple4;
  7. import org.apache.flink.api.java.tuple.Tuple6;
  8. import org.apache.flink.configuration.Configuration;
  9. import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
  10. import org.apache.flink.streaming.api.datastream.BroadcastStream;
  11. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  14. import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
  15. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  16. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  17. import org.apache.flink.util.Collector;
  18. import java.sql.Connection;
  19. import java.sql.DriverManager;
  20. import java.sql.PreparedStatement;
  21. import java.sql.ResultSet;
  22. import java.text.SimpleDateFormat;
  23. import java.util.Date;
  24. import java.util.HashMap;
  25. import java.util.Map;
  26. import java.util.Random;
  27. /**
  28. * Desc
  29. * 需求:
  30. * 使用Flink的BroadcastState来完成
  31. * 事件流和配置流(需要广播为State)的关联,并实现配置的动态更新!
  32. */
  33. public class BroadcastStateConfigUpdate {
  34. public static void main(String[] args) throws Exception{
  35. //1.env
  36. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  37. //2.source
  38. //-1.构建实时的自定义随机数据事件流-数据源源不断产生,量会很大
  39. //<userID, eventTime, eventType, productID>
  40. DataStreamSource<Tuple4<String, String, String, Integer>> eventDS = env.addSource(new MySource());
  41. //-2.构建配置流-从MySQL定期查询最新的,数据量较小
  42. //<用户id,<姓名,年龄>>
  43. DataStreamSource<Map<String, Tuple2<String, Integer>>> configDS = env.addSource(new MySQLSource());
  44. //3.transformation
  45. //-1.定义状态描述器-准备将配置流作为状态广播
  46. MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =
  47. new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
  48. //-2.将配置流根据状态描述器广播出去,变成广播状态流
  49. BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor);
  50. //-3.将事件流和广播流进行连接
  51. BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS);
  52. //-4.处理连接后的流-根据配置流补全事件流中的用户的信息
  53. SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> result = connectDS
  54. //BroadcastProcessFunction<IN1, IN2, OUT>
  55. .process(new BroadcastProcessFunction<
  56. //<userID, eventTime, eventType, productID> //事件流
  57. Tuple4<String, String, String, Integer>,
  58. //<用户id,<姓名,年龄>> //广播流
  59. Map<String, Tuple2<String, Integer>>,
  60. //<用户id,eventTime,eventType,productID,姓名,年龄> //需要收集的数据
  61. Tuple6<String, String, String, Integer, String, Integer>>() {
  62. //处理事件流中的元素
  63. @Override
  64. public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
  65. //取出事件流中的userId
  66. String userId = value.f0;
  67. //根据状态描述器获取广播状态
  68. ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);
  69. if (broadcastState != null) {
  70. //取出广播状态中的map<用户id,<姓名,年龄>>
  71. Map<String, Tuple2<String, Integer>> map = broadcastState.get(null);
  72. if (map != null) {
  73. //通过userId取map中的<姓名,年龄>
  74. Tuple2<String, Integer> tuple2 = map.get(userId);
  75. //取出tuple2中的姓名和年龄
  76. String userName = tuple2.f0;
  77. Integer userAge = tuple2.f1;
  78. out.collect(Tuple6.of(userId, value.f1, value.f2, value.f3, userName, userAge));
  79. }
  80. }
  81. }
  82. //处理广播流中的元素
  83. @Override
  84. public void processBroadcastElement(Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
  85. //value就是MySQLSource中每隔一段时间获取到的最新的map数据
  86. //先根据状态描述器获取历史的广播状态
  87. BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);
  88. //再清空历史状态数据
  89. broadcastState.clear();
  90. //最后将最新的广播流数据放到state中(更新状态数据)
  91. broadcastState.put(null, value);
  92. }
  93. });
  94. //4.sink
  95. result.print();
  96. //5.execute
  97. env.execute();
  98. }
  99. /**
  100. * <userID, eventTime, eventType, productID>
  101. */
  102. public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>>{
  103. private boolean isRunning = true;
  104. @Override
  105. public void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception {
  106. Random random = new Random();
  107. SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  108. while (isRunning){
  109. int id = random.nextInt(4) + 1;
  110. String user_id = "user_" + id;
  111. String eventTime = df.format(new Date());
  112. String eventType = "type_" + random.nextInt(3);
  113. int productId = random.nextInt(4);
  114. ctx.collect(Tuple4.of(user_id,eventTime,eventType,productId));
  115. Thread.sleep(500);
  116. }
  117. }
  118. @Override
  119. public void cancel() {
  120. isRunning = false;
  121. }
  122. }
  123. /**
  124. * <用户id,<姓名,年龄>>
  125. */
  126. public static class MySQLSource extends RichSourceFunction<Map<String, Tuple2<String, Integer>>> {
  127. private boolean flag = true;
  128. private Connection conn = null;
  129. private PreparedStatement ps = null;
  130. private ResultSet rs = null;
  131. @Override
  132. public void open(Configuration parameters) throws Exception {
  133. conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
  134. String sql = "select `userID`, `userName`, `userAge` from `user_info`";
  135. ps = conn.prepareStatement(sql);
  136. }
  137. @Override
  138. public void run(SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception {
  139. while (flag){
  140. Map<String, Tuple2<String, Integer>> map = new HashMap<>();
  141. ResultSet rs = ps.executeQuery();
  142. while (rs.next()){
  143. String userID = rs.getString("userID");
  144. String userName = rs.getString("userName");
  145. int userAge = rs.getInt("userAge");
  146. //Map<String, Tuple2<String, Integer>>
  147. map.put(userID,Tuple2.of(userName,userAge));
  148. }
  149. ctx.collect(map);
  150. Thread.sleep(5000);//每隔5s更新一下用户的配置信息!
  151. }
  152. }
  153. @Override
  154. public void cancel() {
  155. flag = false;
  156. }
  157. @Override
  158. public void close() throws Exception {
  159. if (conn != null) conn.close();
  160. if (ps != null) ps.close();
  161. if (rs != null) rs.close();
  162. }
  163. }
  164. }

注:此博客根据某马2020年贺岁视频改编而来 -> B站网址

注:其他相关文章链接由此进 -> Flink文章汇总


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/557340
推荐阅读
相关标签
  

闽ICP备14008679号