赞
踩
目录
九、Kafka集群中的controller、rebalance、HW
消息队列(Message Queue,简称MQ),具体地说,是为了解决通信问题。
同步:顺序执行,存在性能和稳定性的问题;
问题一:系统开销大,响应时间长;
问题二:执行过程中需要保证每个服务的顺利执行,用户体验较差;
异步:通过消息队列,进行异步处理;
优势一:极大提高系统的吞吐量;
优势二:即使执行失败,也可以使用分布式事务来保证最终是成功的(最终一致性);
主流MQ分为以下几个:
KafKa:目前性能最好、速度最快;
RocketMQ:阿里根据Kafka封装,功能性更强;
RabbitMQ:功能性强,模式多;
ZeroMQ:看重的是MQ的通信能力,基于Socket封装。
功能性区分:
有Broker:通常有一台服务器作为Broker,所有消息都通过它中转。生产者将消息发送给Broker,Broker则把消息主动推送给消费者。
重Topic:Kafka、JMS(ActiveMQ),在消息队列中,Topic必须存在。
轻Topic:RabbitMQ(AMQP),Topic只是其中的一种中转模式。
无Broker:ZeroMQ,认为MQ是一种更高级的Socket,是为了解决通信问题。故ZeroMQ被设计为了一个库,而不是中间件,ZeroMQ做的事情就是封装出了一套类似Socket的API去完成发送、读取消息。
官网地址:Apache Kafka
依赖准备:
安装JDK:jdk1.8 +;
安装Zookeeper:自己安装,或者使用Kafka安装包内自带的也可以;
Kafka安装:
官网下载:当前版本为:kafka_2.13-2.8.0.tgz;
解压缩:tar -xzvf kafka_2.13-2.8.0.tgz
;
目录说明:
bin:二进制启动文件;
config:相关配置文件;
libs:依赖的jar包;
licenses:许可证文件;
logs:某些日志文件;
site-docs:压缩文档参考;
修改配置文件(config目录内):
zookeeper.properties
:修改日志目录,改为自定义路径;
server.properties
:
- # 单节点使用默认值,集群修改为唯一id
- broker.id=0
- # 监听的kafka服务ip地址
- listeners=PLAINTEXT://localhost:9092
- # 消息存储日志文件
- log.dirs=D://IT/Kafka/tmp/kafka-logs
- # 连接的zk服务器ip地址
- zookeeper.connect=localhost:2181
启动测试(win10启动):
启动zk服务器:创建脚本启动,命名为 zookeeper.bat
,并运行;
start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties"
启动Kafka服务器:创建脚本启动,命名为 kafka.bat
,并运行;
start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server.properties"
校验是否启动成功:进入zk-Cli,查看是否有kafka的 broker.id
节点;
ls /brokers/ids
Kafka基本概念:
名称 | 解释 |
---|---|
Broker | 消息中间件处理节点,一个Kafka节点就是一个Broker,一个或多个组成一个Kafka集群 |
Topic | Kafka根据Topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个Topic |
Producer | 消息生产者,向Broker发送消息的客户端 |
Consumer | 消息消费者,从Broker读取消息的客户端 |
创建Topic:
创建Topic:
./kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo1
修改Topic:
- # 修改分区数
- ./kafka-topics.bat --zookeeper localhost:2181 -alter --partitions 3 --topic demo1
查询所有的Topic:
./kafka-topics.bat --list --zookeeper localhost:2181
查询具体某个Topic详细信息:
./kafka-topics.bat --zookeeper localhost:2181 --topic demo1 --describe
发送消息:
使用Kafka自带的生产者命令客户端,既可以从本地文件读取内容,也可以在命令行直接输入消息。默认情况下,每一行都是一条独立的消息。发送消息时,需要指定发送到具体哪个Kafka服务器和Topic名称;
./kafka-console-producer.bat --broker-list localhost:9092 --topic demo1
消费消息:
从头开始消费:消费全部消息。
./kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic demo1
从最新的消息开始消费(默认):从最后一条消息的偏移量 + 1开始消费。
./kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo1
注意:
消息可被存储,且是顺序结构,通过偏移量offset来描述消息的有序性;
消息消费时可以指定偏移量进行描述消费的消息位置;
消费组:多个消费者可以组成一个消费组。
查询所有的消费组:
./kafka-consumer-groups.bat --bootstrap-server localhost:9092 --list
查询某个消费组中具体信息:
./kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group demoGroup1
current-offset:当前消费组已消费偏移量;
log-end-offset:消息总量(最后一条消息的偏移量);
lag:积压消息总量;
单播消息:一个生产者,一个消费组(Group)。同一消费组中,只有一个消费者能收到Topic中的消息。
消费组:
消费者一:
./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup --topic demo1
消费者二:
./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup --topic demo1
多播消息:一个生产者,多个消费组(Group)。每个消费组中,只有一个消费者能收到Topic中的消息。
消费组一 demoGroup1
:
./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup1 --topic demo1
消费组二 demoGroup2
:
./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup2 --topic demo1
主题Topic:
topic是一个逻辑的概念,Kafka通过Topic将消息进行分类,不同的topic将被订阅的消费组进行消费。
当topic中的消息非常多,由于消息会保存在日志中,导致内存占用过大,由此提出分区Partition的概念。
分区Partition:
通过partition将一个topic中的消息进行分区存储。好处如下:
分区存储,可以解决存储问题过大的问题;
提升了消息日志读写的吞吐量,读和写可以在多个分区中同时进行;
消息日志文件分析(kafka-logs):
0000000000.log
:文件中保存的就是消息内容;
__consumer_offsets-x
文件夹:Kafka内部默认会创建50个 __consumer_offsets-x
分区(0-49),目的是为了存放消费者消费某个主题Topic的偏移量。当消费者消费完成后就会把偏移量上报给对应的主题Topic进行保存。目的是为了提升当前的Topic的并发性。
消费完成提交至对应分区的计算方式:hash(消费组ID) % __consumer_offsets分区总数;
提交到主题内容的 Key
的方式:消费组ID + topic名称 + __consumer_offsets分区号;
提交到主题内容的 Value
的方式:当前offset的值;
0000000000.index
:日志文件的索引;
0000000000.timeindex
:日志文件按时间点的索引;
日志文件,默认保存周期为七天,到期后自动删除。
集群搭建(三个Broker):
创建三个 server.properties
文件:
第一个 server0.properties
:
- broker.id=0
- listeners=PLAINTEXT://localhost:9092
- log.dirs=D://IT/Kafka/tmp/kafka-logs-0
- zookeeper.connect=localhost:2181
第二个 server1.properties
:
- broker.id=1
- listeners=PLAINTEXT://localhost:9093
- log.dirs=D://IT/Kafka/tmp/kafka-logs-1
- zookeeper.connect=localhost:2181
第三个 server2.properties
:
- broker.id=2
- listeners=PLAINTEXT://localhost:9094
- log.dirs=D://IT/Kafka/tmp/kafka-logs-2
- zookeeper.connect=localhost:2181
启动三个Broker:
- start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server0.properties"
-
- start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server1.properties"
-
- start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server2.properties"
使用zkCli客户端检测是否启动成功:
ls /brokers/ids # 启动成功显示三个broker的id
副本(replication-factor)的概念:
副本:是为主题中的分区创建的备份,一般来说,有几个Broker就应该创建几个副本。其中,会选举出一个副本作为 leader
,其他的则是 follower
。
leader:Kafka的读写操作,都发生在leader上。leader负责把数据同步至其他的follower,当leader宕机时,将会进行主从选举,选出一个新的leader。
follower:介绍leader的同步数据;
isr:可以同步和已同步的节点会存入 ISR 集合中,主从选举从 ISR 集合内选出leader。当节点的性能较差时,ISR 集合将会踢出该节点。
集群消费问题:
向集群发送消息:
./kafka-console-producer.bat --broker-list localhost:9092,localhost:9093,localhost:9094 --topic demo1
从集群中消费消息:
./kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --from-beginning --consumer-property group.id=demoGroup --topic demo1
分区消费的细节:
一个分区partition只能被一个消费组中的一个消费者消费,目的是为了保证消费的顺序性。多个分区partition的多个消费者的顺序性无法保证(后续有方法可以保证)。
分区partition的数量决定了消费组中的消费者数量,建议消费组中的消费者数量不要超过分区partition的数量,防止多出的消费者消费不到消息造成资源的浪费。
消费者宕机时,将会触发 rebalance 机制,选出其他消费者进行消费该分区的消息。
引入依赖(版本号与Kafka版本一致):
- <!--kafka-clients-->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.8.0</version>
- </dependency>
简单测试:
创建一个Topic:
./kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-test-topic
启动三台Broker作为集群
代码测试:
- public class ProducerTest {
-
- private static final String TOPIC_NAME = "my-test-topic";
- private static final String BOOTSTRAP_SERVER = "localhost:9092,localhost:9093,localhost:9094";
- private static final String KEY = "test:";
- private static final String VALUE = "This is a test for Kafka-producer!";
-
- private static final Properties prop = new Properties();
-
- static {
- // Kafka集群IP
- prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
- // ACK:消息持久化配置(-1、0、1)
- prop.put(ProducerConfig.ACKS_CONFIG, "1");
- // 重试次数配置
- prop.put(ProducerConfig.RETRIES_CONFIG, "3");
- // 重试间隔配置,单位:ms
- prop.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");
-
- // 缓冲区大小,32Mb
- prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024 * 1024 * 32);
- // 发送消息大小,16Kb
- prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 16);
- // 未能拉取到16Kb数据,间隔 10ms 后,立即发送
- prop.put(ProducerConfig.LINGER_MS_CONFIG, 10);
-
- // key的序列化方式
- prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- // value的序列化方式
- prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- }
-
- @Test
- public void test1() throws Exception {
- // 1.创建消息生产者对象
- Producer<String, String> producer = new KafkaProducer<>(prop);
-
- for (int i = 0; i < 6; i++) {
- // 2.封装消息载体对象
- // 分区计算方式:hash(key) % partitionNum,即:key的哈希值 取余 分区的总数
- ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, KEY + UUID.randomUUID(), VALUE);
- // 自定义发送的分区位置
- // ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 2, KEY + UUID.randomUUID(), VALUE);
-
- // 3.同步发送消息
- Future<RecordMetadata> future = producer.send(record);
-
- // 4.获取消息发送成功后的元数据
- RecordMetadata metadata = future.get();
- System.out.println("主题Topic名称:" + metadata.topic());
- System.out.println("保存partition分区的位置:" + metadata.partition());
- System.out.println("总消息数(offset偏移量):" + metadata.offset());
- }
- producer.close();
- }
-
- @Test
- public void test2() {
- // 1.创建消息生产者对象
- Producer<String, String> producer = new KafkaProducer<>(prop);
-
- // 2.封装消息载体对象
- // 分区计算方式:hash(key) % partitionNum,即:key的哈希值 取余 分区的总数
- ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, KEY + UUID.randomUUID(), VALUE);
- // 自定义发送的分区位置
- // ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 2, KEY + UUID.randomUUID(), VALUE);
-
- // 3.异步发送消息
- producer.send(record, (data, exception) -> {
- // 4.消息发送异常,要做的事
- if (exception != null) {
- System.out.println("发送失败,异常原因:" + exception.getMessage());
- }
-
- // 5.消息发送成功要做的事
- if (data != null) {
- System.out.println("主题Topic名称:" + data.topic());
- System.out.println("保存partition分区的位置:" + data.partition());
- System.out.println("总消息数(offset偏移量):" + data.offset());
- }
- });
-
- // 延时等待,异步消息进程执行结束
- ThreadUtils.sleep(1000);
- producer.close();
- }
-
- }
同步发送消息:如果发送消息后没有收到 ack,生产者将阻塞等待 3 秒,之后会进行重试,重试三次后仍然失败,则抛出异常。执行慢,但是不会丢失消息;
异步发送消息:生产者发送完成后就可以执行后续业务,broker收到消息后,进行异步调用生产者提供的 callback 回调方法。当网络异常时可能出现回调方法未执行的问题,即消息丢失;
消息持久化机制参数(ACK配置,同步发送时用到 ACK 配置)
acks=0
:发送消息后无需等待broker进行确认,就能发送下一条。性能最高,消息最容易丢失;
acks=1
(默认值):至少需要等待 leader 成功写入本地日志,才能发送下一条。如果此时 leader 宕机,则会发生消息丢失;
acks=-1/all
:需要等待多个节点写入日志(在 min.insync.replicas
中进行设置,默认值为1,推荐大于等于 2),才能发送下一条。只要有一个备份存活就不会丢失消息。金融级别的常用配置,性能最差,安全性最高;
其他配置:
重试次数配置
重试间隔配置
配置相关代码:
- // ACK:消息持久化配置(-1、0、1)
- prop.put(ProducerConfig.ACKS_CONFIG, "1");
- // 重试次数配置
- prop.put(ProducerConfig.RETRIES_CONFIG, "3");
- // 重试间隔配置,单位:ms
- prop.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");
消息缓冲区配置:
Kafka会创建一个消息缓冲区,存放要发送的消息,缓冲区大小为 32Mb;
- // 缓冲区大小,32Mb
- prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024 * 1024 * 32);
Kafka本地线程会去缓冲区拉取数据,发送至 Broker,一次拉取 16Kb;
- // 发送消息大小 16Kb
- prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 16);
如果线程拉取不到 16Kb 的数据,间隔 10ms 后,也会将拉取的数据发送至Broker;
- // 未能拉取到16Kb数据,间隔 10ms 后,立即发送
- prop.put(ProducerConfig.LINGER_MS_CONFIG, 10);
引入依赖(版本号与Kafka版本一致):
- <!--kafka-clients-->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.8.0</version>
- </dependency>
简单测试:
- public class ConsumerTest {
-
- private static final String TOPIC_NAME = "my-test-topic";
- private static final String BOOTSTRAP_SERVER = "localhost:9092,localhost:9093,localhost:9094";
- private static final String CONSUMER_GROUP_NAME = "test-group-0";
-
- private static final Properties prop = new Properties();
-
- static {
- // Kafka集群IP
- prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
- // 消费组的名称
- prop.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
- // key的反序列化方式
- prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- // value的反序列化方式
- prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- }
-
- @Test
- public void test1() {
- // 1.创建消费者对象
- Consumer<String, String> consumer = new KafkaConsumer<>(prop);
-
- // 2.订阅Topic主题的消息列表
- consumer.subscribe(Collections.singletonList(TOPIC_NAME));
-
- // 3.长轮询进行监听消息
- while (true) {
- // 4.拉取消息列表
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
-
- // 5.循环处理每一条消息
- for (ConsumerRecord<String, String> record : records) {
- System.out.println("主题Topic名称:" + record.topic());
- System.out.println("分区partition的位置:" + record.partition());
- System.out.println("消息offset偏移量:" + record.offset());
- System.out.println("消息的键key:" + record.key());
- System.out.println("消息的值value:" + record.value());
- }
- }
-
- }
-
- }
消费者提交 offset:
提交的内容:所属的消费组 + 消费的 Topic + 消费的 partition + 偏移量 offset;
自动提交:消息 poll 之后,消费消息之前进行 offset 提交。当消费者宕机时,会发生消息丢失问题;
- // 是否开启自动提交 offset,默认为 true
- prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- // 自动提交 间隔时间
- prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
手动提交:消息进行消费后,手动提交 offset;
- // 是否开启自动提交 offset,默认为 true
- prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
同步提交(更推荐使用):
- // 拉取消息列表
- Duration delay = Duration.ofMillis(1000);
- ConsumerRecords<String, String> records = consumer.poll(delay);
- if (records.count() > 0) {
- // 6.手动同步提交,等待Broker返回ACK才会执行后续业务,否则阻塞等待
- try {
- consumer.commitAsync();
- } catch (Exception e) {
- // 提交失败执行的逻辑
- e.printStackTrace();
- }
- }
异步提交:
- // 拉取消息列表
- Duration delay = Duration.ofMillis(1000);
- ConsumerRecords<String, String> records = consumer.poll(delay);
- if (records.count() > 0) {
- // 6.手动异步提交,无需返回ACK,提交成功后异步调用回调方法
- consumer.commitAsync((offset, exception) -> {
- // 提交失败执行的逻辑
- if (exception != null) {
- System.out.println("手动提交异常:" + exception.getMessage());
- }
-
- // 提交成功执行的逻辑
- if (CollUtil.isNotEmpty(offset)) {
- offset.entrySet().forEach(System.out::println);
- }
- });
- }
消费者长轮询 poll 的配置:
单次拉取条数:默认500条;
- // 单次拉取的消息条数
- prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
单次轮询的时间:poll方法的入参,此处为 1000 毫秒;
- Duration delay = Duration.ofMillis(1000);
- ConsumerRecords<String, String> records = consumer.poll(delay);
poll的机制:在轮询内时,重复拉取消息,直至拉取 500 条消息,或超过轮询时长停止;
当首次拉取消息,够了 500 条,则停止拉取,执行后续业务逻辑;
首次没有拉取到 500 条,进行重复拉取,直至 拉取够,或超过轮询时长停止;
若多次拉取都不够 500 条,且超过了轮询时长,则停止拉取,执行后续业务逻辑;
当前后两次轮询的间隔时间超过了 30 秒,集群将判定此消费者能力弱,并踢出消费组,并处罚 rebalance
机制, rebalance
机制会造成性能开销。可以修改如下配置,提升消费者的速度;
- // 单次拉取的消息条数,默认500
- prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
- // 单次长轮询的间隔时间,默认 1000 ms
- prop.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5 * 1000);
消费者的健康状态检测配置:
- // 消费者发送心跳间隔时长,心跳频率
- prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
- // 心跳超时将触发 rebalance机制,并踢出消费组 的超时时长
- prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
消费组指定partition分区进行消费:
- // 参数一,Topic名称。参数二,分区位置
- TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
- consumer.assign(Collections.singletonList(topicPartition));
消费者的消息回溯:指定分区位置,并从头开始消费;
- // 参数一,Topic名称。参数二,分区位置
- TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
- consumer.assign(Collections.singletonList(topicPartition));
- consumer.seekToBeginning(Collections.singletonList(topicPartition));
指定 offset 位置消费:
- // 指定offset位置进行消费
- consumer.seek(topicPartition, new OffsetAndMetadata(10));
指定时间点,获取 offset 进行消费:
- // 获取此Topic的全部分区
- List<PartitionInfo> infos = consumer.partitionsFor(TOPIC_NAME);
-
- // 消费当前时间前一天的消息
- long time = new Date().getTime() - 24 * 60 * 60 * 1000;
-
- // 封装分区和消费时间的参数
- HashMap<TopicPartition, Long> map = new HashMap<>();
- for (PartitionInfo info : infos) {
- map.put(new TopicPartition(TOPIC_NAME, info.partition()), time);
- }
-
- // 根据时间节点,找到消息偏移量 offset
- Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(map);
- for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
- TopicPartition key = entry.getKey();
- OffsetAndTimestamp value = entry.getValue();
-
- // 进行消息订阅,指定消费的偏移量 offset
- consumer.assign(Collections.singletonList(key));
- consumer.seek(key, new OffsetAndMetadata(value.offset()));
- }
新消费组的消费 offset 规则:
当创建新的消费组启动消费者时,默认只会消费最新的消息。通过修改以下配置,使消费者首次启动时,从头开始消费,后续启动从最新消息开始消费。
- // 创建新的消费组时的消费规则,默认latest。latest:从最新开始 / earliest:从头开始
- prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
latest:从最新开始,只消费最新的消息;
earliest:首次启动从头开始,后续启动从最新开始;
区别: seekToBeginning()
方法是每次启动都从头开始消费,earliest
只有首次启动从头开始,后续启动从最新开始;
引入依赖:
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
配置文件:
- spring:
- kafka:
- bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
- producer:
- acks: 1 # 0:无需ack确认;1:broker写入 leader日志后返回ack;-1:broker写入多个副本日志后返回ack
- retries: 3 # 重试次数
- batch-size: 16384 # 单次发送消息大小 16Kb
- buffer-memory: 33554432 # 缓存区大小 32Mb
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- consumer:
- group-id: test-group-0
- max-poll-records: 500 # 单次拉取消息条数
- enable-auto-commit: false # 消费后是否自动提交
- auto-offset-reset: earliest # 新消费组消费策略,earliest:首次从头消费,后续从最新消息消费
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- listener:
- ack-mode: manual # 手动提交,监听器处理一次轮询的消息后(默认500条)调用ack.acknowledge()进行提交
- # ack-mode: manual_immediate # 手动提交,监听器处理单条消息后调用ack.acknowledge()进行提交
消息生产者:
- @RestController
- @RequestMapping("/kafka")
- public class KafkaController {
-
- private static final String TOPIC_NAME = "my-test-topic";
- private static final String KEY = "test:";
- private static final String VALUE = "This is a test for Kafka-producer!";
-
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
-
- @GetMapping("/send")
- public AjaxResult sendMessage() {
- ProducerRecord<String, String> record =
- new ProducerRecord<>(TOPIC_NAME,
- KEY + UUID.randomUUID(),
- VALUE);
- // 返回值是Future
- kafkaTemplate.send(record);
- return AjaxResult.success();
- }
-
- }
消费者监听:
- @Component
- public class MyConsumer {
-
- private static final String TOPIC_NAME = "my-test-topic";
- private static final String CONSUMER_GROUP_NAME_0 = "test-group-0";
-
- /**
- * 自动监听是否有消息
- *
- * @param record 可以是 ConsumerRecords,也可以是 ConsumerRecord,前者一次处理多条消息,后者一次处理一条消息
- * @param ack 当关闭自动提交时,需要使用此参数进行 ack 确认提交
- */
- @KafkaListener(topics = TOPIC_NAME, groupId = GROUP_NAME)
- public void listenMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
- System.out.println(record);
- ack.acknowledge();
- }
-
- }
消费者配置主题Topic、分区partition、偏移量offset:
- /**
- * 创建一个消费组,包含 3 个消费者(concurrency),持续监听两个Topic的消息。
- * 第一个Topic,监听分区(0、1),分区 0的偏移量初始为 5;分区 1的偏移量初始为 10。
- * 第二个Topic,监听分区(1、2),分区 1的偏移量初始为 15;分区 2的偏移量初始为 20。
- *
- * @param record 可以是 ConsumerRecords,也可以是 ConsumerRecord,前者一次处理多条消息,后者一次处理一条消息
- * @param ack 当关闭自动提交时,需要使用此参数进行 ack 确认提交
- */
- @KafkaListener(groupId = GROUP_NAME, concurrency = "3", topicPartitions = {
- @TopicPartition(topic = TOPIC_NAME_1, partitions = {"0", "1"}, partitionOffsets = {
- @PartitionOffset(partition = "0", initialOffset = "5"),
- @PartitionOffset(partition = "1", initialOffset = "10")
- }),
- @TopicPartition(topic = TOPIC_NAME_2, partitions = {"1", "2"}, partitionOffsets = {
- @PartitionOffset(partition = "1", initialOffset = "15"),
- @PartitionOffset(partition = "2", initialOffset = "20")
- })
- })
- public void listenMessage2(ConsumerRecord<String, String> record, Acknowledgment ack) {
- System.out.println(record);
- ack.acknowledge();
- }
选举机制:使用zk的机制,当 broker 创建时,会在 zookeeper 中创建一个临时序号节点,序号最小的节点代表的 broker 将作为集群中的 controller。
作用:
当集群中某一个副本的 leader 宕机,需要在集群中选出一个新的 leader,选举的规则是 ISR 集合中最左边的元素(ISR 集合会按照性能排序,性能越好越靠前);
当集群中有 broker 的增加或减少,controller 会同步信息给其他的 broker;
当集群中有 partition 分区的增加或减少,controller 会同步信息给其他的 broker;
rebalance 的前提:当消费组中的消费者没有指定分区进行消费,由 Kafka 决定消息分区的分配。
触发 rebalance 的条件:当消费组中的消费者和分区的关系发生变化时;
分区分配策略(rebalance之前,分区分配有三种策略):
range
:根据公式计算消费者消费的分区。
轮询:分区逐一分配到消费者上。
sticky:粘合策略。如果需要rebalance,会在已分配的基础上进行调整,而不会影响之前的分配情况。建议开启此策略,因为 rebalance 机制会重新分配会造成资源浪费;
HW:Height Water,称为高水位。它是 Broker 已完成同步的分界线,当消息写入 Broker,并同步到所有副本之后,HW才会变化。HW变化之前,消费者无法消费到未同步完成的消息,这么做的目的是为了防止消息丢失。
LEO:Log End Offset,指的是某个副本的最后一条消息的位置。
关系图如下:
如何防止消息丢失?
生产者:
发送消息时,使用同步发送的方式;
设置 ACK 的级别,设为1 或 -1即可,-1时可以做到99.99%防丢失率,需要修改 min.insync.replicas
分区备份数,推荐大于等于 2;
消费者:拉取消息后的自动提交,修改为手动提交;
如何防止重复消费?
生产者:
关闭 retry 重试:不推荐,这样做可能导致消息丢失;
消费者:
开启自动提交:不推荐,这样做可能导致消息丢失;
保证消费的幂等性(指多次访问结果一致):
方式一:使用 redis / zk 分布式锁,主流方案,推荐使用;
方式二:创建联合主键,保证消息的唯一性,防止插入多条记录;
如何做到顺序消费?
生产者:
使用同步发送,且 ACK 不为 0(0会导致消息丢失),确保消息发送顺序是正确的;
消费者:
主题 Topic 只能设置一个分区 Partition,且消费组只能有一个消费者;
Kafka的顺序消费会牺牲性能,可以考虑使用 RocketMQ 代替 ;
如何处理消息积压?
消息积压的原因:由于消费的速度远赶不上生产的速度,导致
解决方案:
消费者使用多线程,充分利用机器的性能;
优化业务架构,提升业务层的消费速度;
创建多个消费组,多个消费者,提升消费者的速度;
消息分发:创建一个消费者,进行转发消息,接收方为新的 Topic ,且配置了多个分区及多个消费者进行消费(不常用)。
如何实现延时队列?
场景:创建订单后,如果 30 分钟未支付,则取消订单。
解决方案:
单独创建相应的主题;
消费者消费该主题的消息(轮询);
消费前进行判断,当前时间是否与消息的创建时间相差 30 分钟,且未支付;
如果是:修改数据库状态为取消订单;
如果否:记录当前 offset,且不再继续消费之后的消息。等待一分钟后,根据记录的 offset 拉取消息,继续进行判断。
未完待续...
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。