当前位置:   article > 正文

Flink广播流——BroadcastStream_flink 广播流

flink 广播流

在日常Flink开发中,有时需要给正在运行中的Flink程序修改参数,比如过滤一些字段、字段值等,而这些值有时是配置在Mysql中的,但如果进行高吞吐计算的Function中动态查询配置文件有可能使任务阻塞,甚至导致任务出现失败的情况。

遇到上述场景时,可以考虑通过广播流查询配置文件,广播到某个operator的所有并发实例中,然后与另一个条流数据连接进行计算。

实现步骤:

1、首先定义一个Mysql的实体类,下面例子的属性名可以自己根据实际中Mysql中表名来变化

  1. class Flow {
  2. var flowId = 0
  3. //
  4. var mode: Int = 0
  5. //数据库名
  6. var databaseName: String = ""
  7. //mysql表名
  8. var tableName: String = ""
  9. //hbase表名
  10. var hbaseTable: String = ""
  11. //Column Family名称
  12. var family: String = ""
  13. //字段名转为大写,默认为true
  14. var uppercaseQualifier: Boolean = true
  15. //批量提交的大小,ETL中用到
  16. var commitBatch: Int = 0;
  17. //组成rowkey的字段名,必须用逗号分隔
  18. var rowKey: String = ""
  19. //状态
  20. var status: Int = 0
  21. var kuduTable: String = ""
  22. var tidbTable: String = ""
  23. var mask_fields: String = ""
  24. }

2、定义一个MapStateDescriptor来描述广播的数据格式

  1. private val flowStateDescriptor: MapStateDescriptor[String, Flow] = new MapStateDescriptor[String, Flow](
  2. "flowBroadCastState",
  3. BasicTypeInfo.STRING_TYPE_INFO,
  4. TypeInformation.of(new TypeHint[Flow] {}))

3、创建一个源Stream来广播下游的operator

  1. class FlowSource extends RichSourceFunction[Flow]{
  2. private val log: Logger = LoggerFactory.getLogger(Class[FlowSource].getClass)
  3. val serialVersionUID: Long = 3519222623348229907L
  4. private val flow = new Flow
  5. //状态位
  6. var isRunning: Boolean = true
  7. override def run(ctx: SourceFunction.SourceContext[Flow]): Unit = {
  8. //定时读取数据库的flow表,生成Flow数据
  9. while (isRunning) {
  10. val conn = MysqlJdbcUtils.getConnection(
  11. "jdbc:mysql://10.101.40.197:3306/hdb_data_warehouse?useUnicode=true&characterEncoding=utf8",
  12. "canal",
  13. "canal"
  14. )
  15. val statement = conn.createStatement()
  16. val rs = statement.executeQuery("select * from data_warehouse_cfg")
  17. while (rs.next()) {
  18. flow.flowId = rs.getInt("flow_id")
  19. flow.databaseName = rs.getString("mysql_db")
  20. flow.tableName = rs.getString("mysql_table")
  21. flow.hbaseTable = rs.getString("hbase_table")
  22. flow.family = rs.getString("hbase_col_family")
  23. flow.commitBatch = rs.getInt("status")
  24. flow.status = rs.getInt("status")
  25. flow.rowKey = rs.getString("hbase_rowkey")
  26. flow.kuduTable = rs.getString("kudu_table")
  27. flow.tidbTable = rs.getString("tidb_table")
  28. flow.mask_fields = rs.getString("mask_fields")
  29. log.debug("load flow: " + flow.toString)
  30. ctx.collect(flow)
  31. }
  32. //隔一段时间读取,可以使用更新的配置生效
  33. Thread.sleep(60 * 1000L)
  34. }
  35. }
  36. override def cancel(): Unit = {
  37. isRunning = false
  38. }
  39. }

4、添加数据源并把数据源注册成广播流

val broadcast: BroadcastStream[Flow] = env.addSource(new FlowSource).broadcast(flowStateDescriptor)

5、连接广播流和处理数据的流

  1. val connectedStream: DataStream[(FlatMessage, Flow)] = keyedMessage.connect(broadcast).process(new KeyedBroadcastProcessFunction[String, FlatMessage, Flow, (FlatMessage, Flow)] {
  2. override def processElement(
  3. message: FlatMessage,
  4. ctx: KeyedBroadcastProcessFunction[String, FlatMessage, Flow, (FlatMessage, Flow)]#ReadOnlyContext,
  5. out: Collector[(FlatMessage, Flow)]): Unit = {
  6. //获取配置流
  7. val flow = ctx.getBroadcastState(flowStateDescriptor).get(message.getDatabase + message.getTable)
  8. if (null != flow) {
  9. out.collect((message, flow))
  10. }
  11. }
  12. override def processBroadcastElement(
  13. flow: Flow,
  14. ctx: KeyedBroadcastProcessFunction[String, FlatMessage, Flow, (FlatMessage, Flow)]#Context,
  15. out: Collector[(FlatMessage, Flow)]): Unit = {
  16. val broadcast: BroadcastState[String, Flow] = ctx.getBroadcastState(flowStateDescriptor)
  17. ...
  18. broadcast.put(key, flow)
  19. }
  20. })

需要注意到的问题:

  1. 数据源发送数据时候如果数据是集合,必须使用线程安全的集合类
  2. 获取到的BroadcastState是一个map,相同的KEY,put进去会覆盖掉

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/696228
推荐阅读
相关标签
  

闽ICP备14008679号