赞
踩
在 Apache Flink 中,Accumulator 是一个用于收集作业执行期间聚合信息的工具。它允许在用户定义的函数(如 MapFunction, FlatMapFunction, ProcessFunction 等)中累积值,并在作业完成后检索这些值。这对于跟踪诸如事件数量、处理延迟等统计信息非常有用。
要使用 Accumulator,需要首先定义一个 Accumulator 接口的实现,然后在用户定义函数中注册和使用它。
通常,不需要直接定义 Accumulator 接口的实现,因为 Flink 已经为提供了一些内置的 Accumulator 类型,如 IntCounter, LongCounter, DoubleCounter 等。但如果需要自定义的聚合逻辑,可以实现 Accumulator 接口。
在用户定义函数中,可以通过 getRuntimeContext().getAccumulator(“name”) 来获取或注册一个 Accumulator。然后,可以在逻辑中更新它的值。
在作业执行完成后,可以通过 JobExecutionResult 的 getAccumulatorResult() 方法来检索 Accumulator 的值。
但请注意,由于 Accumulator 已经被 Metric 系统所取代,以下是一个使用 Metric 的示例,它提供了类似的功能:
import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.api.functions.source.SourceFunction; public class MySourceFunction implements SourceFunction<String> { private transient Counter counter; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 获取或注册一个 Counter this.counter = getRuntimeContext().getMetricGroup().counter("my-counter"); } @Override public void run(SourceContext<String> ctx) throws Exception { // ... 数据源逻辑 ... // 更新 Counter 的值 counter.inc(); // 发送数据到下游 ctx.collect("some data"); } // ... 其他必要的方法 ... }
在这个示例中,我们使用了 Flink 的 Metric 系统来创建一个计数器,并在数据源函数中更新它的值。这样,就可以在作业执行期间跟踪和检索这个计数器的值了。
package com.wfg.flink.connector.mongodb; import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson2.JSONObject; import com.wfg.flink.connector.dto.KafkaPvDto; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.mongodb.source.MongoSource; import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy; import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.bson.BsonDocument; import static com.wfg.flink.connector.constants.Constants.MONGO_TEST_PV_COLLECTION; /** * @author wfg * 根据名字统计访问次数 */ public class MongoAccumulatorCounts { public static void main(String[] args) throws Exception { String startTime = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss"); System.out.println("StartTime:" + startTime); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启Checkpointing,设置Checkpoint间隔 env.enableCheckpointing(30000); // 设置Checkpoint模式 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 设置最小Checkpoint间隔 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 设置最大并发Checkpoint数目 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 使用RocksDB作为状态后端 env.setStateBackend(new HashMapStateBackend()); env.setParallelism(10); // 配置MongoDB源 MongoSource<String> mongoSource = MongoSource.<String>builder() .setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin") .setDatabase("sjzz") .setCollection(MONGO_TEST_PV_COLLECTION) .setFetchSize(2048) // .setLimit(10000) .setNoCursorTimeout(true) .setPartitionStrategy(PartitionStrategy.SINGLE) .setPartitionSize(MemorySize.ofMebiBytes(64)) .setDeserializationSchema(new MongoDeserializationSchema<>() { @Override public String deserialize(BsonDocument document) { return document.toJson(); } @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }) .build(); // 创建MongoDB数据流 DataStream<String> sourceStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "kafka Mongo Source"); // 转换数据,提取人名作为Key DataStream<Tuple2<String, Integer>> nameCountStream = sourceStream .map(new RichMapFunction<String, Tuple2<String, Integer>>() { private LongCounter elementCounter = new LongCounter(); Long count = 0L; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //-2注册累加器 getRuntimeContext().addAccumulator("elementCounter", elementCounter); } @Override public Tuple2<String, Integer> map(String value) { KafkaPvDto data = JSONObject.parseObject(value, KafkaPvDto.class); //-3.使用累加器 this.elementCounter.add(1); count += 1; System.out.println("不使用累加器统计的结果:" + count); return Tuple2.of(data.getUserName(), 1); } }).setParallelism(10); // .keyBy(value->value.f0) // .sum("f1"); sourceStream.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE); //-4.获取加强结果 JobExecutionResult jobResult = env.execute(); long nums = jobResult.getAccumulatorResult("elementCounter"); System.out.println("使用累加器统计的结果:" + nums); System.out.println("-----------------------------------"); System.out.println("startTime: " + startTime); System.out.println("EndTime: " + DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss")); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。