赞
踩
累加器的灵感来自MapReduce和Spark中的计数器,Flink的累加器指从用户函数和operator
中收集分布式的统计信息或聚合信息,累加器工作原理如下:
累加器的场景非常多,系统内部一些功能也会使用累加器,常见场景如下:
// 1: 创建累加器
val counter = new IntCounter( 0 )
// 实现 Rich ...接口后的open()方法
override def open(parameters: Configuration): Unit = {
getRuntimeContext.addAccumulator("adds",adds) // 注册累加器,adds 累加器标识,用于后续获取最终结果
}
this.counter.add()
val result: JobExecutionResult = env.execute("test01")
val adds: Int = result.getAccumulatorResult[Int]("adds")
Flink自带多种常用的累加器,分别有实现 SimpleAccumulator 和 Accumulator 两种 (注: SimpleAccumulator本身也是实现Accumulator)
累加器顶级接口(Accumulator<V, R>
),类型参数:
V : 输入到累加器的值类型
R :累加器返回结果值类型
本地测试运行,下面代码没有在执行结果中获取结果,在map中返回了该值
数据源
2020-2-18 beijing 2 3
2020-2-18 beijing 3 3
2020-2-18 beijing 4 5
2020-2-18 shanghai 2 3
实现将日期和城市连接在一起进行分组将第三四字段累加
package 复习 import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.common.accumulators.{IntCounter, LongCounter} import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.configuration.{Configuration, RestOptions} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object leijiaqi { def main(args: Array[String]): Unit = { // 构建本地的env环境 // val conf = new Configuration() // conf.setInteger(RestOptions.PORT,8081) // val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) val env = StreamExecutionEnvironment.getExecutionEnvironment env.socketTextStream("localhost", 6666) .map(x => { val fields: Array[String] = x.split(" ") val date = fields(0).trim val province = fields(1) val add = fields(2).trim.toInt val prossbile = fields(3).trim.toInt (date+"_"+province,add,prossbile) }) .keyBy(0) .map(new RichMapFunction[(String,Int,Int),String] { // 定义累加器 val adds: IntCounter = new IntCounter(0) val prossbiles: LongCounter = new LongCounter(0) // 注册累加器 override def open(parameters: Configuration): Unit = { getRuntimeContext.addAccumulator("adds",adds) getRuntimeContext.addAccumulator("prossbiles",prossbiles) } // 每一行数据执行一次 override def map(in: (String, Int, Int)): String = { adds.add(in._2) prossbiles.add(in._3) // 返回 in._1+","+adds+","+prossbiles } override def close(): Unit = super.close() }).print() val result: JobExecutionResult = env.execute("test01") } }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。