赞
踩
kafka集群生产者基本代码实现
pom.xml
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.12</artifactId>
- <version>0.10.2.1</version>
- <scope>compile</scope>
- </dependency>
创建一个
public class kafkaProducer,先按照最基本的main来测试生产者和消费者
- public static void main(String[] args ){
- //构造一个java.util.Properties对象
- Properties props = new Properties();
- // 指定bootstrap.servers属性。必填,无默认值。用于创建向kafka broker服务器的连接。
- props.put("bootstrap.servers", "192.168.3.101:9092,192.168.3.102:9092,192.168.3.156:9092");
- //acks参数用于控制producer生产消息的持久性(durability)。参数可选值,0、1、-1(all)。
- props.put("acks", "all");
- //在producer内部自动实现了消息重新发送。默认值0代表不进行重试。
- props.put("retries", 3);
- //调优producer吞吐量和延时性能指标都有非常重要作用。默认值16384即16KB。
- props.put("batch.size", 323840);
- //控制消息发送延时行为的,该参数默认值是0。表示消息需要被立即发送,无须关系batch是否被填满。
- props.put("linger.ms", 10);
- //指定了producer端用于缓存消息的缓冲区的大小,单位是字节,默认值是33554432即32M。
- props.put("buffer.memory", 33554432);
- props.put("max.block.ms", 3000);
- // 指定key.serializer属性。必填,无默认值。被发送到broker端的任何消息的格式都必须是字节数组。
- // 因此消息的各个组件都必须首先做序列化,然后才能发送到broker。该参数就是为消息的key做序列化只用的。
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- // 指定value.serializer属性。必填,无默认值。和key.serializer类似。此被用来对消息体即消息value部分做序列化。
- // 将消息value部分转换成字节数组。
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- Producer<Object, String> producer = new KafkaProducer<Object, String>(props);
- Random rnd = new Random();
- for (long nEvents = 0; nEvents < 10000; nEvents++) {
- long runtime = new Date().getTime();
- String ip = String.format("192.168.2.%d", rnd.nextInt(255));
- String msg = runtime + ",www.example.com," + ip;
- ProducerRecord<Object, String> data = new ProducerRecord<Object, String>("heartbeat", ip, msg);
- producer.send(data);
- try {
- Thread.sleep(1000);
- System.out.println("kafka producer data sleep one second ");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- }
- producer.close();
- }

执行中出现一个异常 DEBUG org.apache.kafka.clients.NetworkClient - Error connecting to node 3 at bigdata3:9092:
原因:linux环境下面 已经配置了隐射地址,但是idea在windows环境下,无法获取到隐射,因此需要配置window10环境下的hosts。
C:\Windows\System32\drivers\etc下面找到hosts,然后把linux下面配置的隐射地址拷贝一份放到window10的隐射下面
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。