当前位置:   article > 正文

Kafka——Broker与客户端通信网络模型_broker框架

broker框架

初学一个技术的方法:

初学一个技术,怎么了解该技术的源码至关重要。

对我而言,最佳的阅读源码的方式,那就是:不求甚解,观其大略

你如果进到庐山里头,二话不说,蹲下头来,弯下腰,就对着某棵树某棵小草猛研究而不是说先把庐山的整体脉络研究清楚了,那么你的学习方法肯定效率巨低而且特别痛苦。

最重要的还是慢慢地打击你的积极性,说我的学习怎么那么不 happy 啊,怎么那么没劲那,因为你的学习方法错了,大体读明白,先拿来用,用着用着,很多道理你就明白了。

先从整体上把关源码,再去扣一些细节问题。

举个简单的例子:

如果你刚接触 HashMap,你刚有兴趣去看其源码,在看 HashMap 的时候,有一个知识:当链表长度达到 8 之后,就变为了红黑树,小于 6 就变成了链表,当然,还和当前的长度有关。

这个时候,如果你去深究红黑树、为什么是 8 不是别的,又去查 泊松分布,最终会慢慢的搞死自己。

所以,正确的做法,我们先把这一部分给略过去,知道这个概念即可,等后面我们把整个庐山看完之后,再回过头抠细节。

一、服务端网络架构

我们讲过了生产者整个的调用流程以及发送流程,今天我们讲一下服务端是如何与客户端连接的。

我们都知道,Kafka作为一个吞吐量极高的中间件,其通信过程自然而然也起到了关键性的作用,之前我们聊过,Kafka并未用Netty作为其通信的框架,而是自己自研的。

那么,这个自研的框架是怎么做的呢?和Netty相比又如何?

同样,还有一个前提问题,希望在看本篇博客的时候,能够思考一下:Kafka如何在高吞吐的状态下仍然保证单Partition的有序性?

废话不多说,我们直接开车!

首先看一下服务端的网络架构图:

我们现在这里解释下整体的流程:

  • Acceptor初始化的候会注册OP_ACCEPT事件,当有客户端进来时,会触发该事件并将该事件轮询的方式分发给Processor处理。

  • Processor 收到 Acceptor 分发的连接时,会注册 OP_READ 事件并与内部的 selector 绑定,当下次客户端发送信息时,直接触发 ProcessorOP_READ 事件进行处理。

  • Processor 将客户端的连接请求放入 RequestQueue(仅有一个) 里面,所有的 Processor 共用一个 RequestQueue

  • KafkaRequestHandlerRequestQueue 中取出请求,通过调用 KafkaApis 得到响应结果,将响应结果放入到 responseQueues ,这里需要注意一点:Processor 有几个 responseQueue

  • Processor 从对应的 responseQueue 中取出 response,将其通过 SockerChannel 发送给对应的客户端、

  • 这些就是 Kafka 服务端网络的整体架构

  • 下面我们详细的拆解每一部分的实现细节

二、服务端源代码剖析

kafka 服务端的启动类为 kafka.scala,主要启动 KafkaServer 服务端

KafkaServer 的启动代码如下:

  1. socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
  2. socketServer.startup()

实际上真正的服务启动是 SocketServer

1、SocketServer

  1. class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {
  2. private val endpoints = config.listeners // 开放的端口数
  3. private val numProcessorThreads = config.numNetworkThreads // 默认为 3个,即 processor
  4. private val maxQueuedRequests = config.queuedMaxRequests // request 队列中允许的最多请求数,默认是500
  5. private val totalProcessorThreads = numProcessorThreads * endpoints.size // 每个端口会对应 N 个 processor
  6. val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
  7. private val processors = new Array[Processor](totalProcessorThreads)
  8. private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
  9. }
  10. // requestQueue:只有一个
  11. // responseQueues:每个 Processor 都对应一个
  12. class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
  13. private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
  14. private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
  15. for(i <- 0 until numProcessors)
  16. responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
  17. )

1.1 初始化

