当前位置:   article > 正文

Kafka基础概念_kafka buffer-memory

kafka buffer-memory

 

 

1. 楔子

公司项目开发用到了kafka,可是某个菜鸟不会,于是不得不进行的为期一周的晚自习去恶补相关知识。

但是恶补来知识很零散,所以趁这次周末写一篇博客,总结一下得失,顺便查缺补漏。

2. Kafka

2.1 Kafka是什么

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

2.2 kafka相关概念

Broker:即Kafka的服务器,用户存储消息,Kafa集群中的一台或多台服务器统称为 broker。

Producer:消息的生产者,是消息的产生的源头,负责生成消息并发送到Kafka服务器上。

Consumer:消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。

Group:消费者分组,用于归组同类消费者,在Kafka中,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)

Offset:消息存储在Kafka的Broker上,消费者拉取消息数据的过程中需要知道消息在文件中的偏移量,这个偏移量就是所谓的Offset。

Topic:主题,由用户定义并配置在Kafka服务器,用于建立生产者和消息者之间的订阅关系:生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。

Partition:消息分区,一个Topic下面会分为很多分区,例如:“kafka-test”这个Topic下可以分为6个分区,分别由两台服务器提供,那么通常可以配置为让每台服务器提供3个分区,假如服务器ID分别为0、1,则所有的分区为0-0、0-1、0-2和1-0、1-1、1-2。Topic物理上的分组,一个 topic可以分为多个 partition,每个 partition 是一个有序的队列。partition中的每条消息都会被分配一个有序的 id(offset)。

2.2 kafka的优缺点

优点:
高吞吐量
低延迟
分布式
消息代理能力
高并发
批处理能力(ETL之类的功能)
实时处理

缺点 

没有完整的监控工具集 
消息调整的问题
不支持使用通配符选择主题
缺乏一致性
性能降低 
表现笨拙

 

2.3 kafka和其他MQ对比
 

参考:https://blog.csdn.net/yunfeng482/article/details/72856762

 

2.4 kafka集成springboot

假设你已经安装好自己的kafka了

引入依赖和设置application.yml,版本号根据自己的kafka版本决定

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>
  1. kafka:
  2. bootstrap_servers_config: 192.168.75.128:9092
  3. retries_config: 0
  4. batch_size_config: 16384
  5. buffer_memory_config: 33554432
  6. topic: one,two,three
  7. group_id: asdf
  8. auto_offset_reset: earliest
  9. enable-auto-commit: true
  10. auto_commit_interval: 100
  11. consumeOneTopic: one
  12. consumeTwoTopic: two

 

编写kafka生产者配置类 KafkaConsumerConfig.java

  1. package com.example.demokafka.config;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.kafka.annotation.EnableKafka;
  8. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  9. import org.springframework.kafka.core.KafkaTemplate;
  10. import org.springframework.kafka.core.ProducerFactory;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. /**
  14. * className: KafkaConfig <br/>
  15. * packageName:com.example.demokafka.config <br/>
  16. * description: <br/>
  17. *
  18. * @author yuwen <br/>
  19. * @date: 2020-4-7 22:44 <br/>
  20. */
  21. @Configuration
  22. @EnableKafka
  23. public class KafkaProducerConfig {
  24. @Value("${kafka.bootstrap_servers_config}")
  25. private String bootstrap_servers_config;
  26. @Value("${kafka.retries_config}")
  27. private String retries_config;
  28. @Value("${kafka.batch_size_config}")
  29. private String batch_size_config;
  30. @Value("${kafka.buffer_memory_config}")
  31. private String buffer_memory_config;
  32. @Value("${kafka.group_id}")
  33. private String groupId;
  34. @Value("${kafka.auto_offset_reset}")
  35. private String autoOffsetReset;
  36. @Value("${kafka.enable-auto-commit}")
  37. private String enableAutoCommit;
  38. @Value("${kafka.auto_commit_interval}")
  39. private String autoCommitInterval;
  40. @Bean
  41. public Map<String, Object> producerConfigs() {
  42. HashMap<String, Object> configs = new HashMap<>();
  43. // 生产者
  44. configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers_config);
  45. configs.put(ProducerConfig.RETRIES_CONFIG, retries_config);
  46. configs.put(ProducerConfig.BATCH_SIZE_CONFIG, batch_size_config);
  47. configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, buffer_memory_config);
  48. configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  49. configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  50. return configs;
  51. }
  52. @Bean
  53. public ProducerFactory<String, String> producerFactory() {
  54. return new DefaultKafkaProducerFactory<>(producerConfigs());
  55. }
  56. @Bean
  57. public KafkaTemplate<String, String> kafkaTemplate() {
  58. //注意这个 不要把auto flush设置成true,会非常低效率
  59. KafkaTemplate<String, String> stringStringKafkaTemplate = new KafkaTemplate<>(producerFactory());
  60. return stringStringKafkaTemplate;
  61. }
  62. }

 

