当前位置:   article > 正文

Kafka生产调优实践。Kafka消息安全性、消息丢失、消息积压、保证消息顺序性

Kafka生产调优实践。Kafka消息安全性、消息丢失、消息积压、保证消息顺序性

搭建Kafka监控平台

官网地址

在这里插入图片描述



下载efak-web-3.0.2-bin.tar.gz安装包之后,efak需要依赖JDK和数据库。数据库支持本地化的SQLLite以及集中式的MySQL。生产环境建议使用MySQL。

数据库不需要建表初始化,EFAK在执行过程中会自己完成初始化。



将efak压缩包解压

[root@worker1 ~]# tar -zxvf efak-web-3.0.2-bin.tar.gz -C /app/kafka/eagle
  • 1



修改efak解压目录下的conf/system-config.properties。 这个文件中提供了完整的配置,下面只列出需要修改的部分。

######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
# 指向Zookeeper地址
efak.zk.cluster.alias=cluster1
cluster1.zk.list=worker1:2181,worker2:2181,worker3:2181

######################################
# zookeeper enable acl
######################################
# Zookeeper权限控制
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
#cluster1.zk.acl.username=test
#cluster1.zk.acl.password=test123

######################################
# kafka offset storage
######################################
# offset选择存在kafka中。
cluster1.efak.offset.storage=kafka
#cluster2.efak.offset.storage=zk

######################################
# kafka mysql jdbc driver address
######################################
#指向自己的MySQL服务。库需要提前创建
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://worker1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=root
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32



配置EFAK的环境变量

[root@worker1 ~]# vi ~/.bash_profile
export KE_HOME=/app/kafka/eagle/efak-web-3.0.2
PATH=$PATH:#KE_HOME/bin:$HOME/.local/bin:$HOME/bin


[root@worker1 ~]# source ~/.bash_profile
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6



启动EFAK

配置完成后,先启动Zookeeper和Kafka服务,然后调用EFAK的bin目录下的ke.sh脚本启动服务

[root@worker1 bin]$ ./ke.sh start
-- 日志很长,看到以下内容表示服务启动成功
[2023-06-28 16:09:43] INFO: [Job done!]
Welcome to
    ______    ______    ___     __ __
   / ____/   / ____/   /   |   / //_/
  / __/     / /_      / /| |  / ,<   
 / /___    / __/     / ___ | / /| |  
/_____/   /_/       /_/  |_|/_/ |_|  
( Eagle For Apache Kafka® )

Version v3.0.2 -- Copyright 2016-2022
*******************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://192.168.232.128:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20



EFAK管理页面

接下来就可以访问EFAK的管理页面。http://192.168.232.128:8048。 默认的用户名是admin ,密码是123456

关于EFAK更多的使用方式,比如EFAK服务如何集群部署等,可以参考官方文档。



合理规划Kafka部署环境

机械硬盘kafka使用机械硬盘即可,可以不用给Kafka固态硬盘



内存,修改 bin/kafka-start-server.sh启动脚本,修改JVM启动参数中的内存,默认只申请了1G内存。

[oper@worker1 bin]$ cat kafka-server-start.sh 
......
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
......
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

对于主流的16核32G服务器,可以适当扩大Kafka的内存。例如:

export KAFKA_HEAP_OPTS="‐Xmx16G ‐Xms16G ‐Xmn10G ‐XX:MetaspaceSize=256M ‐XX:+UseG1GC ‐XX:MaxGCPauseMillis=50 ‐XX:G1HeapRegionSize=16M"
  • 1



高性能网卡,Kafka本身的服务性能非常高,单机就可以支持百万级的TPS,在高流量冲击下,网络非常有可能优先成为性能瓶颈。对于Kafka服务器,建议配置高性能的网卡。成本允许的话,尽量选择千兆以上的网卡。



合理优化Kafka集群配置

合理配置partition分区数量

Kafka的单个Partition读写效率是非常高的,但是Kafka的Partition设计是非常碎片化。如果Partition文件过多,很容易严重影响Kafka的整体性能。

  • 尽量不要使用过多的Topic,通常不建议超过3个Topic。
  • 一个分区下的副本集数量不要设置过多 将副本数设置为2就可以了。

至于Partition的数量,最好根据业务情况灵活调整。partition数量设置多一些,可以一定程度增加Topic的吞吐量。但是过多的partition数量还是同样会带来partition索引的压力。

Kafka提供了一个生产者的性能压测脚本,可以用来衡量集群的整体性能。

