当前位置:   article > 正文

Flink 自定义addSource 后 SourceFunction 的 run 方法如何执行?源码分析。_sourcefunction run

sourcefunction run

正片文章分两个阶段来分析。
第一阶段在 AbstractUdfStreamOperator 中将 userFunction 指向 UserDefineSource, env.addSource(new UserDefineSource) 为入口。
第二阶段在 StreamSource 中 通过 userFunction.run() 调用了 UserDefineSource 中的 run 方法。

第一阶段

  1. 用户自定义的类
    def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        // 注册自定义 SourceFunction 类到 StreamExecutionEnvironment
        val uds: DataStream[SensorReading] = env.addSource(new UserDefineSource) // ⭐️⭐️⭐️ 此处开始分析
        uds.print("ReadFromUDS Test")
        env.execute()
    }

    class UserDefineSource extends SourceFunction[SensorReading] {

        var running: Boolean = true

        override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
            //模拟随机数据, 可以是其他源 mysql, redis, elastic search, ...
            val random: Random = new Random()
            //初始温度
            val initTemp: immutable.IndexedSeq[(String, Double)] = 1.to(10).map(
                i => {
                    ("sensor_" + i, random.nextDouble() * 100)
                }
            )
            while (running) {
                //将上次数据温度值微调,生成当前温度
                val curTemp: immutable.IndexedSeq[(String, Double)] = {
                    initTemp.map(
                        data => (data._1, data._2 + random.nextGaussian())
                    )
                }
                // 生成 数据产生的时间戳
                val curTime: Long = System.currentTimeMillis()
                //数据放入 SourceContext 中。
                curTemp.foreach(
                    data => {
                        ctx.collect(SensorReading(data._1, curTime, data._2))
                    }
                )
                Thread.sleep(2000)
            }
        }

        override def cancel(): Unit = {
            running = false
        }
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  1. StreamExecutionEnvironment
    在这里插入图片描述

  2. StreamExecutionEnvironment
    在这里插入图片描述

  3. StreamExecutionEnvironment
    在这里插入图片描述

  4. StreamSource
    在这里插入图片描述

  5. AbstractUdfStreamOperator
    在这里插入图片描述


第一阶段结束。 以上 userFunction 就指向我们自定义的 SourceFunction 子类 UserDefineSource


第二阶段

执行任务时调用 run 方法。

env.execute() 执行后,就会经过一系类操作后,在 SourceStreamTask 启动一个线程。

此处一系列操作主要是通过反射进行的。为了简化问题此处省略这一过程。

Task ( run()  -> doRun() -> invokable.invoke() -> runMailboxLoop() ) 
=> StreamTask( mailboxProcessor.runMailboxLoop()  )     
=> MailboxProcessor (mailboxDefaultAction.runDefaultAction(defaultActionContext) ) 
=>SourceStreamTask ( processInput() )
=> 接下面 
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/484419
推荐阅读
相关标签