当前位置:   article > 正文

Kafka请求处理模块(七):KafkaApis_describeclientquotas

describeclientquotas

请求的实际处理逻辑是封装在 KafkaApis 类中,KafkaApis 类的定义代码如下

  1. class KafkaApis(val requestChannel: RequestChannel, // 请求通道
  2. val replicaManager: ReplicaManager, // 副本管理器,控制集群所有副本的状态转换
  3. val adminManager: AdminManager, // topic、分区配置等管理器
  4. val groupCoordinator: GroupCoordinator, // 消费者组协调器组件
  5. val txnCoordinator: TransactionCoordinator, // 事务管理器组件
  6. val controller: KafkaController, // 控制器组件,管理与保存原数据
  7. val zkClient: KafkaZkClient, // // ZooKeeper客户端程序,Kafka依赖于该类实现与ZooKeeper交互
  8. val brokerId: Int, // broker.id参数值
  9. val config: KafkaConfig, // Kafka配置类
  10. val metadataCache: MetadataCache, // 元数据缓存类
  11. val metrics: Metrics,
  12. val authorizer: Option[Authorizer],
  13. val quotas: QuotaManagers, // 配额管理器组件
  14. val fetchManager: FetchManager,
  15. brokerTopicStats: BrokerTopicStats,
  16. val clusterId: String,
  17. time: Time,
  18. val tokenManager: DelegationTokenManager) extends Logging {
  19. }

KafkaApis 的 handle 方法对个各种请求进行处理。

  1. def handle(request: RequestChannel.Request): Unit = {
  2. try {
  3. trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
  4. s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
  5. // 根据请求头部信息中的apiKey字段判断属于哪类请求
  6. // 然后调用响应的handle***方法
  7. // 如果新增RPC协议类型,则:
  8. // 1. 添加新的apiKey标识新请求类型
  9. // 2. 添加新的case分支
  10. // 3. 添加对应的handle***方法
  11. request.header.apiKey match {
  12. case ApiKeys.PRODUCE => handleProduceRequest(request)
  13. case ApiKeys.FETCH => handleFetchRequest(request)
  14. case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
  15. case ApiKeys.METADATA => handleTopicMetadataRequest(request)
  16. case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
  17. case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
  18. case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
  19. case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
  20. case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
  21. case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
  22. case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
  23. case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
  24. case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
  25. case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
  26. case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
  27. case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
  28. case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
  29. case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
  30. case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
  31. case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
  32. case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
  33. case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
  34. case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
  35. case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
  36. case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
  37. case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
  38. case ApiKeys.END_TXN => handleEndTxnRequest(request)
  39. case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
  40. case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
  41. case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
  42. case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
  43. case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
  44. case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
  45. case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
  46. case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
  47. case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
  48. case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
  49. case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
  50. case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
  51. case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
  52. case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
  53. case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
  54. case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
  55. case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
  56. case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
  57. case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignmentsRequest(request)
  58. case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request)
  59. case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request)
  60. case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
  61. case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotasRequest(request)
  62. }
  63. } catch {
  64. // 如果是严重错误,则抛出异常
  65. case e: FatalExitError => throw e
  66. // 普通异常的话,记录下错误日志
  67. case e: Throwable => handleError(request, e)
  68. } finally {
  69. // 记录一下请求本地完成时间,即Broker处理完该请求的时间
  70. if (request.apiLocalCompleteTimeNanos < 0)
  71. request.apiLocalCompleteTimeNanos = time.nanoseconds
  72. }
  73. }

        其他重要方法抛开 KafkaApis 的定义和 handle 方法,还有几个常用的方法也很重要,比如,用于发送 Response 的一组方法,以及用于鉴权的方法。
        特别是前者,它是任何一类请求被处理之后都要做的必要步骤。毕竟,请求被处理完成还不够,Kafka 还需要把处理结果发送给请求发送方。
        首先就是 sendResponse 系列方法。源码中带有 sendResponse 字眼的方法有 7 个之多。

