赞
踩
package Kafka010.Utils import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} /** * Created by Shi shuai RollerQing on 2019/12/24 20:19 */ object ProducerDemo { def main(args: Array[String]): Unit = { // 定义kafka的参数 val brokers = "hadoop01:9092,hadoop02:9092,hadoop03:9092" val topic = "topicB" val prop = new Properties() //prop.put("bootstraps", brokers) //prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") //prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // KafkaProducer val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](prop) // KafkaRecorder // 异步 for(i <- 1 to 100000){ val msg = new ProducerRecord[String, String](topic, i.toString, i.toString) //发送消息 producer.send(msg) println(s"i = $i") Thread.sleep(100) } } }
<!-- spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。