当前位置:   article > 正文

spring boot 使用 Kafka

spring boot 使用 Kafka

一、Kafka作为消息队列的好处

  1. 吞吐量:Kafka能够处理大规模的数据流,并支持高吞吐量的消息传输。

  2. 持久性:Kafka将消息持久化到磁盘上,保证了消息不会因为系统故障而丢失。

  3. 分布式:Kafka是一个分布式系统,可以在多个节点上运行,具有良好的可扩展性和容错性。

  4. 支持多种协议:Kafka支持多种协议,如TCP、HTTP、UDP等,可以与不同的系统进行集成。

  5. 灵活的消费模式:Kafka支持多种消费模式,如拉取和推送,可以根据需要选择合适的消费模式。

  6. 可配置性强:Kafka的配置参数非常丰富,可以根据需要进行灵活配置。

  7. 社区支持:Kafka作为Apache旗下的开源项目,拥有庞大的用户基础和活跃的社区支持,方便用户得到及时的技术支持。

二、springboot中使用Kafka

  1. 添加依赖:在pom.xml文件中添加Kafka的依赖,包括spring-kafka和kafka-clients。确保版本与你的项目兼容。

  2. 创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。

  3. 配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。

  4. 发送消息:在需要发送消息的地方,注入Kafka生产者,并使用其发送消息到指定的Kafka主题。

  5. 创建消费者:创建一个Kafka消费者类,实现Consumer接口,并使用KafkaTemplate订阅指定的Kafka主题。

  6. 配置消费者:在Spring Boot的配置文件中配置Kafka消费者的相关参数,例如group id、auto offset reset等。

  7. 接收消息:在需要接收消息的地方,注入Kafka消费者,并使用其接收消息。

  8. 处理消息:对接收到的消息进行处理,例如保存到数据库或进行其他业务逻辑处理。

三、使用Kafka

pom中填了依赖

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. <version>2.8.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.kafka</groupId>
  8. <artifactId>kafka-clients</artifactId>
  9. <version>2.8.1</version>
  10. </dependency>
  1. 创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。

  1. import org.apache.kafka.clients.producer.*;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class KafkaProducer {
  7. @Value("${kafka.bootstrap}")
  8. private String bootstrapServers;
  9. @Value("${kafka.topic}")
  10. private String topic;
  11. private KafkaTemplate<String, String> kafkaTemplate;
  12. public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
  13. this.kafkaTemplate = kafkaTemplate;
  14. }
  15. public void sendMessage(String message) {
  16. Producer<String, String> producer = new KafkaProducer<>(bootstrapServers, new StringSerializer(), new StringSerializer());
  17. try {
  18. producer.send(new ProducerRecord<>(topic, message));
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. } finally {
  22. producer.close();
  23. }
  24. }
  25. }
  1. 配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。

  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  4. import org.springframework.kafka.core.KafkaTemplate;
  5. import org.springframework.kafka.core.ProducerFactory;
  6. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  7. import org.springframework.kafka.core.ConsumerFactory;
  8. import org.springframework.kafka.core.ConsumerConfig;
  9. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  10. import org.springframework.kafka.listener.MessageListener;
  11. import org.springframework.context.annotation.PropertySource;
  12. import java.util.*;
  13. import org.springframework.beans.factory.*;
  14. import org.springframework.*;
  15. import org.springframework.*;expression.*;value; @Value("${kafka}") Properties kafkaProps = new Properties(); @Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf){ KafkaTemplate<String, String> template = new KafkaTemplate<>(pf); template .setMessageConverter(new StringJsonMessageConverter()); template .setSendTimeout(Duration .ofSeconds(30)); return template ; } @Bean public ProducerFactory<String, String> producerFactory(){ DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(kafkaProps); factory .setBootstrapServers(bootstrapServers); factory .setKeySerializer(new StringSerializer()); factory .setValueSerializer(new StringSerializer()); return factory ; } @Bean public ConsumerFactory<String, String> consumerFactory(){ DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(consumerConfigProps); factory .setBootstrapServers(bootstrapServers); factory .setKeyDeserializer(new StringDeserializer()); factory .setValueDeserializer(new StringDeserializer()); return factory ; } @Bean public ConcurrentMessageListenerContainer<String, String> container(ConsumerFactory<String, String> consumerFactory, MessageListener listener){ ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory); container .setMessageListener(listener); container .setConcurrency(3); return container ; } @Bean public MessageListener

