当前位置:   article > 正文

Kafka 使用java实现,快速入门_java kafka

java kafka

一、kafka的生产者和消费者

1. 生产者发送消息的流程

 2. 消费者接收消息的流程

 二、 java 代码实现

1. 添加依赖:

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka_2.12</artifactId>
  4. </dependency>

2. 实现生产者

  1. public class NormalProducer {
  2. public static void main(String[] args) {
  3. Properties properties = new Properties();
  4. // 1.配置生产者启动的关键属性参数
  5. // 1.1 BOOTSTRAP_SERVERS_CONFIG:连接kafka集群的服务列表,如果有多个,使用"逗号"进行分隔
  6. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");
  7. // 1.2 CLIENT_ID_CONFIG:这个属性的目的是标记kafkaclient的ID
  8. properties.put(ProducerConfig.CLIENT_ID_CONFIG, "quickstart-producer");
  9. // 1.3 KEY_SERIALIZER_CLASS_CONFIG VALUE_SERIALIZER_CLASS_CONFIG
  10. // Q: 对 kafka的 key 和 value 做序列化,为什么需要序列化?
  11. // A: 因为KAFKA Broker 在接收消息的时候,必须要以二进制的方式接收,所以必须要对KEY和VALUE进行序列化
  12. // 字符串序列化类:org.apache.kafka.common.serialization.StringSerializer
  13. // KEY: 是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
  14. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  15. // VALUE: 实际发送消息的内容
  16. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  17. // 2.创建kafka生产者对象 传递properties属性参数集合
  18. KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
  19. for(int i = 0; i <10; i ++) {
  20. // 3.构造消息内容
  21. User user = new User("00" + i, "张三");
  22. ProducerRecord<String, String> record =
  23. // arg1:topic , arg2:实际的消息体内容,quick_start 是 topic 名称
  24. new ProducerRecord<String, String>("quick_start",
  25. JSON.toJSONString(user));
  26. // 4.发送消息
  27. producer.send(record);
  28. }
  29. // 5.关闭生产者
  30. producer.close();
  31. }
  32. }

其中的 User 对象为:

  1. public class User {
  2. private String id;
  3. private String name;
  4. public User() {
  5. }
  6. public User(String id, String name) {
  7. this.id = id;
  8. this.name = name;
  9. }
  10. public String getId() {
  11. return id;
  12. }
  13. public void setId(String id) {
  14. this.id = id;
  15. }
  16. public String getName() {
  17. return name;
  18. }
  19. public void setName(String name) {
  20. this.name = name;
  21. }
  22. }

3. 实现消费者

  1. public class NormalConsumer {
  2. public static void main(String[] args) {
  3. // 1. 配置属性参数
  4. Properties properties = new Properties();
  5. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");
  6. // org.apache.kafka.common.serialization.StringDeserializer
  7. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  8. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  9. // 非常重要的属性配置:与我们消费者订阅组有关系
  10. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "quickstart-group");
  11. // 常规属性:会话连接超时时间
  12. properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
  13. // 消费者提交offset: 自动提交 & 手工提交,默认是自动提交
  14. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  15. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
  16. // 2. 创建消费者对象
  17. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  18. // 3. 订阅你感兴趣的主题:quick_start
  19. consumer.subscribe(Collections.singletonList("quick_start"));
  20. System.err.println("quickstart consumer started...");
  21. try {
  22. // 4.采用拉取消息的方式消费数据
  23. while(true) {
  24. // 等待多久拉取一次消息
  25. // 拉取TOPIC_QUICKSTART主题里面所有的消息
  26. // topic 和 partition是 一对多的关系,一个topic可以有多个partition
  27. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  28. // 因为消息是在partition中存储的,所以需要遍历partition集合
  29. for(TopicPartition topicPartition : records.partitions()) {
  30. // 通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息
  31. List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
  32. // 获取TopicPartition对应的主题名称
  33. String topic = topicPartition.topic();
  34. // 获取当前topicPartition下的消息条数
  35. int size = partitionRecords.size();
  36. System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s",
  37. topic,
  38. topicPartition.partition(),
  39. size));
  40. for(int i = 0; i < size; i++) {
  41. ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
  42. // 实际的数据内容
  43. String value = consumerRecord.value();
  44. // 当前获取的消息偏移量
  45. long offset = consumerRecord.offset();
  46. // ISR : High Watermark, 如果要提交的话,比如提交当前消息的offset+1
  47. // 表示下一次从什么位置(offset)拉取消息
  48. long commitOffser = offset + 1;
  49. System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s",
  50. value, offset, commitOffser));
  51. }
  52. }
  53. }
  54. } finally {
  55. consumer.close();
  56. }
  57. }
  58. }

4. 测试结果

生产者发送的消息在消费者端可以正常接收:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/195703
推荐阅读
相关标签
  

闽ICP备14008679号