当前位置:   article > 正文

kafka三节点集群2.8.0平滑升级到3.4.0过程指导_kafka 版本升级

kafka 版本升级

一、前言

在这里插入图片描述

Apache Kafka作为常用的开源分布式流媒体平台,可以实时发布、订阅、存储和处理数据流,多用于作为消息队列获取实时数据,构建对数据流的变化进行实时反应的应用程序,已被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序。而其中Apache Kafka Connect 作为 Kafka 中用于和其他数据系统流式传输数据的服务,其独立运行版本可以在 Kafka 发布包中通过 bin/connect-standalone.sh 启动,默认会在 8083 端口开启 HTTP REST API 服务,攻击者可以利用基于SASLJAAS 配置和SASL 协议的任意Kafka客户端,对可对连接器(Connector)的配置进行操作,将连接器中的 Kafka 客户端 sasl.jaas.config 属性值设置为 com.sun.security.auth.module.JndiLoginModule(通过 producer.override.sasl.jaas.config, consumer.override.sasl.jaas.configadmin.override.sasl.jaas.config 属性进行配置)时,如果此时连接器连接到攻击者可控的 LDAP 服务器时容易受到反序列化攻击,也称JNDI 注入来实现远程任意代码执行。云平台中,Kafka Connect 服务通常用于提供 Kafka 数据迁移、数据同步的管道能力,其默认 HTTP API 开放于 8083 端口。

    因现场kafka选用版本较低,安全扫描时触发安全风险告警,低于 Kafka 升级3.4.0版本,涉及【Apache Kafka JNDI注入漏洞(CVE-2023-25194)】漏洞,该漏洞可允许远程代码执行,当攻击者可控制kafka-clients连接时的属性,可通过设置 ’ sasl.jaas.config ’ 属性为 ’ com.sun.security.auth.module.JndiLoginModule ’ 进行JNDI注入或反序列化利用,当JDK版本过低或者存在Gadgets时可导致远程代码执行。现场版本kafka_2.13-2.8.0,java version “1.8.0_361”,sasl.jaas.config 配置采用:

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule;

☬ 漏洞复现: 执行创建文件,相关安全软件会报:JNDI注入的告警;影响版本: 2.3.0 至 3.3.2 版本Kafka Connect,原则上不影响 Kafka server (broker),但是会级联影响,最好还是升级到3.4.0及以上版本,升级JDK版本,可采用OpenJDK替换,相关经验已验证:OpenJDK1.8.0_362 + Zookeeper3.6.3 + Kafka3.4.0。

在这里插入图片描述

关联资源官网升级指导kafka部署步骤

二、软件升级

本次软件要升级到3.4.0版本,版本说明见Kafka - Version 3.4.0,升级步骤也可参考官网升级指导。注意这里咱们是从2.1.x升级到3.x,不同于3.x升级,需注意存储 consumer offsets的schema和inter.broker.protocol.version里的版本,一旦升级后不支持降级。Apache Kafka 3.4.0以来,新增了一个系统属性:org.apache.kafka.disallowed.login.modules,用来在SASL JAAS配置中禁用有问题的登录模块,另外默认com.sun.security.auth.module.JndiLoginModule 在该版本中被禁用;另外需注意的是, Kafka 3.0中 Java 8 已注明废弃, 在Apache Kafka 4.0将不再支持;如果启用TLS,Java 11及更高版本的性能会明显更好,因此强烈建议使用它们。对应的zk版本稳定版为 3.5,注意zk需要足够的堆空间(3-5G,看数据量大小);另zk集群不宜过大,尤其是在写操作频繁的使用模式中,意味着会造成大量的集群内通信(写操作和随后的集群成员更新的配额),尽量让ZooKeeper系统尽可能小,并尽可能保持其独立性,以处理负载。

1)滚动升级步骤

