  1. 如果你了解flink架构中taskmanager是如何进行数据交换的,那么这个问题也就不难理解,一般一个taskmanager会有两个网关(输入和输出),每一个taskmanager输入网关和上游的taskmanager的输出网关是通过netty进行交互的,有了一个buffer数据上游就可以下发(当然下发策略好多种,比如赞批下发)
  2. source节点接收数据的思想和其他operator节点交换数据是类似的,是一个append模式,简单来说就是数据是一条一条追加的,不是向批处理模式那样一次性处理完交给下游。
  3. 最后想想kafka消息系统的模式,明显一个队列就是append模式啊,从队尾append的数据,从队头消费数据,所以对于kafka来说作为source和sink是非常合理的source节点可以一条一条的消费数据,处理完了可以一条一条的append数据。
  4. 再来看看截图有哪些source(kafka、RabbitMQ、NIFI、Twitter streaming api、pubsub、activeMq、Netty),首先看到kafka、RabbitMq、activeMq三个消息队列就知道,队列flink是几乎可以支持的,所以其他的消息队列系统他肯定也是支持的,只是没有做出来。其次看到了netty,netty是一个网络通信的框架,目的是为了高性能传输数据,并不是一个数据源,应该是作为某个数据源与flink通信媒介,这里暂时不讨论这个(因为我还没分析netty source)。最后看看其他的,都是应用于流数据的自然能和流处理的flink集成
  5. 再来看看截图有哪些sink(kafka、cassandra、kinesis、es、hdfs、rabbitmq、nifi、pubsub、jdbc、activemq、flume、akka、redis、netty),看到了吗,sink支持的数据源似乎很广,为啥?因为flink是流处理,处理的数据自然是动态append进来的,可是我处理完了,我可以静态存入一个数据源,没必要是一个一个的追加到某个数据源。


知道了上述基本知识之后,咱们就来分析这些source和sink(当然不可能全部分析,这样要累死,我就分析一个kafka source和es sink,只要懂了一个source或一个sink,其他的也可以快速弄懂原理),好了,废话不多说,上干货。



  1. public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
  2. CheckpointListener,
  3. ResultTypeQueryable<T>,
  4. CheckpointedFunction{}
  5. public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
  6. implements ParallelSourceFunction<OUT> {
  7. private static final long serialVersionUID = 1L;
  8. }
  9. public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
  10. }

