当前位置:   article > 正文

kafka的基础配置+简单使用_kafka 配置

kafka 配置

KAFKA

Kafka 是一种高性能、分布式、可横向扩展的消息队列系统,主要用于解决大规模数据处理中的数据传输和存储问题。

Kafka 的主要作用包括:

  1. 数据传输:在分布式系统中,各个组件之间需要互相通信并传递数据,Kafka 作为中间件可以方便地将生产者产生的消息传递给消费者进行处理。

  2. 数据存储:Kafka 可以把生产者产生的消息以队列的形式持久化到磁盘上,并支持数据副本和数据备份等功能,确保数据的可靠性和可恢复性。

  3. 实时流处理:Kafka 还可以与流处理框架结合使用,在数据流处理中扮演缓存和消息分发的角色,支持实时计算、实时监控等场景。

  4. 数据订阅和分发:Kafka 支持发布-订阅模式,消费者可以通过订阅感兴趣的消息主题来获取生产者产生的消息,并且支持多个消费者对同一个主题进行并行消费。

总之,Kafka 在大数据场景中有着广泛的应用,可以解决数据传输、数据存储、实时流处理等问题,是构建可靠、高效、弹性的分布式系统的重要组件之一。

kafka的依赖 

  1. <!--SpringBoot Kafka-->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. </dependency>

配置

  1. spring:
  2. kafka:
  3. bootstrap-servers: ruowu.com:9092 #这个是kafka的<IP地址:端口号>
  4. producer: #生产者配置
  5. key-serializer: org.apache.kafka.common.serialization.StringSerializer #配置key的序列化器 默认值
  6. value-serializer: org.apache.kafka.common.serialization.StringSerializer #配置value的序列化器 默认值
  7. retries: 10 #消息发送失败的时候重试的次数 默认值为0
  8. compression-type: lz4 #指定消息的压缩类型 默认不压缩
  9. acks: all #指定producer(生产者)等待broker(消息代理节点)的确认策略 默认值为1 可选值只有01、all
  10. consumer: #消费者配置
  11. group-id: ${spring.application.name}-test #指定消费者所属的GroupID(分组ID) 默认值为空字符串
  12. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #key的反序列化器 默认值
  13. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #value的反序列化器 默认值
  14. enable-auto-commit: false #是否开启自动提交消费位移 默认值为true
  15. auto-commit-interval: 100 #控制自动提交消费位移的时间间隔 默认值为5000 单位ms

使用方式

  1. @Component
  2. @Slf4j
  3. public class KafkaProducer {
  4. @Autowired
  5. private KafkaTemplate<String,String> kafkaTemplate;
  6. public void send(){
  7. ArrayList<String> list = new ArrayList<>();
  8. list.add("第一个");
  9. list.add("第二个");
  10. list.add("第三个");
  11. kafkaTemplate.send("consumer", JSONObject.toJSONString(list));
  12. }
  13. }

​​​​​​​

  1. @Component
  2. @Slf4j
  3. public class KafkaPerson implements ApplicationRunner {
  4. @KafkaListener(topics = {"consumer"})
  5. public void deploy(ConsumerRecord<String,String> record){
  6. //解析record
  7. String value = record.value();
  8. log.info("kafka接收到值了,开始消费了:{}",value);
  9. List<String> list = JSONObject.parseArray(value, String.class);
  10. //在这里对传过来的数据进行操作
  11. log.info("现在是结束消费了");
  12. }
  13. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/428584
推荐阅读
相关标签
  

闽ICP备14008679号