当前位置:   article > 正文

Kafka请求处理模块(一):RequestChannel的Request与Response_error getting request for apikey

error getting request for apikey

RequestChannel,顾名思义,就是传输 Request/Response 的通道。现在我们先看看RequestChannel 源码中的 Request 定义

Request

  1. sealed trait BaseRequest
  2. case object ShutdownRequest extends BaseRequest
  3. class Request(val processor: Int, //processor 是 Processor 线程的序号,即这个请求是由哪个 Processor 线程接收处理的
  4. //为什么要保存 Processor 线程序号呢?这是因为,当 Request 被后面的 I/O 线程处理完成后,还要依靠 Processor 线程发送 Response 给请求发送方,
  5. //因此,Request 中必须记录它之前是被哪个 Processor 线程接收的。
  6. // Processor 线程仅仅是网络接收线程,不会执行真正的 Request 请求处理逻辑,那是 I/O 线程负责的事情。
  7. val context: RequestContext, // context 是用来标识请求上下文信息,具体定义见下面RequestContext
  8. val startTimeNanos: Long, // startTimeNanos 记录了 Request 对象被创建的时间,主要用于各种时间统计指标的计算。
  9. memoryPool: MemoryPool, // memoryPool 表示源码定义的一个非阻塞式的内存缓冲区,主要作用是避免 Request 对象无限使用内存。
  10. @volatile private var buffer: ByteBuffer, //buffer 是真正保存 Request 对象内容的字节缓冲区。
  11. // Request 发送方必须按照 Kafka RPC 协议规定的格式向该缓冲区写入字节,否则将抛出 InvalidRequestException 异常
  12. metrics: RequestChannel.Metrics // metrics 是 Request 相关的各种监控指标的一个管理类。它里面构建了一个 Map,封装了所有的请求 JMX 指标。
  13. ) extends BaseRequest {
  14. ......
  15. }

RequestContext
context 是用来标识请求上下文信息的。Kafka 源码中定义了 RequestContext 类,顾名思义,它保存了有关 Request 的所有上下文信息。RequestContext 类定义在 clients 工程中

  1. public class RequestContext implements AuthorizableRequestContext {
  2. public final RequestHeader header; // Request头部数据,主要是一些对用户不可见的元数据信息,如Request类型、Request API版本、clientId等
  3. public final String connectionId; // Request发送方的TCP连接串标识,由Kafka根据一定规则定义,主要用于表示TCP连接
  4. public final InetAddress clientAddress; // Request发送方IP地址
  5. public final KafkaPrincipal principal; // Kafka用户认证类,用于认证授权
  6. public final ListenerName listenerName; // 监听器名称,可以是预定义的监听器(如PLAINTEXT),也可自行定义
  7. public final SecurityProtocol securityProtocol; // 安全协议类型,目前支持4种:PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL
  8. public final ClientInformation clientInformation; // 用户自定义的一些连接方信息
  9. // 从给定的ByteBuffer中提取出Request和对应的Size值
  10. public RequestAndSize parseRequest(ByteBuffer buffer) {
  11. if (isUnsupportedApiVersionsRequest()) {
  12. // Unsupported ApiVersion requests are treated as v0 requests and are not parsed
  13. // 不支持的ApiVersions请求类型被视为是V0版本的请求,并且不做解析操作,直接返回
  14. ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest(new ApiVersionsRequestData(), (short) 0, header.apiVersion());
  15. return new RequestAndSize(apiVersionsRequest, 0);
  16. } else {
  17. // 从请求头部数据中获取ApiKeys信息
  18. ApiKeys apiKey = header.apiKey();
  19. try {
  20. // 从请求头部数据中获取版本信息
  21. short apiVersion = header.apiVersion();
  22. // 解析请求
  23. Struct struct = apiKey.parseRequest(apiVersion, buffer);
  24. AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct);
  25. // 封装解析后的请求对象以及请求大小返回
  26. return new RequestAndSize(body, struct.sizeOf());
  27. } catch (Throwable ex) {
  28. // 解析过程中出现任何问题都视为无效请求,抛出异常
  29. throw new InvalidRequestException("Error getting request for apiKey: " + apiKey +
  30. ", apiVersion: " + header.apiVersion() +
  31. ", connectionId: " + connectionId +
  32. ", listenerName: " + listenerName +
  33. ", principal: " + principal, ex);
  34. }
  35. }
  36. }
  37. // 其他Getter方法
  38. ......
  39. }

Response

Kafka 为 Response 定义了 1 个抽象父类和 5 个具体子类

  • Response:定义 Response 的抽象基类。每个 Response 对象都包含了对应的 Request 对象。这个类里最重要的方法是 onComplete 方法,用来实现每类 Response 被处理后需要执行的回调逻辑。
  • SendResponse:Kafka 大多数 Request 处理完成后都需要执行一段回调逻辑,SendResponse 就是保存返回结果的 Response 子类。里面最重要的字段是 onCompletionCallback,即指定处理完成之后的回调逻辑。
  • NoResponse:有些 Request 处理完成后无需单独执行额外的回调逻辑。NoResponse 就是为这类 Response 准备的。
  • CloseConnectionResponse:用于出错后需要关闭 TCP 连接的场景,此时返回 CloseConnectionResponse 给 Request 发送方,显式地通知它关闭连接。
  • StartThrottlingResponse:用于通知 Broker 的 Socket Server 组件(后面几节课我会讲到它)某个 TCP 连接通信通道开始被限流(throttling)。
  • EndThrottlingResponse:与 StartThrottlingResponse 对应,通知 Broker 的 SocketServer 组件某个 TCP 连接通信通道的限流已结束。

Response 相关的代码部分

  1. abstract class Response(val request: Request) {
  2. locally {
  3. val nowNs = Time.SYSTEM.nanoseconds
  4. request.responseCompleteTimeNanos = nowNs
  5. if (request.apiLocalCompleteTimeNanos == -1L)
  6. request.apiLocalCompleteTimeNanos = nowNs
  7. }
  8. def processor: Int = request.processor
  9. def responseString: Option[String] = Some("")
  10. def onComplete: Option[Send => Unit] = None
  11. override def toString: String
  12. }

这个抽象基类只有一个属性字段:request。这就是说,每个 Response 对象都要保存它对应的 Request 对象。
onComplete 方法是调用指定回调逻辑的地方。SendResponse 类就是复写(Override)了这个方法
这里的 SendResponse 类继承了 Response 父类,并重新定义了 onComplete 方法。复写的逻辑很简单,就是指定输入参数 onCompleteCallback。

  1. class SendResponse(request: Request,
  2. val responseSend: Send,
  3. val responseAsString: Option[String],
  4. val onCompleteCallback: Option[Send => Unit]) extends Response(request) {
  5. override def responseString: Option[String] = responseAsString
  6. override def onComplete: Option[Send => Unit] = onCompleteCallback
  7. override def toString: String =
  8. s"Response(type=Send, request=$request, send=$responseSend, asString=$responseAsString)"
  9. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/626045
推荐阅读
相关标签
  

闽ICP备14008679号