从代码中可以看到,继承了RichParallelSourceFunction,RichParallelSourceFunction继承了AbstractRichFunction,然后还实现了ParallelSourceFunction,可以看到ParallelSourceFunction是继承的SourceFunction,这么一来,那FlinkKafkaConsumerBase符合刚才我的说法,确实是继承AbstractRichFunction,和实现了SourceFunction。那其实AbstractRichFunction最重要的是获取运行时上下文,方便source节点获取运行时各种信息。SourceFunction则是有一个很重要的接口run,所以大胆猜测一下,这个run就是用来获取数据源的数据的。AbstractRichFunction还有一个很重要的特征是有个open接口,这个接口就是在task启动之前需要执行的。总结就是:AbstractRichFunction用来获取运行时上下文信息,以及开放一个生命后期方法open,用来启动每个task,SourceFunction则是用来获取数据的逻辑。那FlinkKafkaConsumerBase还实现了CheckpointListener、CheckpointedFunction以及ResultTypeQueryable,很明显从字面上来看,前两个接口都是和检查点相关的,咱们暂时不分析,ResultTypeQueryable也不重要。好,那咱们就开始分析kafka source了。


  1. public class Kafka011Example {
  2. public static void main(String[] args) throws Exception {
  3. // parse input arguments
  4. final ParameterTool parameterTool = ParameterTool.fromArgs(args);
  5. StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
  6. DataStream<KafkaEvent> input = env
  7. .addSource(
  8. new FlinkKafkaConsumer011<>(
  9. parameterTool.getRequired("input-topic"),
  10. new KafkaEventSchema(),
  11. parameterTool.getProperties())
  12. .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
  13. .keyBy("word")
  14. .map(new RollingAdditionMapper());
  15. input.addSink(
  16. new FlinkKafkaProducer011<>(
  17. parameterTool.getRequired("output-topic"),
  18. new KafkaEventSchema(),
  19. parameterTool.getProperties()));
  20. env.execute("Kafka 0.11 Example");
  21. }
  22. }

我们可以看到,FlinkKafkaConsumer011被env对象add了,其实这里的逻辑我大概说一下,就不细讲了(env把FlinkKafkaConsumer011 add之后,用operator包装起来,组成transformation,最后形成jobgraph的节点),FlinkKafkaConsumer011本质继承的FlinkKafkaConsumerBase。那他是怎么工作起来了呢,首先节点被打包成jobgraph送到集群中再次改造成executionGraph(多了并行度),然后这时候的jobmanager就把source节点当成一个subtask发出去(假定并行度为1),把task发到taskmanager之后,在taskmanager中就形成了一个task类

  1. public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionProducerStateProvider, CheckpointListener, BackPressureSampleableTask {
  2. /** The class logger. */
  3. private static final Logger LOG = LoggerFactory.getLogger(Task.class);
  4. /** The thread group that contains all task threads. */
  5. private static final ThreadGroup TASK_THREADS_GROUP = new ThreadGroup("Flink Task Threads");
  6. /** For atomic state updates. */
  7. private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER =
  8. AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState.class, "executionState");
  9. // ------------------------------------------------------------------------
  10. // Constant fields that are part of the initial Task construction
  11. // ------------------------------------------------------------------------
  12. /** The job that the task belongs to. */
  13. private final JobID jobId;
  14. /** The vertex in the JobGraph whose code the task executes. */
  15. private final JobVertexID vertexId;
  16. /** The execution attempt of the parallel subtask. */
  17. private final ExecutionAttemptID executionId;
  18. /** ID which identifies the slot in which the task is supposed to run. */
  19. private final AllocationID allocationId;
  20. /** TaskInfo object for this task. */
  21. private final TaskInfo taskInfo;
  22. /** The name of the task, including subtask indexes. */
  23. private final String taskNameWithSubtask;
  24. /** The job-wide configuration object. */
  25. private final Configuration jobConfiguration;
  26. /** The task-specific configuration. */
  27. private final Configuration taskConfiguration;
  28. /** The jar files used by this task. */
  29. private final Collection<PermanentBlobKey> requiredJarFiles;
  30. /** The classpaths used by this task. */
  31. private final Collection<URL> requiredClasspaths;
  32. /** The name of the class that holds the invokable code. */
  33. private final String nameOfInvokableClass;
  34. /** Access to task manager configuration and host names. */
  35. private final TaskManagerRuntimeInfo taskManagerConfig;
  36. /** The memory manager to be used by this task. */
  37. private final MemoryManager memoryManager;
  38. /** The I/O manager to be used by this task. */
  39. private final IOManager ioManager;
  40. /** The BroadcastVariableManager to be used by this task. */
  41. private final BroadcastVariableManager broadcastVariableManager;
  42. private final TaskEventDispatcher taskEventDispatcher;
  43. /** The manager for state of operators running in this task/slot. */
  44. private final TaskStateManager taskStateManager;
  45. /** Serialized version of the job specific execution configuration (see {@link ExecutionConfig}). */
  46. private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
  47. private final ResultPartitionWriter[] consumableNotifyingPartitionWriters;
  48. private final InputGate[] inputGates;
  49. /** Connection to the task manager. */
  50. private final TaskManagerActions taskManagerActions;
  51. /** Input split provider for the task. */
  52. private final InputSplitProvider inputSplitProvider;
  53. /** Checkpoint notifier used to communicate with the CheckpointCoordinator. */
  54. private final CheckpointResponder checkpointResponder;
  55. /** GlobalAggregateManager used to update aggregates on the JobMaster. */
  56. private final GlobalAggregateManager aggregateManager;
  57. /** The BLOB cache, from which the task can request BLOB files. */
  58. private final BlobCacheService blobService;
  59. /** The library cache, from which the task can request its class loader. */
  60. private final LibraryCacheManager libraryCache;
  61. /** The cache for user-defined files that the invokable requires. */
  62. private final FileCache fileCache;
  63. /** The service for kvState registration of this task. */
  64. private final KvStateService kvStateService;
  65. /** The registry of this task which enables live reporting of accumulators. */
  66. private final AccumulatorRegistry accumulatorRegistry;
  67. /** The thread that executes the task. */
  68. private final Thread executingThread;
  69. /** Parent group for all metrics of this task. */
  70. private final TaskMetricGroup metrics;
  71. /** Partition producer state checker to request partition states from. */
  72. private final PartitionProducerStateChecker partitionProducerStateChecker;
  73. /** Executor to run future callbacks. */
  74. private final Executor executor;
  75. /** Future that is completed once {@link #run()} exits. */
  76. private final CompletableFuture<ExecutionState> terminationFuture = new CompletableFuture<>();
  77. // ------------------------------------------------------------------------
  78. // Fields that control the task execution. All these fields are volatile
  79. // (which means that they introduce memory barriers), to establish
  80. // proper happens-before semantics on parallel modification
  81. // ------------------------------------------------------------------------
  82. /** atomic flag that makes sure the invokable is canceled exactly once upon error. */
  83. private final AtomicBoolean invokableHasBeenCanceled;
  84. /** The invokable of this task, if initialized. All accesses must copy the reference and
  85. * check for null, as this field is cleared as part of the disposal logic. */
  86. @Nullable
  87. private volatile AbstractInvokable invokable;
  88. /** The current execution state of the task. */
  89. private volatile ExecutionState executionState = ExecutionState.CREATED;
  90. /** The observed exception, in case the task execution failed. */
  91. private volatile Throwable failureCause;
  92. /** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
  93. private long taskCancellationInterval;
  94. /** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
  95. private long taskCancellationTimeout;
  96. /** This class loader should be set as the context class loader for threads that may dynamically load user code. */
  97. private ClassLoader userCodeClassLoader;


从task run开始:

  1. public void run() {
  2. try {
  3. doRun();
  4. } finally {
  5. terminationFuture.complete(executionState);
  6. }
  7. }
  8. private void doRun() {
  9. ......
  10. invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
  11. ......
  12. invokable.invoke();
  13. ......
  14. }


  1. public final void invoke() throws Exception {
  2. try {
  3. beforeInvoke();
  4. ......
  5. runMailboxLoop();
  6. ......
  7. afterInvoke();
  8. }
  9. finally {
  10. cleanUpInvoke();
  11. }
  12. }


  1. private void beforeInvoke() throws Exception {
  2. ......
  3. operatorChain = new OperatorChain<>(this, recordWriter);
  4. headOperator = operatorChain.getHeadOperator();
  5. // task specific initialization
  6. //所以具体的任务有具体的初始化
  7. init();
  8. ......
  9. actionExecutor.runThrowing(() -> {
  10. initializeStateAndOpen();
  11. });
  12. }
  13. private void initializeStateAndOpen() throws Exception {
  14. StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
  15. for (StreamOperator<?> operator : allOperators) {
  16. if (null != operator) {
  17. //初始化operatorStateBackend、keyedStateBackend
  18. operator.initializeState();
  19. //
  20. operator.open();
  21. }
  22. }
  23. }
  24. public void open() throws Exception {
  25. super.open();
  26. FunctionUtils.openFunction(userFunction, new Configuration());
  27. }
  28. public static void openFunction(Function function, Configuration parameters) throws Exception{
  29. if (function instanceof RichFunction) {
  30. RichFunction richFunction = (RichFunction) function;
  31. richFunction.open(parameters);
  32. }
  33. }


  1. public void open(Configuration configuration) throws Exception {
  2. ......
  3. this.partitionDiscoverer = createPartitionDiscoverer(
  4. topicsDescriptor,
  5. getRuntimeContext().getIndexOfThisSubtask(),
  6. getRuntimeContext().getNumberOfParallelSubtasks());
  7. this.partitionDiscoverer.open();
  8. ......
  9. if (restoredState != null) {
  10. for (KafkaTopicPartition partition : allPartitions) {
  11. if (!restoredState.containsKey(partition)) {
  12. restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
  13. }
  14. }
  15. ......
  16. }


  1. protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
  2. ......
  3. sourceThread.setTaskDescription(getName());
  4. sourceThread.start();
  5. sourceThread.getCompletionFuture().whenComplete((Void ignore, Throwable sourceThreadThrowable) -> {
  6. if (isCanceled() && ExceptionUtils.findThrowable(sourceThreadThrowable, InterruptedException.class).isPresent()) {
  7. mailboxProcessor.reportThrowable(new CancelTaskException(sourceThreadThrowable));
  8. } else if (!isFinished && sourceThreadThrowable != null) {
  9. mailboxProcessor.reportThrowable(sourceThreadThrowable);
  10. } else {
  11. mailboxProcessor.allActionsCompleted();
  12. }
  13. });
  14. }


  1. public void run() {
  2. try {
  3. headOperator.run(getCheckpointLock(), getStreamStatusMaintainer(), operatorChain);
  4. completionFuture.complete(null);
  5. } catch (Throwable t) {
  6. // Note, t can be also an InterruptedException
  7. completionFuture.completeExceptionally(t);
  8. }
  9. }


  1. public void run(final Object lockingObject,
  2. final StreamStatusMaintainer streamStatusMaintainer,
  3. final Output<StreamRecord<OUT>> collector,
  4. final OperatorChain<?, ?> operatorChain) throws Exception {
  5. ......
  6. try {
  7. userFunction.run(ctx);
  8. ......
  9. } finally {
  10. if (latencyEmitter != null) {
  11. latencyEmitter.close();
  12. }
  13. }
  14. }


  1. public void run(SourceContext<T> sourceContext) throws Exception {
  2. ......
  3. this.kafkaFetcher = createFetcher(
  4. sourceContext,
  5. subscribedPartitionsToStartOffsets,
  6. periodicWatermarkAssigner,
  7. punctuatedWatermarkAssigner,
  8. (StreamingRuntimeContext) getRuntimeContext(),
  9. offsetCommitMode,
  10. getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
  11. useMetrics);
  12. ......
  13. if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
  14. kafkaFetcher.runFetchLoop();
  15. } else {
  16. runWithPartitionDiscovery();
  17. }
  18. }


  1. public void runFetchLoop() throws Exception {
  2. try {
  3. final Handover handover = this.handover;
  4. consumerThread.start();
  5. while (running) {
  6. final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
  7. for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
  8. List<ConsumerRecord<byte[], byte[]>> partitionRecords =
  9. records.records(partition.getKafkaPartitionHandle());
  10. for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
  11. final T value = deserializer.deserialize(record);
  12. if (deserializer.isEndOfStream(value)) {
  13. // end of stream signaled
  14. running = false;
  15. break;
  16. }
  17. emitRecord(value, partition, record.offset(), record);
  18. }
  19. }
  20. }
  21. }
  22. finally {
  23. consumerThread.shutdown();
  24. }
  25. try {
  26. consumerThread.join();
  27. }
  28. catch (InterruptedException e) {
  29. Thread.currentThread().interrupt();
  30. }
  31. }


  1. public void run() {
  2. ......
  3. final Handover handover = this.handover;
  4. ......
  5. try {
  6. this.consumer = getConsumer(kafkaProperties);
  7. }
  8. catch (Throwable t) {
  9. handover.reportError(t);
  10. return;
  11. }
  12. try {
  13. ......
  14. ConsumerRecords<byte[], byte[]> records = null;
  15. List<KafkaTopicPartitionState<TopicPartition>> newPartitions;
  16. // main fetch loop
  17. while (running) {
  18. ......
  19. if (records == null) {
  20. try {
  21. records = consumer.poll(pollTimeout);
  22. }
  23. catch (WakeupException we) {
  24. continue;
  25. }
  26. }
  27. try {
  28. handover.produce(records);
  29. records = null;
  30. }
  31. catch (Handover.WakeupException e) {
  32. // fall through the loop
  33. }
  34. }
  35. // end main fetch loop
  36. }
  37. ......
  38. }

