当前位置:   article > 正文

kafka(组件分析 整合springboot集成 实战)_spingboot的kafka 组件

spingboot的kafka 组件

kafka 组件 搭建 springboot集成 实战

1、应用场景

1.1 kafka场景

Kafka最初是由LinkedIn公司采用Scala语言开发,基于ZooKeeper,现在已经捐献给了Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以 高吞吐、可持久化、可水平扩展、支持流处理等多种特性而被广泛应用。
Apache Kafka能够支撑海量数据的数据传递。在离线和实时的消息处理业务系统中,Kafka都有广泛的应用。
(1)日志收集:收集各种服务的log,通过kafka以统一接口服务的方式开放 给各种consumer,例如Hadoop、Hbase、Solr等;
(2)消息系统:解耦和生产者和消费者、缓存消息等;
(3)用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;
(4)运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
(5)流式处理:比如spark streaming和storm;

1.2 kafka特性

kafka以高吞吐量著称,主要有以下特性:
(1)高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒;
(2)可扩展性:kafka集群支持热扩展;
(3)持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
(4)容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
(5)高并发:支持数千个客户端同时读写;

1.3 消息对比

  • 如果普通的业务消息解耦,消息传输,rabbitMq是首选,它足够简单,管理方便,性能够用。
  • 如果在上述,日志、消息收集、访问记录等高吞吐,实时性场景下,推荐kafka,它基于分布式,扩容便捷
  • 如果很重的业务,要做到极高的可靠性,考虑rocketMq,但是它太重。需要你有足够的了解

1.4 大厂应用

京东通过kafka搭建数据平台,用于用户购买、浏览等行为的分析。成功抗住6.18的流量洪峰
阿里借鉴kafka的理念,推出自己的rocketmq。在设计上参考了kafka的架构体系

2、基础组件

2.1 角色

在这里插入图片描述

  • broker:节点,就是你看到的机器
  • provider:生产者,发消息的
  • consumer:消费者,读消息的
  • zookeeper:信息中心,记录kafka的各种信息的地方
  • controller:其中的一个broker,作为leader身份来负责管理整个集群。如果挂掉,借助zk重新选主

2.2 逻辑组件

在这里插入图片描述

  • topic:主题,一个消息的通道,收发总得知道消息往哪投
  • partition:分区,每个主题可以有多个分区分担数据的传递,多条路并行,吞吐量大
  • Replicas:副本,每个分区可以设置多个副本,副本之间数据一致。相当于备份,有备胎更可靠
  • leader & follower:主从,上面的这些副本里有1个身份为leader,其他的为follower。leader处理partition的所有读写请求

2.3 副本集合

  • AR(Assigned Repllicas):所有副本的统称,AR=ISR+OSR
  • ISR(In-Sync Replicas):同步中的副本,可以参与leader选主。一旦落后太多(数量滞后和时间滞后两个维度)会被踢到OSR。
  • OSR(Out-Sync Replicas):踢出同步的副本,一直追赶leader,追上后会进入ISR

2.4 消息标记

在这里插入图片描述
offset:偏移量,消息消费到哪一条了?每个消费者都有自己的偏移量

在这里插入图片描述
HW:(high watermark)副本的高水印值,客户端最多能消费到的位置,HW值为8,代表offset为[0,8]的9条消息都可以被消费到,它们是对消费者可见的,而[9,12]这4条消息由于未提交,对消费者是不可见的
LEO:(log end offset)日志末端位移,代表日志文件中下一条待写入消息的offset,这个offset上实际是没有消息的。不管是leader副本还是follower副本,都有这个值。

那么这三者有什么关系呢?
比如在副本数等于3的情况下,消息发送到Leader A之后会更新LEO的值,Follower B和Follower C也会实时拉取Leader A中的消息来更新自己,HW就表示A、B、C三者同时达到的日志位移,也就是A、B、 C三者中LEO最小的那个值。由于B、C拉取A消息之间延时问题,所以HW一般会小于LEO,即LEO>=HW。

LEO>=HW>=OFFSET

3、架构探索

3.1 发展历程

http://kafka.apache.org/downloads
在这里插入图片描述

3.1.1 版本命名

Kafka在1.0.0版本前的命名规则是4位,比如0.8.2.1,0.8是大版本号,2是小版本号,1表示打过1个补丁
现在的版本号命名规则是3位,格式是“大版本号”+“小版本号”+“修订补丁数”,比如2.5.0,前面的2代表的是大版本号,中间的5代表的是小版本号,0表示没有打过补丁
我们所看到的下载包,前面是scala编译器的版本,后面才是真正的kafka版本。

3.1.2 演进历史