编写消费者配置类 KafkaConsumerConfig.java

  1. package com.example.demokafka.config;
  2. import org.apache.kafka.clients.consumer.Consumer;
  3. import org.apache.kafka.clients.consumer.ConsumerConfig;
  4. import org.apache.kafka.clients.producer.ProducerConfig;
  5. import org.apache.kafka.common.serialization.StringDeserializer;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.kafka.annotation.EnableKafka;
  10. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  11. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  12. import org.springframework.kafka.core.*;
  13. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  14. import java.util.HashMap;
  15. import java.util.Map;
  16. /**
  17. * className: KafkaConfig <br/>
  18. * packageName:com.example.demokafka.config <br/>
  19. * description: <br/>
  20. *
  21. * @author yuwen <br/>
  22. * @date: 2020-4-7 22:44 <br/>
  23. */
  24. @Configuration
  25. @EnableKafka
  26. public class KafkaConsumerConfig {
  27. @Value("${kafka.bootstrap_servers_config}")
  28. private String bootstrap_servers_config;
  29. @Value("${kafka.retries_config}")
  30. private String retries_config;
  31. @Value("${kafka.batch_size_config}")
  32. private String batch_size_config;
  33. @Value("${kafka.buffer_memory_config}")
  34. private String buffer_memory_config;
  35. @Value("${kafka.group_id}")
  36. private String groupId;
  37. @Value("${kafka.auto_offset_reset}")
  38. private String autoOffsetReset;
  39. @Value("${kafka.enable-auto-commit}")
  40. private String enableAutoCommit;
  41. @Value("${kafka.auto_commit_interval}")
  42. private String autoCommitInterval;
  43. @Bean
  44. public Map<String, Object> consumerConfigs() {
  45. HashMap<String, Object> configs = new HashMap<>();
  46. //消费者
  47. configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers_config);
  48. configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  49. configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
  50. configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  51. configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  52. configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  53. return configs;
  54. }
  55. @Bean
  56. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  57. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  58. factory.setConsumerFactory(consumerFactory());
  59. factory.getContainerProperties().setPollTimeout(1000);
  60. return factory;
  61. }
  62. @Bean
  63. public Consumer<String, String> consumer() {
  64. return consumerFactory().createConsumer();
  65. }
  66. public ConsumerFactory<String, String> consumerFactory() {
  67. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  68. }
  69. }

