当前位置:   article > 正文

跟我学Kafka源码Producer分析_kafka failed to produce metrics message

kafka failed to produce metrics message

本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和负载逻辑都在Producer中维护。

 

一、Kafka的总体结构图

(图片转发)

 

二、Producer源码分析

 

  1. class Producer[K,V](val config: ProducerConfig,
  2. private val eventHandler: EventHandler[K,V]) // only for unit testing
  3. extends Logging {
  4. private val hasShutdown = new AtomicBoolean(false)
  5. //异步发送队列
  6. private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
  7. private var sync: Boolean = true
  8. //异步处理线程
  9. private var producerSendThread: ProducerSendThread[K,V] = null
  10. private val lock = new Object()
  11. //根据从配置文件中载入的信息封装成ProducerConfig类
  12. //判断发送类型是同步,还是异步,如果是异步则启动一个异步处理线程
  13. config.producerType match {
  14. case "sync" =>
  15. case "async" =>
  16. sync = false
  17. producerSendThread =
  18. new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
  19. queue,
  20. ventHandler,
  21. config.queueBufferingMaxMs,
  22. config.batchNumMessages,
  23. config.clientId)
  24. producerSendThread.start()
  25. }
  26. private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
  27. KafkaMetricsReporter.startReporters(config.props)
  28. AppInfo.registerInfo()
  29. def this(config: ProducerConfig) =
  30. this(config,
  31. new DefaultEventHandler[K,V](config,
  32. Utils.createObject[Partitioner](config.partitionerClass, config.props),
  33. Utils.createObject[Encoder[V]](config.serializerClass, config.props),
  34. Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
  35. new ProducerPool(config)))
  36. /**
  37. * Sends the data, partitioned by key to the topic using either the
  38. * synchronous or the asynchronous producer
  39. * @param messages the producer data object that encapsulates the topic, key and message data
  40. */
  41. def send(messages: KeyedMessage[K,V]*) {
  42. lock synchronized {
  43. if (hasShutdown.get)
  44. throw new ProducerClosedException
  45. recordStats(messages)
  46. sync match {
  47. case true => eventHandler.handle(messages)
  48. case false => asyncSend(messages)
  49. }
  50. }
  51. }
  52. private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
  53. for (message <- messages) {
  54. producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()
  55. producerTopicStats.getProducerAllTopicsStats.messageRate.mark()
  56. }
  57. }
  58. //异步发送流程
  59. //将messages异步放到queue里面,等待异步线程获取
  60. private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
  61. for (message <- messages) {
  62. val added = config.queueEnqueueTimeoutMs match {
  63. case 0 =>
  64. queue.offer(message)
  65. case _ =>
  66. try {
  67. config.queueEnqueueTimeoutMs < 0 match {
  68. case true =>
  69. queue.put(message)
  70. true
  71. case _ =>
  72. queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
  73. }
  74. }
  75. catch {
  76. case e: InterruptedException =>
  77. false
  78. }
  79. }
  80. if(!added) {
  81. producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
  82. producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
  83. throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
  84. }else {
  85. trace("Added to send queue an event: " + message.toString)
  86. trace("Remaining queue size: " + queue.remainingCapacity)
  87. }
  88. }
  89. }
  90. /**
  91. * Close API to close the producer pool connections to all Kafka brokers. Also closes
  92. * the zookeeper client connection if one exists
  93. */
  94. def close() = {
  95. lock synchronized {
  96. val canShutdown = hasShutdown.compareAndSet(false, true)
  97. if(canShutdown) {
  98. info("Shutting down producer")
  99. val startTime = System.nanoTime()
  100. KafkaMetricsGroup.removeAllProducerMetrics(config.clientId)
  101. if (producerSendThread != null)
  102. producerSendThread.shutdown
  103. eventHandler.close
  104. info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
  105. }
  106. }
  107. }
  108. }

 

说明:

上面这段代码很多方法我加了中文注释,首先要初始化一系列参数,比如异步消息队列queue,是否是同步sync,异步同步数据线程ProducerSendThread,其实重点就是ProducerSendThread这个类,从队列中取出数据并让kafka.producer.EventHandler将消息发送到broker。这个代码量不多,但是包含了很多内容,通过config.producerType判断是同步发送还是异步发送,每一种发送方式都有相关类支持,下面我们将重点介绍这二种类型。

 