0.7版本 只提供了最基础的消息队列功能。
0.8版本 引入了副本机制,至此Kafka成为了一个真正意义上完备的分布式高可靠消息队列解决方案。
0.9版本 增加权限和认证,使用Java重写了新的consumer API,Kafka Connect功能;不建议使用consumer API;
0.10版本 引入Kafka Streams功能,正式升级成分布式流处理平台;建议版本0.10.2.2;建议使用新版consumer API
0.11版本 producer API幂等,事务API,消息格式重构;建议版本0.11.0.3;谨慎对待消息格式变化
1.0和2.0版本 Kafka Streams改进;建议版本2.0;

3.2 集群搭建

1)原生启动
kafka启动需要zookeeper,第一步启动zk:

docker run --name zookeeper-1 -d -p 2181 zookeeper:3.4.13
  • 1

原生安装:下载后解压启动即可 http://kafka.apache.org/downloads

bin/kafka-server-start.sh config/server.properties
  • 1
#server.properties配置说明 
#表示broker的编号,如果集群中有多个broker,则每个broker的编号需要设置的不同 
broker.id=0 
#brokder对外提供的服务入口地址,默认9092 
listeners=PLAINTEXT://:9092 
#设置存放消息日志文件的地址 
log.dirs=/tmp/kafka/log 
#Kafka所需Zookeeper集群地址,这里是关键!加入同一个zk的kafka为同一集群 
zookeeper.connect=zookeeper:2181 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2)推荐docker-compose 一键启动

#docker-compose.yml
#注意hostname问题,ip地址:52.82.98.209,换成你自己服务器的
#docker-compose up -d 启动
version: '3'
services:
    zookeeper:
        image: zookeeper:3.6.3

    kafka-1:
        container_name: kafka-1
        image: wurstmeister/kafka
        ports:
            - 10903:9092
        environment:
            KAFKA_BROKER_ID: 1 
            HOST_IP: 192.168.31.236
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            #docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
            KAFKA_ADVERTISED_HOST_NAME: 192.168.31.236
            KAFKA_ADVERTISED_PORT: 10903 
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper           
    kafka-2:
        container_name: kafka-2
        image: wurstmeister/kafka
        ports:
            - 10904:9092
        environment:
            KAFKA_BROKER_ID: 2 
            HOST_IP: 192.168.31.236
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_HOST_NAME: 192.168.31.236
            KAFKA_ADVERTISED_PORT: 10904 
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper 


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

3.3 组件探秘

命令行工具是管理kafka集群最直接的工具。官方自带,不需要额外安装。

3.2.1 主题创建
#进入容器 
docker exec -it kafka-1 sh 
#进入bin目录 
cd /opt/kafka/bin 
#创建 
kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test --partitions 2 
--replication-factor 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
3.2.2 查看主题
kafka-topics.sh --zookeeper zookeeper:2181 --list
  • 1
3.2.3 主题详情
kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test
#分析输出: 
#注意格式:Partition: (分区号,从0开始),Leader、 Replicas、Isr: (机器号,也就是启动 时配置的broker_id) 


Topic: test	PartitionCount: 2	ReplicationFactor: 1	Configs:
	Topic: test	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
	Topic: test	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
3.2.4 消息收发
#使用docker连接任意集群中的一个容器 
docker exec -it kafka-1 sh 
#进入kafka的容器内目录 
cd /opt/kafka/bin 
#客户端监听 
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test 
#另起一个终端,验证发送 
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

如果起两个客户端都监听的话,发现是广播形式

3.2.5 分组消费
#启动两个consumer时,如果不指定group信息,消息被广播 
#指定相同的group,让多个消费者分工消费(画图:group原理) 
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -- 
group aaa 
#结果:在发送方,连续发送 1-4 ,4条消息,同一group下的两台consumer交替消费,并发执行
  • 1
  • 2
  • 3
  • 4
  • 5

注意!!!
这是在消费者和分区数相等(都是2)的情况下。 如果同一group下的 ( 消费者数量 > 分区数量 ) 那么就会有消费者闲置。

验证方式:
可以再多启动几个消费者试一试,会发现,超出2个的时候,有的始终不会消费到消息。 停掉可以消费到的,那么闲置的会被激活,进入工作状态