编写kafka的消息发送者  KafkaSender.java

  1. package com.example.demokafka.producer;
  2. import com.alibaba.fastjson.JSON;
  3. import com.example.demokafka.entity.Message;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. import org.springframework.stereotype.Component;
  8. import java.util.Date;
  9. import java.util.UUID;
  10. import java.util.concurrent.Executors;
  11. /**
  12. * className: KafkaSender <br/>
  13. * packageName:com.example.demokafka.producer <br/>
  14. * description: <br/>
  15. *
  16. * @author yuwen <br/>
  17. * @date: 2020-4-7 22:45 <br/>
  18. */
  19. @Component
  20. @Slf4j
  21. public class KafkaSender {
  22. @Autowired
  23. private KafkaTemplate<String, String> kafkaTemplate;
  24. //发送消息方法
  25. public void send(String topic) {
  26. Executors.newSingleThreadExecutor().submit(() -> {
  27. while (true) {
  28. Message message = new Message();
  29. message.setId(System.currentTimeMillis());
  30. message.setMsg(UUID.randomUUID().toString());
  31. kafkaTemplate.send(topic, JSON.toJSONString(message));
  32. try {
  33. Thread.sleep(1000);
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. log.info("send success");
  38. }
  39. });
  40. }
  41. }

 

接下类编写消费者的代码,

1.新建一个消费者父类 MsgConsume,所有的消费者需要实现这个父类的抽象方法

  1. package com.example.demokafka.consumer.base;
  2. import lombok.Data;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * className: MsgConsumer <br/>
  7. * packageName:com.example.demokafka.consumer.base <br/>
  8. * description: <br/>
  9. *
  10. * @author yuwen <br/>
  11. * @date: 2020-4-8 22:21 <br/>
  12. */
  13. @Component
  14. @Data
  15. public abstract class MsgConsumer {
  16. public abstract String getTopic();
  17. public abstract void doWork(ConsumerRecord consumerRecord);
  18. }

2.编写一个消费者去继承他

  1. package com.example.demokafka.consumer;
  2. import com.example.demokafka.consumer.base.MsgConsumer;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * className: ConsumeOne <br/>
  8. * packageName:com.example.demokafka.consumer <br/>
  9. * description: <br/>
  10. *
  11. * @author yuwen <br/>
  12. * @date: 2020-4-25 13:38 <br/>
  13. */
  14. @Component
  15. public class ConsumeOne extends MsgConsumer {
  16. @Value("${kafka.consumeOneTopic}")
  17. private String topic;
  18. @Override
  19. public String getTopic() {
  20. return topic;
  21. }
  22. @Override
  23. public void doWork(ConsumerRecord consumerRecord) {
  24. System.out.println("it is one");
  25. }
  26. }

3.然后新建一个消费者group,这个group中有很多消费者,消费到消息时,group循环消费者list,每个消费者消费自己topic下的消息

  1. package com.example.demokafka.consumer.base;
  2. import lombok.Data;
  3. import org.springframework.stereotype.Component;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. /**
  7. * className: ConsumerGroup <br/>
  8. * packageName:com.example.demokafka.consumer.base <br/>
  9. * description: <br/>
  10. *
  11. * @author yuwen <br/>
  12. * @date: 2020-4-25 13:35 <br/>
  13. */
  14. @Component
  15. @Data
  16. public class ConsumerGroup {
  17. private List<MsgConsumer> consumerList= new ArrayList<>();
  18. }

4.以上好了以后,建立 KafkaReceive.java 用于消费消息

  1. package com.example.demokafka.consumer;
  2. import com.example.demokafka.consumer.base.ConsumerGroup;
  3. import com.example.demokafka.consumer.base.MsgConsumer;
  4. import org.apache.kafka.clients.consumer.Consumer;
  5. import org.apache.kafka.clients.consumer.ConsumerRecord;
  6. import org.apache.kafka.clients.consumer.ConsumerRecords;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.stereotype.Component;
  10. import java.time.Duration;
  11. import java.util.Arrays;
  12. import java.util.concurrent.Executors;
  13. /**
  14. * className: KafkaConsumer <br/>
  15. * packageName:com.example.demokafka.consumer <br/>
  16. * description: <br/>
  17. *
  18. * @author yuwen <br/>
  19. * @date: 2020-4-8 22:14 <br/>
  20. */
  21. @Component
  22. public class KafkaReceive {
  23. @Autowired
  24. private Consumer consumer;
  25. @Value("${kafka.topic}")
  26. private String topic;
  27. public void consumer(ConsumerGroup consumerGroup) {
  28. Executors.newSingleThreadExecutor().submit(() -> {
  29. try {
  30. // 订阅主题
  31. consumer.subscribe(Arrays.asList(topic.split(",")));
  32. while (true) {
  33. ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
  34. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  35. for (MsgConsumer msgConsumer : consumerGroup.getConsumerList()) {
  36. if (msgConsumer.getTopic().contains(consumerRecord.topic())) {
  37. msgConsumer.doWork(consumerRecord);
  38. }
  39. }
  40. }
  41. }
  42. } catch (Exception e) {
  43. e.printStackTrace();
  44. }
  45. });
  46. }
  47. }

