赞
踩
下载efak-web-3.0.2-bin.tar.gz安装包之后,efak需要依赖JDK和数据库。数据库支持本地化的SQLLite以及集中式的MySQL。生产环境建议使用MySQL。
数据库不需要建表初始化,EFAK在执行过程中会自己完成初始化。
将efak压缩包解压
[root@worker1 ~]# tar -zxvf efak-web-3.0.2-bin.tar.gz -C /app/kafka/eagle
修改efak解压目录下的conf/system-config.properties。 这个文件中提供了完整的配置,下面只列出需要修改的部分。
###################################### # multi zookeeper & kafka cluster list # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead ###################################### # 指向Zookeeper地址 efak.zk.cluster.alias=cluster1 cluster1.zk.list=worker1:2181,worker2:2181,worker3:2181 ###################################### # zookeeper enable acl ###################################### # Zookeeper权限控制 cluster1.zk.acl.enable=false cluster1.zk.acl.schema=digest #cluster1.zk.acl.username=test #cluster1.zk.acl.password=test123 ###################################### # kafka offset storage ###################################### # offset选择存在kafka中。 cluster1.efak.offset.storage=kafka #cluster2.efak.offset.storage=zk ###################################### # kafka mysql jdbc driver address ###################################### #指向自己的MySQL服务。库需要提前创建 efak.driver=com.mysql.cj.jdbc.Driver efak.url=jdbc:mysql://worker1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull efak.username=root efak.password=root
配置EFAK的环境变量
[root@worker1 ~]# vi ~/.bash_profile
export KE_HOME=/app/kafka/eagle/efak-web-3.0.2
PATH=$PATH:#KE_HOME/bin:$HOME/.local/bin:$HOME/bin
[root@worker1 ~]# source ~/.bash_profile
启动EFAK
配置完成后,先启动Zookeeper和Kafka服务,然后调用EFAK的bin目录下的ke.sh脚本启动服务
[root@worker1 bin]$ ./ke.sh start -- 日志很长,看到以下内容表示服务启动成功 [2023-06-28 16:09:43] INFO: [Job done!] Welcome to ______ ______ ___ __ __ / ____/ / ____/ / | / //_/ / __/ / /_ / /| | / ,< / /___ / __/ / ___ | / /| | /_____/ /_/ /_/ |_|/_/ |_| ( Eagle For Apache Kafka® ) Version v3.0.2 -- Copyright 2016-2022 ******************************************************************* * EFAK Service has started success. * Welcome, Now you can visit 'http://192.168.232.128:8048' * Account:admin ,Password:123456 ******************************************************************* * <Usage> ke.sh [start|status|stop|restart|stats] </Usage> * <Usage> https://www.kafka-eagle.org/ </Usage> *******************************************************************
EFAK管理页面
接下来就可以访问EFAK的管理页面。http://192.168.232.128:8048。 默认的用户名是admin ,密码是123456
关于EFAK更多的使用方式,比如EFAK服务如何集群部署等,可以参考官方文档。
机械硬盘kafka使用机械硬盘即可,可以不用给Kafka固态硬盘
内存,修改 bin/kafka-start-server.sh
启动脚本,修改JVM启动参数中的内存,默认只申请了1G内存。
[oper@worker1 bin]$ cat kafka-server-start.sh
......
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
......
对于主流的16核32G服务器,可以适当扩大Kafka的内存。例如:
export KAFKA_HEAP_OPTS="‐Xmx16G ‐Xms16G ‐Xmn10G ‐XX:MetaspaceSize=256M ‐XX:+UseG1GC ‐XX:MaxGCPauseMillis=50 ‐XX:G1HeapRegionSize=16M"
高性能网卡,Kafka本身的服务性能非常高,单机就可以支持百万级的TPS,在高流量冲击下,网络非常有可能优先成为性能瓶颈。对于Kafka服务器,建议配置高性能的网卡。成本允许的话,尽量选择千兆以上的网卡。
合理配置partition分区数量
Kafka的单个Partition读写效率是非常高的,但是Kafka的Partition设计是非常碎片化。如果Partition文件过多,很容易严重影响Kafka的整体性能。
至于Partition的数量,最好根据业务情况灵活调整。partition数量设置多一些,可以一定程度增加Topic的吞吐量。但是过多的partition数量还是同样会带来partition索引的压力。
Kafka提供了一个生产者的性能压测脚本,可以用来衡量集群的整体性能。
# num-record表示要发送100000条压测消息,record-size表示每条消息大小1KB,throughput表示限流控制,设置为小于0表示不限流。
# properducer-props用来设置生产者的参数。
[oper@worker1 bin]$ ./kafka-producer-perf-test.sh --topic test --num-record 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=worker1:9092 acks=1
94846 records sent, 18969.2 records/sec (18.52 MB/sec), 1157.4 ms avg latency, 1581.0 ms max latency.
133740 records sent, 26748.0 records/sec (26.12 MB/sec), 1150.6 ms avg latency, 1312.0 ms max latency.
146760 records sent, 29346.1 records/sec (28.66 MB/sec), 1051.5 ms avg latency, 1164.0 ms max latency.
137400 records sent, 27480.0 records/sec (26.84 MB/sec), 1123.7 ms avg latency, 1182.0 ms max latency.
158700 records sent, 31740.0 records/sec (31.00 MB/sec), 972.1 ms avg latency, 1022.0 ms max latency.
158775 records sent, 31755.0 records/sec (31.01 MB/sec), 963.5 ms avg latency, 1055.0 ms max latency.
1000000 records sent, 28667.259123 records/sec (28.00 MB/sec), 1030.44 ms avg latency, 1581.00 ms max latency, 1002 ms 50th, 1231 ms 95th, 1440 ms 99th, 1563 ms 99.9th.
合理对数据进行压缩
在生产者的ProducerConfig中,有一个配置 COMPRESSION_TYPE_CONFIG,是用来对消息进行压缩的。
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
生产者配置了压缩策略后,会对生产的每个消息进行压缩,从而降低Producer到Broker的网络传输,也降低了Broker的数据存储压力。
所支持的几种压缩算法中,zstd算法具有最高的数据压缩比,但是吞吐量不高,lz4在吞吐量方面的优势比较明显。压缩消息必然增加CPU的消耗,如果CPU资源紧张,就不要压缩了。
关于数据压缩机制,可以在Broker的conf/server.properties文件中进行配置。正常情况下,Broker从Producer端接收到消息后不会对其进行任何修改,但是如果Broker端和Producer端指定了不同的压缩算法,就会产生很多异常的表现。
compression.type
Type: string
Default: producer
Valid Values: [uncompressed, zstd, lz4, snappy, gzip, producer]
如果开启了消息压缩,那么在消费者端自然是要进行解压缩的。
在Kafka中,消息从Producer到Broker再到Consumer会一直携带消息的压缩方式,这样当Consumer读取到消息集合时,自然就知道了这些消息使用的是哪种压缩算法,也就可以自己进行解压了。但是这时要注意的是应用中使用的Kafka客户端版本和Kafka服务端版本是否匹配。
消息生产者设置好发送者应答参数
acks
参数,可以设置为0、1、all或-1生产者端的幂等性配置
// 值为 true 或 false
public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
如果要开启幂等性,那么
max.in.flight.requests.per.connection
<=5retries
>0acks
必须为all其他补充点:
使用生产者事务机制发送消息
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 提交事务
void commitTransaction() throws ProducerFencedException;
// 4 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
SpringBoot集成Kafka时使用的KafkaTemplate就提供了事务消息机制
消费者端合理使用提交方式
如果消费者要使用异步方式进行业务处理,那么如果业务处理失败,此时消费者已经提交了Offset,这个消息就无法重试了,这就会造成消息丢失。
因此在消费者端,尽量不要使用异步处理方式,在绝大部分场景下,就能够通过Kafka的消费者重试机制,保证消息安全处理。此时,在消费者端,需要更多考虑的问题,就变成了消费重试机制造成的消息重复消费的问题。
问题的产生:消费者业务处理时间较长,此时消费者正常处理消息的过程中,Broker端就已经等不下去了,认为这个消费者处理失败了。这时就会往同组的其他消费者实例投递消息,这就造成了消息重复处理。这就给消费者端带来不必要的幂等性问题。
解决方式:
根据业务ID校验。比如对于订单消息,消费者根据订单ID去确认一下这个订单消息有没有处理过。
统一的方式处理幂等性问题
将Offset放到Redis中自行进行管理。通过Redis中的offset来判断消息之前是否处理过。
具体实现详情请参考上文《客户端 — 客户端属性分析 — 消息重复消费问题》
1、 生产者发送消息到Broker不丢失
生产者丢失消息的原因就和ack机制有关,如果将acks
设置为1、all或-1 就表示接收到Broker成功将消息写入文件后的ack应答
2、Broker端保证消息不丢失
Broker端丢失消息的原因主要有两种:Leader partition重新选举、服务器非正常关机 pageCache中的消息还未刷盘
《服务端的Zookeeper元数据梳理 — Partition故障恢复机制》 《服务端日志 — 合理配置刷盘频率》
消息生产者将acks
设置为all可以有效避免因为Leader partition故障导致的消息丢失;
而pageCache刷盘可以通过下面的参数来设定合理的配置
# 多长时间进行一次强制刷盘。默认是Long.MAX。
flush.ms
# 表示当同一个Partiton的消息条数积累到这个数量时,就会申请一次刷盘操作。默认是Long.MAX。
log.flush.interval.messages
#当一个消息在内存中保留的时间,达到这个数量时,就会申请一次刷盘操作。他的默认值是空。如果这个参数配置为空,则生效的是下一个参数。
log.flush.interval.ms
# 日志刷新程序检查是否需要将日志刷新到磁盘的频率(以毫秒为单位),默认也是Long.MAX。
log.flush.scheduler.interval.ms
3、消费者保证消息不丢失
同步处理业务逻辑,同步手动提交offset
如果业务运行正常,只是因为消费者处理消息过慢,造成消息加压。
增加消费者个数,最多让一个消费者组下的消费者个数=Partition分区数,让一个Consumer负责一个分区,将消费进度提升到最大。
如果还是不够,那就新创建一个topic,给这个topic更多的partition,然后启动一批消费者,将消息从旧topic中搬运到新topic中,这些消费者不处理业务消息,仅仅是搬运。这样就可以创建更多的消费者消费从新topic消费了
如果是消费者业务处理异常导致的消息积压。
使用一种降级方案。先启动一个Consumer将Topic下的消息先转发到其他队列中,然后再慢慢分析新队列里的消息处理问题。类似于死信队列的处理方式。
Kafka很难保证消息的顺序,因为Kafka设计的最优先重点是海量吞吐。
这个问题要分两个方面考虑:
问题一:producer要保证一组有序的消息发送到一个partition中
第一种简单粗暴的方式就只为Topic创建一个partition,这种方式放弃了多partition带来的高吞吐量。
第二种方式,自己定义一个分区路由机制,实现org.apache.kafka.clients.producer.Partitioner
接口,将消息分配到同一个Partition上。
此时发送方保证了发送时的消息顺序性。但是存在网络问题,导致发送的一批消息中某一个消息丢失,这样消息到Broker时就还是没办法保证顺序性。Kafka是通过幂等性中单调递增的sequenceNumber来保证消息是顺序,因为是单调递增的,所以还能判断是否存在消息丢失一旦Kafka发现Producer传过来的SequenceNumber出现了跨越,那么就意味着中间有可能消息出现了丢失,就会往Producer抛出一个OutOfOrderSequenceException异常。
问题二:Partition中的消息有序后,如何保证Consumer的消费顺序是有序的
// 一次拉取消息的数量
public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
Note that the consumer performs multiple fetches in parallel.
public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;
从上方fetch.max.bytes
参数的最后一个解释可以知道Consumer其实是每次并行的拉取多个Batch批次的消息进行处理的。所以在Kafka提供的客户端Consumer中,是没有办法直接保证消费的消息顺序。
我们能做的就是在Consumer的处理逻辑中,将消息进行排序。比如将消息按照业务独立性收集到一个集合中,然后在集合中对消息进行排序。
RocketMQ中提供了顺序消息的实现。他的实现原理是先锁定一个队列,消费完这一个队列后,才开始锁定下一个队列,并消费队列中的消息。再结合MessageQueue中的消息有序性,就能保证整体消息的消费顺序是有序的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。