当前位置:   article > 正文

kafka 2.11-2.4.1集群搭建(二)_kafka 2.4.1 与0.10.21

kafka 2.4.1 与0.10.21

kafka集群生产者基本代码实现

pom.xml

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka_2.12</artifactId>
  4. <version>0.10.2.1</version>
  5. <scope>compile</scope>
  6. </dependency>

创建一个

public class kafkaProducer,先按照最基本的main来测试生产者和消费者
  1. public static void main(String[] args ){
  2. //构造一个java.util.Properties对象
  3. Properties props = new Properties();
  4. // 指定bootstrap.servers属性。必填,无默认值。用于创建向kafka broker服务器的连接。
  5. props.put("bootstrap.servers", "192.168.3.101:9092,192.168.3.102:9092,192.168.3.156:9092");
  6. //acks参数用于控制producer生产消息的持久性(durability)。参数可选值,0、1、-1(all)。
  7. props.put("acks", "all");
  8. //在producer内部自动实现了消息重新发送。默认值0代表不进行重试。
  9. props.put("retries", 3);
  10. //调优producer吞吐量和延时性能指标都有非常重要作用。默认值16384即16KB。
  11. props.put("batch.size", 323840);
  12. //控制消息发送延时行为的,该参数默认值是0。表示消息需要被立即发送,无须关系batch是否被填满。
  13. props.put("linger.ms", 10);
  14. //指定了producer端用于缓存消息的缓冲区的大小,单位是字节,默认值是33554432即32M。
  15. props.put("buffer.memory", 33554432);
  16. props.put("max.block.ms", 3000);
  17. // 指定key.serializer属性。必填,无默认值。被发送到broker端的任何消息的格式都必须是字节数组。
  18. // 因此消息的各个组件都必须首先做序列化,然后才能发送到broker。该参数就是为消息的key做序列化只用的。
  19. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  20. // 指定value.serializer属性。必填,无默认值。和key.serializer类似。此被用来对消息体即消息value部分做序列化。
  21. // 将消息value部分转换成字节数组。
  22. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  23. Producer<Object, String> producer = new KafkaProducer<Object, String>(props);
  24. Random rnd = new Random();
  25. for (long nEvents = 0; nEvents < 10000; nEvents++) {
  26. long runtime = new Date().getTime();
  27. String ip = String.format("192.168.2.%d", rnd.nextInt(255));
  28. String msg = runtime + ",www.example.com," + ip;
  29. ProducerRecord<Object, String> data = new ProducerRecord<Object, String>("heartbeat", ip, msg);
  30. producer.send(data);
  31. try {
  32. Thread.sleep(1000);
  33. System.out.println("kafka producer data sleep one second ");
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. producer.close();
  39. }

执行中出现一个异常  DEBUG org.apache.kafka.clients.NetworkClient - Error connecting to node 3 at bigdata3:9092:

原因:linux环境下面 已经配置了隐射地址,但是idea在windows环境下,无法获取到隐射,因此需要配置window10环境下的hosts。

C:\Windows\System32\drivers\etc下面找到hosts,然后把linux下面配置的隐射地址拷贝一份放到window10的隐射下面

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/700523
推荐阅读
相关标签
  

闽ICP备14008679号