1、升级前注意:在待升级节点server.properties文件中添加:inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 3.3, 3.2, etc.),如果是从0.11.0.x或更高版本升级的,并且没有重写message.format.version,那么只需要配置覆写: inter-broker protocol version参数的kafka版本即可,否者还需要设置:log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION,现场我们只需要添加::inter.broker.protocol.version=3.4;

2、对Broker滚动升级,一次升级一个节点或实例:关闭待升级的broker,解压新版本,然后迁移更新配置,重启新的代理,验证数据同步;这时,最新版本的broker程序会运行,之后可以验证kafka集群的业务行为和性能是否符合预期。如果出现任何问题,目前还可以进行降级回滚。Kafka集群的完整升级过程涵盖了broker侧和client侧,因broker是向下兼容的,升级过程中必需先成功升级所有的broker,对于Client(producer 和 consumer)在broker完成升级之后再升级。

3、一旦验证了集群的业务行为和性能满足预期,就可以通过更改协议版本来应用:inter.broker.protocol.version=3.4

4、然后逐个重启kafka brokers,以让inter.broker.protocol.version=3.4生效;这是,就不再支持降级了;

5、最后完成kafka整个集群状态及数据分布验证。inter.broker.protocol.version的值可参考如下,但官网升级指导里看也可直接写成:inter.broker.protocol.version=2.8(针对本次待升级的2.8版本也可),之后再修改为inter.broker.protocol.version=3.4;这种格式是可以的。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2)关停替换升级

即: 如果可接受停机,可将所有broker关闭后,更新版本替换后重启启动。但是会存在一个问题,关闭老版本,启动新版本的过程中,存在部分线上数据丢失的情况,此种情况建议在凌晨数据量少的时候进行;

3)升级计划或流程

在这里插入图片描述
测试方案:
在这里插入图片描述

2.1、软件下载

#MD5: CF 6B 8B 1C A1 12 9E 69  41 39 92 99 B6 CC 47 8C
wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.12-3.4.0.tgz
md5sum kafka_2.12-3.4.0.tgz  #输出
cf6b8b1ca1129e6941399299b6cc478c  ./kafka_2.12-3.4.0.tgz
  • 1
  • 2
  • 3
  • 4

2.2、单节点/实例kafka升级

注意: 停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper 集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程。特别注意的是,broker之间的通讯协议和message的传输协议要与旧版本的一致,否则升级完的broker会因为通讯协议版本不一致导致节点一直报错(Connection to “broker id” was disconnected before the response was read),即升级新版本需现将旧版本的信息写入新版的配置文件中,以兼容当前(旧版)环境适配。从2.6.0版开始,对于Java 11或更新版本,TLSv1.3是默认启用的。客户机和服务器将协商是否支持TLSv1.3,否则将退回到TLSv1.2。

#1、解压缩新版本kafka
tar -xzf kafka_2.12-3.4.0.tgz
cd ./kafka_2.12-3.4.0/config

#2、修改新旧版配置文件,添加版本参数,broker启动时会读取server.properties
vim config/server.properties  #新增
inter.broker.protocol.version=2.8 #旧版本号(即官网说的当前的kafka版本),完成后重启当前kafka实例

#顺序重启三个kafka实例
./bin/kafka-server-stop.sh ./config/server.properties
./bin/kafka-server-start.sh -daemon ./config/server.properties

#3、旧版查看topic
./bin/zkCli.sh -server 10.100.1.94:2181  #查看节点
[zk: 10.100.1.94:2181(CONNECTED) 10] get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"INTERNAL":"SASL_PLAINTEXT"},"endpoints":["INTERNAL://10.100.1.94:9092"],"jmx_port":-1,"port":-1,"host":null,"version":5,"timestamp":"1709263320647"}