3.2.6 指定分区
#指定分区通过参数 --partition,注意!需要去掉上面的group 
#指定分区的意义在于,保障消息传输的顺序性(画图:kafka顺序性原理) 
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -- 
partition 0 
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -- 
partition 1 
#结果:发送1-4条消息,交替出现。说明消息被均分到各个分区中投递 
#默认的发送是没有指定key的 
#要指定分区发送,就需要定义key。那么相同的key被路由到同一个分区 
./kafka-console-producer.sh --broker-list kafka-1:9092 --topic test --property 
parse.key=true 
#携带key再发送,注意key和value之间用tab分割 
>1 1111 
>1 2222 
>2 3333 
>2 4444 

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
3.2.7 偏移量
#偏移量决定了消息从哪开始消费,支持:开头,还是末尾 
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开 始消费 
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产 生的该分区下的数据 
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的 offset,则抛出异常 
#--offset [earliest|latest(默认)] , 或者 --from-beginning 
#新起一个终端,指定offset位置 
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -- 
partition 0 --offset earliest 
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -- 
partition 0 --from-beginning 
#结果:之前发送的消息,从头又消费了一遍!

# 注意点!!!有提交偏移量的话,仍然以提交的为主,即便使用earliest,比提交点更早的也不会被提 取
# 以下指令,在group=aa这个组下,只有第一次会从头开始 
# 后续即便是指定beginning,依然不会从最早的开始,因为第一次消费后,已经有了偏移量,以它优 先!
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -- 
group begining-group --from-beginning 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

3.4 zk探秘

前面说过,zk存储了kafka集群的相关信息,本节来探索内部的秘密。
kafka的信息记录在zk中,进入zk容器,查看相关节点和信息

docker exec -it kafka_zookeeper_1 sh 
>./bin/zkCli.sh 
>ls / 
#结果:得到以下配置信息 
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述

3.4.1 broker信息
[zk: localhost:2181(CONNECTED) 2] ls /brokers
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/ids
[1, 2, 3]
[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.31.236:10903"],"jmx_port":-1,"port":10903,"host":"192.168.31.236","version":5,"timestamp":"1625978230130"}
cZxid = 0x27 
ctime = Tue Jan 05 05:40:45 GMT 2021 
mZxid = 0x27 
mtime = Tue Jan 05 05:40:45 GMT 2021 
pZxid = 0x27 
cversion = 0 
dataVersion = 1 
aclVersion = 0 
ephemeralOwner = 0x105a2db626b0000 
dataLength = 196 
numChildren = 0

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
3.4.2 主题与分区
[zk: localhost:2181(CONNECTED) 8] ls /brokers/topics
[__consumer_offsets, test]
[zk: localhost:2181(CONNECTED) 9]  ls /brokers/topics/test
[partitions]
[zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/test/partitions
[0, 1]
[zk: localhost:2181(CONNECTED) 11]  ls /brokers/topics/test/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 12] get /brokers/topics/test/partitions/0/state
{"controller_epoch":2,"leader":1,"version":1,"leader_epoch":2,"isr":[1]}
[zk: localhost:2181(CONNECTED) 13]
cZxid = 0xb0 
ctime = Tue Jan 05 05:56:06 GMT 2021 
mZxid = 0xb0 
mtime = Tue Jan 05 05:56:06 GMT 2021 
pZxid = 0xb0 
cversion = 0 
dataVersion = 0 
aclVersion = 0 
ephemeralOwner = 0x0 
dataLength = 72 
numChildren = 0

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
3.4.3 消费者与偏移量
[zk: localhost:2181(CONNECTED) 15] ls /consumers 
[]
#空的??? 
#那么,消费者以及它的偏移记在哪里呢???
  • 1
  • 2
  • 3
  • 4

kafka 消费者记录 group 的消费 偏移量 有两种方式 :
1)kafka 自维护 (新)
2)zookpeer 维护 (旧) ,已经逐渐被废弃

查看方式:
上面的消费用的是控制台工具,这个工具使用–bootstrap-server,不经过zk,也就不会记录 到/consumers下。
其消费者的offffset会更新到一个kafka自带的topic【__consumer_offffsets】下面

#先起一个消费端,指定group 
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group aaa 
#使用控制台工具查看消费者及偏移量情况 
./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --list 
KMOffsetCache-44acff134cad 
aaa 
#查看偏移量详情 
./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --describe --group 
aaa 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

当前与LEO保持一致,说明消息都完整的被消费过
在这里插入图片描述
停掉consumer后,往provider中再发几条记录,offset开始滞后:
在这里插入图片描述
重新启动consumer,消费到最新的消息,同时再返回看偏移量,消息得到同步
在这里插入图片描述

3.4.4 controller
#当前集群中的主控节点是谁
[zk: localhost:2181(CONNECTED) 17] get /controller 
{"version":1,"brokerid":1,"timestamp":"1609825245694"} 
cZxid = 0x2a 
ctime = Tue Jan 05 05:40:45 GMT 2021 
mZxid = 0x2a 
mtime = Tue Jan 05 05:40:45 GMT 2021 
pZxid = 0x2a 
cversion = 0 
dataVersion = 0 
aclVersion = 0 
ephemeralOwner = 0x105a2db626b0000 
dataLength = 54 
numChildren = 0 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
3.5.1 启动

