赞
踩
目录
ProducerConfig和ConsumerConfig释义
connect-distributed.sh & connect-standalone.sh
Kafka服务端脚本详解(2)一log,verifiable
Kafka客户端开发中有一个ProducerConfig和ConsumerConfig,熟悉这两个文件内容的含义对我们使用,调优Kafka是非常有帮助的。Ctrl+F搜索吧。
生产者配置参数释义
1.bootstrap.servers
指定Kafka集群所需的broker地址清单,默认”“
2.metadata.max.age.ms
强制刷新元数据时间,毫秒,默认300000,5分钟
3.batch.size
指定ProducerBatch内存区域的大小,默认16kb
4.acks
指定分区中必须有多少个副本收到这条消息,才算消息发送成功,默认值1,字符串类型
5.linger.ms
指定ProducerBatch在延迟多少毫秒后再发送,但如果在延迟的这段时间内batch的大小已经到了batch.size设置的大小,那么消息会被立即发送,不会再等待,默认值0
6.client.id
用户设定,用于跟踪记录消息,默认”“
7.send.buffer.bytes
Socket发送缓冲区大小,默认128kb,-1将使用操作系统的设置
8.receive.buffer.bytes
Socket接收缓冲区大小,默认32kb,-1将使用操作系统的设置
9.max.request.size
限制生产者客户端发送消息的最大值,默认1MB
10.reconnect.backoff.ms
连接失败后,尝试连接Kafka的时间间隔,默认50ms
11.reconnect.backoff.max.ms
尝试连接到Kafka,生产者客户端等待的最大时间,默认1000ms
12.max.block.ms
控制生产者客户端send()方法和partitionsFor()方法的阻塞时间。当生产者的发送缓存区已满,或者没有可用元数据时,这些方法就会阻塞,默认60s
13.buffer.memory
生产者客户端中用于缓存消息的缓存区大小,默认32MB
14.retry.backoff.ms
消息发送失败重试时间间隔,默认100ms
15.compression.type
指定消息的压缩方式,默认不压缩
16.metrics.sample.window.ms
样本计算时间窗口,默认30000ms
17.metrics.num.samples
用于维护metrics的样本数量,默认2
18.metrics.log.level
metrics日志记录级别,默认info
19.metric.reporters
类的列表,用于衡量指标,默认空list
20.max.in.flight.requests.per.connection
可以在一个connection中发送多个请求,叫作一个flight,这样可以减少开销,但是如果产生错误,可能会造成数据的发送顺序改变,默认5
21.retries
消息发送失败重试次数,默认0
22.key.serializer
key的序列化方式
23.value.serializer
value序列化类方式
24.connections.max.idle.ms
设置多久之后关闭空闲连接,默认540000ms
25.partitioner.class
分区类,实现Partitioner接口,可以自定义分区规则
26.request.timeout.ms
客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求,超过重试次数将抛异常,默认30000ms
27.interceptor.classes
拦截器类,实现ProducerInterceptor接口,自定义拦截器
28.enable.idempotence
true为开启幂等性
29.transaction.timeout.ms
事务超时时间,默认60000ms
30.transactional.id
设置事务id,必须唯一
消费者配置参数释义
1.group.id
消费者所属消费组的唯一标识
2.max.poll.records
一次拉取请求的最大消息数,默认500条
3.max.poll.interval.ms
指定拉取消息线程最长空闲时间,默认300000ms
4.session.timeout.ms
检测消费者是否失效的超时时间,默认10000ms
5.heartbeat.interval.ms
消费者心跳时间,默认3000ms
6.bootstrap.servers
连接集群broker地址
7.enable.auto.commit
是否开启自动提交消费位移的功能,默认true
8.auto.commit.interval.ms
自动提交消费位移的时间间隔,默认5000ms
9.partition.assignment.strategy
消费者的分区配置策略
10.auto.offset.reset
如果分区没有初始偏移量,或者当前偏移量服务器上不存在时,将使用的偏移量设置,earliest从头开始消费,latest从最近的开始消费,none抛出异常
11.fetch.min.bytes
消费者客户端一次请求从Kafka拉取消息的最小数据量,如果Kafka返回的数据量小于该值,会一直等待,直到满足这个配置大小,默认1b
12.fetch.max.bytes
消费者客户端一次请求从Kafka拉取消息的最大数据量,默认50MB
13.fetch.max.wait.ms
从Kafka拉取消息时,在不满足fetch.min.bytes条件时,等待的最大时间,默认500ms
14.metadata.max.age.ms
强制刷新元数据时间,毫秒,默认300000,5分钟
15.max.partition.fetch.bytes
设置从每个分区里返回给消费者的最大数据量,区别于fetch.max.bytes,默认1MB
16.send.buffer.bytes
Socket发送缓冲区大小,默认128kb,-1将使用操作系统的设置
17.receive.buffer.bytes
Socket发送缓冲区大小,默认64kb,-1将使用操作系统的设置
18.client.id
消费者客户端的id
19.reconnect.backoff.ms
连接失败后,尝试连接Kafka的时间间隔,默认50ms
20.reconnect.backoff.max.ms
尝试连接到Kafka,生产者客户端等待的最大时间,默认1000ms
21.retry.backoff.ms
消息发送失败重试时间间隔,默认100ms
22.metrics.sample.window.ms
样本计算时间窗口,默认30000ms
23.metrics.num.samples
用于维护metrics的样本数量,默认2
24.metrics.log.level
metrics日志记录级别,默认info
25.metric.reporters
类的列表,用于衡量指标,默认空list
26.check.crcs
自动检查CRC32记录的消耗
27.key.deserializer
key反序列化方式
28.value.deserializer
value反序列化方式
29.connections.max.idle.ms
设置多久之后关闭空闲连接,默认540000ms
30.request.timeout.ms
客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求,超过重试次数将抛异常,默认30000ms
31.default.api.timeout.ms
设置消费者api超时时间,默认60000ms
32.interceptor.classes
自定义拦截器
33.exclude.internal.topics
内部的主题:一consumer_offsets 和一transaction_state。该参数用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true。如果设置为 true,那么只能使用 subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式来订阅内部主题,设置为 false 则没有这个限制。
34.internal.leave.group.on.close
35.isolation.level
用来配置消费者的事务隔离级别。如果设置为“read committed”,那么消费者就会忽略事务未提交的消息,即只能消费到 LSO (LastStableOffset)的位置,默认情况下为 “read_uncommitted”,即可以消 费到 HW (High Watermark)处的位置
脚本名称 | 脚本用途 |
---|---|
kafka-topics.sh | topic管理脚本 |
connect-distributed.sh | 连接分布式模式脚本 |
connect-standalone.sh | 连接单机模式脚本 |
--partitions
创建或修改主题的分区数
--replication-factor
副本因子,副本数量
--replica-assignment
手动指定分区副本分配方案,使用该参数,不用指定--partitions 和 --replication-factor
--topic
主题名称
--zookeeper
连接kafka zk地址
--alter
修改分区,副本,配置
--bootstrap-server
kafka服务器地址
--create
创建主题
--delete
删除主题
--list
列出所有的可用主题
- [root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --list
- __consumer_offsets
- first
- test
- topic-3
- topic-4
- topic-5
- topic-6
- topic-admin
- topic-create-diff
- topic-two
--describe
列出主题的详细信息
--exclude-internal
使用--list --describe 命令时是否列出内部主题,默认列出内部主题
--command-config
以配置文件的形式修改Admin Client的配置,支持的配置见org.apache.kafka.clients.admin.AdminClientConfig
- //me.properties
- request.timeout.ms=200000
-
- //
- bin/kafka-topics.sh --bootstrap-server 10.211.55.3:9092 --topic topic-two --list --command-config config/me.properties
--config
在创建/修改主题的时候可以对主题默认参数进行覆盖,具体支持的参数见http://kafka.apachecn.org/documentation.html#topicconfigs
该参数将在以后废弃,请使用kafka-configs.sh
- [root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --bootstrap-server 10.211.55.3:9092 --topic topic-two --describe
- Topic:topic-two PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824,retention.bytes=1073741824
- Topic: topic-two Partition: 0 Leader: 0 Replicas: 0 Isr: 0
-
- [root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --alter --topic topic-two --config segment.bytes=1048577
- WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
- Going forward, please use kafka-configs.sh for this functionality
- Updated config for topic topic-two.
-
- [root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --describe --topic topic-two
- Topic:topic-two PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1048577
- Topic: topic-two Partition: 0 Leader: 0 Replicas: 0 Isr: 0
----delete-config
删除一个配置项
- 1[root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-two --alter --delete-config segment.bytes
- 2WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
- 3 Going forward, please use kafka-configs.sh for this functionality
- 4Updated config for topic topic-two.
- 5
- 6[root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-two --describe
- 7Topic:topic-two PartitionCount:1 ReplicationFactor:1 Configs:
- 8 Topic: topic-two Partition: 0 Leader: 0 Replicas: 0 Isr: 0
--disable-rack-aware
忽略机架信息
有两个broker,一个配了机架信息,另一个没配,在创建topic的时候就会报错
- 1[root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --create --topic topic-6 --replication-factor 1 --partitions 2
- 2Error while executing topic command : Not all brokers have rack information. Add --disable-rack-aware in command line to make replica assignment without rack information.
- 3[2018-12-27 05:22:40,834] ERROR kafka.admin.AdminOperationException: Not all brokers have rack information. Add --disable-rack-aware in command line to make replica assignment without rack information.
- 4 at kafka.zk.AdminZkClient.getBrokerMetadatas(AdminZkClient.scala:71)
- 5 at kafka.zk.AdminZkClient.createTopic(AdminZkClient.scala:54)
- 6 at kafka.admin.TopicCommand$ZookeeperTopicService.createTopic(TopicCommand.scala:274)
- 7 at kafka.admin.TopicCommand$TopicService$class.createTopic(TopicCommand.scala:134)
- 8 at kafka.admin.TopicCommand$ZookeeperTopicService.createTopic(TopicCommand.scala:266)
- 9 at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
- 10 at kafka.admin.TopicCommand.main(TopicCommand.scala)
- 11 (kafka.admin.TopicCommand$)
- 12
- 13[root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --create --topic topic-6 --replication-factor 1 --partitions 2 --disable-rack-aware
- 14Created topic topic-6.
--if-exists
只有当主题存在时,相关命令才会执行,不会显示错误
- 1[root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-7 --alter --config segment.bytes=104857 --if-exists
- 2
- 3[root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-7 --alter --config segment.bytes=104857
- 4Error while executing topic command : Topics in [] does not exist
- 5[2018-12-27 06:01:25,638] ERROR java.lang.IllegalArgumentException: Topics in [] does not exist
- 6 at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
- 7 at kafka.admin.TopicCommand$ZookeeperTopicService.alterTopic(TopicCommand.scala:294)
- 8 at kafka.admin.TopicCommand$.main(TopicCommand.scala:62)
- 9 at kafka.admin.TopicCommand.main(TopicCommand.scala)
- 10 (kafka.admin.TopicCommand$)
--if-not-exists
创建主题的时候,只有当主题不存在时,命令才执行,存在时不会报错
- 1[root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-6 --create --partitions 1 --replication-factor 1 --if-not-exists
- 2
- 3[root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-6 --create --partitions 1 --replication-factor 1
- 4Error while executing topic command : Topic 'topic-6' already exists.
- 5[2018-12-27 06:07:54,185] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-6' already exists.
- 6 (kafka.admin.TopicCommand$)
--topics-with-overrides
显示覆盖过配置的主题
--unavailable-partitions
查看没有leader副本的分区
- 1[root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-6 --describe --unavailable-partitions
- 2 Topic: topic-6 Partition: 0 Leader: -1 Replicas: 1 Isr: 1
--under-replicated-partitions
查看所有包含失效副本的分区
Kafka Connect 是一款可扩展并且可靠的在 Apache Kafka 和其他系统之间进行数据传输的工具。
- 1bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
- 2
- 3bin/connect-distributed.sh config/connect-distributed.properties
脚本名称 | 脚本用途 |
---|---|
kafka-log-dirs.sh | 查看指定broker上日志目录使用情况 |
kafka-verifiable-consumer.sh | 检验kafka消费者 |
kafka-verifiable-producer.sh | 检验kafka生产者 |
--bootstrap-server
kafka地址
--broker-list
要查询的broker地址列表,broker之间逗号隔开,不配置该命令则查询所有broker
--topic-list
指定查询的topic列表,逗号隔开
--command-config
配置Admin Client
--describe
显示详情
- 1[root@10 kafka_2.11-2.2.0]# bin/kafka-log-dirs.sh --bootstrap-server 10.211.55.3:9092 --describe --broker-list 0 --topic-list first,topic-3
- 2Querying brokers for log directories information
- 3Received log directory information from brokers 0
- 4{"version":1,"brokers":[{"broker":0,"logDirs":[{"logDir":"/tmp/kafka-logs","error":null,"partitions":[{"partition":"topic-3-0","size":474,"offsetLag":0,"isFuture":false},{"partition":"first-0","size":310,"offsetLag":0,"isFuture":false}]}]}]}
--broker-list
broker列表, HOST1:PORT1,HOST2:PORT2,…
--topic
要消费的topic
--group-id
消费组id
--max-messages
最大消费消息数量,默认-1,一直消费
- 1#设置消费两次后,自动停止
- 2[root@10 kafka_2.11-2.2.0]# bin/kafka-verifiable-consumer.sh --broker-list 10.211.55.3:9092 --topic first --group-id group.demo --max-messages 2
- 3{"timestamp":1558869583036,"name":"startup_complete"}
- 4{"timestamp":1558869583329,"name":"partitions_revoked","partitions":[]}
- 5{"timestamp":1558869583366,"name":"partitions_assigned","partitions":[{"topic":"first","partition":0}]}
- 6{"timestamp":1558869590352,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":37,"maxOffset":37}]}
- 7{"timestamp":1558869590366,"name":"offsets_committed","offsets":[{"topic":"first","partition":0,"offset":38}],"success":true}
- 8{"timestamp":1558869595328,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":38,"maxOffset":38}]}
- 9{"timestamp":1558869595335,"name":"offsets_committed","offsets":[{"topic":"first","partition":0,"offset":39}],"success":true}
- 10{"timestamp":1558869595355,"name":"shutdown_complete"}
--session-timeout
消费者会话超时时间,默认30000ms,服务端如果在该时间内没有接收到消费者的心跳,就会将该消费者从消费组中删除
--enable-autocommit
自动提交,默认false
- 1#比较一下两者的差别
- 2#没有--enable-autocommit
- 3[root@10 kafka_2.11-2.2.0]# bin/kafka-verifiable-consumer.sh --broker-list 10.211.55.3:9092 --topic first --group-id group.demo
- 4{"timestamp":1558875063613,"name":"startup_complete"}
- 5{"timestamp":1558875063922,"name":"partitions_revoked","partitions":[]}
- 6{"timestamp":1558875063952,"name":"partitions_assigned","partitions":[{"topic":"first","partition":0}]}
- 7{"timestamp":1558875069603,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":47,"maxOffset":47}]}
- 8{"timestamp":1558875069614,"name":"offsets_committed","offsets":[{"topic":"first","partition":0,"offset":48}],"success":true}
- 9
- 10#有--enable-autocommit
- 11[root@10 kafka_2.11-2.2.0]# bin/kafka-verifiable-consumer.sh --broker-list 10.211.55.3:9092 --topic first --group-id group.demo --enable-autocommit
- 12{"timestamp":1558874772119,"name":"startup_complete"}
- 13{"timestamp":1558874772408,"name":"partitions_revoked","partitions":[]}
- 14{"timestamp":1558874772449,"name":"partitions_assigned","partitions":[{"topic":"first","partition":0}]}
- 15{"timestamp":1558874820898,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":46,"maxOffset":46}]}
--reset-policy
设置消费偏移量,earliest从头开始消费,latest从最近的开始消费,none抛出异常,默认earliest
--assignment-strategy
消费者的分区配置策略, 默认 RangeAssignor
--consumer.config
配置文件
该脚本可以生产测试数据发送到指定topic,并将数据已json格式打印到控制台
--topic
主题名称
--broker-list
broker列表, HOST1:PORT1,HOST2:PORT2,…
--max-messages
最大消息数量,默认-1,一直生产消息
--throughput
设置吞吐量,默认-1
--acks
指定分区中必须有多少个副本收到这条消息,才算消息发送成功,默认-1
--producer.config
配置文件
--message-create-time
设置消息创建的时间,时间戳
--value-prefix
设置消息前缀
--repeating-keys
key从0开始,每次递增1,直到指定的值,然后再从0开始
- 1[root@10 kafka_2.11-2.2.0]# bin/kafka-verifiable-producer.sh --broker-list 10.211.55.3:9092 --topic first --message-create-time 1527351382000 --value-prefix 1 --repeating-keys 10 --max-messages 20
- 2{"timestamp":1558877565069,"name":"startup_complete"}
- 3{"timestamp":1558877565231,"name":"producer_send_success","key":"0","value":"1.0","topic":"first","partition":0,"offset":1541118}
- 4{"timestamp":1558877565238,"name":"producer_send_success","key":"1","value":"1.1","topic":"first","partition":0,"offset":1541119}
- 5{"timestamp":1558877565238,"name":"producer_send_success","key":"2","value":"1.2","topic":"first","partition":0,"offset":1541120}
- 6{"timestamp":1558877565238,"name":"producer_send_success","key":"3","value":"1.3","topic":"first","partition":0,"offset":1541121}
- 7{"timestamp":1558877565238,"name":"producer_send_success","key":"4","value":"1.4","topic":"first","partition":0,"offset":1541122}
- 8{"timestamp":1558877565239,"name":"producer_send_success","key":"5","value":"1.5","topic":"first","partition":0,"offset":1541123}
- 9{"timestamp":1558877565239,"name":"producer_send_success","key":"6","value":"1.6","topic":"first","partition":0,"offset":1541124}
- 10{"timestamp":1558877565239,"name":"producer_send_success","key":"7","value":"1.7","topic":"first","partition":0,"offset":1541125}
- 11{"timestamp":1558877565239,"name":"producer_send_success","key":"8","value":"1.8","topic":"first","partition":0,"offset":1541126}
- 12{"timestamp":1558877565239,"name":"producer_send_success","key":"9","value":"1.9","topic":"first","partition":0,"offset":1541127}
- 13{"timestamp":1558877565239,"name":"producer_send_success","key":"0","value":"1.10","topic":"first","partition":0,"offset":1541128}
- 14{"timestamp":1558877565239,"name":"producer_send_success","key":"1","value":"1.11","topic":"first","partition":0,"offset":1541129}
- 15{"timestamp":1558877565239,"name":"producer_send_success","key":"2","value":"1.12","topic":"first","partition":0,"offset":1541130}
- 16{"timestamp":1558877565240,"name":"producer_send_success","key":"3","value":"1.13","topic":"first","partition":0,"offset":1541131}
- 17{"timestamp":1558877565240,"name":"producer_send_success","key":"4","value":"1.14","topic":"first","partition":0,"offset":1541132}
- 18{"timestamp":1558877565241,"name":"producer_send_success","key":"5","value":"1.15","topic":"first","partition":0,"offset":1541133}
- 19{"timestamp":1558877565244,"name":"producer_send_success","key":"6","value":"1.16","topic":"first","partition":0,"offset":1541134}
- 20{"timestamp":1558877565244,"name":"producer_send_success","key":"7","value":"1.17","topic":"first","partition":0,"offset":1541135}
- 21{"timestamp":1558877565244,"name":"producer_send_success","key":"8","value":"1.18","topic":"first","partition":0,"offset":1541136}
- 22{"timestamp":1558877565244,"name":"producer_send_success","key":"9","value":"1.19","topic":"first","partition":0,"offset":1541137}
- 23{"timestamp":1558877565262,"name":"shutdown_complete"}
- 24{"timestamp":1558877565263,"name":"tool_data","sent":20,"acked":20,"target_throughput":-1,"avg_throughput":100.50251256281408}
脚本名称 | 脚本用途 |
---|---|
kafka-producer-perf-test.sh | kafka 生产者性能测试脚本 |
kafka-consumer-perf-test.sh | kafka 消费者性能测试脚本 |
kafka-console-producer.sh | kafka 生产者控制台 |
kafka-console-consumer.sh | kafka 消费者控制台 |
kafka 生产者性能测试脚本
--topic
消息主题名称
----num-records
需要生产的消息数量
--payload-delimiter
指定 --payload-file 文件的分隔符,默认为换行符 \n
--throughput
设置消息吞吐量,messages/sec
--producer-props
发送端配置信息,配置信息优先于 --producer.config
--producer.config
发送端配置文件
--print-metrics
是否打印测试指标,默认 false
--transactional-id
用于测试并发事务的性能 (默认值:performance-producer-default-transactional-id)
--transaction-duration-ms
事务时间最大值,超过这个值就提交事务,只有 > 0 时才生效
--record-size
每条消息字节数
--payload-file
测试数据文件
测试 10w 条数据,每条数据 1000 字节,每秒发送 2000 条数据
- [root@10 kafka_2.11-2.2.0]# bin/kafka-producer-perf-test.sh --producer-props bootstrap.servers=10.211.55.3:9092 --topic first --record-size 1000 --num-records 100000 --throughput 2000
- 9999 records sent, 1999.8 records/sec (1.91 MB/sec), 8.6 ms avg latency, 406.0 ms max latency.
- 10007 records sent, 2001.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 8.0 ms max latency.
- 10002 records sent, 2000.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 10.0 ms max latency.
- 10000 records sent, 2000.0 records/sec (1.91 MB/sec), 0.8 ms avg latency, 37.0 ms max latency.
- 10008 records sent, 2001.2 records/sec (1.91 MB/sec), 0.6 ms avg latency, 7.0 ms max latency.
- 10004 records sent, 2000.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 5.0 ms max latency.
- 10000 records sent, 2000.0 records/sec (1.91 MB/sec), 0.8 ms avg latency, 35.0 ms max latency.
- 10004 records sent, 2000.8 records/sec (1.91 MB/sec), 0.8 ms avg latency, 33.0 ms max latency.
- 10004 records sent, 2000.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 5.0 ms max latency.
- 100000 records sent, 1999.280259 records/sec (1.91 MB/sec), 1.50 ms avg latency, 406.00 ms max latency, 1 ms 50th, 2 ms 95th, 43 ms 99th, 91 ms 99.9th.
-
测试结果为:每秒发送 1.91MB 数据,平均延迟 1.5ms,最大延迟 406ms, 延迟小于 1ms 占 50%,小于 2ms 占 95%...
kafka 消费者性能测试脚本
--topic
消费的主题名称
--broker-list
kafka 地址
--consumer.config
消费端配置文件
--date-format
格式化时间
--fetch-size
一次请求拉取的消息大小,默认 1048576 字节
--from-latest
如果消费者还没有已建立的偏移量,就从日志中的最新消息开始,而不是最早的消息
--group
消费者组 id,默认 perf-consumer-94851
--hide-header
如果设置,就跳过打印统计信息的标题
--messages
要获取的消息数量
--num-fetch-threads
获取消息的线程数量
--print-metrics
打印指标信息
--reporting-interval
打印进度信息的间隔,默认 5000ms
--show-detailed-stats
如果设置,将按 --reporting-interval 的间隔打印统计信息
--socket-buffer-size
TCP 获取信息的缓存大小 默认 2097152(2M)
--threads
处理线程数,默认 10
--timeout
返回记录的超时时间
测试消费 50w 条数据
- [root@10 kafka_2.11-2.2.0]# bin/kafka-consumer-perf-test.sh --topic first --broker-list 10.211.55.3:9092 --messages 500000 --timeout 300000
- start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
- 2019-05-30 01:21:27:072, 2019-05-30 01:21:30:801, 488.6162, 131.0314, 500343, 134176.1866, 25, 3704, 131.9158, 135081.8035
-
测试结果为:共消费 488.6162MB 数据,每秒消费 131.0314MB, 共消费 500343 条数据,每秒消费 134176.1866 条
测试环境虚拟机
CPU:2 核
RAM:2G
Kafka Topic 为 1 分区,1 副本
batch.size
batch.size 单位为字节,为了方便这里都表示为kb
默认配置,batch.size=16kb
- [root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first --record-size 1024 --num-records 1000000 --throughput 50000
- 249892 records sent, 49978.4 records/sec (48.81 MB/sec), 153.6 ms avg latency, 537.0 ms max latency.
- 250193 records sent, 50038.6 records/sec (48.87 MB/sec), 1.4 ms avg latency, 12.0 ms max latency.
- 211747 records sent, 42349.4 records/sec (41.36 MB/sec), 194.3 ms avg latency, 1106.0 ms max latency.
- 1000000 records sent, 49972.515117 records/sec (48.80 MB/sec), 119.65 ms avg latency, 1106.00 ms max latency, 2 ms 50th, 488 ms 95th, 1043 ms 99th, 1102 ms 99.9th.
结果显示平均延迟有 456.94 ms,最高延迟 5308.00 ms
现在我要降低最高延迟数,batch.size 的意思是 ProducerBatch 的内存区域充满后,消息就会被立即发送,那我们把值改小看看
batch.size=8kb
- [root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first --record-size 1024 --num-records 1000000 --throughput 50000
- 148553 records sent, 29710.6 records/sec (29.01 MB/sec), 812.4 ms avg latency, 1032.0 ms max latency.
- 195468 records sent, 39093.6 records/sec (38.18 MB/sec), 735.9 ms avg latency, 907.0 ms max latency.
- 189700 records sent, 37940.0 records/sec (37.05 MB/sec), 763.4 ms avg latency, 1053.0 ms max latency.
- 208418 records sent, 41683.6 records/sec (40.71 MB/sec), 689.7 ms avg latency, 923.0 ms max latency.
- 196504 records sent, 39300.8 records/sec (38.38 MB/sec), 718.1 ms avg latency, 1056.0 ms max latency.
- 1000000 records sent, 37608.123355 records/sec (36.73 MB/sec), 741.56 ms avg latency, 1056.00 ms max latency, 725 ms 50th, 937 ms 95th, 1029 ms 99th, 1051 ms 99.9th.
但经过测试发现,延迟反而很高,连设定的 50000 吞吐量都达不到,原因应该是这样:batch.size 小了,消息很快就会充满,这样消息就会被立即发送的服务端,但这样的话发送的次数就变多了,但由于网络原因是不可控的,有时候网络发生抖动就会造成较高的延迟
那就改大看看。
batch.size=32kb
- [root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first --record-size 1024 --num-records 1000000 --throughput 50000
- 249852 records sent, 49970.4 records/sec (48.80 MB/sec), 88.8 ms avg latency, 492.0 ms max latency.
- 250143 records sent, 50028.6 records/sec (48.86 MB/sec), 1.2 ms avg latency, 15.0 ms max latency.
- 250007 records sent, 49991.4 records/sec (48.82 MB/sec), 1.2 ms avg latency, 17.0 ms max latency.
- 1000000 records sent, 49952.545082 records/sec (48.78 MB/sec), 31.07 ms avg latency, 492.00 ms max latency, 1 ms 50th, 305 ms 95th, 440 ms 99th, 486 ms 99.9th.
测试后,平均延迟,最高延迟都降下来很多,而且比默认值延迟都要小很多,那再改大延迟还会降低吗
batch.size=50kb
- [root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first --record-size 1024 --num-records 1000000 --throughput 50000
- 249902 records sent, 49970.4 records/sec (48.80 MB/sec), 27.3 ms avg latency, 219.0 ms max latency.
- 250200 records sent, 50030.0 records/sec (48.86 MB/sec), 1.2 ms avg latency, 8.0 ms max latency.
- 250098 records sent, 50019.6 records/sec (48.85 MB/sec), 18.6 ms avg latency, 288.0 ms max latency.
- 242327 records sent, 48407.3 records/sec (47.27 MB/sec), 121.3 ms avg latency, 920.0 ms max latency.
- 1000000 records sent, 49823.127896 records/sec (48.66 MB/sec), 41.98 ms avg latency, 920.00 ms max latency, 1 ms 50th, 221 ms 95th, 792 ms 99th, 910 ms 99.9th.
如上测试在不同的机器上结果会有不同,但总体的变化曲线是一样的,成 U 型变化
batch.size 代码实现
Kafka 客户端有一个 RecordAccumulator 类,叫做消息记录池,内部有一个 BufferPool 内存区域
- RecordAccumulator(LogContext logContext,
- int batchSize,
- CompressionType compression,
- int lingerMs,
- long retryBackoffMs,
- int deliveryTimeoutMs,
- Metrics metrics,
- String metricGrpName,
- Time time,
- ApiVersions apiVersions,
- TransactionManager transactionManager,
- BufferPool bufferPool)
当该判断为 true,消息就会被发送
- if (result.batchIsFull || result.newBatchCreated) {
- log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
- this.sender.wakeup();
- }
max.in.flight.requests.per.connection
该参数可以在一个 connection 中发送多个请求,叫作一个 flight, 这样可以减少开销,但是如果产生错误,可能会造成数据的发送顺序改变,默认 5
在 batch.size=100kb 的基础上,增加该参数值到 10,看看效果
- [root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic two --record-size 1024 --num-records 1000000 --throughput 50000
- 249902 records sent, 49960.4 records/sec (48.79 MB/sec), 16.1 ms avg latency, 185.0 ms max latency.
- 250148 records sent, 50019.6 records/sec (48.85 MB/sec), 1.3 ms avg latency, 14.0 ms max latency.
- 239585 records sent, 47917.0 records/sec (46.79 MB/sec), 6.4 ms avg latency, 226.0 ms max latency.
- 1000000 records sent, 49960.031974 records/sec (48.79 MB/sec), 9.83 ms avg latency, 226.00 ms max latency, 1 ms 50th, 83 ms 95th, 182 ms 99th, 219 ms 99.9th.
多次测试结果延迟都比原来降低了 10 倍多,效果还是很明显的
但物极必反,如果你再调大后,效果就不明显了,最终延迟反而变高,这个 batch.size 道理是一样的
compression.type
指定消息的压缩方式,默认不压缩
在原来 batch.size=100kb,max.in.flight.requests.per.connection=10 的基础上,设置 compression.type=gzip 看看延迟是否还可以降低
- [root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic two --record-size 1024 --num-records 1000000 --throughput 50000
- 249785 records sent, 49957.0 records/sec (48.79 MB/sec), 2.5 ms avg latency, 199.0 ms max latency.
- 250091 records sent, 50008.2 records/sec (48.84 MB/sec), 1.9 ms avg latency, 17.0 ms max latency.
- 250123 records sent, 50024.6 records/sec (48.85 MB/sec), 1.5 ms avg latency, 18.0 ms max latency.
- 1000000 records sent, 49960.031974 records/sec (48.79 MB/sec), 1.89 ms avg latency, 199.00 ms max latency, 2 ms 50th, 4 ms 95th, 6 ms 99th, 18 ms 99.9th.
测试结果发现延迟又降低了,是不是感觉很强大
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。