bin/kafka-topics.sh --bootstrap-server 10.100.1.94:9092 --list
#或
./bin/kafka-topics.sh --zookeeper 10.100.1.94:2181 --list
CCOPS-ZYZX-ENC-001-AVG-1DAY
CCOPS-ZYZX-ENC-001-AVG-1DAY-PROVINCE
CCOPS-ZYZX-ENC-001-AVG-1HOUR
CCOPS-ZYZX-ENC-001-AVG-1HOUR-PROVINCE
#基数
./bin/kafka-topics.sh --zookeeper 10.100.1.94:2181 --list|wc -l
257

#查看kafka集群主题列表和分区信息,分区的ISR列表
./bin/kafka-topics.sh --bootstrap-server 10.100.1.94:9092 --topic YWPT-ZYZX-YYP-001-AVG-5MIN --describe
#或
./bin/kafka-topics.sh --zookeeper 10.100.1.94:2181 --list #可看到该主题有 3 个分区,每个分区只有一个副本,这意味着如果负责分区的代理发生故障,则该分区将不可用,直到代理恢复或创建新的副本为止,正常大于等于2。ISR是存活的副本数量,可以在 broker 或者主题级别进行配置,当可用副本数量小于配置值时,就认为整个分区处于不可用状态,此时客户端再向分区写入数据时候就会抛出异常。副本数由参数replication-factor 指定,主题多个副本中,其中一个副本是首领副本 (Leader replica),所有的事件都直接发送给首领副本;其他副本是跟随者副本 (Follower replica),需要通过复制来保持与首领副本数据一致,当首领副本不可用时,其中一个跟随者副本将成为新首领。单副本情况下,即使你配置了副本因子为 1,Kafka 仍然会将这样的主题视为具有副本的主题。在这种情况下,每个分区只有一个副本,且这个副本只存储在一个 broker 上。

#本案例分区里它是单副本,只有一个leader,副本就是它本身,ISR列表里也记录的是它本身,正常Replicas应显示多个副本,且ISR列表如果显示同步副本的信息同Replicas一致
Topic: YWPT-ZYZX-YYP-001-AVG-5MIN	TopicId: Df7GhiNzQmWpsxFYsfqXWQ	PartitionCount: 3	ReplicationFactor: 1	Configs: 

Topic: YWPT-ZYZX-YYP-001-AVG-5MIN	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
Topic: YWPT-ZYZX-YYP-001-AVG-5MIN	Partition: 1	Leader: 2	Replicas: 2	Isr: 2
Topic: YWPT-ZYZX-YYP-001-AVG-5MIN	Partition: 2	Leader: 3	Replicas: 3	Isr: 3

#比如如下:结果里对于分区0,ISR列表是2,1,这意味着副本2和副本1都与leader副本此时已保持同步。加入某个分区的ISR列表中没有所有的副本,或者ISR列表为空,那么可能存在同步问题。这可能是由于网络问题、磁盘故障、副本落后太多等原因造成的。这种情况下,就需要手动执行同步
Topic:<topic_name>      PartitionCount:3        ReplicationFactor:2     Configs:
        Topic: <topic_name>  Partition: 0    Leader: 2       Replicas: 2,1     Isr: 2,1
        Topic: <topic_name>  Partition: 1    Leader: 1       Replicas: 1,0     Isr: 1,0
        Topic: <topic_name>  Partition: 2    Leader: 0       Replicas: 0,2     Isr: 0,2