kafka-manager是目前最受欢迎的kafka集群管理工具,最早由雅虎开源。提供可视化kafka集群操作
官网:https://github.com/yahoo/kafka-manager/releases
注意它的版本,docker社区的景象版本滞后于kafka,我们自己来打镜像

#Dockerfile 
FROM daocloud.io/library/java:openjdk-8u40-jdk 
ADD kafka-manager-2.0.0.2/ /opt/km2002/ 
CMD ["/opt/km2002/bin/kafka-manager","- 
Dconfig.file=/opt/km2002/conf/application.conf"] 
#打包,注意将kafka-manager-2.0.0.2放到同一目录 
docker build -t km:2002 . 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
#启动:在上面的yml里,services节点下加一段 
#参考资料:km.yml 
#执行: docker-compose -f km.yml up -d 
km: 
image: km:2002 
ports:
- 10906:9000 
depends_on: 
- zookeeper
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
3.5.2 使用

使用km可以方便的查看以下信息:

  • cluster:创建集群,填写zk地址,选中jmx,consumer信息等选项
  • brokers:列表,机器信息
  • topic:主题信息,主题内的分区信息。创建新的主题,增加分区
  • cosumers: 消费者信息,偏移量等

4、深入应用

4.1 springboot-kafka

1)配置文件

  kafka:
    bootstrap-servers: 52.82.98.209:10903,52.82.98.209:10904
    producer: # producer 生产者
      retries: 0 # 重试次数
      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      batch-size: 16384 # 批量大小
      buffer-memory: 33554432 # 生产端缓冲区大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: com.itheima.demo.config.MySerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer: # consumer消费者
      group-id: javagroup # 默认的消费组ID
      enable-auto-commit: true # 是否自动提交offset
      auto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)

      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#      value-deserializer: com.itheima.demo.config.MyDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

pom.xml

    <name>kafka</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>

        <!--swagger2增强,官方ui太low , 访问地址: /doc.html  -->
        <dependency>
            <groupId>com.github.xiaoymin</groupId>
            <artifactId>swagger-bootstrap-ui</artifactId>
            <version>1.9.6</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>

    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

2)启动信息

在这里插入图片描述

4.2 消息发送

4.2.1 发送类型

KafkaTemplate调用send时默认采用异步发送,如果需要同步获取发送结果,调用get方法
详细代码参考:AsyncProducer.java

@RestController
public class AsyncProducer {
    private final static Logger logger = LoggerFactory.getLogger(AsyncProducer.class);
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    //同步发送
    @GetMapping("/kafka/sync/{msg}")
    public void sync(@PathVariable("msg") String msg) throws Exception {
        Message message = new Message();
        message.setMessage(msg);
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));
        SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);
        logger.info("send result:{}",result.getProducerRecord().value());
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

消费者使用:KafkaConsumer.java

//默认消费组消费
@Component
public class KafkaConsumer {
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    //不指定group,默认取yml里配置的
    @KafkaListener(topics = {"test"})
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("message:{}", msg);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

1)同步发送

ListenableFuture<SendResult<String, Object>> future = 
kafkaTemplate.send("test", JSON.toJSONString(message)); 
//注意,可以设置等待时间,超出后,不再等候结果 
SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS); 
logger.info("send result:{}",result.getProducerRecord().value()); 

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

通过swagger发送,控制台可以正常打印send result

2)阻断
在服务器上,将kafka暂停服务

在swagger发送消息
调同步发送:请求被阻断,一直等待,超时后返回错误
在这里插入图片描述
而调异步发送的(默认发送接口),请求立刻返回。
在这里插入图片描述
那么,异步发送的消息怎么确认发送情况呢???往下看!
3)注册监听
代码参考: KafkaListener.java
可以给kafkaTemplate设置Listener来监听消息发送情况,实现内部的对应方法

//@Configuration
public class KafkaListener {
    private final static Logger logger = LoggerFactory.getLogger(KafkaListener.class);

    @Autowired
    KafkaTemplate kafkaTemplate;

    //配置监听
    @PostConstruct
    private void listener(){
        kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
            @Override
            public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
                logger.info("ok,message={}",producerRecord.value());
            }

            @Override
            public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {
                logger.error("error!message={}",producerRecord.value());
            }
        });
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

查看控制台,等待一段时间后,异步发送失败的消息会被回调给注册过的listener

com.itheima.demo.config.KafkaListener:error!message= {"message":"1","sendTime":1609920296374}
  • 1

