当前位置:   article > 正文

Flink累加器_flink 累加器

flink 累加器

Flink累加器

1 累加器

1.1 累加器原理

累加器的灵感来自MapReduce和Spark中的计数器。Flink的累加器指从用户函数和Operator中收集分布式的统计信息或聚合信息。累加器工作原理如下:

  1. 每个并行实例创建和更新它自己的累加器对象。
  2. 不同的并行累加器实例稍后会合并。
  3. 在作业结束时由系统合并。
  4. 结果可以从作业执行的结果中获得,也可以从web运行时监视器获得。
1.2 累加器使用场景

累加器的场景也是非常之多,系统内部一些功能也会使用累加器实现。常见场景如下:

  • 累计数据次数,比如输入无效数据、输入有效数据、输入总数据行数、代码片段调用次数等。用于评价性能等。
  • 累加聚合值,根据key分类,然后对度量值进行累加。如累计用户数、累计会员数等。
1.3 累加器执行步骤
  1. 创建累加器(注:在operate中使用)

    // 1:创建累加器
    val counter = new IntCounter(0)
    
    • 1
    • 2
  2. 注册累加器(注:operate 实现 Rich…接口)

    //实现Rich...接口后的open()方法
    override def open(parameters: Configuration): Unit = {
      getRuntimeContext.addAccumulator("counters", counter) //注册累加器,counters:累加器标识,用于后续获取最终结果
    }
    
    • 1
    • 2
    • 3
    • 4
  3. 使用累加器

    this.counter.add(1)
    
    • 1
  4. 获取最终累加结果

    val result: JobExecutionResult = env.execute("acc1")
    val counters: Int = result.getAccumulatorResult[Int]("counters")  //在触发执行后获取
    
    • 1
    • 2
1.4 累加器分类

Flink自带多种常用的累加器,分别有实现SimpleAccumulator和Accumulator两种(注:SimpleAccumulator本身也是实现Accumulator)。

累加器顶级接口(Accumulator<V, R>),类型参数:

V:输入到累加器的值的类型

R:累加器返回结果值类型

  • 实现SimpleAccumulator

    • IntCounter:Int类型累加器,只能是输入Int类型,返回Int类型。
    • LongCounter:Long类型的累加器。
    • DoubleCounter:Double类型累加器。
    • AverageAccumulator:平均值累加器,输入Int、Long或Double类型,返回Double类型。
  • 实现Accumulator

    • ListAccumulator : 集合累加器实现Accumulator<T, ArrayList>,该累加器输入一个类型,返回一个带输入类型的集合对象。
    • SerializedListAccumulator:序列化结合累加器,实现Accumulator<T, ArrayList<byte[]>>,可以指定集合对象的序列化和反序列化方式。
    • Histogram(直方图):实现Accumulator<Integer, TreeMap<Integer, Integer>>,即输入整数,返回Integer类型TreeMap。
  • 自定义

    自定义累加器:实现SimpleAccumulator或Accumulator),并实现其核心方法:add()、merge()等。

1.5 累加器案例

对相同的key的值进行累加,如下是实现两个值的累加。

package com.qianfeng.day04

import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.accumulators.{IntCounter, LongCounter}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

/**
 * 多个累加器
 * 注:不能根据key来进行分别累加
 *
 * 输入:
    2022-5-18 beijing 2 3
    2022-5-18 beijing 3 3
    2022-5-18 shanghai 6 9
 */
object Demo04_DataStream_Acc {
  def main(args: Array[String]): Unit = {
    //val env = StreamExecutionEnvironment.getExecutionEnvironment

    //本地env,打包到服务器运行时,注释掉本地env,本地env仅为方便查看web ui而写
    val conf: Configuration = new Configuration()
    conf.setInteger(RestOptions.PORT, 8081)   //自定义端口
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    //获取数据源
    env.socketTextStream("hadoop01",6666)
      .map(x=>{
        val fields: Array[String] = x.split(" ")
        val date = fields(0).trim
        val province = fields(1)
        val add = fields(2).trim.toInt
        val possible = fields(3).trim.toInt
        (date+"_"+province,add,possible)
      })
      .keyBy(0)
      .map(new RichMapFunction[(String,Int,Int),String] {
        //定义累加器
        val adds = new IntCounter(0)
        val possibles = new LongCounter(0)

        override def open(parameters: Configuration): Unit = {
          //添加到累加器
          getRuntimeContext.addAccumulator("adds", adds)
          getRuntimeContext.addAccumulator("possibles", possibles)
        }

        override def map(value: (String, Int, Int)): String = {
          //累加
          adds.add(value._2)
          possibles.add(value._3)
          value+""
        }
      })
      .print()

    //执行
    val result: JobExecutionResult = env.execute("acc1")
    val adds: Int = result.getAccumulatorResult[Int]("adds")
    val possibles: Long = result.getAccumulatorResult[Int]("possibles")
    println(s"累计新增:${adds},累计怀疑:${possibles}")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

本地提交运行测试执行:

image-20220325161703869

Vertex Accumulators:总的累加值,即所有的子任务累加值之和。

SubTask Accumulators:子任务累加值,若子任务未分配到值,则累加值是初始值。

退出nc,运行测试结束,运行结果如下:

2> (2022-5-18_beijing,1,2)
2> (2022-5-18_beijing,2,3)
累计新增:3,累计怀疑:5

Process finished with exit code 0
  • 1
  • 2
  • 3
  • 4
  • 5
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/696179
推荐阅读
相关标签
  

闽ICP备14008679号