赞
踩
connector简单来说就是flink系统对接的外界数据源
该截图来自于官网,可以看到官网并没有支持很多常见的数据源,而且支持的数据源不一定source和sink都支持,那这是为什么呢?
知道了上述基本知识之后,咱们就来分析这些source和sink(当然不可能全部分析,这样要累死,我就分析一个kafka source和es sink,只要懂了一个source或一个sink,其他的也可以快速弄懂原理),好了,废话不多说,上干货。
先来看看source的结构(很重要)
source节点承担着接入数据源的重责,所以他需要的功能自然就越多,一般而言,source节点需要继承实现两个接口,一个是RichFunction,另一个是SourceFunction,然后flink会有一个AbstractRichFunction实现了RichFunction,所以想要实现一个source节点只需要继承AbstractRichFunction就好了,然后实现一下SourceFunction。对接kafka,flink创建了FlinkKafkaConsumerBase这个类,这个类也即是咱们的source节点
- public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
- CheckpointListener,
- ResultTypeQueryable<T>,
- CheckpointedFunction{}
-
-
- public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
- implements ParallelSourceFunction<OUT> {
-
- private static final long serialVersionUID = 1L;
- }
-
- public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
- }
从代码中可以看到,继承了RichParallelSourceFunction,RichParallelSourceFunction继承了AbstractRichFunction,然后还实现了ParallelSourceFunction,可以看到ParallelSourceFunction是继承的SourceFunction,这么一来,那FlinkKafkaConsumerBase符合刚才我的说法,确实是继承AbstractRichFunction,和实现了SourceFunction。那其实AbstractRichFunction最重要的是获取运行时上下文,方便source节点获取运行时各种信息。SourceFunction则是有一个很重要的接口run,所以大胆猜测一下,这个run就是用来获取数据源的数据的。AbstractRichFunction还有一个很重要的特征是有个open接口,这个接口就是在task启动之前需要执行的。总结就是:AbstractRichFunction用来获取运行时上下文信息,以及开放一个生命后期方法open,用来启动每个task,SourceFunction则是用来获取数据的逻辑。那FlinkKafkaConsumerBase还实现了CheckpointListener、CheckpointedFunction以及ResultTypeQueryable,很明显从字面上来看,前两个接口都是和检查点相关的,咱们暂时不分析,ResultTypeQueryable也不重要。好,那咱们就开始分析kafka source了。
先来看看这个demo
- public class Kafka011Example {
-
- public static void main(String[] args) throws Exception {
- // parse input arguments
- final ParameterTool parameterTool = ParameterTool.fromArgs(args);
- StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
-
- DataStream<KafkaEvent> input = env
- .addSource(
- new FlinkKafkaConsumer011<>(
- parameterTool.getRequired("input-topic"),
- new KafkaEventSchema(),
- parameterTool.getProperties())
- .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
- .keyBy("word")
- .map(new RollingAdditionMapper());
-
- input.addSink(
- new FlinkKafkaProducer011<>(
- parameterTool.getRequired("output-topic"),
- new KafkaEventSchema(),
- parameterTool.getProperties()));
-
- env.execute("Kafka 0.11 Example");
- }
-
- }
我们可以看到,FlinkKafkaConsumer011被env对象add了,其实这里的逻辑我大概说一下,就不细讲了(env把FlinkKafkaConsumer011 add之后,用operator包装起来,组成transformation,最后形成jobgraph的节点),FlinkKafkaConsumer011本质继承的FlinkKafkaConsumerBase。那他是怎么工作起来了呢,首先节点被打包成jobgraph送到集群中再次改造成executionGraph(多了并行度),然后这时候的jobmanager就把source节点当成一个subtask发出去(假定并行度为1),把task发到taskmanager之后,在taskmanager中就形成了一个task类
- public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionProducerStateProvider, CheckpointListener, BackPressureSampleableTask {
-
- /** The class logger. */
- private static final Logger LOG = LoggerFactory.getLogger(Task.class);
-
- /** The thread group that contains all task threads. */
- private static final ThreadGroup TASK_THREADS_GROUP = new ThreadGroup("Flink Task Threads");
-
- /** For atomic state updates. */
- private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState.class, "executionState");
-
- // ------------------------------------------------------------------------
- // Constant fields that are part of the initial Task construction
- // ------------------------------------------------------------------------
-
- /** The job that the task belongs to. */
- private final JobID jobId;
-
- /** The vertex in the JobGraph whose code the task executes. */
- private final JobVertexID vertexId;
-
- /** The execution attempt of the parallel subtask. */
- private final ExecutionAttemptID executionId;
-
- /** ID which identifies the slot in which the task is supposed to run. */
- private final AllocationID allocationId;
-
- /** TaskInfo object for this task. */
- private final TaskInfo taskInfo;
-
- /** The name of the task, including subtask indexes. */
- private final String taskNameWithSubtask;
-
- /** The job-wide configuration object. */
- private final Configuration jobConfiguration;
-
- /** The task-specific configuration. */
- private final Configuration taskConfiguration;
-
- /** The jar files used by this task. */
- private final Collection<PermanentBlobKey> requiredJarFiles;
-
- /** The classpaths used by this task. */
- private final Collection<URL> requiredClasspaths;
-
- /** The name of the class that holds the invokable code. */
- private final String nameOfInvokableClass;
-
- /** Access to task manager configuration and host names. */
- private final TaskManagerRuntimeInfo taskManagerConfig;
-
- /** The memory manager to be used by this task. */
- private final MemoryManager memoryManager;
-
- /** The I/O manager to be used by this task. */
- private final IOManager ioManager;
-
- /** The BroadcastVariableManager to be used by this task. */
- private final BroadcastVariableManager broadcastVariableManager;
-
- private final TaskEventDispatcher taskEventDispatcher;
-
- /** The manager for state of operators running in this task/slot. */
- private final TaskStateManager taskStateManager;
-
- /** Serialized version of the job specific execution configuration (see {@link ExecutionConfig}). */
- private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
-
- private final ResultPartitionWriter[] consumableNotifyingPartitionWriters;
-
- private final InputGate[] inputGates;
-
- /** Connection to the task manager. */
- private final TaskManagerActions taskManagerActions;
-
- /** Input split provider for the task. */
- private final InputSplitProvider inputSplitProvider;
-
- /** Checkpoint notifier used to communicate with the CheckpointCoordinator. */
- private final CheckpointResponder checkpointResponder;
-
- /** GlobalAggregateManager used to update aggregates on the JobMaster. */
- private final GlobalAggregateManager aggregateManager;
-
- /** The BLOB cache, from which the task can request BLOB files. */
- private final BlobCacheService blobService;
-
- /** The library cache, from which the task can request its class loader. */
- private final LibraryCacheManager libraryCache;
-
- /** The cache for user-defined files that the invokable requires. */
- private final FileCache fileCache;
-
- /** The service for kvState registration of this task. */
- private final KvStateService kvStateService;
-
- /** The registry of this task which enables live reporting of accumulators. */
- private final AccumulatorRegistry accumulatorRegistry;
-
- /** The thread that executes the task. */
- private final Thread executingThread;
-
- /** Parent group for all metrics of this task. */
- private final TaskMetricGroup metrics;
-
- /** Partition producer state checker to request partition states from. */
- private final PartitionProducerStateChecker partitionProducerStateChecker;
-
- /** Executor to run future callbacks. */
- private final Executor executor;
-
- /** Future that is completed once {@link #run()} exits. */
- private final CompletableFuture<ExecutionState> terminationFuture = new CompletableFuture<>();
-
- // ------------------------------------------------------------------------
- // Fields that control the task execution. All these fields are volatile
- // (which means that they introduce memory barriers), to establish
- // proper happens-before semantics on parallel modification
- // ------------------------------------------------------------------------
-
- /** atomic flag that makes sure the invokable is canceled exactly once upon error. */
- private final AtomicBoolean invokableHasBeenCanceled;
-
- /** The invokable of this task, if initialized. All accesses must copy the reference and
- * check for null, as this field is cleared as part of the disposal logic. */
- @Nullable
- private volatile AbstractInvokable invokable;
-
- /** The current execution state of the task. */
- private volatile ExecutionState executionState = ExecutionState.CREATED;
-
- /** The observed exception, in case the task execution failed. */
- private volatile Throwable failureCause;
-
- /** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
- private long taskCancellationInterval;
-
- /** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
- private long taskCancellationTimeout;
-
- /** This class loader should be set as the context class loader for threads that may dynamically load user code. */
- private ClassLoader userCodeClassLoader;
这个类很庞大,就代表一个独立运行的任务,一个关键属性就是invokable,这个是真正干实事的类,他组装了operator,而operator包含咱们的FlinkKafkaConsumerBase,所以咱们关键是要看invokable是如何调用咱们的FlinkKafkaConsumerBase。
从task run开始:
- public void run() {
- try {
- doRun();
- } finally {
- terminationFuture.complete(executionState);
- }
- }
-
- private void doRun() {
- ......
- invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
- ......
- invokable.invoke();
- ......
- }
其实dorun就干了一件事,就是执行invokable.invoke();,在invoke里面有咱们的答案
- public final void invoke() throws Exception {
- try {
- beforeInvoke();
- ......
- runMailboxLoop();
- ......
- afterInvoke();
- }
- finally {
- cleanUpInvoke();
- }
- }
runMailboxLoop就是准备启动任务,beforeInvoke就是在启动前要做一些初始化,afterinvoke即是在执行任务之后进行的逻辑,很明显咱们FlinkKafkaConsumerBase需要在任务启动之前就要初始化各种东西,刚才的demo里只是创建了FlinkKafkaConsumerBase,并没有初始化。好,现在我们来看看beforeInvoke。
- private void beforeInvoke() throws Exception {
- ......
- operatorChain = new OperatorChain<>(this, recordWriter);
- headOperator = operatorChain.getHeadOperator();
-
- // task specific initialization
- //所以具体的任务有具体的初始化
- init();
- ......
- actionExecutor.runThrowing(() -> {
-
- initializeStateAndOpen();
- });
- }
-
- private void initializeStateAndOpen() throws Exception {
-
- StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
-
- for (StreamOperator<?> operator : allOperators) {
- if (null != operator) {
- //初始化operatorStateBackend、keyedStateBackend
- operator.initializeState();
- //
- operator.open();
- }
- }
- }
-
- public void open() throws Exception {
- super.open();
- FunctionUtils.openFunction(userFunction, new Configuration());
- }
-
- public static void openFunction(Function function, Configuration parameters) throws Exception{
- if (function instanceof RichFunction) {
- RichFunction richFunction = (RichFunction) function;
- richFunction.open(parameters);
- }
- }
可以看到这个方法只有一个init还有initializeStateAndOpen,这里的init是用来初始化task类的一些东西的,我们不管,我们需要的是FlinkKafkaConsumerBase的初始化,其实答案就在initializeStateAndOpen中,这个函数就是用来初始化source节点的状态和调用咱们刚才说的生命周期方法open(operator.open会调用function的open的),从openFunction中我们终于看到了RichFuntion的open方法。来看看
- public void open(Configuration configuration) throws Exception {
- ......
- this.partitionDiscoverer = createPartitionDiscoverer(
- topicsDescriptor,
- getRuntimeContext().getIndexOfThisSubtask(),
- getRuntimeContext().getNumberOfParallelSubtasks());
- this.partitionDiscoverer.open();
- ......
- if (restoredState != null) {
- for (KafkaTopicPartition partition : allPartitions) {
- if (!restoredState.containsKey(partition)) {
- restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
- }
- }
- ......
- }
在open这里,主要是先创建一个kafkapartiiton发现器,获取到kafka分区了,就放进restoredState,做checkpoint。下一步,很明显就是执行run方法进行获取数据了。那么是怎么执行到run方法的呢。这其实很简单,刚才我们说到beforeInvoke事在执行任务时做一些初始化的任务,然后就是执行runMailboxLoop(这个方法最终调用processInput方法,也即是真的处理数据的方法)
- protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
- ......
- sourceThread.setTaskDescription(getName());
- sourceThread.start();
- sourceThread.getCompletionFuture().whenComplete((Void ignore, Throwable sourceThreadThrowable) -> {
- if (isCanceled() && ExceptionUtils.findThrowable(sourceThreadThrowable, InterruptedException.class).isPresent()) {
- mailboxProcessor.reportThrowable(new CancelTaskException(sourceThreadThrowable));
- } else if (!isFinished && sourceThreadThrowable != null) {
- mailboxProcessor.reportThrowable(sourceThreadThrowable);
- } else {
- mailboxProcessor.allActionsCompleted();
- }
- });
- }
可以看到processInput就是启动了一个线程,我们再来看看这个线程干了啥
- public void run() {
- try {
- headOperator.run(getCheckpointLock(), getStreamStatusMaintainer(), operatorChain);
- completionFuture.complete(null);
- } catch (Throwable t) {
- // Note, t can be also an InterruptedException
- completionFuture.completeExceptionally(t);
- }
- }
看到了吗,就是执行operator的run方法呀,不出意外那operator的run方法肯定执行了FlinkKafkaConsumerBase的run方法
- public void run(final Object lockingObject,
- final StreamStatusMaintainer streamStatusMaintainer,
- final Output<StreamRecord<OUT>> collector,
- final OperatorChain<?, ?> operatorChain) throws Exception {
-
- ......
- try {
- userFunction.run(ctx);
- ......
- } finally {
- if (latencyEmitter != null) {
- latencyEmitter.close();
- }
- }
- }
这里的userFunction就是咱们的FlinkKafkaConsumerBase呀,看看他的实现
- public void run(SourceContext<T> sourceContext) throws Exception {
- ......
- this.kafkaFetcher = createFetcher(
- sourceContext,
- subscribedPartitionsToStartOffsets,
- periodicWatermarkAssigner,
- punctuatedWatermarkAssigner,
- (StreamingRuntimeContext) getRuntimeContext(),
- offsetCommitMode,
- getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
- useMetrics);
- ......
- if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
- kafkaFetcher.runFetchLoop();
- } else {
- runWithPartitionDiscovery();
- }
- }
可以看到run方法中先是创建一个Fetcher,然后就开始执行他的runFetchLoop方法,从名字上来看,明显这个类是用来获取数据的呀。来看看的具体实现
- public void runFetchLoop() throws Exception {
- try {
- final Handover handover = this.handover;
- consumerThread.start();
-
- while (running) {
- final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
-
-
- for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
-
- List<ConsumerRecord<byte[], byte[]>> partitionRecords =
- records.records(partition.getKafkaPartitionHandle());
-
- for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
- final T value = deserializer.deserialize(record);
-
- if (deserializer.isEndOfStream(value)) {
- // end of stream signaled
- running = false;
- break;
- }
- emitRecord(value, partition, record.offset(), record);
- }
- }
- }
- }
- finally {
- consumerThread.shutdown();
- }
- try {
- consumerThread.join();
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
这里逻辑非常重要,主要是两个类在工作ConsumerThread和Handover,看看这个逻辑,首先时创建handover(可以把他当成一个阻塞队列),然后启动consumerThread线程,这个线程通过KafkaConsumer来获取Kafka服务器的内容,然后把获取的数据放到handover,然后就开始while(true)阻塞获取handover的数据,获取到了就emitRecord,这个emitRecord逻辑并不是直接送到下游呀,之前咱们说过,taskmanager的task之间是通过netty交互传输数据的,所以emitRecord是把数据送到输出的网关,由输出网关和下游网关打交道。现在最关键先来看,consumerThread是如何获取到数据的
- public void run() {
- ......
- final Handover handover = this.handover;
- ......
- try {
- this.consumer = getConsumer(kafkaProperties);
- }
- catch (Throwable t) {
- handover.reportError(t);
- return;
- }
- try {
- ......
- ConsumerRecords<byte[], byte[]> records = null;
- List<KafkaTopicPartitionState<TopicPartition>> newPartitions;
-
- // main fetch loop
- while (running) {
- ......
- if (records == null) {
- try {
- records = consumer.poll(pollTimeout);
- }
- catch (WakeupException we) {
- continue;
- }
- }
- try {
- handover.produce(records);
- records = null;
- }
- catch (Handover.WakeupException e) {
- // fall through the loop
- }
- }
- // end main fetch loop
- }
- ......
- }
其实可以看到,这里逻辑非常清晰,这里用consumer直接拉取数据,然后放到handover,而且这里consumer拉取数据不是一个数据,而是一批。至于这个consumer是啥,之前说过,他就是KafkaConsumer,是Kafka的client用来访问Kafka server的。那他是怎么来的呢,其实就是由咱们主函数提供的Properties创建出来的。那这样一说来,open的执行似乎并没有起到很关键的作用,其实刚才说到open有一个partitionDiscover会请求到kafka的所有kafkapartition,然后放到restoreState中,所以我们并没有用到这个restoreState,其实这个我们确实在获取数据的时候没有用到这个,但是他在checkpoint中用到了呀,你可以看看这个属性的注释就会知道,这个变量是先initialState中先初始化,然后在open中记录kafka的分区,offset等记录。所以这么看来open和run是没关系的。
小结一下kafkasource执行流程:
接下来是ES的sink分析,其实看完了上面的分析之后,分析ES就很简单
首先咱们还是来看看一个sink有啥特点,其实和source是差不多的,也是继承AbstractRichFunction,然后有点不同的是实现的是SinkFunction,这个接口有个重要的方法就是invoke,和SourceFunction是异曲同工的。好,我们首先要知道es的sink是啥
public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends RichSinkFunction<T> implements CheckpointedFunction
可以看到类的写法和kafka的source差不多,也会实现CheckpointedFunction。先看看他的demo
- public class Elasticsearch7SinkExample {
-
- public static void main(String[] args) throws Exception {
-
- final ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
- if (parameterTool.getNumberOfParameters() < 2) {
- System.out.println("Missing parameters!\n" +
- "Usage: --numRecords <numRecords> --index <index>");
- return;
- }
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.enableCheckpointing(5000);
-
- DataStream<Tuple2<String, String>> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
- .flatMap(new FlatMapFunction<Long, Tuple2<String, String>>() {
- @Override
- public void flatMap(Long value, Collector<Tuple2<String, String>> out) {
- final String key = String.valueOf(value);
- final String message = "message #" + value;
- out.collect(Tuple2.of(key, message + "update #1"));
- out.collect(Tuple2.of(key, message + "update #2"));
- }
- });
-
- List<HttpHost> httpHosts = new ArrayList<>();
- httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
-
- ElasticsearchSink.Builder<Tuple2<String, String>> esSinkBuilder = new ElasticsearchSink.Builder<>(
- httpHosts,
- (Tuple2<String, String> element, RuntimeContext ctx, RequestIndexer indexer) -> {
- indexer.add(createIndexRequest(element.f1, parameterTool));
- indexer.add(createUpdateRequest(element, parameterTool));
- });
-
- esSinkBuilder.setFailureHandler(
- new CustomFailureHandler(parameterTool.getRequired("index")));
-
- // this instructs the sink to emit after every element, otherwise they would be buffered
- esSinkBuilder.setBulkFlushMaxActions(1);
-
- source.addSink(esSinkBuilder.build());
-
- env.execute("Elasticsearch 7.x end to end sink test example");
- }
-
- private static class CustomFailureHandler implements ActionRequestFailureHandler {
-
- private static final long serialVersionUID = 942269087742453482L;
-
- private final String index;
-
- CustomFailureHandler(String index) {
- this.index = index;
- }
-
- @Override
- public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
- if (action instanceof IndexRequest) {
- Map<String, Object> json = new HashMap<>();
- json.put("data", ((IndexRequest) action).source());
-
- indexer.add(
- Requests.indexRequest()
- .index(index)
- .id(((IndexRequest) action).id())
- .source(json));
- } else {
- throw new IllegalStateException("unexpected");
- }
- }
- }
-
- private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
- Map<String, Object> json = new HashMap<>();
- json.put("data", element);
-
- String index;
- String type;
-
- if (element.startsWith("message #15")) {
- index = ":intentional invalid index:";
- type = ":intentional invalid type:";
- } else {
- index = parameterTool.getRequired("index");
- }
-
- return Requests.indexRequest()
- .index(index)
- .id(element)
- .source(json);
- }
-
- private static UpdateRequest createUpdateRequest(Tuple2<String, String> element, ParameterTool parameterTool) {
- Map<String, Object> json = new HashMap<>();
- json.put("data", element.f1);
-
- return new UpdateRequest(
- parameterTool.getRequired("index"),
- parameterTool.getRequired("type"),
- element.f0)
- .doc(json)
- .upsert(json);
- }
- }
可以看到这里并不是直接把ESsink创建出来扔进env的,而是先创建一个builder,然后在build出来再扔进去。为啥要这样呀?那这就是经典的builder设计模式了,你会发现,创建builder的时候传了一个httphost和一个esFunction,这个esfunction就是咱们把获取到的数据组装成es需要的格式,然后扔给es client,然后builder还会set一下其他属性。这些属性到时候都是要传给essink的,所以用builder模式可以达到灵活插拔,想给essink注入啥属性就注入啥属性。说完这个builder之后其实最重要的还是要关注invoke和open是如何调用的以及何时调用的。
依葫芦画瓢嘛,sink自然也是一个task最终还是会调用beforeInvoke,那beforeInvoke还不是调用的open方法嘛,所以我们很轻松的知道了open在何时调用的,那咱们来看看他的具体实现:
- public void open(Configuration parameters) throws Exception {
- client = callBridge.createClient(userConfig);
- bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
- requestIndexer = callBridge.createBulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests);
- failureRequestIndexer = new BufferingNoOpRequestIndexer();
- }
原来open方法就这么简单就创建client、bulkprocessor、requestIndexer,看看这几行代码就知道这三个都是由callBridge创建出来的,这个callbridge就是咱们在用户主程序中传入的参数创建的呀。首先说明这个client就是咱们的es client,类似于kafkaconsumer,是和es server交互的client,其次这个bulkprocessor是为了批处理准备的,flink是流处理为什么要批写入呢,原因时,你老是处理完一条数据就push一条数据到服务器上,这样大大滴浪费了时间呀,就像jdbc一条一条的写入mysql一样性能很低,所以为了提高性能就使用了bulkprocessor。最后这个requestIndexer就是用来接受用户的一条一条的请求的,在我们的用户主程序可以看到,用户最终是要执行indexer.add(request)方法的,这个indexer便是requestIndexer了。整体的结构是requestIndexer包含bulkProcessor,bulkProcessor包含client。先来看看RequestIndexer的代码:
- public void add(IndexRequest... indexRequests) {
- for (IndexRequest indexRequest : indexRequests) {
- if (flushOnCheckpoint) {
- numPendingRequestsRef.getAndIncrement();
- }
- this.bulkProcessor.add(indexRequest);
- }
- }
可以看到RequestIndexer只是一个媒介呀,当执行add的时候,其实是把request add 到bulkProcessor,所以我们来看看bulkProcessor
- private void internalAdd(DocWriteRequest<?> request) {
- Tuple<BulkRequest, Long> bulkRequestToExecute = null;
- this.lock.lock();
-
- try {
- this.ensureOpen();
- this.bulkRequest.add(request);
- bulkRequestToExecute = this.newBulkRequestIfNeeded();
- } finally {
- this.lock.unlock();
- }
-
- if (bulkRequestToExecute != null) {
- this.execute((BulkRequest)bulkRequestToExecute.v1(), (Long)bulkRequestToExecute.v2());
- }
-
- }
其实bulkProcessor的add方法最终是往bulkRequest add request,其实很好理解,咱们每一条请求都是一条数据,这些数据最终汇集到bulkRequest(顾名思义就是块请求,也就是批请求),而且add的时候还上锁了。那这个bulkRequest是怎么push到es的server的呢?要解决这个问题那就是要看BulkProcessor这个类了,先来看看他的构造函数
- BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, BulkProcessor.Listener listener, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, Scheduler scheduler, Runnable onClose, Supplier<BulkRequest> bulkRequestSupplier) {
- this.bulkActions = bulkActions;
- this.bulkSize = bulkSize.getBytes();
- this.bulkRequest = (BulkRequest)bulkRequestSupplier.get();
- this.bulkRequestSupplier = bulkRequestSupplier;
- this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
- this.cancellableFlushTask = this.startFlushTask(flushInterval, scheduler);
- this.onClose = onClose;
- }
有没有看到关键的东西?他在构造函数就执行了startFlushTask,看这个名字就知道就是刷新任务呗,那还能刷新啥任务,不就是把数据push到es上嘛,但是我怎么确定我有数据呢?这其实就是设置了一个定时器(flushInterval)比如10秒,10秒内都可以把request add到bulkRequest中,一旦时间一到,就会执行client.bulkAsync方法把bulkrequest中的数据一次性全部push到es server。startFlushTask最终会调用如下代码:
- private void execute() {
- BulkRequest bulkRequest = this.bulkRequest;
- long executionId = this.executionIdGen.incrementAndGet();
- this.bulkRequest = (BulkRequest)this.bulkRequestSupplier.get();
- this.execute(bulkRequest, executionId);
- }
先把bulkRequest拿出来,然后立马new一个新的bulkRequest,最后把这个bulkRequest发出去,不知道大家理不理解这几句代码。其实就是先把有一定数据的bulkrequest用临时变量存起来,然后就new一个新的用于下次的接受用户的request,接着把刚才用临时变量保存的数据push到es server,真整过程就这样以定时器的方式运行着,即是没有数据也是这样运行着。所以回顾一下open干了啥?其实open就是初始化了要连接es server的所有组件,就等着invoke方法产生数据放到bulkRequest中,一旦到了定时器时间,就往es server push。
接下来就来看看invoke是如何调用的,很明显他也是由processInput调用的,来看看sink的processInput
- protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
- InputStatus status = inputProcessor.processInput();
- if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
- return;
- }
- if (status == InputStatus.END_OF_INPUT) {
- controller.allActionsCompleted();
- return;
- }
- CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
- MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
- jointFuture.thenRun(suspendedDefaultAction::resume);
- }
其实调用链一直调用下去,会调用operator的processElement方法,最终调用esSink的invoke方法
- public void processElement(StreamRecord<IN> element) throws Exception {
- sinkContext.element = element;
- userFunction.invoke(element.getValue(), sinkContext);
- }
其实invoke方法很简单,就是调用用户主程序传给builder的函数ElasticsearchSinkFunction,调用这个函数不就是给RequestIndexer发request,最终还是发到bulkRequest中,然后随着定时器触发就push到es server中。
小结一下esSink的执行过程:
总结:其实source和sink的核心都是两个方法source是open和run,sink是open和invoke,两者的open用途明显不一样,kafka的open方法是获取卡夫卡的信息做checkpoint和获取数据没有关系,而sink的open就是初始化与数据源服务器交互的各种组件,就等数据通过invoke方法送到组件中,组件自己通过定时发送给服务器。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。