其实可以看到,这里逻辑非常清晰,这里用consumer直接拉取数据,然后放到handover,而且这里consumer拉取数据不是一个数据,而是一批。至于这个consumer是啥,之前说过,他就是KafkaConsumer,是Kafka的client用来访问Kafka server的。那他是怎么来的呢,其实就是由咱们主函数提供的Properties创建出来的。那这样一说来,open的执行似乎并没有起到很关键的作用,其实刚才说到open有一个partitionDiscover会请求到kafka的所有kafkapartition,然后放到restoreState中,所以我们并没有用到这个restoreState,其实这个我们确实在获取数据的时候没有用到这个,但是他在checkpoint中用到了呀,你可以看看这个属性的注释就会知道,这个变量是先initialState中先初始化,然后在open中记录kafka的分区,offset等记录。所以这么看来open和run是没关系的。


  1. FlinkKafkaConsumerBase在用户自己编写的主程序创建之后,就交给env包装成operator,最后打包成jobgraph由client送到jobmanager手中
  2. jobmanager经过并行度、chain等优化,把jobgraph编程executiongraph
  3. executiongraph把子任务下发到taskmanager,子任务抽象为一个Task类
  4. task类会调用真正的执行类AbstractInvokable的invoke方法
  5. invoke方法分为三步:beforeInvoke、runLoop、afterInvoke
  6. beforeInvoke会调用FlinkKafkaConsumerBase的open方法,保存checkpoint需要的各种信息(比如需要保存kafka的分区、消费到那个offset的了)
  7. runLoop会启动一个线程sourceThread去真正处理source节点该干的事
  8. sourceThread会去调用operator的run方法,而operator就会去调用FlinkKafkaConsumerBase的run方法
  9. FlinkKafkaConsumerBase的run方法会去创建一个fetcher真正的来获取数据
  10. fetcher有两个重要的类来完成获取数据(一个是用来拉数据的consumerThread,一个是用来放数据的Handover)
  11. consumerThread直接利用kafkaConsumer(kafka client的类)获取服务器的数据
  12. 获取到之后就放进handover,然后fetcher就循环阻塞从handover拿数据,拿到之后先反序列化一波
  13. 最后就是把数据emitRecord
  14. emitRecord不是直接发送到下游,而是直接交给他的输出网关。



