赞
踩
想必大家在实际使用过程都遇到过消息的丢失的问题,这里的消息丢失其实涉及到两个方面,一个是Producer生产的数据丢失,一个是Consumer消费的过程中因Offset问题而产生的漏消费。文章中我们会从这两个角度来和大家探讨如何尽量避免消息丢失的问题
Kafka会对已提交的消息进行持久化存储,那么这里「已提交」的消息是什么意思呢?这里就得看Producer层面是如何定义消息是否发送成功了。所以接下来有两个比较重要的属性「RequiredAcks」,「Async」需要重点关注下
// Number of acknowledges from partition replicas required before receiving
// a response to a produce request, the following values are supported:
//
// RequireNone (0) fire-and-forget, do not wait for acknowledgements from the
// RequireOne (1) wait for the leader to acknowledge the writes
// RequireAll (-1) wait for the full ISR to acknowledge the writes
//
// Defaults to RequireNone.
RequiredAcks RequiredAcks
上面的代码是kafka-go关于该属性的描述
如果设置成该值的话,Producer不会等待来自Broker端的确认,就可以继续发送下一批消息。可想而知,这种Ack策略必然会造成消息的丢失
所有分区Leader副本写入完成后才确认成功,这种配置也是Producer默认的Ack配置,相比于RequireNone,该策略的好处是能够大幅度减少消息丢失,但是损失了一定的写入性能(不过这种性能的损失是完全可以接受的,起码我们先得保证业务上正确把)
所有分区的ISR(下文详细描述)确认后才算成功,这是最严格的策略。好处是能够最大程度的保证生产的数据不丢失,但相对于RequireOne来说,它的写入性能还是虐差的。如果你的业务对于Producer的消息丢失是零容忍的话,你可以将Ack属性设置成该值,否则我还是建议你设置成RequireOne
ISR (IN-SYNC Replication)
每个分区的Leader副本会维持一个与其保持消息同步的Follower副本集合(当然也包含leader副本本身),该集合就是ISR(我看有些地方说的是「所有的Follower」,这其实是不严谨的,有些Follower因为网络或者自身配置导致与Leader的消息Offset差距很大,那么这些Follower会被踢出ISR的)。当Leader挂掉时,Kafka会从ISR中的Follower重新选举一个Leader
// Setting this flag to true causes the WriteMessages method to never block.
// It also means that errors are ignored since the caller will not receive
// the returned value. Use this only if you don't care about guarantees of
// whether the messages were written to kafka.
//
// Defaults to false.
Async bool
该属性注释中描述的是: 如果将该值设置为true,那么WriteMessages方法(Producer写入消息至Kafka的函数)永远不会阻塞,也就是调用者是无法接收到Kafka返回的消息的,这样一来你就无法做到发送消息失败的重试,或者日志告警了。不过好在该属性的默认值为false。
关于该属性的使用,我们可以在源码中找到佐证
func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error { if w.Addr == nil { return errors.New("kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address") } w.group.Add(1) defer w.group.Done() if w.isClosed() { return io.ErrClosedPipe } if len(msgs) == 0 { return nil } balancer := w.balancer() batchBytes := w.batchBytes() for i := range msgs { n := int64(msgs[i].size()) if n > batchBytes { // This error is left for backward compatibility with historical // behavior, but it can yield O(N^2) behaviors. The expectations // are that the program will check if WriteMessages returned a // MessageTooLargeError, discard the message that was exceeding // the maximum size, and try again. return messageTooLarge(msgs, i) } } // We use int32 here to half the memory footprint (compared to using int // on 64 bits architectures). We map lists of the message indexes instead // of the message values for the same reason, int32 is 4 bytes, vs a full // Message value which is 100+ bytes and contains pointers and contributes // to increasing GC work. assignments := make(map[topicPartition][]int32) for i, msg := range msgs { topic, err := w.chooseTopic(msg) if err != nil { return err } numPartitions, err := w.partitions(ctx, topic) if err != nil { return err } partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...) key := topicPartition{ topic: topic, partition: int32(partition), } assignments[key] = append(assignments[key], int32(i)) } batches := w.batchMessages(msgs, assignments) // 注意这里,如果是Async为true的话,这里就直接返回了 // 调用者根本无法获取到消息对应的err信息 if w.Async { return nil } done := ctx.Done() hasErrors := false for batch := range batches { select { case <-done: return ctx.Err() case <-batch.done: if batch.err != nil { hasErrors = true } } } if !hasErrors { return nil } werr := make(WriteErrors, len(msgs)) // 这里将Kafka返回的err信息统一聚合,并返回给调用方 for batch, indexes := range batches { for _, i := range indexes { werr[i] = batch.err } } return werr }
如上图所示,Consumer会根据当前分区中的Offset来决定自己从哪里开始消费。该Offset值和Consumer绑定,并由Consumer来更新offset值。
我们从两个Consumer更offset的场景来讨论:
假设更新Offset的成功, 但是业务上处理失败。那么这条消息对于业务上来说其实已经丢失了,因为根本就没有了重试的机会
假设业务上处理成功,但是更新Offset失败了。这虽然不会导致消息丢失,但是会带来消息重复消费的问题,解决重复消费的方法有很多,业务上最简单的实现就是保持接口幂等性
在分析了上述两个场景后,我们觉得场景二更加适合。也就是业务上处理成功后我们手动更新Consumer的Offset,场景一其实类似自动更新Consumer的位移,所以请确保你线上的Consumer是手动更新Offset,当然不重要的业务场景你可以随意处理
上述文章我们从Producer和Consumer两个维度来讲述了如何避免消息丢失
Producer层面
1.Ack属性不要设置成「RequireNone」
2.Async属性设置成false,这样可以接受Kafka返回的错误信息,我们可以做类型重试等处理
Consumer层面
在确保业务上成功后,手动更新Consumer的Offset,虽然可能会出现消息重复消费的问题,但是比漏消费好很多了。因为重复消费我们有很多的办法可以解决
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。