赞
踩
名词解释:
broker: 一个kafka启动后,就是一个broker,生产者和消费者都和broker通信;
topic: 主题,生产者发消息需要指定某个主题,消费者拉取消息的时候也需要指定某个主题;
partition: 分区,一个主题分为多个分区,producer可以指定往特定主题的特定分区发消息,同一个topic分区同时只能被一个consumer group的某个consumer消费;
producer: 消息发送者
consumer: 消息消费者
consumer group: 消费者组,比如有个消费者组g1,有一个消费者c1 ,一个topic:test ,它有三个分区p1,p2,p3 ;test这个topic只会被g1这个消费者组中的消费者消费一次;c1会分别串行消费分区p1,p2,p3(同一个分区的消息会被顺序消费,因为一个分区同时只会有一个consumer在消费),为了提高消费速度,我们新建了两个消费者c2,c3,然后加入到;消费者组g1中,这是三个消费者c1,c2,c3就可以并行消费分区p1,p2,p3了;如果你想增加一个消费者,消费topic:test所有的消息,只需要新建消费者组g2,消费者c4,c4加入到g2中。
broker.id=0 broker标识符,集群模式下这个值不能重复
num.network.threads=3 kafka处理网络连接的线程数
num.io.threads=8 kafka处理I/o操作的线程数
socket.send.buffer.bytes=102400 socker发送缓冲区大小
socket.receive.buffer.bytes=102400 socker接收缓冲区大小
socket.request.max.bytes=104857600 socker请求的最大大小
log.dirs= kafka把所有消息都持久化保存到磁盘上,配置日志的存放目录,多个用逗号分隔;同一个分区的日志保存在同一个目录下;在有新分区时,broker会往最少分区的目录下创建,而不是最多空余磁盘空间下新增。
auto.create.topics.enable=true 是否允许自动创建topic
比如设置为true 往一个不存在的topic发消息,consumer从不存在的topic拉取消息,都会创建topic。
num.partitions=2 新创建的topic包含多少个分区,该数量只能增加不能减少
num.recovery.threads.per.data.dir=1 在broker正常关闭和启动的时候,用于打开日志文件的线程数(如果设置为3,log.dirs配置了3个目录,就会启用9个线程)
log.retention.hours=168 设置log的保留时间,是通过日志文件的最后修改时间来实现的。
log.retention.bytes 作用在每个分区上,比如该参数设置为1G,也就是说一个分区的日志文件大小不能超过1G,超出的部分会被删除(假如一个topic有6个partiton,那么这个topic最多可以保留6G数据)
log.segment.bytes=1073741824 单个日志片段文件的大小(默认1G),当日志片段文件达到设定值,文件就会关闭,然后创建一个新的日志文件。
message.max.bytes 限制单条消息的大小(默认1M),该参数指的是压缩后的大小
这个值需要和客户端设置的fetch.message.max.bytes(客户端允许拉取的最大消息)值对应
log.retention.check.interval.ms=300000 检查log是否保留的时间间隔
zookeeper.connect=localhost:3183 保存broker元数据的zk地址
zookeeper.connection.timeout.ms=6000 kafka连接zk的超时时间
总结:具体设置参数值时,需要根据自身的业务情况配置,比如设置分区数num.partitions,如果你topic的消息量很大,需要很高的吞吐量,那么你就可以将值设大一些,那是不是设置的越大越好呢?
当然不是,分区数量越多就越占内存,kafka需要打开的文件句柄就越多,同时也需要你有更多的消费者消费,但是消费者太多了又会引发什么问题呢?
多个消费者同时去poll消息,会提高broker的cpu消耗。所以具体配置需要更具自己业务情况和集群情况来确定。
bootstrap.servers:broker地址,多个逗号分隔
acks :发送应答(0:收到后直接响应成功,1:leader保存则响应成功,-1/all:leader和isr列表保存后才响应成功)
batch.size:批量发送大小(默认:16384,16K)
buffer.memory:生产者最大可用缓存 (默认:33554432,32M)
compression.type:压缩类型(none,gzip,snappy等)
retries:失败重试次数
connections.max.idle.ms:关闭空闲连接时间(默认:540000)
key.serializer:key序列化器(默认无)
value.serializer:value序列化器(默认无)
linger.ms:发送延迟时间(默认:0)
max.request.size:最大请求字节大小(默认:1048576,1M)
request.timeout.ms:请求超时时间(默认:30000)
client.id:客户ID:用于跟踪调试
enable.auto.commit:开启自动提交(默认:true)
auto.commit.interval.ms:上面为true时才有效,自动提交频率(默认:5000)
client.id:客户ID:用于跟踪调试
bootstrap.servers:broker配置
connections.max.idle.ms:关闭空间连接时间(默认:540000)
group.id:群组(默认:“”)唯一标识用户群组,同一个group每个partition只会分配到一个consumer。
max.poll.records:拉起最大记录(默认:500)
max.poll.interval.ms:拉取记录间隔(默认:300000,5分钟):如果在此超时过期之前没有调用poll(),则认为使用者失败,组将重新平衡,以便将分区重新分配给另一个成员。
auto.offset.reset:初始偏移量 (默认:latest):如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量:earliest:自动重置偏移到最早的偏移; latest:自动将偏移量重置为最新偏移量
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。