public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends RichSinkFunction<T> implements CheckpointedFunction


  1. public class Elasticsearch7SinkExample {
  2. public static void main(String[] args) throws Exception {
  3. final ParameterTool parameterTool = ParameterTool.fromArgs(args);
  4. if (parameterTool.getNumberOfParameters() < 2) {
  5. System.out.println("Missing parameters!\n" +
  6. "Usage: --numRecords <numRecords> --index <index>");
  7. return;
  8. }
  9. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10. env.enableCheckpointing(5000);
  11. DataStream<Tuple2<String, String>> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
  12. .flatMap(new FlatMapFunction<Long, Tuple2<String, String>>() {
  13. @Override
  14. public void flatMap(Long value, Collector<Tuple2<String, String>> out) {
  15. final String key = String.valueOf(value);
  16. final String message = "message #" + value;
  17. out.collect(Tuple2.of(key, message + "update #1"));
  18. out.collect(Tuple2.of(key, message + "update #2"));
  19. }
  20. });
  21. List<HttpHost> httpHosts = new ArrayList<>();
  22. httpHosts.add(new HttpHost("", 9200, "http"));
  23. ElasticsearchSink.Builder<Tuple2<String, String>> esSinkBuilder = new ElasticsearchSink.Builder<>(
  24. httpHosts,
  25. (Tuple2<String, String> element, RuntimeContext ctx, RequestIndexer indexer) -> {
  26. indexer.add(createIndexRequest(element.f1, parameterTool));
  27. indexer.add(createUpdateRequest(element, parameterTool));
  28. });
  29. esSinkBuilder.setFailureHandler(
  30. new CustomFailureHandler(parameterTool.getRequired("index")));
  31. // this instructs the sink to emit after every element, otherwise they would be buffered
  32. esSinkBuilder.setBulkFlushMaxActions(1);
  33. source.addSink(esSinkBuilder.build());
  34. env.execute("Elasticsearch 7.x end to end sink test example");
  35. }
  36. private static class CustomFailureHandler implements ActionRequestFailureHandler {
  37. private static final long serialVersionUID = 942269087742453482L;
  38. private final String index;
  39. CustomFailureHandler(String index) {
  40. this.index = index;
  41. }
  42. @Override
  43. public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
  44. if (action instanceof IndexRequest) {
  45. Map<String, Object> json = new HashMap<>();
  46. json.put("data", ((IndexRequest) action).source());
  47. indexer.add(
  48. Requests.indexRequest()
  49. .index(index)
  50. .id(((IndexRequest) action).id())
  51. .source(json));
  52. } else {
  53. throw new IllegalStateException("unexpected");
  54. }
  55. }
  56. }
  57. private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
  58. Map<String, Object> json = new HashMap<>();
  59. json.put("data", element);
  60. String index;
  61. String type;
  62. if (element.startsWith("message #15")) {
  63. index = ":intentional invalid index:";
  64. type = ":intentional invalid type:";
  65. } else {
  66. index = parameterTool.getRequired("index");
  67. }
  68. return Requests.indexRequest()
  69. .index(index)
  70. .id(element)
  71. .source(json);
  72. }
  73. private static UpdateRequest createUpdateRequest(Tuple2<String, String> element, ParameterTool parameterTool) {
  74. Map<String, Object> json = new HashMap<>();
  75. json.put("data", element.f1);
  76. return new UpdateRequest(
  77. parameterTool.getRequired("index"),
  78. parameterTool.getRequired("type"),
  79. element.f0)
  80. .doc(json)
  81. .upsert(json);
  82. }
  83. }

