当前位置:   article > 正文

kafka消息队列最常用的两种模式,以及应用场景_kafka订阅模式

kafka订阅模式

目录

一、发布-订阅模式

二、点对点模式

三、应用场景


 

一、发布-订阅模式

发布-订阅模式是最常见的消息传递模式,其中消息发布者将消息发送到一个或多个主题(Topic),而订阅者可以选择订阅一个或多个主题来接收消息。每个订阅者都可以独立地消费消息,而发布者和订阅者之间没有直接的联系。

在Kafka中,使用KafkaProducer类进行消息发布,KafkaConsumer类进行消息订阅。以下是一个简单的Java代码示例:

  1. import org.apache.kafka.clients.producer.*;
  2. import org.apache.kafka.clients.consumer.*;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import org.apache.kafka.common.serialization.StringDeserializer;
  5. import java.util.Properties;
  6. public class PubSubExample {
  7. private static final String TOPIC = "my_topic";
  8. private static final String BOOTSTRAP_SERVERS = "localhost:9092";
  9. public static void main(String[] args) {
  10. // Kafka Producer
  11. Properties producerProps = new Properties();
  12. producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
  13. producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  14. producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  15. KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
  16. // Publish messages
  17. for (int i = 0; i < 10; i++) {
  18. String message = "Message " + i;
  19. ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
  20. producer.send(record, new Callback() {
  21. @Override
  22. public void onCompletion(RecordMetadata metadata, Exception exception) {
  23. if (exception != null) {
  24. System.err.println("Error publishing message: " + exception.getMessage());
  25. } else {
  26. System.out.println("Message published successfully: " + metadata.offset());
  27. }
  28. }
  29. });
  30. }
  31. producer.close();
  32. // Kafka Consumer
  33. Properties consumerProps = new Properties();
  34. consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
  35. consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_group");
  36. consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  37. consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  38. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
  39. consumer.subscribe(Collections.singletonList(TOPIC));
  40. // Consume messages
  41. while (true) {
  42. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  43. for (ConsumerRecord<String, String> record : records) {
  44. System.out.println("Received message: " + record.value());
  45. // Process the message
  46. }
  47. }
  48. }
  49. }

 

二、点对点模式

点对点模式中,消息发送者将消息发送到一个指定的队列(Queue),而消息接收者从相同的队列中接收消息。每个消息只能被一个接收者消费。

在Kafka中,点对点模式可以通过创建单个消费者组来实现。以下是一个简单的Java代码示例:

  1. import org.apache.kafka.clients.producer.*;
  2. import org.apache.kafka.clients.consumer.*;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import org.apache.kafka.common.serialization.StringDeserializer;
  5. import java.util.Properties;
  6. public class PointToPointExample {
  7. private static final String QUEUE = "my_queue";
  8. private static final String BOOTSTRAP_SERVERS = "localhost:9092";
  9. public static void main(String[] args) {
  10. // Kafka Producer
  11. Properties producerProps = new Properties();
  12. producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
  13. producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  14. producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  15. KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
  16. // Publish messages
  17. for (int i = 0; i < 10; i++) {
  18. String message = "Message " + i;
  19. ProducerRecord<String, String> record = new ProducerRecord<>(QUEUE, message);
  20. producer.send(record, new Callback() {
  21. @Override
  22. public void onCompletion(RecordMetadata metadata, Exception exception) {
  23. if (exception != null) {
  24. System.err.println("Error publishing message: " + exception.getMessage());
  25. } else {
  26. System.out.println("Message published successfully: " + metadata.offset());
  27. }
  28. }
  29. });
  30. }
  31. producer.close();
  32. // Kafka Consumer
  33. Properties consumerProps = new Properties();
  34. consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
  35. consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_group");
  36. consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  37. consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  38. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
  39. consumer.subscribe(Collections.singletonList(QUEUE));
  40. // Consume messages
  41. while (true) {
  42. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  43. for (ConsumerRecord<String, String> record : records) {
  44. System.out.println("Received message: " + record.value());
  45. // Process the message
  46. consumer.commitAsync();
  47. }
  48. }
  49. }
  50. }

以上代码示例演示了如何使用Kafka的Java客户端库进行发布和订阅消息以及点对点消息传递。请注意,代码中的BOOTSTRAP_SERVERS需要根据你的实际环境进行配置。

 

三、应用场景

Kafka消息队列具有高吞吐量、低延迟、可扩展性等特点,因此广泛应用于以下场景:

  1. 日志收集和数据管道:Kafka可以用作集中式日志收集系统,可以将不同服务、应用程序、服务器生成的日志集中到一个中心化的消息队列中,再通过消费者进行处理、分析和存储。同时,Kafka还可以作为数据管道,将不同数据源的数据通过消息队列进行传输和处理。

  2. 实时流处理:Kafka与流处理框架(如Apache Flink、Apache Spark)结合使用,可以实现实时的数据流处理。Kafka可以作为输入源和输出源,将数据流传输给流处理框架进行实时分析、计算和处理。

  3. 微服务架构:Kafka可以用作微服务之间的异步通信机制,不同的微服务各自独立地生产和消费消息,实现解耦和扩展性。同时,Kafka还可以用于实现事件驱动架构,不同的微服务通过订阅事件的方式进行通信和协作。

  4. 网络爬虫和数据采集:Kafka可以用于构建高可靠的网络爬虫系统和数据采集系统。爬虫可以将抓取的数据写入Kafka队列,然后其他系统可以消费这些数据进行进一步的处理和分析。

  5. 消息系统和通信中间件:Kafka提供了可靠的消息传递机制,可以作为消息系统和通信中间件,用于构建分布式系统、实现异步通信和跨系统的数据传输。

总之,Kafka消息队列的应用场景非常广泛,适用于大数据处理、实时数据流处理、异步通信等各种场景。它具有高性能、可靠性和可扩展性的特点,可以帮助解决数据流处理和消息传递的各种问题。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/839388
推荐阅读
相关标签
  

闽ICP备14008679号