当前位置:   article > 正文

Kafka系列二——消息发送send方法源码分析_kafka send

kafka send

一、send使用说明

1.1 客户端代码

public boolean producer(String key, String value) {
	//在客户端调用send方法时,需要先构造好ProducerRecord对象
	ProducerRecord kafkaMessage = new ProducerRecord(topic, key, value);
	
    try {
   	    //异步方式返回的是一个future对象,在这对象上调用get方法,将被阻塞直到返回结果,从而实现同步
        Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
        RecordMetadata recordMetadata = (RecordMetadata)metadataFuture.get();
        logger.info("Produce ok:" + recordMetadata.offset());
        return true;
    } catch (Exception var6) {
        logger.error("Error occurred from topic:{},error:{}", topic, var6);
        return false;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

1.2 ProducerRecord

public class ProducerRecord<K, V> {

    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

如果在发送时指定了partition,则消息将被保存到指定的tp分区队列,如果没有指定分区,将对key散列后来计算分区,相同key的消息将被写到同一个分区队列中,如果key是null,且使用默认的分区器,则分区器将用轮询的方法(Round Robin)将序列化后的消息均衡分布到不同的队列中,sender线程从Accumulator中取出批量数据组成一个batch发送

二、发送过程

在这里插入图片描述

在producer端,存在2个线程,一个是producer主线程,用户端调用send消息时,是在主线程执行的,数据被缓存到RecordAccumulator中,send方法即刻返回,也就是说此时并不能确定消息是否真正的发送到broker。另外一个是sender IO线程,其不断轮询RecordAccumulator,满足一定条件后,就进行真正的网络IO发送,使用的是异步非阻塞的NIO。主线程的send方法提供了一个用于回调的参数,当sender线程发送完后,回调函数将被调用,可以用来处理成功,失败或异常的逻辑
sender线程会发送多种类型的请求,以下是kafka集群元数据的请求过程
在这里插入图片描述

2.1 send

  @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        //执行用户自定义拦截器插件
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2.2 doSend关键代码

//计算推送到哪个partition
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);

setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
//校验消息内容不能超过限制值
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
       compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
if (log.isTraceEnabled()) {
   log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
}
// 生产者回调将确保调用'回调'和拦截回调
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != null && transactionManager.isTransactional()) {
   transactionManager.failIfNotReadyForSend();
}
//在record被加入到accumulator时,会根据record所在的tp找到RecordBatch队列,如果不存在,就新建一个队列,在队列中取出最后一个RecordBatch,如果这个batch还有空间,就把record新追加到缓存后面,这样1个batch可能会有多个record,如果batch空间不够,就新创建一个batch,重新分配一个Max(16k, recordsize)的buffer,如果这个record超过16k,则这个batch中只会保存这1个record
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
       serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

if (result.abortForNewBatch) {//默认false
   int prevPartition = partition;
   partitioner.onNewBatch(record.topic(), cluster, prevPartition);
   partition = partition(record, serializedKey, serializedValue, cluster);
   tp = new TopicPartition(record.topic(), partition);
   if (log.isTraceEnabled()) {
       log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
   }
   // producer callback will make sure to call both 'callback' and interceptor callback
   interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

   result = accumulator.append(tp, timestamp, serializedKey,
       serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
//发送事务处理
if (transactionManager != null && transactionManager.isTransactional())
   transactionManager.maybeAddPartitionToTransaction(tp);

if (result.batchIsFull || result.newBatchCreated) {
   //RecordBatch满了或者创建新的RecordBatch,触发sender线程。sender线程在KafkaProducer实例化时就已启动并阻塞在poll方法,wakeup()它的作用就是将 Sender 线程从poll方法的阻塞中唤醒,之后从accumulator取出RecordBatch并发送到broker
   log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
   this.sender.wakeup();
}
return result.future;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
2.2.1 RecordAccumulator原理

在这里插入图片描述
分区选择器计算出分区之后得到TopicPartition,在消息record(key、value)被加入到accumulator时,会根据TopicPartition找到RecordBatch队列,如果不存在,就新建一个队列,在队列中取出最后一个RecordBatch,如果这个batch还有空间,就把record新追加到缓存后面,这样1个batch可能会有多个record,如果batch空间不够,就新创建一个batch,重新分配一个Max(16k, recordsize)的buffer,如果这个record超过16k,则这个batch中只会保存这1个record

2.3 Sender关键代码

先看下kafka的网络设计,kafka的网络实际上是封装了java.nio的Selector、SocketChanel。
在这里插入图片描述

public class Sender implements Runnable {

    private final Logger log;

    //kafka 网络通信客户端,主要封装与 broker 的网络通信。
    private final KafkaClient client;

    //消息记录累积器,消息追加的入口(RecordAccumulator 的 append 方法)。
    private final RecordAccumulator accumulator;

    //元数据管理器,即 topic 的路由分区信息。
    private final ProducerMetadata metadata;

    //是否需要保证消息的顺序性。
    private final boolean guaranteeMessageOrder;

    //调用 send 方法发送的最大请求大小,包括 key、消息体序列化后的消息总大小不能超过该值。通过参数 max.request.size 来设置。
    private final int maxRequestSize;

	//用来定义消息“已提交”的条件(标准),就是 Broker 端向客户端承偌已提交的条件,可选值如下0、-1、1.
    private final short acks;

    //重试次数。
    private final int retries;

    /* the clock instance used for getting the time */
    private final Time time;

    //该线程状态,为 true 表示运行中。
    private volatile boolean running;

    //是否强制关闭,此时会忽略正在发送中的消息。
    private volatile boolean forceClose;

    //消息发送相关的统计指标收集器。
    private final SenderMetrics sensors;

    //请求的超时时间。
    private final int requestTimeoutMs;

    //请求失败之在重试之前等待的时间。
    private final long retryBackoffMs;

    //API版本信息。
    private final ApiVersions apiVersions;

 	//事务处理器。
    private final TransactionManager transactionManager;

    // 正在执行发送相关的消息批次。
    private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
2.3.1 wakeup()

Sender中的wakeup()方法最终调用WindowsSelectorImpl(基于windows操作系统实现的多路复用选择器)的wakeup(),它的作用就是将 Sender 线程从poll方法的阻塞中唤醒。poll方法的作用是轮询注册在多路复用器上的 Channel,它会一直阻塞在这个方法上,除非满足下面条件中的一个:

  • at least one channel is selected;
  • this selector’s {@link #wakeup wakeup} method is invoked;
  • the current thread is interrupted;
  • the given timeout period expires.

否则 poll 将会一直轮询,阻塞在这个地方,直到条件满足。

因此,sender.wakeup() 方法的作用就是:当有新的 RecordBatch 创建后,旧的 RecordBatch 就可以发送了(或者此时有 Metadata 请求需要发送),如果线程阻塞在 select() 方法中,就将其唤醒,Sender 重新开始运行 run() 方法,在这个方法中,旧的 RecordBatch (或相应的 Metadata 请求)将会被选中,进而可以及时将这些请求发送出去。

2.3.2 run()
public void run() {
    log.debug("Starting Kafka producer I/O thread.");
    while (running) {   
        try {
            runOnce();    // @1
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }
    log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
    while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {    // @2
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }
    if (forceClose) {     // @3                                                                                                                                
        log.debug("Aborting incomplete batches due to forced shutdown");
        this.accumulator.abortIncompleteBatches();
    }
    try {
        this.client.close();     // @4                                                                                                                          
    } catch (Exception e) {
        log.error("Failed to close network client", e);
    }
    log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

代码@1:Sender 线程在运行状态下主要的业务处理方法,将消息缓存区中的消息向 broker 发送。
代码@2:如果主动关闭 Sender 线程,如果不是强制关闭,则如果缓存区还有消息待发送,再次调用 runOnce 方法将剩余的消息发送完毕后再退出。
代码@3:如果强制关闭 Sender 线程,则拒绝未完成提交的消息。
代码@4:关闭 Kafka Client 即网络通信对象。

2.3.3 runOnce()
void runOnce() {
        if (transactionManager != null) {
            try {
                transactionManager.maybeResolveSequences();

                // do not continue sending if the transaction manager is in a failed state
                if (transactionManager.hasFatalError()) {
                    RuntimeException lastError = transactionManager.lastError();
                    if (lastError != null)
                        maybeAbortBatches(lastError);
                    client.poll(retryBackoffMs, time.milliseconds());
                    return;
                }

                // Check whether we need a new producerId. If so, we will enqueue an InitProducerId
                // request which will be sent below
                transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();

                if (maybeSendAndPollTransactionalRequest()) {
                    return;
                }
            } catch (AuthenticationException e) {
                // This is already logged as error, but propagated here to perform any clean ups.
                log.trace("Authentication exception while processing transactional request", e);
                transactionManager.authenticationFailed(e);
            }
        }

        long currentTimeMs = time.milliseconds();
        // 调用 sendProducerData 方法,将消息封装成List<ProducerBatch>,再组装成ClientRequest。然后最终会通过 NetworkClient#send 方法,将该批数据(InFlightRequest)设置到 NetworkClient 的待发送数据中,此时并没有触发真正的网络调用。
        long pollTimeout = sendProducerData(currentTimeMs);
        //触发真正的网络通讯,该方法中会通过收到调用 NIO 中的 Selector#select() 方法,对通道的读写就绪事件进行处理,当写事件就绪后,就会将通道中的消息发送到远端的 broker,并且接收响应进行处理
        client.poll(pollTimeout, currentTimeMs);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

2.3 poll()关键代码

poll方法调用Selector的poll方法

   /* check ready keys */
        long startSelect = time.nanoseconds();
        //调用nio.selector,获取准备好的socket数量,并将socket放入this.nioSelector.selectedKeys()
        int numReadyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

        if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
            Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

            // Poll from channels that have buffered data (but nothing more from the underlying socket)
            if (dataInBuffers) {
                keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
                Set<SelectionKey> toPoll = keysWithBufferedRead;
                keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
                pollSelectionKeys(toPoll, false, endSelect);
            }

            // 遍历readyKeys,处理注册的感兴趣事件
            pollSelectionKeys(readyKeys, false, endSelect);
            // Clear all selected keys so that they are included in the ready count for the next select
            readyKeys.clear();

            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
            immediatelyConnectedKeys.clear();
        } else {
            madeReadProgressLastPoll = true; //no work is also "progress"
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

三、总结

消息最终的的发送是在sender线程的run()方法开始的,如图是执行过程:
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/839420
推荐阅读
相关标签
  

闽ICP备14008679号