可以看到这里并不是直接把ESsink创建出来扔进env的,而是先创建一个builder,然后在build出来再扔进去。为啥要这样呀?那这就是经典的builder设计模式了,你会发现,创建builder的时候传了一个httphost和一个esFunction,这个esfunction就是咱们把获取到的数据组装成es需要的格式,然后扔给es client,然后builder还会set一下其他属性。这些属性到时候都是要传给essink的,所以用builder模式可以达到灵活插拔,想给essink注入啥属性就注入啥属性。说完这个builder之后其实最重要的还是要关注invoke和open是如何调用的以及何时调用的。


  1. public void open(Configuration parameters) throws Exception {
  2. client = callBridge.createClient(userConfig);
  3. bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
  4. requestIndexer = callBridge.createBulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests);
  5. failureRequestIndexer = new BufferingNoOpRequestIndexer();
  6. }

原来open方法就这么简单就创建client、bulkprocessor、requestIndexer,看看这几行代码就知道这三个都是由callBridge创建出来的,这个callbridge就是咱们在用户主程序中传入的参数创建的呀。首先说明这个client就是咱们的es client,类似于kafkaconsumer,是和es server交互的client,其次这个bulkprocessor是为了批处理准备的,flink是流处理为什么要批写入呢,原因时,你老是处理完一条数据就push一条数据到服务器上,这样大大滴浪费了时间呀,就像jdbc一条一条的写入mysql一样性能很低,所以为了提高性能就使用了bulkprocessor。最后这个requestIndexer就是用来接受用户的一条一条的请求的,在我们的用户主程序可以看到,用户最终是要执行indexer.add(request)方法的,这个indexer便是requestIndexer了。整体的结构是requestIndexer包含bulkProcessor,bulkProcessor包含client。先来看看RequestIndexer的代码:

  1. public void add(IndexRequest... indexRequests) {
  2. for (IndexRequest indexRequest : indexRequests) {
  3. if (flushOnCheckpoint) {
  4. numPendingRequestsRef.getAndIncrement();
  5. }
  6. this.bulkProcessor.add(indexRequest);
  7. }
  8. }

