当前位置:   article > 正文

Kafka 生产者消费者模式 写入/读取数据 [ 命令行/Java 代码 ]_命令行向kafka写入数据

命令行向kafka写入数据

 

一.确认配置文件:


打开config/server.properties 文件,修改broker.id,listeners,port,log.dirs

vi config/server.properties 

broker.id=0
listeners=PLAINTEXT://192.168.105.110:9092
port=9092
log.dirs=kafka-logs
zookeeper.connect=192.168.105.110:2181

备注:

listeners一定要配置成为IP地址;

如果配置为localhost或服务器的hostname,在使用java获取数据时会拿不到数据,或者发送数据时就会抛出异 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。

因为在没有配置advertised.host.name 的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上这个hostname的
 

 

二.命令行创建生产者消费者



1.创建主题:


bin/kafka-topics.sh --create --zookeeper 192.168.105.110:2181 --replication-factor 1   --partitions 1 --topic Hello-Kafka


2.查看主题列表


bin/kafka-topics.sh --list --zookeeper 192.168.105.110:2181


3. 启动生产者以发送消息

 

bin/kafka-console-producer.sh --broker-list 192.168.105.110: 9092 --topic Hello-Kafka

 

4. 启动消费者,消费数据 (两种方式均可)


bin/kafka-console-consumer.sh --zookeeper 192.168.105.110:2181 --topic Hello-Kafka --from-beginning


bin/kafka-console-consumer.sh --bootstrap-server 192.168.105.110:9092  --topic  Hello-Kafka --from-beginning


 

 

三. java 代码

 

生产者

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.Producer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import java.io.BufferedReader;
  5. import java.io.File;
  6. import java.io.FileReader;
  7. import java.util.Properties;
  8. import java.util.Random;
  9. import java.util.UUID;
  10. /**
  11. * @author admin
  12. * @title SimpleProducer
  13. * @projectName KafkaStreamDemo
  14. * @description TODO
  15. * @date 2019/9/1016:45
  16. */
  17. public class SimpleProducer {
  18. public static void main(String[] args) throws Exception {
  19. // Assign topicName to string variable
  20. String topicName = "Hello-Kafka";
  21. // create instance for properties to access producer configs
  22. Properties props = new Properties();
  23. // Assign localhost id, 参考http://kafka.apache.org/documentation/#producerapi
  24. props.put("bootstrap.servers", "192.168.105.110:9092");
  25. // Set acknowledgements for producer requests.
  26. props.put("acks", "all");
  27. // If the request fails, the producer can automatically retry,
  28. props.put("retries", 0);
  29. // Specify buffer size in config
  30. props.put("batch.size", 16384);
  31. // Reduce the no of requests less than 0
  32. props.put("linger.ms", 1);
  33. // The buffer.memory controls the total amount of memory available to the
  34. // producer for buffering.
  35. props.put("buffer.memory", 33554432);
  36. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  37. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  38. Producer<String, String> producer = new KafkaProducer<String, String>(props);
  39. int i = 0 ;
  40. while (i < 100) {
  41. String tempString = UUID.randomUUID().toString();
  42. System.out.println("----------"+tempString);
  43. producer.send(new ProducerRecord<String, String>(topicName, tempString));
  44. Thread.sleep(1000);
  45. i++ ;
  46. }
  47. System.out.println("Message sent successfully");
  48. producer.close();
  49. }
  50. }

 

 

 

消费者

 

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.apache.kafka.clients.consumer.ConsumerRecords;
  3. import org.apache.kafka.clients.consumer.KafkaConsumer;
  4. import java.time.Duration;
  5. import java.util.Arrays;
  6. import java.util.Collections;
  7. import java.util.Properties;
  8. /**
  9. * @author admin
  10. * @title SimpleConsumer
  11. * @projectName KafkaStreamDemo
  12. * @description TODO
  13. * @date 2019/9/1016:47
  14. */
  15. public class SimpleConsumer {
  16. public static void main(String[] args) throws Exception {
  17. // Kafka consumer configuration settings
  18. String topicName = "Hello-Kafka";
  19. Properties props = new Properties();
  20. props.put("bootstrap.servers", "192.168.105.110:9092");
  21. props.put("group.id", "CountryCounter");
  22. props.put("auto.offset.reset", "latest");
  23. props.put("enable.auto.commit", "true");
  24. props.put("auto.commit.interval.ms", "1000");
  25. props.put("session.timeout.ms", "30000");
  26. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  27. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  28. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
  29. // Kafka Consumer subscribes list of topics here.
  30. kafkaConsumer.subscribe(Collections.singletonList(topicName) );
  31. while (true) {
  32. ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
  33. for (ConsumerRecord<String, String> record : records) {
  34. // print the offset,key and value for the consumer records.
  35. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
  36. }
  37. }
  38. }
  39. }

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/614338
推荐阅读
相关标签
  

闽ICP备14008679号