上篇介绍了NetworkReceive
当接收到NetworkReceive, Processor会构造了Request实例,发送给RequestChannel
- private def processCompletedReceives() {
- selector.completedReceives.asScala.foreach { receive =>
- val openChannel = selector.channel(receive.source)
- val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
- val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
- // 创建Request实例
- val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
- buffer = receive.payload, startTimeNanos = time.nanoseconds,
- listenerName = listenerName, securityProtocol = securityProtocol)
- requestChannel.sendRequest(req)
- selector.mute(receive.source)
- }
- }
Request
Request表示请求,它有两个主要的属性。
header是通用请求的头部
bodyAndSize是请求的数据部分,它根据不同类型的请求,返回不同的实例
- case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer,
- startTimeNanos: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
- val requestId = buffer.getShort()
- // 这里只是为了支持v0版本的shutdown请求
- val requestObj: RequestOrResponse = if (requestId == ApiKeys.CONTROLLED_SHUTDOWN_KEY.id)
- ControlledShutdownRequest.readFrom(buffer)
- else
- null
-
- val header: RequestHeader =
- if (requestObj == null) {
- buffer.rewind
- // 使用RequestHeader的类方法解析
- try RequestHeader.parse(buffer)
- catch {
- case ex: Throwable =>
- throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: $requestId", ex)
- }
- } else
- null
- val bodyAndSize: RequestAndSize =
- if (requestObj == null)
- try {
- if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) {
- new RequestAndSize(new ApiVersionsRequest.Builder().build(), 0)
- }
- else
- // 根据apiKey,apiVersion和buffer,实例化RequestAndSize
- AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
- } catch {
- case ex: Throwable =>
- throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
- }
- else
- null
-
- buffer = null