赞
踩
首先:
1:kafka是拉取模式的消息队列,是消费者控制什么时候拉取消息的;
2:每条消息都有一个偏移量,每个消费者都会跟踪最近消费消息的偏移量;
有下面几种处理方式:
1:重试,不停的重试,直到成功;
可能导致的问题:
问题是若是这条消息(通过目前的代码)可能永远不能消费成功,
导致消费者不会继续处理后续的任何问题,导致消费者阻塞;
2:跳过,跳过这条没有消费的消息;
这个其实是不合理的,可能会造成数据不一致性;
这里需要建立一个专门用于重试的topic(retry topic);
当消费者没有正确消费这条消息的时候,
则将这条消息转发(发布)到重试主题(retry topic)上,然后提交消息的偏移量,以便继续处理下一个消息;
注意:
这个时候,这个没有正确消费的消息,其实对于这个消费者来说,也算是消费完成了,因为也正常提交了偏移量,只不过是业务没有正确处理,而且这个消息被发布到另一个topic中了(retry topic);
;;
之后再创建一个重试消费者,用于订阅这个重试主题,
只不过这个重试消费者,跟之前那个消费者处理相同的业务,两个逻辑是一样的;
;;
如果这个重试消费者,也无法小得这条消息,那就把这个消息发布到另一个重试主题上,并提交该消息的偏移量;
;;
循环,递归
;;
最后,当创建了很多重试消费者的时候,在最终重试消费者无法处理某条消息后,把该消息发布到一个死信队列(DLQ);
这种方式优缺点:
优点:可以重试,可能在重试中,就正常消费了;
缺点:可能一直重试,都不会正常消费,一直到死信队列中;
这个问题就比较大了,可能我们就是消费的是一个错误的消息,
比如说缺字段,或者数据库中根本就没有这个业务的id;
或者消息中包含特殊字段,导致无法消费;
这种是消息本身的错误,导致无法消费,除非你去解决消息,不然是永远不会成功的;
建议:
对于第一种:
消息正确,是因为业务或者其他问题导致第一次消费失败,其实可以让消费者自己去重试,知道消费成功;
对于第二种:
当然可以用上面那个重试主题,不然你这个消息阻塞了我后面的后续的消费,所以需要把这个消息分流,分流该消息会为我们的消费清除障碍;
1:它会忽略排序
当事件发布到同一分区时,可以保证各个事件按照它们发生的顺序进行处理。如果对同一聚合进行连续更改,并且所产生的事件发布到不同的分区,就可能发生争用状况,也就是消费者在消费第一个更改之前就消费了第二个更改。这会导致数据不一致。
重试主题真正出问题的地方。它让我们的消费者容易打乱处理事件的顺序。
需要明确的是,重试主题并非一直都是错误的模式。当然,它也存在一些合适的用例。具体来说,当消费者的工作是收集不可修改的记录时,这种模式就很不错。这样的例子可能包括:
统计日志啊
记录用户操作记录啊
这类消费者可能会从重试主题模式中受益,同时没有数据损坏的风险。
1:消除错误类型;
如果我们能在生产者发送该消息的时候,就确定这个消息是正常的,可以被消费的,那么就没必要用重试主题了,直接让消费者重试就好了;
解决方案:
在消息的body中增加一个字段:
isRetry:是否重试
那么我们消费者拿到这个消息后,可以根据这个字段来判断生产者是否需要我们重试,
当然这个是最简单的一种;
还有根据错误类型来判断的:
void processMessage(KafkaMessage km) {
try {
Message m = km.getMessage();
transformAndSave(m);
} catch (Throwable t) {
if (isRecoverable(t)) {
// ...
} else {
// ...
}
}
}
在上面的 Java 伪代码示例中,isRecoverable()将采用一种白名单方法来确定 t 是否表示可恢复错误。换句话说,它检查 t 以确定它是否与任何已知的可恢复错误(例如 SQL 连接错误或 ReST 客户端超时)相匹配,如果匹配则返回 true,否则返回 false。这样就能防止我们的消费者被不可恢复错误一直阻塞下去。
当然这种其实也不太准,
例如,一个 SQLException 可能指的是一次数据库故障(可恢复)或一次约束违反状况(不可恢复)。
2:在消费者内重试可恢复错误
可恢复错误:就是在重试一段时间后,可能成功的;
不可恢复错误:你直接就是消息体错误,缺字段,结构错误,有特殊字符等;
存在可恢复错误时,将消息发布到重试主题毫无意义。我们只会为下一条消息的失败扫清道路。相反,消费者可以简单地重试,直到条件恢复。
当然,出现可恢复错误意味着外部资源存在问题。我们不断对这块资源发送请求是无济于事的。因此,我们希望对重试应用一个退避策略。我们的伪 Java 代码现在可能看起来像这样:
void processMessage(KafkaMessage km) {
try {
Message m = km.getMessage();
transformAndSave(m);
} catch (Throwable t) {
if (isRecoverable(t)) {
doWithRetry(m, Backoff.EXPONENTIAL, this::transformAndSave);
} else {
// ...
}
}
}
(注意:我们使用的任何退避机制都应配置为在达到某个阈值时向我们发出警报,并通知我们潜在的严重错误)
2:遇到不可恢复错误时,将消息直接发送到最后一个主题
另一方面,当我们的消费者遇到不可恢复错误时,我们可能希望立即隐藏(stash)该消息,以释放后续消息。但在这里使用多个重试主题会有用吗?答案是否定的。在转到 DLQ 之前,我们的消息只会经历 n 次消费失败而已。那么,为什么不从一开始就将消息粘贴在那里呢?
与重试主题一样,这个主题(在这里,我们将其称为隐藏主题)将拥有自己的消费者,其与主消费者保持一致。但就像 DLQ 一样,这个消费者并不总是在消费消息;它只有在我们明确需要时才会这么做。
不论我们是直接用消费者的重试,还是发送到重试主题,没有最好的解决方案,只有最适合自己的,需要根据自己的实际业务来选择最合适的处理方式,
最好能加人工告警机制,【代码不够,人工来凑】,这个也是我开发中经常用来背锅的一种思路;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。