可以看到RequestIndexer只是一个媒介呀,当执行add的时候,其实是把request add 到bulkProcessor,所以我们来看看bulkProcessor

  1. private void internalAdd(DocWriteRequest<?> request) {
  2. Tuple<BulkRequest, Long> bulkRequestToExecute = null;
  3. this.lock.lock();
  4. try {
  5. this.ensureOpen();
  6. this.bulkRequest.add(request);
  7. bulkRequestToExecute = this.newBulkRequestIfNeeded();
  8. } finally {
  9. this.lock.unlock();
  10. }
  11. if (bulkRequestToExecute != null) {
  12. this.execute((BulkRequest)bulkRequestToExecute.v1(), (Long)bulkRequestToExecute.v2());
  13. }
  14. }

其实bulkProcessor的add方法最终是往bulkRequest add request,其实很好理解,咱们每一条请求都是一条数据,这些数据最终汇集到bulkRequest(顾名思义就是块请求,也就是批请求),而且add的时候还上锁了。那这个bulkRequest是怎么push到es的server的呢?要解决这个问题那就是要看BulkProcessor这个类了,先来看看他的构造函数

  1. BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, BulkProcessor.Listener listener, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, Scheduler scheduler, Runnable onClose, Supplier<BulkRequest> bulkRequestSupplier) {
  2. this.bulkActions = bulkActions;
  3. this.bulkSize = bulkSize.getBytes();
  4. this.bulkRequest = (BulkRequest)bulkRequestSupplier.get();
  5. this.bulkRequestSupplier = bulkRequestSupplier;
  6. this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
  7. this.cancellableFlushTask = this.startFlushTask(flushInterval, scheduler);
  8. this.onClose = onClose;
  9. }

