赞
踩
背景:
一个简单的用scala往kafka里写数据demo,每次运行只往一个分区写入数据,下次运行又选另一个分区一直写入。
发送例子:
- def main(args: Array[String]): Unit = {
- val topic = "test02"
- val brokers = "demo169.test.com:6667,demo167.test.com:6667,demo168.test.com:6667"
- val props = new Properties()
- props.put("metadata.broker.list", brokers)
- props.put("serializer.class", "kafka.serializer.StringEncoder")
- props.put("request.required.acks", "1")
-
- val kafkaConfig = new ProducerConfig(props)
- val producer = new Producer[String, String](kafkaConfig)
-
- while(true) {
- // prepare event data
- val event = new JSONObject()
- event
- .put("uid", getUserID)
- .put("event_time", System.currentTimeMillis.toString)
- .put("os_type", "Android")
- .put("click_count", click)
-
- // produce event message
- // producer.send(new KeyedMessage[String, String](topic,getUserID, event.toString))
- producer.send(new KeyedMessage[String, String](topic, event.toString))
- println("Message sent: " + event)
- Thread.sleep(1000)
- }
- }
原因探索:
KeyedMessage有两种实例化方式导致:
def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message) def this(topic:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。