启动kafka

docker-compose unpause kafka-1 kafka-2
  • 1

再次发送消息时,同步异步均可以正常收发,并且监听进入success回调

com.itheima.demo.config.KafkaListener$1:ok,message= 
{"message":"1","sendTime":1610089315395} 
com.itheima.demo.controller.PartitionConsumer:patition=1,message: 
[{"message":"1","sendTime":1610089315395}]
  • 1
  • 2
  • 3
  • 4

可以看到,在内部类 KafkaListener$1 中,即注册的Listener的消息

4.2.2 序列化

消费者使用:KafkaConsumer.java

@Component
public class KafkaConsumer {
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    //不指定group,默认取yml里配置的
    @KafkaListener(topics = {"test"})
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("message:{}", msg);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

1)序列化详解

  • 前面用到的是Kafka自带的字符串序列化器
    (org.apache.kafka.common.serialization.StringSerializer)
  • 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等
  • 这些序列化器都实现了接口 (org.apache.kafka.common.serialization.Serializer)
  • 基本上,可以满足绝大多数场景

2)自定义序列化
自己实现,实现对应的接口即可,有以下方法:

public interface Serializer<T> extends Closeable {
    default void configure(Map<String, ?> configs, boolean isKey) {
    }
    //理论上,只实现这个即可正常运行
    byte[] serialize(String var1, T var2);
    //默认调上面的方法
    default byte[] serialize(String topic, Headers headers, T data) {
        return this.serialize(topic, data);
    }