我们发送消息的类是如下格式:

case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V)

 说明:

 当使用三个参数的构造函数时, partKey会等于key。partKey是用来做partition的,但它不会最当成消息的一部分被存储。

 

1、同步发送

 

  1. private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
  2. //分区并且整理方法
  3. val partitionedDataOpt = partitionAndCollate(messages)
  4. partitionedDataOpt match {
  5. case Some(partitionedData) =>
  6. val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
  7. try {
  8. for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
  9. if (logger.isTraceEnabled)
  10. messagesPerBrokerMap.foreach(partitionAndEvent =>
  11. trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
  12. val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
  13. val failedTopicPartitions = send(brokerid, messageSetPerBroker)
  14. failedTopicPartitions.foreach(topicPartition => {
  15. messagesPerBrokerMap.get(topicPartition) match {
  16. case Some(data) => failedProduceRequests.appendAll(data)
  17. case None => // nothing
  18. }
  19. })
  20. }
  21. } catch {
  22. case t: Throwable => error("Failed to send messages", t)
  23. }
  24. failedProduceRequests
  25. case None => // all produce requests failed
  26. messages
  27. }
  28. }

   说明:

   这个方法主要说了二个重要信息,一个是partitionAndCollate,这个方法主要获取topic、partition和broker的,这个方法很重要,下面会进行分析。另一个重要的方法是groupMessageToSet是要对所发送数据进行压缩设置,如果没有设置压缩,就所有topic对应的消息集都不压缩。如果设置了压缩,并且没有设置对个别topic启用压缩,就对所有topic都使用压缩;否则就只对设置了压缩的topic压缩。

