赞
踩
max.in.flight.requests.per.connection=1 可以保证的某个主题分区发送的绝对顺序,不会因为报错重试导致批次乱序
Sender 发送数据 逻辑模拟 while (true) { if (!ready(cluster)) { return; } dataList = accumulator.drain(cluster, node) // 发送之后再判断 mute 由于没有返回, 所以阻止取某个tp的下一批数据,否则取到了tp的下一批数据 也发出去了, 但是上一批数据报错了, 再重试就乱序了, 取的数据在max.request.size范围内 for data in dataList: mutePartition(data.tp) nioSend() // 异步发送, } nioResponse() { // 异步接收 if (ex) { if (canRtry()) { accumulator.requeue(). // 加到first保证下次还是取到的这个请求 } } unmutePartition } public xxx drain() { for tp in node: // 遍历某个节点上的所有主题分区 if mute(tp): continue queue = getDeque(tp) // tp 就是 主题分区 batch =-queue.peekFirst() // 注意是peek boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs; // 如果异常了重试,并且超过重试时间间隔就可以获取到当前批次数据, 否则获取不到 if (backoff) return batch = queue.pollFirst() list.add(batch) return list; }
org.apache.kafka.clients.producer.internals.Sender#sendProducerData
drain 获取到了一批数据
org.apache.kafka.clients.producer.internals.Sender#completeBatch(org.apache.kafka.clients.producer.internals.ProducerBatch, org.apache.kafka.common.requests.ProduceResponse.PartitionResponse, long, long)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。