#注:Kafka的可用性依赖于副本机制,Kafka三节点集群中,如果每个分区有3个副本并且副本因子为2,那么每个分区都会有2个副本分布在不同的broker上。这样,即使停止其中一个Kafka broker,整个Kafka服务一般仍然应该是可用的,对于每个分区,Kafka会维护指定数量的副本,并且会自动在broker之间复制这些副本以保证高可用性。如果一个broker停止工作,Kafka会尝试在剩余的broker上重新平衡副本,以确保每个分区至少有一个副本是可用的。但如果每个分区只有一个副本(单副本),那么停止其中一个Kafka节点(broker)可能会导致该节点上的分区不可用,从而影响整个Kafka服务的可用性。单副本配置下,每个分区只有一个副本,这意味着如果该分区所在的broker停止工作,那么该分区的数据将不可访问,直到该broker恢复或手动干预来恢复数据。Kafka可能无法继续处理发送到该分区的消息,因为没有可用的副本可以接收和处理这些消息。此外,消费者可能无法从该分区读取消息,导致读取操作失败或阻塞。建议至少为每个分区配置两个或更多的副本,以确保即使某个broker停止工作,其他副本仍然可以提供服务。
#Kafka还提供了ISR(In-Sync Replicas)机制来确保只有与leader副本保持同步的副本才会被选为新的leader,从而提高了系统的可靠性和一致性。ISR列表是包含所有与leader副本保持同步的副本的集合。如果副本未与leader保持同步,它将被移出ISR列表,可能会导致分区不可用。

#查看分区的ISR列表
bin/kafka-topics.sh --zookeeper 10.100.1.94:9092 --topic YWPT-ZYZX-YYP-001-AVG-5MIN --describe


#手动执行副本同步和ISR列表更新
bin/kafka-replica-verification.sh --zookeeper 10.100.1.94:2181 --topic YWPT-ZYZX-YYP-001-AVG-5MIN --partition 1 --replication-id 1 --verify-only

#修改主题分区副本因子,不能直接命令修改,只能重分配,以下命令是不对的
./bin/kafka-topics.sh --alter --zookeeper 10.100.1.94:2181 --topic YWPT-ZYZX-YYP-001-AVG-5MIN --partitions 3 --replication-factor 2

#

./bin/kafka-server-start.sh ./config/server.properties
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

2.3、剩余2节点kafka升级

2.4、集群状态确认

2.5、kafka性能测试

#使用 8 个线程向名为 test-update-perf 的主题发送 500000 条大小为 50000 字节的消息,并将性能统计信息写入位于 ./perf-test 目录中的 CSV 文件中。性能统计信息将每 3000 毫秒报告一次。

./bin/kafka-producer-perf-test.sh --messages 500000 --message-size 50000 --topic test-update-perf --threads 8 --broker-list * —show-detailed-stats --csv-reporter-enabled --metrics-dir ./perf-test --reporting-interval 3000

#参数说明
--messages 500000:指定要发送的消息总数。
--message-size 50000:指定要发送的每条消息的大小(以字节为单位)。
--topic test-update-perf:指定要发送消息的目标主题。
--threads 8:指定用于发送消息的线程数。
--broker-list *:指定 Kafka 集群中所有代理的列表。星号* 表示所有代理。
--show-detailed-stats:启用详细统计信息的显示,包括每个线程的统计信息。
--csv-reporter-enabled:启用 CSV 报告程序,它将性能统计信息写入 CSV 文件。
--metrics-dir ./perf-test:指定用于存储 CSV 报告程序输出的目录。
--reporting-interval 3000:指定性能统计信息报告的间隔(以毫秒为单位)

#示例2:perf-consumer-t4单线程向test-update-perf主题请求消费500000 条大小为 50000 字节的消息,测试性能
/bin/kafka-consumer-perf-test.sh --topic test-update-perf --zookeeper 10.100.1.94:2183 --threads 1 --group perf-consumer-t4 --message-size 50000 --messages 10

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

三、附录

3.1、Kafka消息发送流程

kafka在消息发送的过程中,涉及到两个线程:main 线程和 Sender 线程。其中,main 线程中会创建了一个队列 RecordAccumulator,main 线程将消息发送给 RecordAccumulator;Sender 线程则不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。示意如下:

在这里插入图片描述

3.2、kafka应用场景

在这里插入图片描述

3.3、kafka架构回顾

在这里插入图片描述

3.4、zookeeper中kafka信息结构

在这里插入图片描述

3.5、Kafka Broker 总工作流程

在这里插入图片描述

3.6、消费者组消费流程

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/783124
推荐阅读
相关标签
  

闽ICP备14008679号