有没有看到关键的东西?他在构造函数就执行了startFlushTask,看这个名字就知道就是刷新任务呗,那还能刷新啥任务,不就是把数据push到es上嘛,但是我怎么确定我有数据呢?这其实就是设置了一个定时器(flushInterval)比如10秒,10秒内都可以把request add到bulkRequest中,一旦时间一到,就会执行client.bulkAsync方法把bulkrequest中的数据一次性全部push到es server。startFlushTask最终会调用如下代码:

  1. private void execute() {
  2. BulkRequest bulkRequest = this.bulkRequest;
  3. long executionId = this.executionIdGen.incrementAndGet();
  4. this.bulkRequest = (BulkRequest)this.bulkRequestSupplier.get();
  5. this.execute(bulkRequest, executionId);
  6. }

先把bulkRequest拿出来,然后立马new一个新的bulkRequest,最后把这个bulkRequest发出去,不知道大家理不理解这几句代码。其实就是先把有一定数据的bulkrequest用临时变量存起来,然后就new一个新的用于下次的接受用户的request,接着把刚才用临时变量保存的数据push到es server,真整过程就这样以定时器的方式运行着,即是没有数据也是这样运行着。所以回顾一下open干了啥?其实open就是初始化了要连接es server的所有组件,就等着invoke方法产生数据放到bulkRequest中,一旦到了定时器时间,就往es server push。


  1. protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
  2. InputStatus status = inputProcessor.processInput();
  3. if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
  4. return;
  5. }
  6. if (status == InputStatus.END_OF_INPUT) {
  7. controller.allActionsCompleted();
  8. return;
  9. }
  10. CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
  11. MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
  12. jointFuture.thenRun(suspendedDefaultAction::resume);
  13. }


  1. public void processElement(StreamRecord<IN> element) throws Exception {
  2. sinkContext.element = element;
  3. userFunction.invoke(element.getValue(), sinkContext);
  4. }

其实invoke方法很简单,就是调用用户主程序传给builder的函数ElasticsearchSinkFunction,调用这个函数不就是给RequestIndexer发request,最终还是发到bulkRequest中,然后随着定时器触发就push到es server中。


  1. 用户主程序创建esSinkbuilder,重要的是创一个ElasticsearchSinkFunction进去,但是时候给esSink的invoke调用,
  2. 然后设置属性之后就build,扔给env
  3. 还是组装成jobgraph给jobmanager,最后形成executiongraph
  4. excutiongraph把sink任务交给taskmanager抽象为task,最后执行StreamTask的invoke方法
  5. 和kafka source一样,先执行beforeInvoke,调用open方法,初始化所有与es server通信的组件,client(发数据的核心类)、bulkprocessor(把数据攒成一批再调用client发出去)、requestIndexer(用户发出请求到bulkProcessor的bulkRequest的媒介),这时与es的通信已经开始了
  6. 调用runloop,执行processInput,最终调用operator的processElement,processElement调用esSink的invoke方法
  7. esSink的invoke调用用户主程序传进来的函数,在这个函数里把数据组装成请求,通过requestIndexer add到bulkrequest
  8. bulkrequest被定时发送到es server中。