sendResponse
        该方法接收的实际上是 Request,而非 Response,因此,它会在内部构造出 Response 对象之后,再调用 sendResponse 方法。

  1. private def sendResponse(request: RequestChannel.Request,
  2. responseOpt: Option[AbstractResponse],
  3. onComplete: Option[Send => Unit]): Unit = {
  4. // Update error metrics for each error code in the response including Errors.NONE
  5. responseOpt.foreach(response => requestChannel.updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala))
  6. val response = responseOpt match {
  7. case Some(response) =>
  8. val responseSend = request.context.buildResponse(response)
  9. val responseString =
  10. if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
  11. else None
  12. new RequestChannel.SendResponse(request, responseSend, responseString, onComplete)
  13. case None =>
  14. new RequestChannel.NoOpResponse(request)
  15. }
  16. requestChannel.sendResponse(response)
  17. }

  sendNoOpResponseExemptThrottle

        发送 NoOpResponse 类型的 Response 而不受请求通道上限流(throttling)的限制。所谓的 NoOpResponse,是指 Processor 线程取出该类型的 Response 后,不执行真正的 I/O 发送操作。

  1. private def sendNoOpResponseExemptThrottle(request: RequestChannel.Request): Unit = {
  2. quotas.request.maybeRecordExempt(request)
  3. sendResponse(request, None, None)
  4. }

sendErrorResponseExemptThrottle

        发送携带错误信息的 Response 而不受限流限制。 

  1. private def sendErrorResponseExemptThrottle(request: RequestChannel.Request, error: Throwable): Unit = {
  2. quotas.request.maybeRecordExempt(request)
  3. sendErrorOrCloseConnection(request, error, 0)
  4. }

sendResponseExemptThrottle

        发送普通 Response 而不受限流限制。

  1. private def sendResponseExemptThrottle(request: RequestChannel.Request,
  2. response: AbstractResponse,
  3. onComplete: Option[Send => Unit] = None): Unit = {
  4. quotas.request.maybeRecordExempt(request)
  5. sendResponse(request, Some(response), onComplete)
  6. }

sendErrorResponseMaybeThrottle

        发送携带错误信息的 Response 但接受限流的约束。

  1. private def sendErrorResponseMaybeThrottle(request: RequestChannel.Request, error: Throwable): Unit = {
  2. val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
  3. quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse)
  4. sendErrorOrCloseConnection(request, error, throttleTimeMs)
  5. }

sendResponseMaybeThrottle

        发送普通 Response 但接受限流的约束。

  1. private def sendResponseMaybeThrottle(request: RequestChannel.Request,
  2. createResponse: Int => AbstractResponse,
  3. onComplete: Option[Send => Unit] = None): Unit = {
  4. val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
  5. quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse)
  6. sendResponse(request, Some(createResponse(throttleTimeMs)), onComplete)
  7. }

        KafkaApis 实际上是把处理完成的 Response 放回到前端 Processor 线程的 Response 队列中,而真正将 Response 返还给 Clients 或其他 Broker 的,其实是 Processor 线程,而不是执行 KafkaApis 逻辑的 KafkaRequestHandler 线程。

        KafkaApis 请求处理实例解析,以 handleListGroupsRequest 方法为例来介绍一下

  1. def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
  2. val (error, groups) = groupCoordinator.handleListGroups() // 调用GroupCoordinator的handleListGroups方法拿到所有Group信息
  3. // 如果Clients具备CLUSTER资源的DESCRIBE权限
  4. if (authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME))
  5. // 直接使用刚才拿到的Group数据封装进Response然后发送
  6. sendResponseMaybeThrottle(request, requestThrottleMs =>
  7. new ListGroupsResponse(new ListGroupsResponseData()
  8. .setErrorCode(error.code)
  9. .setGroups(groups.map { group => new ListGroupsResponseData.ListedGroup()
  10. .setGroupId(group.groupId)
  11. .setProtocolType(group.protocolType)}.asJava
  12. )
  13. .setThrottleTimeMs(requestThrottleMs)
  14. ))
  15. else {
  16. // 找出Clients对哪些Group有GROUP资源的DESCRIBE权限,返回这些Group信息
  17. val filteredGroups = groups.filter(group => authorize(request, DESCRIBE, GROUP, group.groupId))
  18. sendResponseMaybeThrottle(request, requestThrottleMs =>
  19. new ListGroupsResponse(new ListGroupsResponseData()
  20. .setErrorCode(error.code)
  21. .setGroups(filteredGroups.map { group => new ListGroupsResponseData.ListedGroup()
  22. .setGroupId(group.groupId)
  23. .setProtocolType(group.protocolType)}.asJava
  24. )
  25. .setThrottleTimeMs(requestThrottleMs)
  26. ))
  27. }
  28. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/626041
推荐阅读
相关标签
  

闽ICP备14008679号