赞
踩
正片文章分两个阶段来分析。
第一阶段在 AbstractUdfStreamOperator 中将 userFunction 指向 UserDefineSource, env.addSource(new UserDefineSource) 为入口。
第二阶段在 StreamSource 中 通过 userFunction.run()
调用了 UserDefineSource 中的 run 方法。
def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 注册自定义 SourceFunction 类到 StreamExecutionEnvironment val uds: DataStream[SensorReading] = env.addSource(new UserDefineSource) // ⭐️⭐️⭐️ 此处开始分析 uds.print("ReadFromUDS Test") env.execute() } class UserDefineSource extends SourceFunction[SensorReading] { var running: Boolean = true override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = { //模拟随机数据, 可以是其他源 mysql, redis, elastic search, ... val random: Random = new Random() //初始温度 val initTemp: immutable.IndexedSeq[(String, Double)] = 1.to(10).map( i => { ("sensor_" + i, random.nextDouble() * 100) } ) while (running) { //将上次数据温度值微调,生成当前温度 val curTemp: immutable.IndexedSeq[(String, Double)] = { initTemp.map( data => (data._1, data._2 + random.nextGaussian()) ) } // 生成 数据产生的时间戳 val curTime: Long = System.currentTimeMillis() //数据放入 SourceContext 中。 curTemp.foreach( data => { ctx.collect(SensorReading(data._1, curTime, data._2)) } ) Thread.sleep(2000) } } override def cancel(): Unit = { running = false } }
StreamExecutionEnvironment
StreamExecutionEnvironment
StreamExecutionEnvironment
StreamSource
AbstractUdfStreamOperator
第一阶段结束。 以上 userFunction
就指向我们自定义的 SourceFunction
子类 UserDefineSource
执行任务时调用 run 方法。
env.execute()
执行后,就会经过一系类操作后,在 SourceStreamTask
启动一个线程。
此处一系列操作主要是通过反射进行的。为了简化问题此处省略这一过程。
Task ( run() -> doRun() -> invokable.invoke() -> runMailboxLoop() )
=> StreamTask( mailboxProcessor.runMailboxLoop() )
=> MailboxProcessor (mailboxDefaultAction.runDefaultAction(defaultActionContext) )
=>SourceStreamTask ( processInput() )
=> 接下面 声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/484419
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。