    default void close() {
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

案例,参考: MySerializer.java

public class MySerializer implements Serializer {

    @Override
    public byte[] serialize(String s, Object o) {
        String json = JSON.toJSONString(o);
        return json.getBytes();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在yaml中配置自己的编码器

value-serializer: com.itheima.demo.config.MySerializer
  • 1

重新发送,发现:消息发送端编码回调一切正常。但是消费端消息内容不对!

com.itheima.demo.controller.KafkaListener$1:ok,message= 
{"message":"1","sendTime":1609923570477} 
com.itheima.demo.controller.KafkaConsumer:message:" 
{\"message\":\"1\",\"sendTime\":1609923570477}" 

  • 1
  • 2
  • 3
  • 4
  • 5

怎么办?
3)解码
发送端有编码并且我们自己定义了编码,那么接收端自然要配备对应的解码策略
代码参考:MyDeserializer.java,实现方式与编码器几乎一样!

在yaml中配置自己的解码器

value-deserializer: com.itheima.demo.config.MyDeserializer
  • 1
public class MyDeserializer implements Deserializer {
    private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);

    @Override
    public Object deserialize(String s, byte[] bytes) {
        try {
            String json = new String(bytes,"utf-8");
            return JSON.parse(json);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

再次收发,消息正常

com.itheima.demo.controller.AsyncProducer$1:ok,message= 
{"message":"1","sendTime":1609924855896} 
com.itheima.demo.controller.KafkaConsumer:message: 
{"message":"1","sendTime":1609924855896}

  • 1
  • 2
  • 3
  • 4
  • 5
4.2.3 分区策略

分区策略决定了消息根据key投放到哪个分区,也是顺序消费保障的基石。

  • 给定了分区号,直接将数据发送到指定的分区里面去
  • 没有给定分区号,给定数据的key值,通过key取上hashCode进行分区
  • 既没有给定分区号,也没有给定key值,直接轮循进行分区
  • 自定义分区,你想怎么做就怎么做

1)验证默认分区规则

发送者代码参考:PartitionProducer.java

@RestController
public class PartitionProducer {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

//    指定分区发送
//    不管你key是什么,到同一个分区
    @GetMapping("/kafka/partitionSend/{key}")
    public void setPartition(@PathVariable("key") String key) {
        kafkaTemplate.send("test", 0,key,"key="+key+",msg=指定0号分区");
    }

//    指定key发送,不指定分区
//    根据key做hash,相同的key到同一个分区
    @GetMapping("/kafka/keysend/{key}")
    public void setKey(@PathVariable("key") String key) {
        kafkaTemplate.send("test", key,"key="+key+",msg=不指定分区");
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

消费者代码使用:PartitionConsumer.java

//@Component
public class PartitionConsumer {
    private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);

    //分区消费
    @KafkaListener(topics = {"test"},topicPattern = "0")
    public void onMessage(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("partition=0,message:[{}]", msg);
        }
    }
    @KafkaListener(topics = {"test"},topicPattern = "1")
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("partition=1,message:[{}]", msg);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

通过swagger访问setKey:
在这里插入图片描述
看控制台:
在这里插入图片描述
再访问setPartition来设置分区号0来发送
在这里插入图片描述
看控制台:
在这里插入图片描述
2)自定义分区
你想自己定义规则,根据我的要求,把消息投放到对应的分区去? 可以!
参考代码:MyPartitioner.java

public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//        定义自己的分区策略
//                如果key以0开头,发到0号分区
//                其他都扔到1号分区
        String keyStr = key+"";
        if (keyStr.startsWith("0")){
            return 0;
        }else {
            return 1;
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

MyPartitionTemplate.java ,

@Configuration
public class MyPartitionTemplate {
 
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    KafkaTemplate kafkaTemplate;

    @PostConstruct
    public void setKafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //注意分区器在这里!!!
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
        this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
    }

    public KafkaTemplate getKafkaTemplate(){
        return kafkaTemplate;
    }
 

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

发送使用:MyPartitionProducer.java

@RestController
public class MyPartitionProducer {

    @Autowired
    MyPartitionTemplate template;

//    使用0开头和其他任意字母开头的key发送消息
//    看控制台的输出,在哪个分区里?
    @GetMapping("/kafka/myPartitionSend/{key}")
    public void setPartition(@PathVariable("key") String key) {
        template.getKafkaTemplate().send("test", key,"key="+key+",msg=自定义分区策略");
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

使用swagger,发送0开头和非0开头两种key试一试!在这里插入图片描述
备注:
自己定义config参数,比较麻烦,需要打破默认的KafkaTemplate设置
可以将KafkaConfiguration.java中的getTemplate加上@Bean注解来覆盖系统默认bean
这里为了避免混淆,采用@Autowire注入

4.3 消息消费

4.3.1 消息组别

发送者使用:KafkaProducer.java

@RestController
public class KafkaProducer {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping("/kafka/test/{msg}")
    public void sendMessage(@PathVariable("msg") String msg) {
        Message message = new Message();
        message.setMessage(msg);
        kafkaTemplate.send("test", JSON.toJSONString(message));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

1)代码参考:GroupConsumer.java,Listener拷贝3份,分别赋予两组group,验证分组消费:

@Component
public class GroupConsumer {
    private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class);

    //组1,消费者1
    @KafkaListener(topics = {"test"},groupId = "group1")
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("group:group1-1 , message:{}", msg);
        }
    }

    //组1,消费者2
    @KafkaListener(topics = {"test"},groupId = "group1")
    public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("group:group1-2 , message:{}", msg);
        }
    }

    //组2,只有一个消费者
    @KafkaListener(topics = {"test"},groupId = "group2")
    public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("group:group2 , message:{}", msg);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

2)启动
在这里插入图片描述
3)通过swagger发送2条消息
在这里插入图片描述
同一group下的两个消费者,在group1均分消息
group2下只有一个消费者,得到全部消息

4)消费端闲置
注意分区数与消费者数的搭配,如果 ( 消费者数 > 分区数量 ),将会出现消费者闲置,浪费资源!

验证方式:
停掉项目,删掉test主题,重新建一个 ,这次只给它分配一个分区。
重新发送两条消息,试一试
在这里插入图片描述
解析:
group2可以消费到1、2两条消息
group1下有两个消费者,但是只分配给了 -1 , -2这个进程被闲置

4.3.2 位移提交

1)自动提交
前面的案例中,我们设置了以下两个选项,则kafka会按延时设置自动提交

enable-auto-commit: true # 是否自动提交offset 
auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)
  • 1
  • 2

2)手动提交
有些时候,我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复。
下面我们自己定义配置,覆盖上面的参数

@Configuration
public class MyOffsetConfig {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 注意这里!!!设置手动提交
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));

        // ack模式:
        //          AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种:
        //
        //          RECORD
        //          每处理一条commit一次
        //
        //          BATCH(默认)
        //          每次poll的时候批量提交一次,频率取决于每次poll的调用频率
        //
        //          TIME
        //          每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
        //
        //          COUNT
        //          累积达到ackCount次的ack去commit
        //
        //          COUNT_TIME
        //          ackTime或ackCount哪个条件先满足,就commit
        //
        //          MANUAL
        //          listener负责ack,但是背后也是批量上去
        //
        //          MANUAL_IMMEDIATE
        //          listner负责ack,每调用一次,就立即commit

        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        return factory;
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

通过在消费端的Consumer来提交偏移量,有如下几种方式:

