赞
踩
累加器的灵感来自MapReduce和Spark中的计数器。Flink的累加器指从用户函数和Operator
中收集分布式的统计信息或聚合信息。累加器工作原理如下:
累加器的场景也是非常之多,系统内部一些功能也会使用累加器实现。常见场景如下:
创建累加器(注:在operate中使用)
// 1:创建累加器
val counter = new IntCounter(0)
注册累加器(注:operate 实现 Rich…接口)
//实现Rich...接口后的open()方法
override def open(parameters: Configuration): Unit = {
getRuntimeContext.addAccumulator("counters", counter) //注册累加器,counters:累加器标识,用于后续获取最终结果
}
使用累加器
this.counter.add(1)
获取最终累加结果
val result: JobExecutionResult = env.execute("acc1")
val counters: Int = result.getAccumulatorResult[Int]("counters") //在触发执行后获取
Flink自带多种常用的累加器,分别有实现SimpleAccumulator和Accumulator两种(注:SimpleAccumulator本身也是实现Accumulator
)。
累加器顶级接口(Accumulator<V, R>
),类型参数:
V:输入到累加器的值的类型
R:累加器返回结果值类型
实现SimpleAccumulator
实现Accumulator
自定义
自定义累加器:实现SimpleAccumulator或Accumulator),并实现其核心方法:add()、merge()等。
对相同的key的值进行累加,如下是实现两个值的累加。
package com.qianfeng.day04 import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.common.accumulators.{IntCounter, LongCounter} import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.configuration.{Configuration, RestOptions} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ /** * 多个累加器 * 注:不能根据key来进行分别累加 * * 输入: 2022-5-18 beijing 2 3 2022-5-18 beijing 3 3 2022-5-18 shanghai 6 9 */ object Demo04_DataStream_Acc { def main(args: Array[String]): Unit = { //val env = StreamExecutionEnvironment.getExecutionEnvironment //本地env,打包到服务器运行时,注释掉本地env,本地env仅为方便查看web ui而写 val conf: Configuration = new Configuration() conf.setInteger(RestOptions.PORT, 8081) //自定义端口 val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) //获取数据源 env.socketTextStream("hadoop01",6666) .map(x=>{ val fields: Array[String] = x.split(" ") val date = fields(0).trim val province = fields(1) val add = fields(2).trim.toInt val possible = fields(3).trim.toInt (date+"_"+province,add,possible) }) .keyBy(0) .map(new RichMapFunction[(String,Int,Int),String] { //定义累加器 val adds = new IntCounter(0) val possibles = new LongCounter(0) override def open(parameters: Configuration): Unit = { //添加到累加器 getRuntimeContext.addAccumulator("adds", adds) getRuntimeContext.addAccumulator("possibles", possibles) } override def map(value: (String, Int, Int)): String = { //累加 adds.add(value._2) possibles.add(value._3) value+"" } }) .print() //执行 val result: JobExecutionResult = env.execute("acc1") val adds: Int = result.getAccumulatorResult[Int]("adds") val possibles: Long = result.getAccumulatorResult[Int]("possibles") println(s"累计新增:${adds},累计怀疑:${possibles}") } }
本地提交运行测试执行:
Vertex Accumulators:总的累加值,即所有的子任务累加值之和。
SubTask Accumulators:子任务累加值,若子任务未分配到值,则累加值是初始值。
退出nc,运行测试结束,运行结果如下:
2> (2022-5-18_beijing,1,2)
2> (2022-5-18_beijing,2,3)
累计新增:3,累计怀疑:5
Process finished with exit code 0
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。