当前位置:   article > 正文

kafka-请求头部解析

kafka requestheader

上篇介绍了NetworkReceive

当接收到NetworkReceive, Processor会构造了Request实例,发送给RequestChannel

  1. private def processCompletedReceives() {
  2. selector.completedReceives.asScala.foreach { receive =>
  3. val openChannel = selector.channel(receive.source)
  4. val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
  5. val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
  6. // 创建Request实例
  7. val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
  8. buffer = receive.payload, startTimeNanos = time.nanoseconds,
  9. listenerName = listenerName, securityProtocol = securityProtocol)
  10. requestChannel.sendRequest(req)
  11. selector.mute(receive.source)
  12. }
  13. }

Request

Request表示请求,它有两个主要的属性。

header是通用请求的头部

bodyAndSize是请求的数据部分,它根据不同类型的请求,返回不同的实例

  1. case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer,
  2. startTimeNanos: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
  3. val requestId = buffer.getShort()
  4. // 这里只是为了支持v0版本的shutdown请求
  5. val requestObj: RequestOrResponse = if (requestId == ApiKeys.CONTROLLED_SHUTDOWN_KEY.id)
  6. ControlledShutdownRequest.readFrom(buffer)
  7. else
  8. null
  9. val header: RequestHeader =
  10. if (requestObj == null) {
  11. buffer.rewind
  12. // 使用RequestHeader的类方法解析
  13. try RequestHeader.parse(buffer)
  14. catch {
  15. case ex: Throwable =>
  16. throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: $requestId", ex)
  17. }
  18. } else
  19. null
  20. val bodyAndSize: RequestAndSize =
  21. if (requestObj == null)
  22. try {
  23. if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) {
  24. new RequestAndSize(new ApiVersionsRequest.Builder().build(), 0)
  25. }
  26. else
  27. // 根据apiKey,apiVersion和buffer,实例化RequestAndSize
  28. AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
  29. } catch {
  30. case ex: Throwable =>
  31. throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
  32. }
  33. else
  34. null
  35. buffer = null
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/766942
推荐阅读
相关标签
  

闽ICP备14008679号