//@Component
public class MyOffsetConsumer {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(topics = "test",groupId = "myoffset-group-1",containerFactory = "manualKafkaListenerContainerFactory")
    public void manualCommit(@Payload String message,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        Consumer consumer,
                        Acknowledgment ack) {
        logger.info("手动提交偏移量 , partition={}, msg={}", partition, message);
        // 同步提交
        consumer.commitSync();
        //异步提交
        //consumer.commitAsync();

        // ack提交也可以,会按设置的ack策略走(参考MyOffsetConfig.java里的ack模式)
        // ack.acknowledge();
    }

    @KafkaListener(topics = "test",groupId = "myoffset-group-2",containerFactory = "manualKafkaListenerContainerFactory")
    public void noCommit(@Payload String message,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        Consumer consumer,
                        Acknowledgment ack) {
        logger.info("忘记提交偏移量, partition={}, msg={}", partition, message);
        // 不做commit!
    }

    /**
     * 现实状况:
     * commitSync和commitAsync组合使用
     *
     * 手工提交异步 consumer.commitAsync();
     * 手工同步提交 consumer.commitSync()
     *
     * commitSync()方法提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,
     * commitSync()会一直重试,但是commitAsync()不会。
     *
     * 一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题
     * 因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。
     * 但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。否则就会造成重复消费
     * 因此,在消费者关闭前一般会组合使用commitAsync()和commitSync()。
     */
//   @KafkaListener(topics = "test", groupId = "myoffset-group-3",containerFactory = "manualKafkaListenerContainerFactory")
    public void manualOffset(@Payload String message,
                       @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                       @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                       Consumer consumer,
                       Acknowledgment ack) {
        try {
            logger.info("同步异步搭配 , partition={}, msg={}", partition, message);
            //先异步提交
            consumer.commitAsync();
            //继续做别的事
        } catch (Exception e) {
            System.out.println("commit failed");
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }

    }


    /**
     * 甚至可以手动提交,指定任意位置的偏移量
     * 不推荐日常使用!!!
     */
//    @KafkaListener(topics = "test", groupId = "myoffset-group-4",containerFactory = "manualKafkaListenerContainerFactory")
    public void offset(ConsumerRecord record, Consumer consumer) {
        logger.info("手动指定任意偏移量, partition={}, msg={}",record.partition(),record);
        Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
        currentOffset.put(new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1));
        consumer.commitSync(currentOffset);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83

同步提交、异步提交:manualCommit() ,同步异步的差别,下面会详细讲到。
指定偏移量提交:offset()
3)重复消费问题
如果手动提交模式被打开,一定不要忘记提交偏移量。否则会造成重复消费!
代码参考和对比:manualCommit() , noCommit()

验证过程
用km将test主题删除,新建一个test空主题。方便观察消息偏移 注释掉其他Consumer的Component注解,只保留当前MyOffsetConsumer.java 启动项目,使用swagger的KafkaProducer发送连续几条消息 留心控制台,都能消费,没问题:
在这里插入图片描述
但是!重启试试:
在这里插入图片描述
无论重启多少次,不提交偏移量的消费组,会重复消费一遍!!!

再通过命令行查询偏移量试试:
在这里插入图片描述
4)经验与总结

commitSync()方法,即同步提交,会提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前, 
commitSync()会一直重试,但是commitAsync()不会。 

这就造成一个陷阱: 
如果异步提交,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题 
导致的,那么后续的提交总会有成功的。只要成功一次,偏移量就会提交上去。 
但是!如果这是发生在关闭消费者时的最后一次提交,就要确保能够提交成功,如果还没提交完就停掉了进 
程。就会造成重复消费! 
因此,在消费者关闭前一般会组合使用commitAsync()和commitSync()。 
详细代码参考:MyOffsetConsumer.manualOffset() 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

5、高级特性

5.1 扩展性

5.1.1 broker扩容

1)在yaml中复制kafka-2,拷贝为新的节点,注意以下标注修改的地方!

#docker-compose.yml
#注意hostname问题,ip地址:52.82.98.209,换成你自己服务器的
#docker-compose up -d 启动
version: '3'
services:
    zookeeper:
        image: zookeeper:3.4.13
    km:
        image: km:2002
        ports:
          - 10906:9000
        depends_on:
          - zookeeper
    kafka-1:
        container_name: kafka-1
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10903:9092
        environment:
            KAFKA_BROKER_ID: 1 
            HOST_IP: 52.82.98.209
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            #docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
            KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
            KAFKA_ADVERTISED_PORT: 10903 
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper           
    kafka-2:
        container_name: kafka-2
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10904:9092
        environment:
            KAFKA_BROKER_ID: 2 
            HOST_IP: 52.82.98.209
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
            KAFKA_ADVERTISED_PORT: 10904 
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper 

    kafka-3:
        container_name: kafka-3
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10905:9092
        environment:
            KAFKA_BROKER_ID: 3
            HOST_IP: 52.82.98.209
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
            KAFKA_ADVERTISED_PORT: 10905
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