在这个gruopMessageToSet中,并不有具体的压缩逻辑。而是返回一个ByteBufferMessageSet对象。

 

  在我们了解的partitionAndCollate方法之前先来了解一下如下类结构:

  

  1. TopicMetadata -->PartitionMetadata
  2. case class PartitionMetadata(partitionId: Int,
  3. val leader: Option[Broker],
  4. replicas: Seq[Broker],
  5. isr: Seq[Broker] = Seq.empty,
  6. errorCode: Short = ErrorMapping.NoError)
  也就是说,Topic元数据包括了partition元数据,partition元数据中包括了partitionId,leader(leader partition在哪个broker中,备份partition在哪些broker中,以及isr有哪些等等。

 

 

  1. def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
  2. val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
  3. try {
  4. for (message <- messages) {
  5. //获取Topic的partition列表
  6. val topicPartitionsList = getPartitionListForTopic(message)
  7. //根据hash算法得到消息应该发往哪个分区(partition)
  8. val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)
  9. val brokerPartition = topicPartitionsList(partitionIndex)
  10. // postpone the failure until the send operation, so that requests for other brokers are handled correctly
  11. val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)
  12. var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null
  13. ret.get(leaderBrokerId) match {
  14. case Some(element) =>
  15. dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
  16. case None =>
  17. dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
  18. ret.put(leaderBrokerId, dataPerBroker)
  19. }
  20. val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
  21. var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null
  22. dataPerBroker.get(topicAndPartition) match {
  23. case Some(element) =>
  24. dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
  25. case None =>
  26. dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
  27. dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
  28. }
  29. dataPerTopicPartition.append(message)
  30. }
  31. Some(ret)
  32. }catch { // Swallow recoverable exceptions and return None so that they can be retried.
  33. case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
  34. case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
  35. case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
  36. }
  37. }

  说明:

  调用partitionAndCollate根据topics的messages进行分组操作,messages分配给dataPerBroker(多个不同的Broker的Map),根据不同Broker调用不同的SyncProducer.send批量发送消息数据,SyncProducer包装了nio网络操作信息。

  partitionAndCollate这个方法的主要作用是:获取所有partitions的leader所在leaderBrokerId(就是在该partiionid的leader分布在哪个broker上),创建一个HashMap>>>,把messages按照brokerId分组组装数据,然后为SyncProducer分别发送消息作准备工作,在确定一个消息应该发给哪个broker之前,要先确定它发给哪个partition,这样才能根据paritionId去找到对应的leader所在的broker。

 

  我们进入getPartitionListForTopic这个方法看一下,这个方法主要是干什么的。

  

  1. private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = {
  2. val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement)
  3. debug("Broker partitions registered for topic: %s are %s"
  4. .format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
  5. val totalNumPartitions = topicPartitionsList.length
  6. if(totalNumPartitions == 0)
  7. throw new NoBrokersForPartitionException("Partition key = " + m.key)
  8. topicPartitionsList
  9. }
   说明:这个方法看上去没什么,主要是getBrokerPartitionInfo这个方法,其中KeyedMessage这个就是我们要发送的消息,返回值是Seq[PartitionAndLeader]。

 

  

  1. def getBrokerPartitionInfo(topic: String, correlationId: Int): Seq[PartitionAndLeader] = {
  2. debug("Getting broker partition info for topic %s".format(topic))
  3. // check if the cache has metadata for this topic
  4. val topicMetadata = topicPartitionInfo.get(topic)
  5. val metadata: TopicMetadata =
  6. topicMetadata match {
  7. case Some(m) => m
  8. case None =>
  9. // refresh the topic metadata cache
  10. updateInfo(Set(topic), correlationId)
  11. val topicMetadata = topicPartitionInfo.get(topic)
  12. topicMetadata match {
  13. case Some(m) => m
  14. case None => throw new KafkaException("Failed to fetch topic metadata for topic: " + topic)
  15. }
  16. }
  17. val partitionMetadata = metadata.partitionsMetadata
  18. if(partitionMetadata.size == 0) {
  19. if(metadata.errorCode != ErrorMapping.NoError) {
  20. throw new KafkaException(ErrorMapping.exceptionFor(metadata.errorCode))
  21. } else {
  22. throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata))
  23. }
  24. }
  25. partitionMetadata.map { m =>
  26. m.leader match {
  27. case Some(leader) =>
  28. debug("Partition [%s,%d] has leader %d".format(topic, m.partitionId, leader.id))
  29. new PartitionAndLeader(topic, m.partitionId, Some(leader.id))
  30. case None =>
  31. debug("Partition [%s,%d] does not have a leader yet".format(topic, m.partitionId))
  32. new PartitionAndLeader(topic, m.partitionId, None)
  33. }
  34. }.sortWith((s, t) => s.partitionId < t.partitionId)
  35. }

  说明:

  这个方法很重要,首先看一下topicPartitionInfo这个对象,这个一个HashMap结构:HashMap[String, TopicMetadata] key是topic名称,value是topic元数据。

  通过这个hash结构获取topic元数据,做match匹配,如果有数据(Some(m))则赋值给metadata,如果没有,也就是None的时候,则通过nio远程连到服务端更新topic信息。

  请看如下流程图:

  

