当前位置:   article > 正文

max.in.flight.requests.per.connection保证顺序的实现

max.in.flight.requests.per.connection

max.in.flight.requests.per.connection=1 可以保证的某个主题分区发送的绝对顺序,不会因为报错重试导致批次乱序

Sender 发送数据 逻辑模拟

while (true) {
    if (!ready(cluster)) { 
    	return;
    }
	dataList = accumulator.drain(cluster, node) // 发送之后再判断 mute 由于没有返回, 所以阻止取某个tp的下一批数据,否则取到了tp的下一批数据 也发出去了, 但是上一批数据报错了, 再重试就乱序了,  取的数据在max.request.size范围内
	for data in dataList:
	  mutePartition(data.tp)

	nioSend() // 异步发送,
}


nioResponse() { // 异步接收
	if (ex) {
	   if (canRtry()) {
		   accumulator.requeue(). // 加到first保证下次还是取到的这个请求
	   }			
	}
	unmutePartition
}


public xxx drain() {
  for tp in node:  // 遍历某个节点上的所有主题分区
    if mute(tp):
       continue
	queue = getDeque(tp) // tp 就是 主题分区
	batch =-queue.peekFirst() // 注意是peek
	boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs; // 如果异常了重试,并且超过重试时间间隔就可以获取到当前批次数据, 否则获取不到
	if (backoff) return
    
    batch = queue.pollFirst()
    list.add(batch)
  
  return list;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

org.apache.kafka.clients.producer.internals.Sender#sendProducerData
在这里插入图片描述
drain 获取到了一批数据
在这里插入图片描述

org.apache.kafka.clients.producer.internals.Sender#completeBatch(org.apache.kafka.clients.producer.internals.ProducerBatch, org.apache.kafka.common.requests.ProduceResponse.PartitionResponse, long, long)
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/839399
推荐阅读
相关标签
  

闽ICP备14008679号