2)更新docker集群信息

docker-compose -f cluster.yml up -d
  • 1
#启动消息 
kafka_zookeeper_1 is up-to-date 
kafka_km_1 is up-to-date 
kafka-1 is up-to-date 
kafka-2 is up-to-date 
Creating kafka-3 ... done 

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3)进命令行,或打开km查看新的broker信息
在这里插入图片描述

5.1.2 分区扩容

1)使用km对test主题增加分区到3个,看分区分配机器情况
在这里插入图片描述
可以指定新分区数量,及分配到的机器
在这里插入图片描述
2)注意问题
新加分区或重新调整分区,已经启动的客户端会动态更新对应的分配信息,不需要重启。
但是!!!
在同步变更消息的过程中有可能会丢失消息!想想为什么?(答案在下面)

(注意!以下场景不保证100%会重现!)
在这里插入图片描述
答案:
回顾一下消费偏移量的默认提交配置:latest,因为新分区没有任何offset提交记录
所以会在重新分配分区后从末尾开始消费!
那么分配前的那些消息就不会消费到(调整分区的时候还有消息进来,这部分数据会消费不到)。而分配后再发送的不会受影响,可以正常消费
分区分配正常后,查看偏移量提交信息,没问题:
在这里插入图片描述
km的Consumer页签里也可以查看偏移量信息:
在这里插入图片描述

5.2 高可用

以上动态扩容操作是怎么实现的呢?集群中必然有一个节点协调了相关操作。
这台协调者,就是controller节点。
controller节点是其中的一台broker,所有broker都有可能成为controller
当前controller宕机后,其他就会参与竞争,选出新的controller,保持集群对外的高可用

5.2.1 节点选举

1)查找controller,找到它所在的broker

#查找docker进程,找到zookeeper的容器 
[root@iZ8vb3a9qxofwannyywl6zZ ~]# docker ps --format 
"table{{.ID}}\t{{.Names}}\t{{.Ports}}" 
CONTAINER ID NAMES PORTS 
75318748caab kafka-3 0.0.0.0:10905->9092/tcp 
4807d188a180 kafka_km_1 0.0.0.0:10906->9000/tcp 
4453eb0b2a36 kafka-2 0.0.0.0:10904->9092/tcp 
d6fd814a0851 kafka-1 0.0.0.0:10903->9092/tcp 
8c1fc2cc6e9a kafka_zookeeper_1 2181/tcp, 2888/tcp, 3888/tcp 
#进入容器,连上zk 
[root@iZ8vb3a9qxofwannyywl6zZ ~]# docker exec -it kafka_zookeeper_1 sh 
/zookeeper-3.4.13 # 
/zookeeper-3.4.13 # zkCli.sh 
Connecting to localhost:2181 
#查询当前controller是哪个节点,发现是2号机器(有可能是其他节点,找到这个brokerid,下面要 
用!) 
[zk: localhost:2181(CONNECTED) 6] get /controller 
{"version":1,"brokerid":2,"timestamp":"1610500701187"}
#controller变更的次数 
[zk: localhost:2181(CONNECTED) 7] get /controller_epoch 
1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

2)docker-compose停掉它!

#docker pause 暂停容器的服务,注意是上面找到的那台broker 
[root@iZ8vb3a9qxofwannyywl6zZ ~]# docker pause kafka-2 
kafka-2 
#查看状态,发现(Paused) 
[root@iZ8vb3a9qxofwannyywl6zZ ~]# docker ps | grep kafka-2 
4453eb0b2a36 wurstmeister/kafka:2.12-2.2.2 "start- 
kafka.sh" 2 days ago Up 2 days (Paused) 0.0.0.0:10904- 
>9092/tcp 
kafka-2 
#再次按 1)的步骤进入zk容器,查看当前controller,已经变为3号 
[zk: localhost:2181(CONNECTED) 0] get /controller 
{"version":1,"brokerid":3,"timestamp":"1610679583216"} 
#变更次数加了1 
[zk: localhost:2181(CONNECTED) 1] get /controller_epoch 
2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
5.2.2 原理剖析

当控制器被关闭或者与Zookeeper系统断开连接时,Zookeeper系统上的/controller临时节点就会被清除。
Kafka集群中的监听器会接收到变更通知,各个代理节点会尝试到Zookeeper系统中创建它。
第一个成功在Zookeeper系统中创建的代理节点,将会成为新的控制器。
每个新选举出来的控制器,会在Zookeeper系统中递增controller_epoch的值。

在这里插入图片描述

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/851996
推荐阅读
相关标签
  

闽ICP备14008679号