接下来看updateInfo源码如下:

 

  1. def updateInfo(topics: Set[String], correlationId: Int) {
  2. var topicsMetadata: Seq[TopicMetadata] = Nil
  3. //将配置参数发送到服务端请求最新元数据
  4. val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
  5. //通过response响应信息解析topic元数据和partition元数据,并且放入缓存
  6. topicsMetadata = topicMetadataResponse.topicsMetadata
  7. // throw partition specific exception
  8. topicsMetadata.foreach(tmd =>{
  9. trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
  10. if(tmd.errorCode == ErrorMapping.NoError) {
  11. topicPartitionInfo.put(tmd.topic, tmd)
  12. } else
  13. warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
  14. tmd.partitionsMetadata.foreach(pmd =>{
  15. if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
  16. warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
  17. ErrorMapping.exceptionFor(pmd.errorCode).getClass))
  18. } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
  19. })
  20. })
  21. producerPool.updateProducer(topicsMetadata)
  22. }
   特别要注意:在ClientUtils.fetchTopicMetadata调用完成后,回到BrokerPartitionInfo.updateInfo继续执行,在其末尾,pool会根据上面取得的最新的metadata建立所有的SyncProducer,即Socket通道producerPool.updateProducer(topicsMetadata),也就是说updateInfo这个方法会定时多次执行,刷新最新的数据到缓存中。
 
  1. private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
  2. val numPartitions = topicPartitionList.size
  3. if(numPartitions <= 0)
  4. throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
  5. val partition =
  6. if(key == null) {
  7. // If the key is null, we don't really need a partitioner
  8. // So we look up in the send partition cache for the topic to decide the target partition
  9. val id = sendPartitionPerTopicCache.get(topic)
  10. id match {
  11. case Some(partitionId) =>
  12. // directly return the partitionId without checking availability of the leader,
  13. // since we want to postpone the failure until the send operation anyways
  14. partitionId
  15. case None =>
  16. val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
  17. if (availablePartitions.isEmpty)
  18. throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
  19. val index = Utils.abs(Random.nextInt) % availablePartitions.size
  20. val partitionId = availablePartitions(index).partitionId
  21. sendPartitionPerTopicCache.put(topic, partitionId)
  22. partitionId
  23. }
  24. } else
  25. partitioner.partition(key, numPartitions)
  26. if(partition < 0 || partition >= numPartitions)
  27. throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
  28. "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
  29. trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))
  30. partition
  31. }
   说明:先判断一下当前这个topic有多少个partition,在判断key的时候如果为null,则从sendParitionPerTopicCache里取这个topic缓存的partitionId,这个cache是一个Map,如果之前己经使sendPartitionPerTopicCache.put(topic, partitionId)缓存了一个,就直接取出它。如果取不出来,则根据Utils.abs(Random.nextInt) % availablePartitions.size这个公式随机取出一个paritionId并且缓存到sendParitionPerTopicCache中。这就使得sendParitionPerTopicCache里有一个可用的partitionId时,很多消息都会被发送给这同一个partition。
当key不为null时,就用传给handler的partitioner的partition方法,根据partKey和numPartitions来确定这个消息被发给哪个partition。注意这里的numPartition是topicPartitionList.size获取的,有可能会有parition不存在可用的leader。这样的问题将留给send时解决。实际上发生这种情况时,partitionAndCollate会将这个消息分派给brokerId为-1的broker。下面的代码就是计算选择分区的算法公式:key.hashCode%numPartitions。
  1. class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
  2. private val random = new java.util.Random
  3. def partition(key: Any, numPartitions: Int): Int = {
  4. Utils.abs(key.hashCode) % numPartitions
  5. }
  6. }
 
最后要说明的就是发送方法,主要是利用阻塞式IO进行socket通信。
  1. private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
  2. if(brokerId < 0) {
  3. warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))
  4. messagesPerTopic.keys.toSeq
  5. } else if(messagesPerTopic.size > 0) {
  6. val currentCorrelationId = correlationId.getAndIncrement
  7. val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
  8. config.requestTimeoutMs, messagesPerTopic)
  9. var failedTopicPartitions = Seq.empty[TopicAndPartition]
  10. try {
  11. val syncProducer = producerPool.getProducer(brokerId)
  12. debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d"
  13. .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
  14. val response = syncProducer.send(producerRequest)
  15. debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d"
  16. .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
  17. if(response != null) {
  18. if (response.status.size != producerRequest.data.size)
  19. throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))
  20. if (logger.isTraceEnabled) {
  21. val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
  22. successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
  23. trace("Successfully sent message: %s".format(if(message.message.isNull) null else Utils.readString(message.message.payload)))))
  24. }
  25. val failedPartitionsAndStatus = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
  26. failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
  27. if(failedTopicPartitions.size > 0) {
  28. val errorString = failedPartitionsAndStatus
  29. .sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 ||
  30. (p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition))
  31. .map{
  32. case(topicAndPartition, status) =>
  33. topicAndPartition.toString + ": " + ErrorMapping.exceptionFor(status.error).getClass.getName
  34. }.mkString(",")
  35. warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString))
  36. }
  37. failedTopicPartitions
  38. } else {
  39. Seq.empty[TopicAndPartition]
  40. }
  41. } catch {
  42. case t: Throwable =>
  43. warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"
  44. .format(currentCorrelationId, brokerId, messagesPerTopic.map(_._1).mkString(",")), t)
  45. messagesPerTopic.keys.toSeq
  46. }
  47. } else {
  48. List.empty
  49. }
  50. }

  

 

  • 大小: 106.2 KB
  • 大小: 46.6 KB
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/618460
推荐阅读
相关标签
  

闽ICP备14008679号