对于初始化来说,主要完成 Processoracceptor 的创建

  1. def startup() {
  2. this.synchronized {
  3. // 发送和接受的缓存区大小
  4. val sendBufferSize = config.socketSendBufferBytes
  5. val recvBufferSize = config.socketReceiveBufferBytes
  6. val brokerId = config.brokerId
  7. var processorBeginIndex = 0
  8. // endpoint:开放的端口数,默认一个 Broker 开放一个
  9. endpoints.values.foreach { endpoint =>
  10. val protocol = endpoint.protocolType
  11. val processorEndIndex = processorBeginIndex + numProcessorThreads
  12. // Processor:默认为三个
  13. for (i <- processorBeginIndex until processorEndIndex 默认为 3
  14. processors(i) = newProcessor(i, connectionQuotas, protocol)
  15. // Acceptor: 默认一个
  16. val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
  17. processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
  18. acceptors.put(endpoint, acceptor)
  19. // 等待线程的启动
  20. acceptor.awaitStartup()
  21. processorBeginIndex = processorEndIndex
  22. }
  23. }

1.2 Acceptor 处理

上面我们创建完了Acceptor 和 Processor,首先看一下 Acceptor 的处理

  • 首先向 nioSelector 注册接受 OP_ACCEPT 事件,监听是否有新的连接请求

  • 如果有新的连接请求接入,将该连接的 SocketChannel 交于 processors 进行处理

  • 由于 processor 存在多个,以轮询的方式去交付,保证 processor 的负载均衡

  1. def run() {
  2. // 注册OP_ACCEPT事件
  3. serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
  4. // 线程启动完成
  5. startupComplete()
  6. try {
  7. var currentProcessor = 0
  8. // 死循环
  9. while (isRunning) {
  10. try {
  11. // 查看有没有注册的连接进来
  12. val ready = nioSelector.select(500)
  13. if (ready > 0) {
  14. // 拿出所有的keys并遍历
  15. val keys = nioSelector.selectedKeys()
  16. val iter = keys.iterator()
  17. while (iter.hasNext && isRunning) {
  18. try {
  19. val key = iter.next
  20. // 用完即删
  21. iter.remove()
  22. // 如果当前的是接受事件,则进行接受事件相应的处理
  23. if (key.isAcceptable)
  24. accept(key, processors(currentProcessor))
  25. // 轮询的方式选择下一个Processor线程
  26. currentProcessor = (currentProcessor + 1) % processors.length
  27. }
  28. }
  29. }
  30. }
  31. }
  32. }
  33. }

交于 processors 处理的逻辑:

  • 拿到当前 ServerSocketChannel 上的 socketChannel 并进行一些对应的配置

  • socketChannel 放入 newConnections 中并唤醒我们的 processor

  1. /*
  2. * 接受一个新连接
  3. */
  4. def accept(key: SelectionKey, processor: Processor) {
  5. // accept 事件发生时,获取注册到 selector 上的 ServerSocketChannel
  6. val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
  7. val socketChannel = serverSocketChannel.accept()
  8. try {
  9. // socketChannel的各种配置
  10. connectionQuotas.inc(socketChannel.socket().getInetAddress)
  11. socketChannel.configureBlocking(false)
  12. socketChannel.socket().setTcpNoDelay(true)
  13. socketChannel.socket().setKeepAlive(true)
  14. socketChannel.socket().setSendBufferSize(sendBufferSize)
  15. processor.accept(socketChannel)
  16. }
  17. }
  18. // 将新的 SocketChannel放入到newConnections中去
  19. def accept(socketChannel: SocketChannel) {
  20. newConnections.add(socketChannel)
  21. // 唤醒 Processor 的 selector(如果此时在阻塞的话)
  22. wakeup()
  23. }

1.3 Processor 处理

前面我们讲过,Acceptor 将 socketChannel 放到了 newConnections 队列中并唤醒我们的 Processor 线程

我们可以猜测到,Processor 肯定是从 newConnections 中拿出 socketChannel 去处理

我们的猜测正不正确呢?来看看源码怎么说

  1. override def run() {
  2. startupComplete()
  3. while (isRunning) {
  4. try {
  5. /**
  6. * 从 newConnections 弹出当前的 channel
  7. * 将当前的 channel 绑定到 nioSelector 并注册 OP_READ 事件
  8. */
  9. configureNewConnections()
  10. /**
  11. * 拿到属于自己的 responseQueues 并处理其中的 response
  12. * 其中 response 分为三类:
  13. * NoOpAction:如果这个请求不需要返回 response,再次注册 OP_READ 监听事件
  14. * SendAction:需要发送,后续注册 OP_WRITE 监听事件,最终通过 poll 发送(类似我们的生产者消息发送)
  15. * CloseConnectionAction:需要关闭的 response
  16. */
  17. processNewResponses()
  18. /**
  19. * 选择器轮询各种事件,请求和发送响应
  20. * 比如上面我们需要发送的 response,就通过 poll 发送出去(代码逻辑和生产者类似,不再细讲)
  21. */
  22. poll()
  23. /**
  24. * 服务端处理器的运行方法在调用选择器的轮询后,处理已经完成的请求接收
  25. * 请求接受:将请求放入到 requestQueue 中并删除掉 OP_READ 事件注册
  26. */
  27. processCompletedReceives()
  28. /**
  29. * 服务端处理器的运行方法在调用选择器的轮询后,处理已经完成的响应发送
  30. * 响应发送:当有写请求时加入inflightResponses,当写请求完成后删除并添加 OP_READ 事件监听
  31. */
  32. processCompletedSends()
  33. processDisconnected()
  34. }
  35. }
  36. swallowError(closeAll())
  37. shutdownComplete()
  38. }

我们通过源码可以看到,总共分五个步骤:

  • Processor 从 newConnections 取出 socketChannel 并注册 OP_READ 事件监听

  • 处理 responseQueues 的 response,总共分三个类型

  • NoOpAction:如果这个请求不需要返回 response,再次注册 OP_READ 监听事件

  • SendAction:需要发送,后续注册 OP_WRITE 监听事件,最终通过 poll 发送(类似我们的生产者消息发送)

  • seConnectionAction:需要关闭的 response

  • 上面我们注册了 OP_WRITE 事件,在 poll 阶段会被监听到并发送至客户端

  • 处理客户端的一些请求,将其放入到 requestQueue 并删除掉 OP_READ 事件监听

  • 处理响应请求,当有写请求时加入 inflightResponses,当写请求完成后删除并添加 OP_READ 事件监听

当然,我们可以简单的理解一下整个流程:

这里给大家留一个小问题:为什么要频繁的删除掉 OP_READ 事件监听、增加 OP_READ 事件监听?

2、KafkaRequestHandlerPool

  • 按照我们架构图所示,不出所料的话,应该到 KafkaRequestHandlerPool 这一部分了

  • 通过架构图我们可以得知这部分的主要功能:

  • 获取 requestQueue 中的请求,通过 KafkaApis 得到对应的结果

  • 将结果放入到响应队列(responseQueues)中

2.1 初始化

  • KafkaRequestHandlerPool 中创建 numThreads 个 KafkaRequestHandler 并启动

  • 在初始化 KafkaRequestHandler的时候,我们发现其入参有个 requestChannel,这个入参是 Processor 存放 request 请求的地方,也是 Handler 处理完请求存放 response 的地方

  1. class KafkaRequestHandlerPool(val brokerId: Int,
  2. val requestChannel: RequestChannel,
  3. val apis: KafkaApis,
  4. numThreads: Int) extends Logging with KafkaMetricsGroup {
  5. private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
  6. val threads = new Array[Thread](numThreads)
  7. val runnables = new Array[KafkaRequestHandler](numThreads)
  8. for(i <- 0 until numThreads) {
  9. // 开启 numThreads 个 KafkaRequestHandler 并启动
  10. runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
  11. threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
  12. threads(i).start()
  13. }
  14. }

2.2 KafkaRequestHandler

  • 从 RequestChannel 得到 Requests 并交由 KafkaApis 去处理

  1. def run() {
  2. while(true) {
  3. try {
  4. var req : RequestChannel.Request = null
  5. while (req == null) {
  6. val startSelectTime = SystemTime.nanoseconds
  7. req = requestChannel.receiveRequest(300)
  8. val idleTime = SystemTime.nanoseconds - startSelectTime
  9. aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
  10. }
  11. req.requestDequeueTimeMs = SystemTime.milliseconds
  12. apis.handle(req)
  13. }
  14. }
  15. }

3、KafkaApis

上面讲到 KafkaRequestHandlerRequestChannel 得到 Requests 并交由 KafkaApis 去处理

那么到底是一个怎么样的处理逻辑呢?

  1. def handle(request: RequestChannel.Request) {
  2. try {
  3. format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
  4. ApiKeys.forId(request.requestId) match {
  5. case ApiKeys.PRODUCE => handleProducerRequest(request)
  6. case ApiKeys.FETCH => handleFetchRequest(request)
  7. case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
  8. case ApiKeys.METADATA => handleTopicMetadataRequest(request)
  9. case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
  10. case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
  11. case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
  12. case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
  13. case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
  14. case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
  15. case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
  16. case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
  17. case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
  18. case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
  19. case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
  20. case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
  21. case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
  22. case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
  23. case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
  24. case requestId => throw new KafkaException("Unknown api code " + requestId)
  25. }
  26. } finally
  27. request.apiLocalCompleteTimeMs = SystemTime.milliseconds
  28. }
  • 根据 ApiKeys 不同的类别,走不同的处理方式

  • 这里的类别

  1. PRODUCE(0, "Produce"),
  2. FETCH(1, "Fetch"),
  3. LIST_OFFSETS(2, "Offsets"),
  4. METADATA(3, "Metadata"),
  5. LEADER_AND_ISR(4, "LeaderAndIsr"),
  6. STOP_REPLICA(5, "StopReplica"),
  7. UPDATE_METADATA_KEY(6, "UpdateMetadata"),
  8. CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
  9. OFFSET_COMMIT(8, "OffsetCommit"),
  10. OFFSET_FETCH(9, "OffsetFetch"),
  11. GROUP_COORDINATOR(10, "GroupCoordinator"),
  12. JOIN_GROUP(11, "JoinGroup"),
  13. HEARTBEAT(12, "Heartbeat"),
  14. LEAVE_GROUP(13, "LeaveGroup"),
  15. SYNC_GROUP(14, "SyncGroup"),
  16. DESCRIBE_GROUPS(15, "DescribeGroups"),
  17. LIST_GROUPS(16, "ListGroups"),
  18. SASL_HANDSHAKE(17, "SaslHandshake"),
  19. API_VERSIONS(18, "ApiVersions");

3.1 响应返回

当我们的 ApiKeys 处理完相对应的请求时,会执行以下方法:

  1. // 将响应发送回套接字服务器,以便通过网络发送
  2. def sendResponse(response: RequestChannel.Response) {
  3. // 将得到的响应放入到 responseQueues 中
  4. responseQueues(response.processor).put(response)
  5. for(onResponse <- responseListeners)
  6. // 调用对应 processor 的 wakeup 方法
  7. onResponse(response.processor)
  8. }

至于每个类型的请求是如何处理的,这一章我们暂时不讲

我们继续完善一下上面的图片:

三、问题解析

经过我们上面的讲述,相信大家对整个 服务端网络整体架构 有了更深的认识

还记得我在文中提到的两个问题嘛?

  • Kafka 如何在高吞吐的状态下仍然能保证单 Partition 的有序性?

  • 为什么要频繁的删除掉 OP_READ 事件监听、增加 OP_READ 事件监听?

接下来就是见证奇迹的时刻,也是面试的时候装逼的时刻,这一刻,你就是天选!

首先,我们从生产者的发送讲起,众所周知,生产者在发送服务端时会将相同 Partition 的放到一起,具体可见:Kafka 生产者全流程

所以我们的客户端与服务端的请求如下:

从上面我们可以看到,客户端(Producer)向服务端发送了 1、2、3 总共三条数据且三条数据处于一个 Partition。

对于这三条数据来说,发送时是有序的,按照 1、2、3 的顺序,服务端落日志肯定也是有序的 1、2、3

问题来了,我们上面讲了客户端的请求都会被扔到 requestQueue 中,让 KafkaRequestHandler 去通过 KafkaApis 处理并将响应扔到 responseQueues 中

假如,我们全程没有不去删除 OP_READ 事件监听,会发生什么情况?大家可以想一下,给个提示:KafkaRequestHandler是多线程的

如上图所示,如果我们 不去删除 OP_READ 事件监听的话,我们的 1、2、3 三条信息会都放入到 requestQueue 中,那么我们的 KafkaRequestHandler 去拉取的时候,会出现乱序的现象。

比如,我们三个 KafkaRequestHandler 分别拉取到一条消息:

这个时候,三个 KafkaRequestHandler 线程同时去调用 KafkaApis 落日志,那么这种方式怎么可能保证有序性呢?

kafka 的开发者采取了 mute 的解决方式,将所有接受的事件先放到 kernel 中,每次只取一个请求,取完就关闭,等该请求的 response 过来后,再重新增加 OP_READ 事件的监听。

通过上述的方式,kafka 做到了分区落日志的有序性。

四、总结

这一篇文章主要从 Kafka 服务端的网络架构入手,剖析了服务端网络如何连接、如何处理、如何返回的。

1 + N + M 的架构思想

  • 1Acceptor

  • NProcessor

  • MKafkaRequestHandler

对应关系如下:

  • boss ====》Acceptor ===》前台

  • work ====》Processor ===》服务员

这里讲一个故事更形象化一些:

当你去酒店住宿的时候,首先需要去前台登记入住手续,登记完成后,前台会给你一个房间的钥匙。这个就相当于我们连接初始化连接的时候,boss 为刚连接进来的客户端分配 SocketChannel。

之后,前台会让服务员领你去房间,如果你有什么需要,都可以跟这个服务员说。这个相当于我们的 boss 将该客户端的连接交给了 work 线程,任何的业务处理都交由 work 线程去做。

最后强调一下:kafka 的网络架构使用了 Reactor 模型,利用 1 + N + M 的架构模式,将 kafka 的通信支撑起来,最后通过 mute 的方法保障了分区有序性。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/142566
推荐阅读
相关标签
  

闽ICP备14008679号