当前位置:   article > 正文

kafka发送消息分区策略详解_kafka 分区后没起作用,消息还是存储到了某一个分区

kafka 分区后没起作用,消息还是存储到了某一个分区

背景:

     一个简单的用scala往kafka里写数据demo,每次运行只往一个分区写入数据,下次运行又选另一个分区一直写入。

发送例子:

  1. def main(args: Array[String]): Unit = {
  2. val topic = "test02"
  3. val brokers = "demo169.test.com:6667,demo167.test.com:6667,demo168.test.com:6667"
  4. val props = new Properties()
  5. props.put("metadata.broker.list", brokers)
  6. props.put("serializer.class", "kafka.serializer.StringEncoder")
  7. props.put("request.required.acks", "1")
  8. val kafkaConfig = new ProducerConfig(props)
  9. val producer = new Producer[String, String](kafkaConfig)
  10. while(true) {
  11. // prepare event data
  12. val event = new JSONObject()
  13. event
  14. .put("uid", getUserID)
  15. .put("event_time", System.currentTimeMillis.toString)
  16. .put("os_type", "Android")
  17. .put("click_count", click)
  18. // produce event message
  19. // producer.send(new KeyedMessage[String, String](topic,getUserID, event.toString))
  20. producer.send(new KeyedMessage[String, String](topic, event.toString))
  21. println("Message sent: " + event)
  22. Thread.sleep(1000)
  23. }
  24. }

原因探索:

 KeyedMessage有两种实例化方式导致:

def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)

def this(topic:
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/794076
推荐阅读
相关标签
  

闽ICP备14008679号