赞
踩
在日常Flink开发中,有时需要给正在运行中的Flink程序修改参数,比如过滤一些字段、字段值等,而这些值有时是配置在Mysql中的,但如果进行高吞吐计算的Function中动态查询配置文件有可能使任务阻塞,甚至导致任务出现失败的情况。
遇到上述场景时,可以考虑通过广播流查询配置文件,广播到某个operator的所有并发实例中,然后与另一个条流数据连接进行计算。
实现步骤:
1、首先定义一个Mysql的实体类,下面例子的属性名可以自己根据实际中Mysql中表名来变化
- class Flow {
- var flowId = 0
- //
- var mode: Int = 0
- //数据库名
- var databaseName: String = ""
- //mysql表名
- var tableName: String = ""
- //hbase表名
- var hbaseTable: String = ""
- //Column Family名称
- var family: String = ""
- //字段名转为大写,默认为true
- var uppercaseQualifier: Boolean = true
- //批量提交的大小,ETL中用到
- var commitBatch: Int = 0;
- //组成rowkey的字段名,必须用逗号分隔
- var rowKey: String = ""
- //状态
- var status: Int = 0
- var kuduTable: String = ""
- var tidbTable: String = ""
- var mask_fields: String = ""
- }
2、定义一个MapStateDescriptor来描述广播的数据格式
- private val flowStateDescriptor: MapStateDescriptor[String, Flow] = new MapStateDescriptor[String, Flow](
- "flowBroadCastState",
- BasicTypeInfo.STRING_TYPE_INFO,
- TypeInformation.of(new TypeHint[Flow] {}))
3、创建一个源Stream来广播下游的operator
- class FlowSource extends RichSourceFunction[Flow]{
- private val log: Logger = LoggerFactory.getLogger(Class[FlowSource].getClass)
-
- val serialVersionUID: Long = 3519222623348229907L
-
- private val flow = new Flow
-
- //状态位
- var isRunning: Boolean = true
-
- override def run(ctx: SourceFunction.SourceContext[Flow]): Unit = {
- //定时读取数据库的flow表,生成Flow数据
- while (isRunning) {
- val conn = MysqlJdbcUtils.getConnection(
- "jdbc:mysql://10.101.40.197:3306/hdb_data_warehouse?useUnicode=true&characterEncoding=utf8",
- "canal",
- "canal"
- )
-
- val statement = conn.createStatement()
- val rs = statement.executeQuery("select * from data_warehouse_cfg")
-
- while (rs.next()) {
- flow.flowId = rs.getInt("flow_id")
- flow.databaseName = rs.getString("mysql_db")
- flow.tableName = rs.getString("mysql_table")
- flow.hbaseTable = rs.getString("hbase_table")
- flow.family = rs.getString("hbase_col_family")
- flow.commitBatch = rs.getInt("status")
- flow.status = rs.getInt("status")
- flow.rowKey = rs.getString("hbase_rowkey")
- flow.kuduTable = rs.getString("kudu_table")
- flow.tidbTable = rs.getString("tidb_table")
- flow.mask_fields = rs.getString("mask_fields")
-
- log.debug("load flow: " + flow.toString)
- ctx.collect(flow)
- }
- //隔一段时间读取,可以使用更新的配置生效
- Thread.sleep(60 * 1000L)
- }
- }
-
- override def cancel(): Unit = {
- isRunning = false
- }
- }
4、添加数据源并把数据源注册成广播流
val broadcast: BroadcastStream[Flow] = env.addSource(new FlowSource).broadcast(flowStateDescriptor)
5、连接广播流和处理数据的流
- val connectedStream: DataStream[(FlatMessage, Flow)] = keyedMessage.connect(broadcast).process(new KeyedBroadcastProcessFunction[String, FlatMessage, Flow, (FlatMessage, Flow)] {
- override def processElement(
- message: FlatMessage,
- ctx: KeyedBroadcastProcessFunction[String, FlatMessage, Flow, (FlatMessage, Flow)]#ReadOnlyContext,
- out: Collector[(FlatMessage, Flow)]): Unit = {
- //获取配置流
- val flow = ctx.getBroadcastState(flowStateDescriptor).get(message.getDatabase + message.getTable)
-
- if (null != flow) {
- out.collect((message, flow))
- }
-
- }
-
- override def processBroadcastElement(
- flow: Flow,
- ctx: KeyedBroadcastProcessFunction[String, FlatMessage, Flow, (FlatMessage, Flow)]#Context,
- out: Collector[(FlatMessage, Flow)]): Unit = {
- val broadcast: BroadcastState[String, Flow] = ctx.getBroadcastState(flowStateDescriptor)
- ...
- broadcast.put(key, flow)
- }
- })
需要注意到的问题:
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。