赞
踩
linger.ms
:表示一个batch还未满时,时隔多久开始发送。默认是0ms,即缓冲区中一有数据就开始向kafka集群开始发送,此时批次大小的数据没有意义。一般修改linger.ms=5~100ms
,给缓冲队列一些时间让他能够向其中存放数据,能够按照batch进行数据发送。也不要设的太大,避免数据量少的时候长时间不发送。batch.size
:批次大小,一个batch中存放多少数据。compression.type
:压缩方式snappy
,压缩数据能够减小数据大小,提高数据吞吐效率。可以选择的压缩方式:
RecordAccumulator
:缓冲区大小,默认32Mb,可以修改为64Mb
生产者处按照有序的顺序产生数据,写入不同的分区中,消费者在读取数据的时候,因为需要从不同的分区中读取数据,在同一分区内的数据是保持有序的,但是在不同分区之间的数据是无序的。
在单分区中的有序也是有条件的。
如果需要多分区中的数据依旧保持有序,那么需要在消费者端创建多个窗口,等待数据全部到达消费者端后,在消费者端进行统一排序,反而降低了效率。
max.in.flight.requests.per.connection=1
表示缓存队列中最多只能缓存一个请求,那么分区内一定是有序的,因为失败的请求就会被阻塞,直到成功为止。此时甚至不需要开启幂等性max.in.flight.requests.per.connection=1
max.in.flight.requests.per.connection
需要设置小于等于5sequence number
,这个值是单调递增的,因此可以根据该值对数据进行排序。如果sequence number
较大的数据先到达了,那么就会等待所有数据都到齐了之后,在服务器端对所有数据进行排序之后再落盘。创建了一个新节点,修改完相关内容后,该节点并没有直接加入到kafka集群中
topics_to_mv.json
{
"topics":[
{"topic": "first"},
{"topic": "second"}
],
"version":1
}
bin/kafka-reassign-parititons.sh --bootstrap-server node1:9092 --topics-to-move-json-file topics_to_move.json --broker-list "1,2,3,4" --generate
--topics-to-move-json-file
指定按照哪个文件,对哪个主题进行负载均衡操作--broker-list
指定需要对那几台broker进行负载均衡--generate
生成负载均衡计划increase_replication_factor.json
就是把之前生成的负载均衡计划给拿过来
4. 执行这个副本存储计划
bin/kafka-reassign-parititons.sh --bootstrap-server node1:9092 --reassignment-json-file increase_replication_factor.json --execute
--reassignment-json-file
副本存储计划地址--execute
执行bin/kafka-reassign-parititons.sh --bootstrap-server node1:9092 --reassignment-json-file increase_replication_factor.json --verify
--verify
将之前的execute改成verify,验证对应的计划和服役一个新节点基本一致,只需要在生成负载均衡计划的时候,将对应需要退役的节点的brokerid进行去除即可。
这样操作就能够将这个节点上原先的数据转移到其他节点上,让其他节点重新负载均衡其上面的数据.
实际生产的环境中,可能存在一些机器性能高,而一些机器性能低的情况。kafka默认的分区情况是不考虑机器资源的负载均衡的,其只考虑机器数量上的负载均衡问题。因此需要根据实际手动调整分区数量。
现在假设希望将所有的数据存储在node1和node2上
increase-replication-factor.json
{ "version":1, "partitions":[ { "topic":"test", "partition":0, "replicas":[0,1] }, { "topic":"test", "partition":1, "replicas":[0,1] }, { "topic":"test", "partition":2, "replicas":[1,0] }, ] }
就是让原先在2上的数据重新负载均衡到0,1上。
2. 执行改副本存储计划bin/kafka-reassign-partitions.sh --bootstrap-servers node1:9092 --reassignment-json-file increase-replication-factor.json --execute
和之前的节点退役重负载均衡问题类似
auto.leader.rebalance.enable
,开启后能够自动进行leader partition的均衡。该参数默认开启。leader.imbalance.per.broker.percentage
,每个broker允许的不平衡leader比例。如果每个broker上不平衡的leader都超过了这个值,就会由Controller来进行leader的再平衡。该值默认是10%
。如果开启了自动平衡,推荐将不平衡比例调大,确保不会轻易的进行自平衡。leader.imbalance.check.interval.seconds
检查leader负载是否均衡的间隔时间,默认300s
按照正常情况,replicas中第一个副本所在的分区应该和leader是保持一致的,如果不同,说明发生过节点的宕机,它上面的leader被分配到其他节点上了。
计算某个节点的不平衡比例
0号分区本来应该是partition2的leader,因为他在partition2的replicas中排第一个,但实际上不是,因此有一个不平衡数
对于0号节点,AR总副本数为4,有一个不平衡数,因此1/4=25%>10%,0号节点不平衡
某些主题可能在生产过程中重要等级发生了提升,需要对该主题增加副本数。同样是在json中制定号对应的重负载计划,然后按照计划进行执行。
到底哪个consumer来消费哪个partition中的数据
kafka提供了4种主流的分区分配策略
通过修改配置参数partition.assignment.strategy
来修改对应的分区分配策略,默认策略为Range+CooperativeSticky
kakfa可以同时使用多个分区分配策略
Range
partition / consumer
来决定每个消费者需要消费几个分区。如果这个结果除不尽,那么前面的消费者需要多消费几个分区N
个,每个都除不尽,那么第0个分区就要比其他分区多处理N
个分区,会造成数据倾斜
RoundRobin
Sticky
这个问题的关键就是在于如何提高consumer对应的吞吐量
消费者数=分区数
拉取数据/处理速度>生产速度
场景: 100w日活,每人每天100条日志,每天日志总数1亿条
2 * (生产者峰值速率 * 副本数 /100) + 1
= 2 * (20 m/s * 2 / 100) + 1 = 3 台kafka-server-start.sh
中的配置页缓存=分区数 * 1g * 25% / 服务器台数
num.io.threads=8
负责写磁盘的线程数,占总核数的50%num.replica.fetchers=1
副本拉取线程数 占总核数1/6num.network.threads=3
数据传输线程 占总核数1/3峰值吞吐量
一般选择千兆网卡auto.create.topics.enable
如果没有主题的时候就向其中发送消息,系统会自动创建一个主题,该主题是非预期的,增加了主题管理和维护上的难度,因此该值一般置为false
buffer.memory
batch.size
linger.ms
compression.type
fetch.max.bytes
max.poll.records
总吞吐量 / min(producer吞吐量,consumer吞吐量)
kafka单条日志的默认上限是1m,如果消息大小超过1m了,那么会导致kafka可能卡住
message.max.bytes
:默认1m,broker端接受每批次消息的最大值max.request.size
默认1m,生产者向broker发送请求每个消息的最大值replica.fetch.max.bytes
默认1m,副本同步时每个批次消息的最大值fetch.max.bytes
消费者端一次拉取的最大数据量,但是这个一个软上限 默认50mkafka自带了压测的脚本
kafka-producer-perf-test.sh
kafka-consumer-perf-test.sh
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 100000 --throughput 10000 --producer-props bootstrap.servers=node1:9092 batch.size=16384 linger.ms=0
参数 | 描述 |
---|---|
topic | 主题 |
record-size | 单条信息的大小,单位为字节 |
num-records | 总共发送的信息条数 |
throughput | 每秒多少条消息,设为-1表示不设限,尽可能快的生产数据,可测出生产者最大吞吐量 |
producer-props | 配置生产者的一些参数 |
kafka-consumer-perf-test.sh --bootstrap-servers:node1:9092 -topic test --message 1000000 --consumer.config config/consumer.properties
参数 | 描述 |
---|---|
topic | 主题 |
message | 发送的消息条数 |
consumer.config | 对应的消费者配置,与生产者压测不同的是,这个需要写在一个properties文件中,通过读取文件的方式加载配置 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。