5.最后在项目启动时候,加上 kafka 的消费任务

  1. package com.example.demokafka.start;
  2. import com.example.demokafka.consumer.ConsumeOne;
  3. import com.example.demokafka.consumer.ConsumeTwo;
  4. import com.example.demokafka.consumer.KafkaReceive;
  5. import com.example.demokafka.consumer.base.ConsumerGroup;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.CommandLineRunner;
  8. import org.springframework.context.annotation.Configuration;
  9. /**
  10. * className: KafkaTask <br/>
  11. * packageName:com.example.demokafka.start <br/>
  12. * description: <br/>
  13. *
  14. * @author yuwen <br/>
  15. * @date: 2020-4-25 19:10 <br/>
  16. */
  17. @Configuration
  18. public class KafkaTask implements CommandLineRunner{
  19. @Autowired
  20. private ConsumerGroup consumerGroup;
  21. @Autowired
  22. private KafkaReceive kafkaReceive;
  23. @Autowired
  24. private ConsumeOne consumeOne;
  25. @Autowired
  26. private ConsumeTwo consumeTwo;
  27. @Override
  28. public void run(String... args) throws Exception {
  29. startConsume();
  30. }
  31. /**
  32. * kafka消费
  33. */
  34. public void startConsume(){
  35. consumerGroup.getConsumerList().add(consumeOne);
  36. consumerGroup.getConsumerList().add(consumeTwo);
  37. kafkaReceive.consumer(consumerGroup);
  38. }
  39. }

ok,到此为止基本上就结束了,编写一个接口去测试一下是否可以消费,

  1. package com.example.demokafka.controller;
  2. import com.example.demokafka.producer.KafkaSender;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RequestParam;
  6. import org.springframework.web.bind.annotation.RestController;
  7. /**
  8. * className: KafkaController <br/>
  9. * packageName:com.example.demokafka.controller <br/>
  10. * description: <br/>
  11. *
  12. * @author yuwen <br/>
  13. * @date: 2020-4-7 22:51 <br/>
  14. */
  15. @RestController
  16. public class KafkaController {
  17. @Autowired
  18. private KafkaSender kafkaSender;
  19. @GetMapping("/send")
  20. public String send(@RequestParam(name = "topic") String topic) {
  21. kafkaSender.send(topic);
  22. return "SUCCESS";
  23. }
  24. }

 

浏览器发送http请求:http://localhost:8080/send?topic=one

控制台打印信息如下:

 

 

 

除了上面的那种消费方法,还可以使用注解监听的方式消费消息,如下 KafkaListen.java

  1. package com.example.demokafka.consumer;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.stereotype.Component;
  6. import java.util.Optional;
  7. /**
  8. * className: KafkaReceiver <br/>
  9. * packageName:com.example.demokafka.consumer <br/>
  10. * description: <br/>
  11. *
  12. * @author yuwen <br/>
  13. * @date: 2020-4-7 22:48 <br/>
  14. */
  15. @Component
  16. @Slf4j
  17. public class KafkaListen {
  18. @KafkaListener(topics = {"yuwen"})
  19. public void listen(ConsumerRecord<?, ?> record) {
  20. Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  21. if (kafkaMessage.isPresent()) {
  22. Object message = kafkaMessage.get();
  23. log.info("----------------- record =" + record);
  24. log.info("------------------ message =" + message);
  25. }
  26. }
  27. }

 

3. 最后

kafka初步的学习已经达成目标,处于会用阶段,至于kafka原理什么的,这些需要深入了解才能写出博客,不然就是误己误人。因此在这里就暂告一段落吧。

写了一篇博客,算是一个小小的学习产物吧,而且以后再次回顾kakfa也非常的方便。

 

end

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/1000638
推荐阅读
相关标签
  

闽ICP备14008679号