100 万日活,每人每天 100 条日志,每天总共的日志条数是 100 万 * 100 条 = 1 亿条。
1 亿/24 小时/60 分/60 秒 = 1150 条/每秒钟。
每条日志大小:0.5k - 2k(取 1k)。
1150 条/每秒钟 * 1k ≈ 1m/s 。
高峰期每秒钟:1150 条 * 20 倍 = 23000 条。
服务器台数 = 2 * (生产者峰值生产速率 * 副本 / 100) + 1 = 2 * (20m/s * 2 / 100) + 1 = 3 台
建议 3 台服务器。
kafka 底层主要是顺序写,固态硬盘和机械硬盘的顺序写速度差不多。
每天总数据量:1 亿条 * 1k ≈ 100g
100g * 副本 2 * 保存时间 3 天 / 0.7 ≈ 1T。
建议三台服务器硬盘总大小,大于等于 1T。
Kafka 内存组成:堆内存 + 页缓存
1、Kafka 堆内存建议每个节点:10g ~ 15g
在 kafka-server-start.sh 中修改
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"
(1)查看 Kafka 进程号
2321 Kafka
5255 Jps
1931 QuorumPeerMain
(2)根据 Kafka 进程号,查看 Kafka 的 GC 情况
jstat -gc 2321 1s 10
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
(3)根据 Kafka 进程号,查看 Kafka 的堆内存
jmap -heap 2321 Attaching to process ID 2321, please wait... Debugger attached successfully. Server compiler detected. JVM version is 25.212-b10 using thread-local object allocation. Garbage-First (G1) GC with 8 thread(s) Heap Configuration: MinHeapFreeRatio = 40 MaxHeapFreeRatio = 70 MaxHeapSize = 2147483648 (2048.0MB) NewSize = 1363144 (1.2999954223632812MB) MaxNewSize = 1287651328 (1228.0MB) OldSize = 5452592 (5.1999969482421875MB) NewRatio = 2 SurvivorRatio = 8 MetaspaceSize = 21807104 (20.796875MB) CompressedClassSpaceSize = 1073741824 (1024.0MB) MaxMetaspaceSize = 17592186044415 MB G1HeapRegionSize = 1048576 (1.0MB) Heap Usage: G1 Heap: regions = 2048 capacity = 2147483648 (2048.0MB) used = 246367744 (234.95458984375MB) free = 1901115904 (1813.04541015625MB) 11.472392082214355% used G1 Young Generation: Eden Space: regions = 83 capacity = 105906176 (101.0MB) used = 87031808 (83.0MB) free = 18874368 (18.0MB) 82.17821782178218% used Survivor Space: regions = 7 capacity = 7340032 (7.0MB) used = 7340032 (7.0MB) free = 0 (0.0MB) 100.0% used G1 Old Generation: regions = 147 capacity = 2034237440 (1940.0MB) used = 151995904 (144.95458984375MB) free = 1882241536 (1795.04541015625MB) 7.471886074420103% used 13364 interned Strings occupying 1449608 bytes.
2、页缓存:页缓存是 Linux 系统服务器的内存。我们只需要保证 1 个 segment(1g)中 25%的数据在内存中就好。
每个节点页缓存大小 =(分区数 * 1g * 25%)/ 节点数。例如 10 个分区,页缓存大小 =(10 * 1g * 25%)/ 3 ≈ 1g
建议服务器内存大于等于 11G。
num.io.threads = 8 负责写磁盘的线程数,整个参数值要占总核数的 50%。
num.replica.fetchers = 1 副本拉取线程数,这个参数占总核数的 50%的 1/3。
num.network.threads = 3 数据传输线程数,这个参数占总核数的 50%的 2/3。
建议 32 个 cpu core。
网络带宽 = 峰值吞吐量 ≈ 20MB/s 选择千兆网卡即可。
100Mbps 单位是 bit;10M/s 单位是 byte ; 1byte = 8bit,100Mbps/8 = 12.5M/s。
一般百兆的网卡(100Mbps )、千兆的网卡(1000Mbps)、万兆的网卡(10000Mbps)。
3.1.1 Updating Broker Configs
From Kafka version 1.1 onwards, some of the broker configs can be
updated without restarting the broker. See the Dynamic Update Mode
column in Broker Configs for the update mode of each broker config.
read-only: Requires a broker restart for update
per-broker: May be updated dynamically for each broker
cluster-wide: May be updated dynamically as a cluster-wide default.
May also be updated as a per-broker value for testing.
2、Kafka 的事务一共有如下 5 个 API
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
vim topics-to-move.json
"topics": [
{"topic": "first"}
"version": 1
bin/kafka-reassign-partitions.sh --
bootstrap-server hadoop102:9092 --topics-to-move-json-file
topics-to-move.json --broker-list "0,1,2,3" --generate
3、创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)。
vim increase-replication-factor.json
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increasereplication-factor.json --execute
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increasereplication-factor.json --verify
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
1、创建 topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic four
(1)创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)。
vim increase-replication-factor.json
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increasereplication-factor.json --execute
1、创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。
vim increase-replication-factor.json
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increasereplication-factor.json --execute
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increasereplication-factor.json --verify
如果 broker 端配置参数 auto.create.topics.enable 设置为 true(默认值是 true)
,那么当生产者向一个未创建的主题发送消息时,会自动创建一个分区数为 num.partitions(默认值为1)、副本因子为 default.replication.factor(默认值为 1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会自动创建一个相应主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。
生产环境建议将该参数设置为 false
1、向一个没有提前创建 five 主题发送数据
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic five
>hello world
2、查看 five 主题的详情
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic five
kafkaConsumer.seek(topic, 1000);
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(topicPartition, System.currentTimeMillis() -1 * 24 * 3600 * 1000);
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
:发送消息的缓冲区大小,默认值是 32m,可以增加到 64m。
:默认是 16k。如果 batch 设置太小,会导致频繁网络请求,吞吐量下降;如果 batch 太大,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
:这个值默认是 0,意思就是消息必须立即被发送。一般设置一个 5-100 毫秒。如果 linger.ms 设置的太小,会导致频繁网络请求,吞吐量下降;如果 linger.ms 太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
:默认是 none,不压缩,但是也可以使用 lz4 压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大 producer 端的 CPU 开销。
(1)调整 fetch.max.bytes
大小,默认是 50m。
(2)调整 max.poll.records
大小,默认是 500 条。
enable.idempotence = true
) + 事务 。2、broker 服务端角度
min.insync.replicas = 2
enable.auto.commit = false
)。1、创建一个只有 1 个分区的 topic。
2、测试这个 topic 的 producer 吞吐量和 consumer 吞吐量。
3、假设他们的值分别是 Tp 和 Tc,单位可以是 MB/s。
4、然后假设总的目标吞吐量是 Tt,那么分区数 = Tt / min(Tp,Tc)。
例如:producer 吞吐量 = 20m/s;consumer 吞吐量 = 50m/s,期望吞吐量 100m/s;
分区数 = 100 / 20 = 5 分区
分区数一般设置为:3-10 个
在生产环境中,如果某个 Kafka 节点挂掉。正常处理办法:
2、如果重启不行,考虑增加内存、增加 CPU、网络带宽。
3、如果将 kafka 整个节点误删除,如果副本数大于等于 2,可以按照服役新节点的方式重新服役一个新节点,并执行负载均衡。
用 Kafka 官方自带的脚本,对 Kafka 进行压测。
1、创建一个 test topic,设置为 3 个分区 3 个副本
bin/kafka-topics.sh --bootstrapserver hadoop102:9092 --create --replication-factor 3 --partitions 3 --topic test
2、在 /opt/module/kafka/bin 目录下面有这两个文件。我们来测试一下
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092batch.size=16384 linger.ms=0
batch.size 配置为 16k
ap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=16384 linger.ms=0 37021 records sent, 7401.2 records/sec (7.23 MB/sec), 1136.0 ms avg latency, 1453.0 ms max latency. 50535 records sent, 10107.0 records/sec (9.87 MB/sec), 1199.5 ms avg latency, 1404.0 ms max latency. 47835 records sent, 9567.0 records/sec (9.34 MB/sec), 1350.8 ms avg latency, 1570.0 ms max latency. 。。。 。。。 42390 records sent, 8444.2 records/sec (8.25 MB/sec), 3372.6 ms avg latency, 4008.0 ms max latency. 37800 records sent, 7558.5 records/sec (7.38 MB/sec), 4079.7 ms avg latency, 4758.0 ms max latency. 33570 records sent, 6714.0 records/sec (6.56 MB/sec), 4549.0 ms avg latency, 5049.0 ms max latency. 1000000 records sent, 9180.713158 records/sec (8.97 MB/sec), 1894.78 ms avg latency, 5049.00 ms max latency, 1335 ms 50th, 4128 ms 95th, 4719 ms 99th, 5030 ms 99.9th.
3、调整 batch.size 大小
(1)batch.size 默认值是 16k。本次实验 batch.size 设置为 32k。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=32768 linger.ms=0
49922 records sent, 9978.4 records/sec (9.74 MB/sec), 64.2 ms avg latency,
340.0 ms max latency.
49940 records sent, 9988.0 records/sec (9.75 MB/sec), 15.3 ms avg latency,
31.0 ms max latency.
50018 records sent, 10003.6 records/sec (9.77 MB/sec), 16.4 ms avg latency,
52.0 ms max latency.
。。。 。。。
49960 records sent, 9992.0 records/sec (9.76 MB/sec), 17.2 ms avg latency,
40.0 ms max latency.
50090 records sent, 10016.0 records/sec (9.78 MB/sec), 16.9 ms avg latency,
47.0 ms max latency.
1000000 records sent, 9997.600576 records/sec (9.76 MB/sec), 20.20 ms avg
latency, 340.00 ms max latency, 16 ms 50th, 30 ms 95th, 168 ms 99th, 249
ms 99.9th.
(2)batch.size 默认值是 16k。本次实验 batch.size 设置为 4k。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=0
15598 records sent, 3117.1 records/sec (3.04 MB/sec), 1878.3 ms avg latency,
3458.0 ms max latency.
17748 records sent, 3549.6 records/sec (3.47 MB/sec), 5072.5 ms avg latency,
6705.0 ms max latency.
18675 records sent, 3733.5 records/sec (3.65 MB/sec), 6800.9 ms avg latency,
7052.0 ms max latency.
。。。 。。。
19125 records sent, 3825.0 records/sec (3.74 MB/sec), 6416.5 ms avg latency,
7023.0 ms max latency.
1000000 records sent, 3660.201531 records/sec (3.57 MB/sec), 6576.68 ms
avg latency, 7677.00 ms max latency, 6745 ms 50th, 7298 ms 95th, 7507 ms
99th, 7633 ms 99.9th.
4、调整 linger.ms 时间:linger.ms 默认是 0ms。本次实验 linger.ms 设置为 50ms。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50
16804 records sent, 3360.1 records/sec (3.28 MB/sec), 1841.6 ms avg latency,
3338.0 ms max latency.
18972 records sent, 3793.6 records/sec (3.70 MB/sec), 4877.7 ms avg latency,
6453.0 ms max latency.
19269 records sent, 3852.3 records/sec (3.76 MB/sec), 6477.9 ms avg latency,
6686.0 ms max latency.
。。。 。。。
17073 records sent, 3414.6 records/sec (3.33 MB/sec), 6987.7 ms avg latency,
7353.0 ms max latency.
19326 records sent, 3865.2 records/sec (3.77 MB/sec), 6756.5 ms avg latency,
7357.0 ms max latency.
1000000 records sent, 3842.754486 records/sec (3.75 MB/sec), 6272.49 ms
avg latency, 7437.00 ms max latency, 6308 ms 50th, 6880 ms 95th, 7289 ms
99th, 7387 ms 99.9th.
(1)默认的压缩方式是 none。本次实验 compression.type 设置为 snappy。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50 compression.type=snappy
17244 records sent, 3446.0 records/sec (3.37 MB/sec), 5207.0 ms avg latency,
6861.0 ms max latency.
18873 records sent, 3774.6 records/sec (3.69 MB/sec), 6865.0 ms avg latency,
7094.0 ms max latency.
18378 records sent, 3674.1 records/sec (3.59 MB/sec), 6579.2 ms avg latency,
6738.0 ms max latency.
。。。 。。。
17631 records sent, 3526.2 records/sec (3.44 MB/sec), 6671.3 ms avg latency,
7566.0 ms max latency.
19116 records sent, 3823.2 records/sec (3.73 MB/sec), 6739.4 ms avg latency,
7630.0 ms max latency.
1000000 records sent, 3722.925028 records/sec (3.64 MB/sec), 6467.75 ms
avg latency, 7727.00 ms max latency, 6440 ms 50th, 7308 ms 95th, 7553 ms
99th, 7665 ms 99.9th.
(2)默认的压缩方式是 none。本次实验 compression.type 设置为 zstd。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50 compression.type=zstd
23820 records sent, 4763.0 records/sec (4.65 MB/sec), 1580.2 ms avg latency,
2651.0 ms max latency.
29340 records sent, 5868.0 records/sec (5.73 MB/sec), 3666.0 ms avg latency,
4752.0 ms max latency.
28950 records sent, 5788.8 records/sec (5.65 MB/sec), 5785.2 ms avg latency,
6865.0 ms max latency.
。。。 。。。
29580 records sent, 5916.0 records/sec (5.78 MB/sec), 6907.6 ms avg latency,
7432.0 ms max latency.
29925 records sent, 5981.4 records/sec (5.84 MB/sec), 6948.9 ms avg latency,
7541.0 ms max latency.
1000000 records sent, 5733.583318 records/sec (5.60 MB/sec), 6824.75 ms
avg latency, 7595.00 ms max latency, 7067 ms 50th, 7400 ms 95th, 7500 ms
99th, 7552 ms 99.9th.
(3)默认的压缩方式是 none。本次实验 compression.type 设置为 gzip。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50 compression.type=gzip
27170 records sent, 5428.6 records/sec (5.30 MB/sec), 1374.0 ms avg latency, 2311.0 ms max latency. 31050 records sent, 6210.0 records/sec (6.06 MB/sec), 3183.8 ms avg latency, 4228.0 ms max latency. 32145 records sent, 6427.7 records/sec (6.28 MB/sec), 5028.1 ms avg latency, 6042.0 ms max latency. 。。。 。。。 31710 records sent, 6342.0 records/sec (6.19 MB/sec), 6457.1 ms avg latency, 6777.0 ms max latency. 31755 records sent, 6348.5 records/sec (6.20 MB/sec), 6498.7 ms avg latency, 6780.0 ms max latency. 32760 records sent, 6548.1 records/sec (6.39 MB/sec), 6375.7 ms avg latency, 6822.0 ms max latency. 1000000 records sent, 6320.153706 records/sec (6.17 MB/sec), 6155.42 ms avg latency, 6943.00 ms max latency, 6437 ms 50th, 6774 ms 95th, 6863 ms 99th, 6912 ms 99.9th.
(4)默认的压缩方式是 none。本次实验 compression.type 设置为 lz4。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50 compression.type=lz4
16696 records sent, 3339.2 records/sec (3.26 MB/sec), 1924.5 ms avg latency, 3355.0 ms max latency. 19647 records sent, 3928.6 records/sec (3.84 MB/sec), 4841.5 ms avg latency, 6320.0 ms max latency. 20142 records sent, 4028.4 records/sec (3.93 MB/sec), 6203.2 ms avg latency, 6378.0 ms max latency. 。。。 。。。 20130 records sent, 4024.4 records/sec (3.93 MB/sec), 6073.6 ms avg latency, 6396.0 ms max latency. 19449 records sent, 3889.8 records/sec (3.80 MB/sec), 6195.6 ms avg latency, 6500.0 ms max latency. 19872 records sent, 3972.8 records/sec (3.88 MB/sec), 6274.5 ms avg latency, 6565.0 ms max latency. 1000000 records sent, 3956.087430 records/sec (3.86 MB/sec), 6085.62 ms avg latency, 6745.00 ms max latency, 6212 ms 50th, 6524 ms 95th, 6610 ms 99th, 6695 ms 99.9th.
6、调整缓存大小:默认生产者端缓存大小 32m。本次实验 buffer.memory 设置为 64m。
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50 buffer.memory=67108864
20170 records sent, 4034.0 records/sec (3.94 MB/sec), 1669.5 ms avg latency, 3040.0 ms max latency. 21996 records sent, 4399.2 records/sec (4.30 MB/sec), 4407.9 ms avg latency, 5806.0 ms max latency. 22113 records sent, 4422.6 records/sec (4.32 MB/sec), 7189.0 ms avg latency, 8623.0 ms max latency. 。。。 。。。 19818 records sent, 3963.6 records/sec (3.87 MB/sec), 12416.0 ms avg latency, 12847.0 ms max latency. 20331 records sent, 4062.9 records/sec (3.97 MB/sec), 12400.4 ms avg latency, 12874.0 ms max latency. 19665 records sent, 3933.0 records/sec (3.84 MB/sec), 12303.9 ms avg latency, 12838.0 ms max latency. 1000000 records sent, 4020.100503 records/sec (3.93 MB/sec), 11692.17 ms avg latency, 13796.00 ms max latency, 12238 ms 50th, 12949 ms 95th, 13691 ms 99th, 13766 ms 99.9th.
1、修改 /opt/module/kafka/config/consumer.properties 文件中的一次拉取条数为 500:
2、消费 100 万条日志进行压测
bin/kafka-consumer-perf-test.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --messages 1000000 --consumer.config config/consumer.properties
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg,
nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-01-20 09:58:26:171, 2022-01-20 09:58:33:321, 977.0166, 136.6457,
1000465, 139925.1748, 415, 6735, 145.0656, 148547.1418
3、一次拉取条数为 2000
(1)修改/opt/module/kafka/config/consumer.properties 文件中的一次拉取条数为 2000:
bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --messages 1000000 --consumer.config config/consumer.properties
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg,
nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-01-20 10:18:06:268, 2022-01-20 10:18:12:863, 977.5146, 148.2206,
1000975, 151777.8620, 358, 6237, 156.7283, 160489.8188
4、调整 fetch.max.bytes 大小为 100m
(1)修改/opt/module/kafka/config/consumer.properties 文件中的拉取一批数据大小 100m:
bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --messages 1000000 --consumer.config config/consumer.properties
start.time, end.time, data.consumed.in.MB, MB.sec,
data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms,
fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-01-20 10:26:13:203, 2022-01-20 10:26:19:662, 977.5146,
151.3415, 1000975, 154973.6801, 362, 6097, 160.3272, 164175.0041
