赞
踩
同步发送模式虽然吞吐量小,但是发一条收到确认后再发下一条,既能保证不丢失消息,又能保证顺序。
kafka的ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到。这里涉及到一个参数 acks 他的值有三个 0, 1, -1,如果是0 ,那么代表发送过去,不等待kafka消息确认,认为成功。一定会丢失消息,可能kafka集群正在选举,此时就无法收到任何异常。如果是1,那么代表发送过去,等待leader副本确认消息,认为成功leader肯定收到了消息,写入了分区文件(不一定落盘)。如果是all, 那么代表发送过去之后,消息被写入所有同步副本之后 ,认为成功。
注意: 这里是 所有同步副本,不是所有副本。 具体是多少同步副本,还要取决于kafka集群设置的最小同步副本数,和集群当前的同步副本数。选择这种配置,会可靠,但是牺牲效率,可以通过增大批和使用异步模式,提高效率。
如果是同步模式:ack机制能够保证数据的不丢失,如果ack设置为0,风险很大,一般不建议设置为0
producer.type=sync
request.required.acks=1
如果是异步模式:通过buffer来进行控制数据的发送,有两个值来进行控制,时间阈值与消息的数量阈值,如果buffer满了数据还没有发送出去,如果设置的是立即清理模式,风险很大,一定要设置为阻塞模式。
producer有丢数据的可能,但是可以通过配置保证消息的不丢失
producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
如果网络异常收不到响应,则等待,这里有个配置等待时间 request,timeout.ms
发送消息等待时间。metadata.fetch.time.out
从kafka 获取元数据的等待时间。max.block.ms
: 配置控制了KafkaProducer.send()
并将KafkaProducer.partitionsFor()
被阻塞多长时间。由于缓冲区已满或元数据不可用,这些方法可能会被阻塞止。用户提供的序列化程序或分区程序中的阻。将不计入此超时。重试次数 retries 重试直接的等待时间, 默认是100 ms ,可以通过 retry.backoff.ms
配置 。多个消息发送给同一个分区的时候,生产者会把消息打成一个批,批大小设置 batch.size
过大占内存,过小发送频繁,并且生产者不是必须满批发送,有个等待时间,linger.ms
设置 等待多久批不满则发送。
消费者需要向kafka集群提交 已经消费的消息的offset来确定消息消费到了那里。
消息队列的消费方式有两种,一种是发布订阅模式,一种是队列模式。
发布订阅模式 一个消息可以被多个消费者消费。队列模式多个消费者只能消费到一部分消息。
kafka是通过group-id来区分消费组的。
一个topic被 同一个消费组的不同消费者消费 ,相当于是队列模式。被不同消费组消费相当于是 订阅模式。一个partition在同一个时刻只有一个consumer instance
在消费。对于正确的模式,我们需要配置正确的group-id
auto.offset.reset
没有偏移量可以提交的时候,系统从哪里开始消费。
有两种设置 :earliest 和latest 。
enable.auto.commit
自动提交 ,如果开启了自动提交,那么系统会自动进行提交offset。可能会引起,并未消费掉,就提交了offset.引起数据的丢失。与自动提交相关的是自动提交的间隔时间 auto.commit.interval.ms
默认是5秒钟提交一次,可以通过查看 kafka config目录下的配置文件,查询配置的默认值。自动提交 还可能引起消息的重复消费,特别是 多个客户端直接出现重平衡时。
参考我的 Kafka的副本复制机制
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。