赞
踩
假设我们有两个服务:生产者A每秒能生产200个消息,消费者B每秒能消费100个消息。
那么B服务是处理不了A这么多消息的,那么怎么使B不被压垮的同时还能处理A的消息呢,我们引入一个中间件,即Kafka
。(当然着并不能使消费者的处理速度上升)
那么我们可以在B服务中加入一个队列,也就是一个链表,链表的每个节点相当于一条消息,每个节点有一个序号即offset
,记录消息的位置。
但是这样也会有个问题,还没有处理的消息是存储在内存中的,如果B服务挂掉,那么消息也就丢失了。
所以我们可以把队列移出,变成一个单独的进程,即使B服务挂掉,消息也不会丢失。
B服务由于性能差,队列中未处理的消息会越来越多,我们可以增加更多的消费者来处理消息,相对的也可以增加更多的生产者来生成消息。
但是,生产者与消费者会争抢同一个队列,没有抢到就要等待,那么怎么解决呢?
我们可以将消息进行分类,每一类消息是一个topic
,生产者按消息的类型投递到不同的topic
中,消费者也按照不同的topic进行消费。
但是单个topic的消息还是有可能过多,我们可以将单个队列拆分,每段是一个partition
分区,每个消费者负责一个partition
。
随着partition
过多,所有的partition
都在同一个机器上,就可能会导致单机的cpu和内存过高,影响性能,那么我们可以使用多台机器,将partition
分散部署在不同的机器上。每台机器就代表一个broker
。
我们可以增加broker
来缓解服务器的cpu过高的性能问题。
假如某个broker
挂了, 那么其中partition
中的消息也就都丢失了,那么这个问题怎么解决呢?
我们可以给partition
多加几个副本,统称replicas
,并将它们分为leader
和follower
。
leader
负责生产者和消费者的读写,follower
只负责同步leader
的数据。假如leader
挂了,也不会影响follower
,随后在follower
中选出一个leader
,保证消息队列的高可用。
在上面讲述了leader
挂掉的情况,如果所有的broker
都挂了,消息不就都丢失了?
为了解决这个问题,就不能只把数据存在内存中,也要存在磁盘中。
但是如果所有消息一直保存在磁盘中,那磁盘也会被占满,所以引入保留策略。
如果我想在原有的基础上增加一个消费者,那么它只能跟着最新的offset
接着消费,如果我想从某个offset
开始消费呢?
我们引入消费者组,实现不同消费者维护自己的消费进度。
上面介绍了很多的组件,每个组件都有自己的状态信息,那么就需要有一个组件去统一维护这些组件的状态信息,于是引入了Zookeeper
组件,它会定期与broker
通信,获取Kafka
集群的状态,判断哪些broker
挂了,消费者组消费到哪了等等。
1、官网地址
2、下载
选择稳定版本下载
3、解压,修改配置文件
解压后,复制 zoo_sample.cfg,重命名为 zoo.cfg
修改数据文件目录位置
4、启动
我们是在windows系统下安装的,运行 bin 目录下的 zkServer.cmd
1、官网地址
2、下载
3、解压,修改配置文件
修改 config 目录下 server.properties 文件
修改日志文件位置,其他参数(如zookeeper端口,根据需要修改)
4、启动
bin\windows\kafka-server-start.bat config\server.properties
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
public static void main(String[] args) throws InterruptedException { Properties prop = new Properties(); prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); prop.put(ProducerConfig.ACKS_CONFIG, "all"); prop.put(ProducerConfig.RETRIES_CONFIG, 0); prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); prop.put(ProducerConfig.LINGER_MS_CONFIG, 1); prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); String topic = "hello"; KafkaProducer<String, String> producer = new KafkaProducer<>(prop); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<String, String>(topic, Integer.toString(2), "hello kafka" + i)); System.out.println("生产消息:" + i); Thread.sleep(1000); } producer.close(); }
public static void main(String[] args) { Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); prop.put(ConsumerConfig.GROUP_ID_CONFIG, "con-1"); // 消费者组 prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); //自动提交偏移量 prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); //自动提交时间 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop); ArrayList<String> topics = new ArrayList<>(); //可以订阅多个消息 topics.add("hello"); consumer.subscribe(topics); try { while(true) { ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(10)); for (TopicPartition topicPartition : poll.partitions()) { // 通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息 List<ConsumerRecord<String, String>> partitionRecords = poll.records(topicPartition); // 获取TopicPartition对应的主题名称 String topic = topicPartition.topic(); // 获取TopicPartition对应的分区位置 int partition = topicPartition.partition(); // 获取当前TopicPartition下的消息条数 int size = partitionRecords.size(); System.out.printf("--- 获取topic: %s, 分区位置:%s, 消息总数: %s%n", topic, partition, size); for(int i = 0; i < size; i++) { ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i); // 实际的数据内容 String key = consumerRecord.key(); // 实际的数据内容 String value = consumerRecord.value(); // 当前获取的消息偏移量 long offset = consumerRecord.offset(); // 表示下一次从什么位置(offset)拉取消息 long commitOffser = offset + 1; System.out.printf("消费消息 key:%s, value:%s, 消息offset: %s, 提交offset: %s%n", key, value, offset, commitOffser); Thread.sleep(1500); } } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } }
生产消息
消费消息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。