- class Producer[K,V](val config: ProducerConfig,
- private val eventHandler: EventHandler[K,V]) // only for unit testing
- extends Logging {
- private val hasShutdown = new AtomicBoolean(false)
- //异步发送队列
- private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
- private var sync: Boolean = true
- //异步处理线程
- private var producerSendThread: ProducerSendThread[K,V] = null
- private val lock = new Object()
- //根据从配置文件中载入的信息封装成ProducerConfig类
- //判断发送类型是同步,还是异步,如果是异步则启动一个异步处理线程
- config.producerType match {
- case "sync" =>
- case "async" =>
- sync = false
- producerSendThread =
- new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
- queue,
- ventHandler,
- config.queueBufferingMaxMs,
- config.batchNumMessages,
- config.clientId)
- producerSendThread.start()
- }
- private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
- KafkaMetricsReporter.startReporters(config.props)
- AppInfo.registerInfo()
- def this(config: ProducerConfig) =
- this(config,
- new DefaultEventHandler[K,V](config,
- Utils.createObject[Partitioner](config.partitionerClass, config.props),
- Utils.createObject[Encoder[V]](config.serializerClass, config.props),
- Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
- new ProducerPool(config)))
- /**
- * Sends the data, partitioned by key to the topic using either the
- * synchronous or the asynchronous producer
- * @param messages the producer data object that encapsulates the topic, key and message data
- */
- def send(messages: KeyedMessage[K,V]*) {
- lock synchronized {
- if (hasShutdown.get)
- throw new ProducerClosedException
- recordStats(messages)
- sync match {
- case true => eventHandler.handle(messages)
- case false => asyncSend(messages)
- }
- }
- }
- private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
- for (message <- messages) {
- producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()
- producerTopicStats.getProducerAllTopicsStats.messageRate.mark()
- }
- }
- //异步发送流程
- //将messages异步放到queue里面,等待异步线程获取
- private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
- for (message <- messages) {
- val added = config.queueEnqueueTimeoutMs match {
- case 0 =>
- queue.offer(message)
- case _ =>
- try {
- config.queueEnqueueTimeoutMs < 0 match {
- case true =>
- queue.put(message)
- true
- case _ =>
- queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
- }
- }
- catch {
- case e: InterruptedException =>
- false
- }
- }
- if(!added) {
- producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
- producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
- throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
- }else {
- trace("Added to send queue an event: " + message.toString)
- trace("Remaining queue size: " + queue.remainingCapacity)
- }
- }
- }
- /**
- * Close API to close the producer pool connections to all Kafka brokers. Also closes
- * the zookeeper client connection if one exists
- */
- def close() = {
- lock synchronized {
- val canShutdown = hasShutdown.compareAndSet(false, true)
- if(canShutdown) {
- info("Shutting down producer")
- val startTime = System.nanoTime()
- KafkaMetricsGroup.removeAllProducerMetrics(config.clientId)
- if (producerSendThread != null)
- producerSendThread.shutdown
- eventHandler.close
- info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
- }
- }
- }
- }
case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V)
当使用三个参数的构造函数时, partKey会等于key。partKey是用来做partition的,但它不会最当成消息的一部分被存储。
- private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
- //分区并且整理方法
- val partitionedDataOpt = partitionAndCollate(messages)
- partitionedDataOpt match {
- case Some(partitionedData) =>
- val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
- try {
- for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
- if (logger.isTraceEnabled)
- messagesPerBrokerMap.foreach(partitionAndEvent =>
- trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
- val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
- val failedTopicPartitions = send(brokerid, messageSetPerBroker)
- failedTopicPartitions.foreach(topicPartition => {
- messagesPerBrokerMap.get(topicPartition) match {
- case Some(data) => failedProduceRequests.appendAll(data)
- case None => // nothing
- }
- })
- }
- } catch {
- case t: Throwable => error("Failed to send messages", t)
- }
- failedProduceRequests
- case None => // all produce requests failed
- messages
- }
- }
- TopicMetadata -->PartitionMetadata
- case class PartitionMetadata(partitionId: Int,
- val leader: Option[Broker],
- replicas: Seq[Broker],
- isr: Seq[Broker] = Seq.empty,
- errorCode: Short = ErrorMapping.NoError)
也就是说,Topic元数据包括了partition元数据,partition元数据中包括了partitionId,leader(leader partition在哪个broker中,备份partition在哪些broker中,以及isr有哪些等等。
- def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
- val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
- try {
- for (message <- messages) {
- //获取Topic的partition列表
- val topicPartitionsList = getPartitionListForTopic(message)
- //根据hash算法得到消息应该发往哪个分区(partition)
- val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)
- val brokerPartition = topicPartitionsList(partitionIndex)
- // postpone the failure until the send operation, so that requests for other brokers are handled correctly
- val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)
- var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null
- ret.get(leaderBrokerId) match {
- case Some(element) =>
- dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
- case None =>
- dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
- ret.put(leaderBrokerId, dataPerBroker)
- }
- val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
- var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null
- dataPerBroker.get(topicAndPartition) match {
- case Some(element) =>
- dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
- case None =>
- dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
- dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
- }
- dataPerTopicPartition.append(message)
- }
- Some(ret)
- }catch { // Swallow recoverable exceptions and return None so that they can be retried.
- case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
- case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
- case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
- }
- }
- private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = {
- val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement)
- debug("Broker partitions registered for topic: %s are %s"
- .format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
- val totalNumPartitions = topicPartitionsList.length
- if(totalNumPartitions == 0)
- throw new NoBrokersForPartitionException("Partition key = " + m.key)
- topicPartitionsList
- }