# num-record表示要发送100000条压测消息,record-size表示每条消息大小1KB,throughput表示限流控制,设置为小于0表示不限流。
# properducer-props用来设置生产者的参数。
[oper@worker1 bin]$ ./kafka-producer-perf-test.sh --topic test --num-record 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=worker1:9092 acks=1
94846 records sent, 18969.2 records/sec (18.52 MB/sec), 1157.4 ms avg latency, 1581.0 ms max latency.
133740 records sent, 26748.0 records/sec (26.12 MB/sec), 1150.6 ms avg latency, 1312.0 ms max latency.
146760 records sent, 29346.1 records/sec (28.66 MB/sec), 1051.5 ms avg latency, 1164.0 ms max latency.
137400 records sent, 27480.0 records/sec (26.84 MB/sec), 1123.7 ms avg latency, 1182.0 ms max latency.
158700 records sent, 31740.0 records/sec (31.00 MB/sec), 972.1 ms avg latency, 1022.0 ms max latency.
158775 records sent, 31755.0 records/sec (31.01 MB/sec), 963.5 ms avg latency, 1055.0 ms max latency.
1000000 records sent, 28667.259123 records/sec (28.00 MB/sec), 1030.44 ms avg latency, 1581.00 ms max latency, 1002 ms 50th, 1231 ms 95th, 1440 ms 99th, 1563 ms 99.9th.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10



合理对数据进行压缩

在生产者的ProducerConfig中,有一个配置 COMPRESSION_TYPE_CONFIG,是用来对消息进行压缩的。

public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
  • 1

生产者配置了压缩策略后,会对生产的每个消息进行压缩,从而降低Producer到Broker的网络传输,也降低了Broker的数据存储压力。

所支持的几种压缩算法中,zstd算法具有最高的数据压缩比,但是吞吐量不高,lz4在吞吐量方面的优势比较明显。压缩消息必然增加CPU的消耗,如果CPU资源紧张,就不要压缩了。

关于数据压缩机制,可以在Broker的conf/server.properties文件中进行配置。正常情况下,Broker从Producer端接收到消息后不会对其进行任何修改,但是如果Broker端和Producer端指定了不同的压缩算法,就会产生很多异常的表现。

compression.type

Type:	string
Default:	producer
Valid Values:	[uncompressed, zstd, lz4, snappy, gzip, producer]
  • 1
  • 2
  • 3
  • 4
  • 5



如果开启了消息压缩,那么在消费者端自然是要进行解压缩的。

在Kafka中,消息从Producer到Broker再到Consumer会一直携带消息的压缩方式,这样当Consumer读取到消息集合时,自然就知道了这些消息使用的是哪种压缩算法,也就可以自己进行解压了。但是这时要注意的是应用中使用的Kafka客户端版本和Kafka服务端版本是否匹配。



优化Kafka客户端使用方式

合理保证消息安全

消息生产者设置好发送者应答参数

  • 消息生产者的acks参数,可以设置为0、1、all或-1
  • Broker的min.insync.replicas参数。如果生产者的acks设置为-1或all,服务端并不是强行要求所有Paritition都完成写入再返回,而是可以配置多少个Partition完成消息写入后,再往Producer返回消息。



生产者端的幂等性配置

// 值为 true 或 false
public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
  • 1
  • 2

如果要开启幂等性,那么

  • 生产者消息缓存机制中的 max.in.flight.requests.per.connection <=5
  • 重试次数retries>0
  • 应答机制中的acks必须为all

其他补充点:

  • 如果没有设置冲突配置,则默认启用幂等性。
  • 如果设置了冲突的配置,并且幂等性没有显式启用,则幂等性被禁用。
  • 即显示开启的幂等性,又有冲突配置,就抛异常



使用生产者事务机制发送消息

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 提交事务
void commitTransaction() throws ProducerFencedException;
// 4 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

SpringBoot集成Kafka时使用的KafkaTemplate就提供了事务消息机制

在这里插入图片描述



消费者端合理使用提交方式

如果消费者要使用异步方式进行业务处理,那么如果业务处理失败,此时消费者已经提交了Offset,这个消息就无法重试了,这就会造成消息丢失。

因此在消费者端,尽量不要使用异步处理方式,在绝大部分场景下,就能够通过Kafka的消费者重试机制,保证消息安全处理。此时,在消费者端,需要更多考虑的问题,就变成了消费重试机制造成的消息重复消费的问题。



消费者防止消息重复消费

问题的产生:消费者业务处理时间较长,此时消费者正常处理消息的过程中,Broker端就已经等不下去了,认为这个消费者处理失败了。这时就会往同组的其他消费者实例投递消息,这就造成了消息重复处理。这就给消费者端带来不必要的幂等性问题。

