赞
踩
RequestChannel,顾名思义,就是传输 Request/Response 的通道。现在我们先看看RequestChannel 源码中的 Request 定义
Request
- sealed trait BaseRequest
- case object ShutdownRequest extends BaseRequest
-
- class Request(val processor: Int, //processor 是 Processor 线程的序号,即这个请求是由哪个 Processor 线程接收处理的
- //为什么要保存 Processor 线程序号呢?这是因为,当 Request 被后面的 I/O 线程处理完成后,还要依靠 Processor 线程发送 Response 给请求发送方,
- //因此,Request 中必须记录它之前是被哪个 Processor 线程接收的。
- // Processor 线程仅仅是网络接收线程,不会执行真正的 Request 请求处理逻辑,那是 I/O 线程负责的事情。
- val context: RequestContext, // context 是用来标识请求上下文信息,具体定义见下面RequestContext
- val startTimeNanos: Long, // startTimeNanos 记录了 Request 对象被创建的时间,主要用于各种时间统计指标的计算。
- memoryPool: MemoryPool, // memoryPool 表示源码定义的一个非阻塞式的内存缓冲区,主要作用是避免 Request 对象无限使用内存。
- @volatile private var buffer: ByteBuffer, //buffer 是真正保存 Request 对象内容的字节缓冲区。
- // Request 发送方必须按照 Kafka RPC 协议规定的格式向该缓冲区写入字节,否则将抛出 InvalidRequestException 异常
- metrics: RequestChannel.Metrics // metrics 是 Request 相关的各种监控指标的一个管理类。它里面构建了一个 Map,封装了所有的请求 JMX 指标。
- ) extends BaseRequest {
- ......
- }
RequestContext
context 是用来标识请求上下文信息的。Kafka 源码中定义了 RequestContext 类,顾名思义,它保存了有关 Request 的所有上下文信息。RequestContext 类定义在 clients 工程中
- public class RequestContext implements AuthorizableRequestContext {
- public final RequestHeader header; // Request头部数据,主要是一些对用户不可见的元数据信息,如Request类型、Request API版本、clientId等
- public final String connectionId; // Request发送方的TCP连接串标识,由Kafka根据一定规则定义,主要用于表示TCP连接
- public final InetAddress clientAddress; // Request发送方IP地址
- public final KafkaPrincipal principal; // Kafka用户认证类,用于认证授权
- public final ListenerName listenerName; // 监听器名称,可以是预定义的监听器(如PLAINTEXT),也可自行定义
- public final SecurityProtocol securityProtocol; // 安全协议类型,目前支持4种:PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL
- public final ClientInformation clientInformation; // 用户自定义的一些连接方信息
- // 从给定的ByteBuffer中提取出Request和对应的Size值
- public RequestAndSize parseRequest(ByteBuffer buffer) {
- if (isUnsupportedApiVersionsRequest()) {
- // Unsupported ApiVersion requests are treated as v0 requests and are not parsed
- // 不支持的ApiVersions请求类型被视为是V0版本的请求,并且不做解析操作,直接返回
- ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest(new ApiVersionsRequestData(), (short) 0, header.apiVersion());
- return new RequestAndSize(apiVersionsRequest, 0);
- } else {
- // 从请求头部数据中获取ApiKeys信息
- ApiKeys apiKey = header.apiKey();
- try {
- // 从请求头部数据中获取版本信息
- short apiVersion = header.apiVersion();
- // 解析请求
- Struct struct = apiKey.parseRequest(apiVersion, buffer);
- AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct);
- // 封装解析后的请求对象以及请求大小返回
- return new RequestAndSize(body, struct.sizeOf());
- } catch (Throwable ex) {
- // 解析过程中出现任何问题都视为无效请求,抛出异常
- throw new InvalidRequestException("Error getting request for apiKey: " + apiKey +
- ", apiVersion: " + header.apiVersion() +
- ", connectionId: " + connectionId +
- ", listenerName: " + listenerName +
- ", principal: " + principal, ex);
- }
- }
- }
- // 其他Getter方法
- ......
- }
Response
Kafka 为 Response 定义了 1 个抽象父类和 5 个具体子类
Response 相关的代码部分
- abstract class Response(val request: Request) {
- locally {
- val nowNs = Time.SYSTEM.nanoseconds
- request.responseCompleteTimeNanos = nowNs
- if (request.apiLocalCompleteTimeNanos == -1L)
- request.apiLocalCompleteTimeNanos = nowNs
- }
- def processor: Int = request.processor
- def responseString: Option[String] = Some("")
- def onComplete: Option[Send => Unit] = None
- override def toString: String
- }
这个抽象基类只有一个属性字段:request。这就是说,每个 Response 对象都要保存它对应的 Request 对象。
onComplete 方法是调用指定回调逻辑的地方。SendResponse 类就是复写(Override)了这个方法
这里的 SendResponse 类继承了 Response 父类,并重新定义了 onComplete 方法。复写的逻辑很简单,就是指定输入参数 onCompleteCallback。
- class SendResponse(request: Request,
- val responseSend: Send,
- val responseAsString: Option[String],
- val onCompleteCallback: Option[Send => Unit]) extends Response(request) {
- override def responseString: Option[String] = responseAsString
-
- override def onComplete: Option[Send => Unit] = onCompleteCallback
-
- override def toString: String =
- s"Response(type=Send, request=$request, send=$responseSend, asString=$responseAsString)"
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。