赞
踩
首先创建kafka topic的命令是下面这个
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name \--partitions 20 --replication-factor 3 --config x=y
--bootstrap-server
某一台kafka服务器地址和端口
--create
代表这个命令是创建
--topic
后面是想创建的topic
partitions
主动设置分区数
--replication-factor
主动设置一个分区数中有几个副本
--config x=y
在命令行上添加的配置会覆盖服务器的默认设置,例如数据应该保留的时间长度。此处记录了完整的每个主题配置集。选填
之后再看kafka-topics.sh
里面的命令
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
知道了其实是执行了源码core/src/main/scala/kafka/admin/TopicCommand.scala
文件中的方法
这里需要注意的是从kafka 2.8以后,删除了ZooKeeper,通过KRaft
进行自己的集群管理,所以下面源码中没有ZookeeperTopicService
这个创建topic的方法了
object TopicCommand extends Logging { def main(args: Array[String]): Unit = { val opts = new TopicCommandOptions(args) opts.checkArgs() //初始化得到实例化的topicService val topicService = TopicService(opts.commandConfig, opts.bootstrapServer) var exitCode = 0 try { if (opts.hasCreateOption) //这个是通过判断命令中的是否是--create 关键字来判断是否执行createTopic topicService.createTopic(opts) else if (opts.hasAlterOption) topicService.alterTopic(opts) else if (opts.hasListOption) topicService.listTopics(opts) else if (opts.hasDescribeOption) topicService.describeTopic(opts) else if (opts.hasDeleteOption) topicService.deleteTopic(opts) } catch { case e: ExecutionException => if (e.getCause != null) printException(e.getCause) else printException(e) exitCode = 1 case e: Throwable => printException(e) exitCode = 1 } finally { topicService.close() Exit.exit(exitCode) } }
TopicService(opts.commandConfig, opts.bootstrapServer)
执行的是下面的方法中的apply
object TopicService {
def createAdminClient(commandConfig: Properties, bootstrapServer: Option[String]): Admin = {
bootstrapServer match {
case Some(serverList) => commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, serverList)
case None =>
}
Admin.create(commandConfig)
}
def apply(commandConfig: Properties, bootstrapServer: Option[String]): TopicService =
new TopicService(createAdminClient(commandConfig, bootstrapServer))
}
之后又调用的createAdminClient
创建的一个客户端,来创建topic
下面就是验证参数,是否指定参数设置等等,之后调用新创建的clien创建topic
case class TopicService private (adminClient: Admin) extends AutoCloseable { def createTopic(opts: TopicCommandOptions): Unit = { //创建一个topic,把输入参数,比如分区数,副本数等等参数设置上 val topic = new CommandTopicPartition(opts) if (Topic.hasCollisionChars(topic.name)) //检查topic名称中的特殊字符 println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could " + "collide. To avoid issues it is best to use either, but not both.") createTopic(topic) } def createTopic(topic: CommandTopicPartition): Unit = { // //如果配置了副本副本数--replication-factor 一定要大于0 if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1)) throw new IllegalArgumentException(s"The replication factor must be between 1 and ${Short.MaxValue} inclusive") //如果配置了--partitions 分区数 必须大于0 if (topic.partitions.exists(partitions => partitions < 1)) throw new IllegalArgumentException(s"The partitions must be greater than 0") try { val newTopic = if (topic.hasReplicaAssignment) // 如果指定了--replica-assignment参数;则按照指定的来分配副本 new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get)) else { new NewTopic( topic.name, topic.partitions.asJava, topic.replicationFactor.map(_.toShort).map(Short.box).asJava) } //将配置--config 解析成一个配置map val configsMap = topic.configsToAdd.stringPropertyNames() .asScala .map(name => name -> topic.configsToAdd.getProperty(name)) .toMap.asJava newTopic.configs(configsMap) //调用adminClient创建Topic val createResult = adminClient.createTopics(Collections.singleton(newTopic), new CreateTopicsOptions().retryOnQuotaViolation(false)) createResult.all().get() println(s"Created topic ${topic.name}.") } catch { case e : ExecutionException => if (e.getCause == null) throw e if (!(e.getCause.isInstanceOf[TopicExistsException] && topic.ifTopicDoesntExist())) throw e.getCause } }
在KafkaAdminClient.java
中的createTopics
方法
@Override public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options) { final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> topicFutures = new HashMap<>(newTopics.size()); final CreatableTopicCollection topics = new CreatableTopicCollection(); //遍历要创建的topic集合 for (NewTopic newTopic : newTopics) { if (topicNameIsUnrepresentable(newTopic.name())) { //topic名称不存在 KafkaFutureImpl<TopicMetadataAndConfig> future = new KafkaFutureImpl<>(); future.completeExceptionally(new InvalidTopicException("The given topic name '" + newTopic.name() + "' cannot be represented in a request.")); topicFutures.put(newTopic.name(), future); } else if (!topicFutures.containsKey(newTopic.name())) {//防止发一次创建多个topic时有重复的 topicFutures.put(newTopic.name(), new KafkaFutureImpl<>()); topics.add(newTopic.convertToCreatableTopic()); } } //如果topics不为null。则去创建 if (!topics.isEmpty()) { final long now = time.milliseconds(); final long deadline = calcDeadlineMs(now, options.timeoutMs()); //初始化创建topic的调用, final Call call = getCreateTopicsCall(options, topicFutures, topics, Collections.emptyMap(), now, deadline); //这里面才是调用,上面call只是初始化 runnable.call(call, now); } return new CreateTopicsResult(new HashMap<>(topicFutures)); }
为什么先讲解这个?而不是先
getCreateTopicsCall
?因为我觉得先看这个比较好理解,因为它不是单调执行的一步到位,比如先看getCreateTopicsCall
会有点迷糊
/** * Initiate a new call. *发起新呼叫 * This will fail if the AdminClient is scheduled to shut down. *如果 AdminClient 计划关闭,这将失败 * @param call The new call object. * @param now The current time in milliseconds. */ void call(Call call, long now) { if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) { log.debug("The AdminClient is not accepting new calls. Timing out {}.", call); call.handleTimeoutFailure(time.milliseconds(), new TimeoutException("The AdminClient thread is not accepting new calls.")); } else { enqueue(call, now); } } /** * Queue a call for sending. *排队发送呼叫 * If the AdminClient thread has exited, this will fail. Otherwise, it will succeed (even * if the AdminClient is shutting down). This function should called when retrying an * existing call. *如果 AdminClient 线程已退出,这将失败。否则,它将成功(即使 如果 AdminClient 正在关闭)。 * 重试现有调用时应调用此函数 * @param call The new call object. * @param now The current time in milliseconds. */ void enqueue(Call call, long now) { if (call.tries > maxRetries) { log.debug("Max retries {} for {} reached", maxRetries, call); call.handleTimeoutFailure(time.milliseconds(), new TimeoutException( "Exceeded maxRetries after " + call.tries + " tries.")); return; } if (log.isDebugEnabled()) { log.debug("Queueing {} with a timeout {} ms from now.", call, Math.min(requestTimeoutMs, call.deadlineMs - now)); } boolean accepted = false; //把call放到一个newCalls队列中 synchronized (this) { if (!closing) { newCalls.add(call); accepted = true; } } //唤醒线程去执行 if (accepted) { client.wakeup(); // wake the thread if it is in poll()如果线程处于轮询中,则唤醒线程 } else { log.debug("The AdminClient thread has exited. Timing out {}.", call); call.handleTimeoutFailure(time.milliseconds(), new TimeoutException("The AdminClient thread has exited.")); } }
client.wakeup()
唤醒的线程执行下面的
@Override public void run() { log.debug("Thread starting"); try { //这里是处理请求 processRequests(); } finally { closing = true; //省略 log.debug("Exiting AdminClientRunnable thread."); } } private void processRequests() { long now = time.milliseconds(); while (true) { // Copy newCalls into pendingCalls. //将 newCalls 复制到 pendingCalls drainNewCalls(); // Check if the AdminClient thread should shut down. //检查 AdminClient 线程是否应该关闭 long curHardShutdownTimeMs = hardShutdownTimeMs.get(); if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) && threadShouldExit(now, curHardShutdownTimeMs)) break; // Handle timeouts. //处理超时 TimeoutProcessor timeoutProcessor = timeoutProcessorFactory.create(now); timeoutPendingCalls(timeoutProcessor); timeoutCallsToSend(timeoutProcessor); timeoutCallsInFlight(timeoutProcessor); long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs()); if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) { pollTimeout = Math.min(pollTimeout, curHardShutdownTimeMs - now); } // Choose nodes for our pending calls.为我们的待处理呼叫选择节点 pollTimeout = Math.min(pollTimeout, maybeDrainPendingCalls(now)); long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now); if (metadataFetchDelayMs == 0) { metadataManager.transitionToUpdatePending(now); Call metadataCall = makeMetadataCall(now); // Create a new metadata fetch call and add it to the end of pendingCalls. //创建一个新的元数据获取调用并将其添加到 pendingCalls 的末尾 // Assign a node for just the new call (we handled the other pending nodes above). //为新调用分配一个节点(我们处理了上面的其他待处理节点)。 if (!maybeDrainPendingCall(metadataCall, now)) pendingCalls.add(metadataCall); } pollTimeout = Math.min(pollTimeout, sendEligibleCalls(now)); if (metadataFetchDelayMs > 0) { pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs); } // Ensure that we use a small poll timeout if there are pending calls which need to be sent //如果有待发送的呼叫需要发送,请确保我们使用一个小的轮询超时 if (!pendingCalls.isEmpty()) pollTimeout = Math.min(pollTimeout, retryBackoffMs); // Wait for network responses. //等待网络响应 log.trace("Entering KafkaClient#poll(timeout={})", pollTimeout); List<ClientResponse> responses = client.poll(Math.max(0L, pollTimeout), now); log.trace("KafkaClient#poll retrieved {} response(s)", responses.size()); // unassign calls to disconnected nodes //取消对断开节点的调用 unassignUnsentCalls(client::connectionFailed); // Update the current time and handle the latest responses. //更新当前时间并处理最新响应 now = time.milliseconds(); handleResponses(now, responses); } }
sendEligibleCalls
这个方法是实际调用的call的方法
/** * Send the calls which are ready. *发送准备好的电话 * @param now The current time in milliseconds. * @return The minimum timeout we need for poll(). */ private long sendEligibleCalls(long now) { long pollTimeout = Long.MAX_VALUE; for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator(); iter.hasNext(); ) { Map.Entry<Node, List<Call>> entry = iter.next(); List<Call> calls = entry.getValue(); if (calls.isEmpty()) { iter.remove(); continue; } //省略。。。 while (!calls.isEmpty()) { Call call = calls.remove(0); int timeoutMs = Math.min(remainingRequestTime, calcTimeoutMsRemainingAsInt(now, call.deadlineMs)); AbstractRequest.Builder<?> requestBuilder; try { //获得call中的requestBuilder requestBuilder = call.createRequest(timeoutMs); } catch (Throwable t) { call.fail(now, new KafkaException(String.format( "Internal error sending %s to %s.", call.callName, node), t)); continue; } ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true, timeoutMs, null); log.debug("Sending {} to {}. correlationId={}, timeoutMs={}", requestBuilder, node, clientRequest.correlationId(), timeoutMs); //实际调用请求 client.send(clientRequest, now); callsInFlight.put(node.idString(), call); correlationIdToCalls.put(clientRequest.correlationId(), call); break; } } return pollTimeout; }
这里需要多注意一下requestBuilder = call.createRequest(timeoutMs);
这一行,下面getCreateTopicsCall
才是requestBuilder 的初始化
看完上面的runnable.call,下面接着看
getCreateTopicsCall
如何生成Call
的。
private Call getCreateTopicsCall(final CreateTopicsOptions options, final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> futures, final CreatableTopicCollection topics, final Map<String, ThrottlingQuotaExceededException> quotaExceededExceptions, final long now, final long deadline) { return new Call("createTopics", deadline, new ControllerNodeProvider()) { @Override public CreateTopicsRequest.Builder createRequest(int timeoutMs) { return new CreateTopicsRequest.Builder( new CreateTopicsRequestData() .setTopics(topics) .setTimeoutMs(timeoutMs) .setValidateOnly(options.shouldValidateOnly())); } @Override public void handleResponse(AbstractResponse abstractResponse) { //省略.. } private ConfigEntry configEntry(CreatableTopicConfigs config) { return new ConfigEntry( config.name(), config.value(), configSource(DescribeConfigsResponse.ConfigSource.forId(config.configSource())), config.isSensitive(), config.readOnly(), Collections.emptyList(), null, null); } @Override void handleFailure(Throwable throwable) { // If there were any topics retries due to a quota exceeded exception, we propagate // the initial error back to the caller if the request timed out. maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(), throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now)); // Fail all the other remaining futures completeAllExceptionally(futures.values(), throwable); } }; }
其中new ControllerNodeProvider()
返回的是controller列表,这样的话相当于服务端是用controller
接收的,
/**
* Provides the controller node.
* 提供控制器节点
*/
private class ControllerNodeProvider implements NodeProvider {
@Override
public Node provide() {
if (metadataManager.isReady() &&
(metadataManager.controller() != null)) {
return metadataManager.controller();
}
metadataManager.requestUpdate();
return null;
}
}
为什么要加这一步?
1、主要是因为从kafka2.8开始,除了zk我们又有新的选择,用kraft来做zk的工作,并被称为革命性的,但是旧的zk其实没有被废弃,只是提供了新的选择,并且创建topic的命令,Kraft是用控制器API创建的
2、在kakfa服务端启动时,如何选择Kraft还是zk,可以看我的另一篇文章:kafka 3.5 如何选择启用kraft还是ZooKeeper(选择哪个server实现,不涉及到server具体的初始化))
但是不管选择哪一种handler,是实现的下面这个接口
trait ApiRequestHandler {
def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit
}
在kakfaServer.scala
的startup中,把KafkaApis
类放入到dataPlaneRequestProcessor
,再由KafkaRequestHandlerPool
开启守护线程
/** * Start up API for bringing up a single instance of the Kafka server. * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers * 启动用于启动 Kafka 服务器的单个实例的 API。实例化 LogManager、SocketServer 和请求处理程序 - KafkaRequestHandlers */ override def startup(): Unit = { //省略干扰代码 /* start processing requests 处理请求需要初始化的逻辑*/ val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache, brokerEpochManager) //创建KafkaApis def createKafkaApis(requestChannel: RequestChannel): KafkaApis = new KafkaApis( requestChannel = requestChannel, metadataSupport = zkSupport, replicaManager = replicaManager, groupCoordinator = groupCoordinator, txnCoordinator = transactionCoordinator, autoTopicCreationManager = autoTopicCreationManager, brokerId = config.brokerId, config = config, configRepository = configRepository, metadataCache = metadataCache, metrics = metrics, authorizer = authorizer, quotas = quotaManagers, fetchManager = fetchManager, brokerTopicStats = brokerTopicStats, clusterId = clusterId, time = time, tokenManager = tokenManager, apiVersionManager = apiVersionManager) dataPlaneRequestProcessor = createKafkaApis(socketServer.dataPlaneRequestChannel) //把KafkaApis放入到dataPlaneRequestHandlerPool dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", DataPlaneAcceptor.ThreadPrefix) //省略干扰代码 }
其中KafkaRequestHandlerPool
初始化时,会根据配置文件中的num.io.threads
数量,对handler开启多个守护线程
class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: ApiRequestHandler, time: Time, numThreads: Int, requestHandlerAvgIdleMetricName: String, logAndThreadNamePrefix : String) extends Logging { private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads) //runnables 可以处理请求的handler数组,每一个handler都会有一个守护线程执行 val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) for (i <- 0 until numThreads) { //numThreads,服务器用于处理请求的线程数量,配置文件中的关键字num.io.threads,默认8 createHandler(i) } def createHandler(id: Int): Unit = synchronized { //把新生成的Handler加入到runnables进行管理 runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time) //开启一个守护线程,运行KafkaRequestHandler KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start() } }
KafkaRequestHandler
类中执行方法是run()
/** * A thread that answers kafka requests. * 响应 kafka 请求的线程。 */ class KafkaRequestHandler(id: Int, brokerId: Int, val aggregateIdleMeter: Meter, val totalHandlerThreads: AtomicInteger, val requestChannel: RequestChannel, apis: ApiRequestHandler, time: Time) extends Runnable with Logging { //省略干扰代码 private val requestLocal = RequestLocal.withThreadConfinedCaching @volatile private var stopped = false //线程启动执行函数 def run(): Unit = { while (!stopped) { val req = requestChannel.receiveRequest(300) //省略干扰代码 req match { //省略干扰代码 case request: RequestChannel.Request => try { //省略干扰代码 //去调用ApiRequestHandler接口方法handle的某个实现,这里是zk,实现是KafkaApis类 apis.handle(request, requestLocal) } catch { //省略干扰代码 } finally { request.releaseBuffer() } case null => // continue } } completeShutdown() } }
下面是KafkaApis
类中和handle
方法关键的地方
/** * Logic to handle the various Kafka requests * 处理各种 Kafka 请求的逻辑 */ class KafkaApis(val requestChannel: RequestChannel, val metadataSupport: MetadataSupport, val replicaManager: ReplicaManager, val groupCoordinator: GroupCoordinator, val txnCoordinator: TransactionCoordinator, val autoTopicCreationManager: AutoTopicCreationManager, val brokerId: Int, val config: KafkaConfig, val configRepository: ConfigRepository, val metadataCache: MetadataCache, val metrics: Metrics, val authorizer: Option[Authorizer], val quotas: QuotaManagers, val fetchManager: FetchManager, brokerTopicStats: BrokerTopicStats, val clusterId: String, time: Time, val tokenManager: DelegationTokenManager, val apiVersionManager: ApiVersionManager ) extends ApiRequestHandler with Logging { //省略干扰代码 /** * Top-level method that handles all requests and multiplexes to the right api * 处理正确 API 的所有请求和多路复用的顶级方法 */ override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { //省略干扰代码 request.header.apiKey match { case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal) case ApiKeys.FETCH => handleFetchRequest(request) case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request) case ApiKeys.METADATA => handleTopicMetadataRequest(request) case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request) case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal) case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request) case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal).exceptionally(handleError) case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request).exceptionally(handleError) case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request) case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal).exceptionally(handleError) case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request).exceptionally(handleError) case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request).exceptionally(handleError) case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal).exceptionally(handleError) case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupsRequest(request).exceptionally(handleError) case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request).exceptionally(handleError) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest) case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest) case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request) case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, requestLocal) case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request) case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionsToTxnRequest(request, requestLocal) case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request, requestLocal) case ApiKeys.END_TXN => handleEndTxnRequest(request, requestLocal) case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request, requestLocal) case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal).exceptionally(handleError) case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request) case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls) case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls) case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request) case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request) case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request) case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request) case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request) case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest) case ApiKeys.CREATE_DELEGATION_TOKEN => maybeForwardToController(request, handleCreateTokenRequest) case ApiKeys.RENEW_DELEGATION_TOKEN => maybeForwardToController(request, handleRenewTokenRequest) case ApiKeys.EXPIRE_DELEGATION_TOKEN => maybeForwardToController(request, handleExpireTokenRequest) case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request) case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal).exceptionally(handleError) case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders) case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request) case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest) case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest) case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal).exceptionally(handleError) case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request) case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest) case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request) case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => maybeForwardToController(request, handleAlterUserScramCredentialsRequest) case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request) case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, handleUpdateFeatures) case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal) case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request) case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request) case ApiKeys.UNREGISTER_BROKER => forwardToControllerOrFail(request) case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request) case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request) case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request) case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") //省略干扰代码 } }
创建topic命令,关注一下apiKeys
中的下面这条case
case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
下面是handleCreateTopicsRequest()
方法
def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = { val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6) def sendResponseCallback(results: CreatableTopicResultCollection): Unit = { def createResponse(requestThrottleMs: Int): AbstractResponse = { val responseData = new CreateTopicsResponseData() .setThrottleTimeMs(requestThrottleMs) .setTopics(results) val responseBody = new CreateTopicsResponse(responseData) trace(s"Sending create topics response $responseData for correlation id " + s"${request.header.correlationId} to client ${request.header.clientId}.") responseBody } requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, createResponse) } //首先,从请求中获取CreateTopicsRequest对象。 val createTopicsRequest = request.body[CreateTopicsRequest] val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size) //,检查是否有活跃的控制器(controller) if (!zkSupport.controller.isActive) { //如果没有活跃的控制器,将为每个要创建的主题设置错误码为NOT_CONTROLLER,并调用sendResponseCallback方法发送响应。 createTopicsRequest.data.topics.forEach { topic => results.add(new CreatableTopicResult().setName(topic.name) .setErrorCode(Errors.NOT_CONTROLLER.code)) } sendResponseCallback(results) } else { //这是由活跃的控制器 createTopicsRequest.data.topics.forEach { topic => results.add(new CreatableTopicResult().setName(topic.name)) } //将检查集群是否具有创建主题的权限。 val hasClusterAuthorization = authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false) val allowedTopicNames = { val topicNames = createTopicsRequest .data .topics .asScala .map(_.name) .toSet /* The cluster metadata topic is an internal topic with a different implementation. The user should not be * allowed to create it as a regular topic. */ if (topicNames.contains(Topic.CLUSTER_METADATA_TOPIC_NAME)) { info(s"Rejecting creation of internal topic ${Topic.CLUSTER_METADATA_TOPIC_NAME}") } topicNames.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME)) } val authorizedTopics = if (hasClusterAuthorization) { allowedTopicNames.toSet } else { authHelper.filterByAuthorized(request.context, CREATE, TOPIC, allowedTopicNames)(identity) } val authorizedForDescribeConfigs = authHelper.filterByAuthorized( request.context, DESCRIBE_CONFIGS, TOPIC, allowedTopicNames, logIfDenied = false )(identity).map(name => name -> results.find(name)).toMap results.forEach { topic => if (results.findAll(topic.name).size > 1) { topic.setErrorCode(Errors.INVALID_REQUEST.code) topic.setErrorMessage("Found multiple entries for this topic.") } else if (!authorizedTopics.contains(topic.name)) { topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) topic.setErrorMessage("Authorization failed.") } if (!authorizedForDescribeConfigs.contains(topic.name)) { topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) } } //将符合条件的要创建的主题添加到toCreate映射中 val toCreate = mutable.Map[String, CreatableTopic]() createTopicsRequest.data.topics.forEach { topic => if (results.find(topic.name).errorCode == Errors.NONE.code) { toCreate += topic.name -> topic } } //回调函数:创建完成后调用handleCreateTopicsResults方法处理创建结果 def handleCreateTopicsResults(errors: Map[String, ApiError]): Unit = { errors.foreach { case (topicName, error) => val result = results.find(topicName) result.setErrorCode(error.error.code) .setErrorMessage(error.message) // Reset any configs in the response if Create failed if (error != ApiError.NONE) { result.setConfigs(List.empty.asJava) .setNumPartitions(-1) .setReplicationFactor(-1) .setTopicConfigErrorCode(Errors.NONE.code) } } sendResponseCallback(results) } //zkSupport.adminManager.createTopics方法创建主题 zkSupport.adminManager.createTopics( createTopicsRequest.data.timeoutMs, createTopicsRequest.data.validateOnly, toCreate, authorizedForDescribeConfigs, controllerMutationQuota, handleCreateTopicsResults) } }
zk模式创建topic到此结束,再细就自行再看吧
控制器API只负责元数据类请求,包括 topic 创建删除等
brokerAPI才是生产消费数据的API
ControllerServer.scala
中的startup方法,def startup(): Unit = { //省略干扰代码 controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel, authorizer, quotaManagers, time, controller, raftManager, config, sharedServer.metaProps, controllerNodes.asScala.toSeq, apiVersionManager) //把controllerApis放入到controllerApisHandlerPool controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, controllerApis, time, config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", DataPlaneAcceptor.ThreadPrefix) //省略干扰代码 }
ControllerApis
也是实现的ApiRequestHandler
接口,KafkaRequestHandlerPool
看选择zk模式中的KafkaRequestHandlerPool
class ControllerApis(val requestChannel: RequestChannel, val authorizer: Option[Authorizer], val quotas: QuotaManagers, val time: Time, val controller: Controller, val raftManager: RaftManager[ApiMessageAndVersion], val config: KafkaConfig, val metaProperties: MetaProperties, val controllerNodes: Seq[Node], val apiVersionManager: ApiVersionManager) extends ApiRequestHandler with Logging { override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { val handlerFuture: CompletableFuture[Unit] = request.header.apiKey match { case ApiKeys.FETCH => handleFetch(request) case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request) case ApiKeys.CREATE_TOPICS => handleCreateTopics(request) case ApiKeys.DELETE_TOPICS => handleDeleteTopics(request) case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) case ApiKeys.ALTER_CONFIGS => handleLegacyAlterConfigs(request) case ApiKeys.VOTE => handleVote(request) case ApiKeys.BEGIN_QUORUM_EPOCH => handleBeginQuorumEpoch(request) case ApiKeys.END_QUORUM_EPOCH => handleEndQuorumEpoch(request) case ApiKeys.DESCRIBE_QUORUM => handleDescribeQuorum(request) case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request) case ApiKeys.BROKER_REGISTRATION => handleBrokerRegistration(request) case ApiKeys.BROKER_HEARTBEAT => handleBrokerHeartBeatRequest(request) case ApiKeys.UNREGISTER_BROKER => handleUnregisterBroker(request) case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotas(request) case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request) case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignments(request) case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignments(request) case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => handleAlterUserScramCredentials(request) case ApiKeys.ENVELOPE => handleEnvelopeRequest(request, requestLocal) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request) case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request) case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request) case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request) case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request) case ApiKeys.ELECT_LEADERS => handleElectLeaders(request) case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request) case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}") } }
API是用的和zk一样的KafkaApis,加载在dataPlaneRequestHandlerPool
中
override def startup(): Unit = { //主要是找到哪个node为控制器节点,包括不限于投票,指定 clientToControllerChannelManager.start() forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager) //设置manager的的类型,这里设置为broker类型 val apiVersionManager = ApiVersionManager( ListenerType.BROKER, config, Some(forwardingManager), brokerFeatures, metadataCache ) // Create and start the socket server acceptor threads so that the bound port is known. // Delay starting processors until the end of the initialization sequence to ensure // that credentials have been loaded before processing authentications. socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager) //省略代码 val raftSupport = RaftSupport(forwardingManager, metadataCache) dataPlaneRequestProcessor = new KafkaApis( requestChannel = socketServer.dataPlaneRequestChannel, metadataSupport = raftSupport, replicaManager = replicaManager, groupCoordinator = groupCoordinator, txnCoordinator = transactionCoordinator, autoTopicCreationManager = autoTopicCreationManager, brokerId = config.nodeId, config = config, configRepository = metadataCache, metadataCache = metadataCache, metrics = metrics, authorizer = authorizer, quotas = quotaManagers, fetchManager = fetchManager, brokerTopicStats = brokerTopicStats, clusterId = clusterId, time = time, tokenManager = tokenManager, apiVersionManager = apiVersionManager) //初始化kafka的handler dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", DataPlaneAcceptor.ThreadPrefix) }
后续的大家可以直接看API的实现命令就可以了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。