解决方式:

  • 根据业务ID校验。比如对于订单消息,消费者根据订单ID去确认一下这个订单消息有没有处理过。

  • 统一的方式处理幂等性问题

    将Offset放到Redis中自行进行管理。通过Redis中的offset来判断消息之前是否处理过。

    具体实现详情请参考上文《客户端 — 客户端属性分析 — 消息重复消费问题》



生产环境常见问题分析

消息零丢失方案

在这里插入图片描述



1、 生产者发送消息到Broker不丢失

生产者丢失消息的原因就和ack机制有关,如果将acks设置为1、all或-1 就表示接收到Broker成功将消息写入文件后的ack应答



2、Broker端保证消息不丢失

Broker端丢失消息的原因主要有两种:Leader partition重新选举、服务器非正常关机 pageCache中的消息还未刷盘

《服务端的Zookeeper元数据梳理 — Partition故障恢复机制》 《服务端日志 — 合理配置刷盘频率》

消息生产者将acks设置为all可以有效避免因为Leader partition故障导致的消息丢失;

而pageCache刷盘可以通过下面的参数来设定合理的配置

# 多长时间进行一次强制刷盘。默认是Long.MAX。
flush.ms

# 表示当同一个Partiton的消息条数积累到这个数量时,就会申请一次刷盘操作。默认是Long.MAX。
log.flush.interval.messages

#当一个消息在内存中保留的时间,达到这个数量时,就会申请一次刷盘操作。他的默认值是空。如果这个参数配置为空,则生效的是下一个参数。
log.flush.interval.ms

# 日志刷新程序检查是否需要将日志刷新到磁盘的频率(以毫秒为单位),默认也是Long.MAX。
log.flush.scheduler.interval.ms
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11



3、消费者保证消息不丢失

同步处理业务逻辑,同步手动提交offset



消息积压如何处理

在这里插入图片描述

  • 如果业务运行正常,只是因为消费者处理消息过慢,造成消息加压。

    增加消费者个数,最多让一个消费者组下的消费者个数=Partition分区数,让一个Consumer负责一个分区,将消费进度提升到最大。

    如果还是不够,那就新创建一个topic,给这个topic更多的partition,然后启动一批消费者,将消息从旧topic中搬运到新topic中,这些消费者不处理业务消息,仅仅是搬运。这样就可以创建更多的消费者消费从新topic消费了

  • 如果是消费者业务处理异常导致的消息积压。

    使用一种降级方案。先启动一个Consumer将Topic下的消息先转发到其他队列中,然后再慢慢分析新队列里的消息处理问题。类似于死信队列的处理方式。



如何保证消息顺序

Kafka很难保证消息的顺序,因为Kafka设计的最优先重点是海量吞吐。

在这里插入图片描述



这个问题要分两个方面考虑:

  1. Topic下各个partition是并发处理消息的 ,producer要保证一组有序的消息发送到一个partition中
  2. consumer要从partition中顺序处理消息



问题一:producer要保证一组有序的消息发送到一个partition中

第一种简单粗暴的方式就只为Topic创建一个partition,这种方式放弃了多partition带来的高吞吐量。

第二种方式,自己定义一个分区路由机制,实现org.apache.kafka.clients.producer.Partitioner接口,将消息分配到同一个Partition上。

此时发送方保证了发送时的消息顺序性。但是存在网络问题,导致发送的一批消息中某一个消息丢失,这样消息到Broker时就还是没办法保证顺序性。Kafka是通过幂等性中单调递增的sequenceNumber来保证消息是顺序,因为是单调递增的,所以还能判断是否存在消息丢失一旦Kafka发现Producer传过来的SequenceNumber出现了跨越,那么就意味着中间有可能消息出现了丢失,就会往Producer抛出一个OutOfOrderSequenceException异常。



问题二:Partition中的消息有序后,如何保证Consumer的消费顺序是有序的

// 一次拉取消息的数量
public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
Note that the consumer performs multiple fetches in parallel.
    
    
public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

从上方fetch.max.bytes参数的最后一个解释可以知道Consumer其实是每次并行的拉取多个Batch批次的消息进行处理的。所以在Kafka提供的客户端Consumer中,是没有办法直接保证消费的消息顺序。

我们能做的就是在Consumer的处理逻辑中,将消息进行排序。比如将消息按照业务独立性收集到一个集合中,然后在集合中对消息进行排序。

RocketMQ中提供了顺序消息的实现。他的实现原理是先锁定一个队列,消费完这一个队列后,才开始锁定下一个队列,并消费队列中的消息。再结合MessageQueue中的消息有序性,就能保证整体消息的消费顺序是有序的。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/968200
推荐阅读
相关标签
  

闽ICP备14008679号