消费者

  1. import org.apache.kafka.clients.consumer.*;
  2. import org.springframework.kafka.core.KafkaTemplate;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class KafkaConsumer {
  6. @Value("${kafka.bootstrap}")
  7. private String bootstrapServers;
  8. @Value("${kafka.group}")
  9. private String groupId;
  10. @Value("${kafka.topic}")
  11. private String topic;
  12. private KafkaTemplate<String, String> kafkaTemplate;
  13. public KafkaConsumer(KafkaTemplate<String, String> kafkaTemplate) {
  14. this.kafkaTemplate = kafkaTemplate;
  15. }
  16. public void consume() {
  17. Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs());
  18. consumer.subscribe(Collections.singletonList(topic));
  19. while (true) {
  20. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  21. for (ConsumerRecord<String, String> record : records) {
  22. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  23. }
  24. }
  25. }
  26. private Properties consumerConfigs() {
  27. Properties props = new Properties();
  28. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  29. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  30. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  31. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  32. return props;
  33. }
  34. }

四、kafka与rocketMQ比较

Kafka和RocketMQ都是开源的消息队列系统,它们具有许多相似之处,但在一些关键方面也存在差异。以下是它们在数据可靠性、性能、消息传递方式等方面的比较:

  1. 数据可靠性:
  • Kafka使用异步刷盘方式,而RocketMQ支持异步实时刷盘、同步刷盘、同步复制和异步复制。这使得RocketMQ在单机可靠性上比Kafka更高,因为它不会因为操作系统崩溃而导致数据丢失。此外,RocketMQ新增的同步刷盘机制也进一步保证了数据的可靠性。
  1. 性能:
  • Kafka和RocketMQ在性能方面各有千秋。由于Kafka的数据以partition为单位,一个Kafka实例上可能有多达上百个partition,而一个RocketMQ实例上只有一个partition。这使得RocketMQ可以充分利用IO组的commit机制,批量传输数据,从而在replication时具有更好的性能。然而,Kafka的异步replication性能理论上低于RocketMQ的replication,因为同步replication与异步replication相比,性能上会有约20%-30%的损耗。
  1. 消息传递方式:
  • Kafka和RocketMQ在消息传递方式上也有所不同。Kafka采用Producer发送消息后,broker马上把消息投递给consumer,这种方式实时性较高,但会增加broker的负载。而RocketMQ基于Pull模式和Push模式的长轮询机制,来平衡Push和Pull模式各自的优缺点。RocketMQ的消息及时性较好,严格的消息顺序得到了保证。
  1. 其他特性:
  • Kafka在单机支持的队列数超过64个队列,而RocketMQ最高支持5万个队列。队列越多,可以支持的业务就越多。

五、kafka使用场景

  1. 实时数据流处理:Kafka可以处理大量的实时数据流,这些数据流可以来自不同的源,如用户行为、传感器数据、日志文件等。通过Kafka,可以将这些数据流进行实时的处理和分析,例如进行实时数据分析和告警。
  2. 消息队列:Kafka可以作为一个消息队列使用,用于在分布式系统中传递消息。它能够处理高吞吐量的消息,并保证消息的有序性和可靠性。
  3. 事件驱动架构:Kafka可以作为事件驱动架构的核心组件,将事件数据发布到不同的消费者,以便进行实时处理。这种架构可以简化应用程序的设计和开发,提高系统的可扩展性和灵活性。
  4. 数据管道:Kafka可以用于数据管道,将数据从一个系统传输到另一个系统。例如,可以将数据从数据库或日志文件传输到大数据平台或数据仓库。
  5. 业务事件通知:Kafka可以用于通知业务事件,例如订单状态变化、库存更新等。通过订阅Kafka主题,相关的应用程序和服务可以实时地接收到这些事件通知,并进行相应的处理。
  6. 流数据处理框架集成:Kafka可以与流处理框架集成,如Apache Flink、Apache Spark等。通过集成,可以将流数据从Kafka中实时导入到流处理框架中进行处理,实现流式计算和实时分析。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/算法研究专家/article/detail/61405
推荐阅读
相关标签
  

闽ICP备14008679号