赞
踩
使用类似 RabbitMQ、RocketMQ 等 MQ 系统来做消息队列实现异步处理,虽然说消息可 以落地到磁盘保存,即使 MQ 出现问题消息数据也不会丢失,但是异步流程在消息发送、 传输、处理等环节,都可能发生消息丢失。此外,任何 MQ 中间件都无法确保 100% 可 用,需要考虑不可用时异步流程如何继续进行。 因此,对于异步处理流程,必须考虑补偿或者说建立主备双活流程。 我们来看一个用户注册后异步发送欢迎消息的场景。用户注册落数据库的流程为同步流程, 会员服务收到消息后发送欢迎消息的流程为异步流程。
我们来分析一下: 我们来看一下相关的实现代码。 首先,定义 UserController 用于注册 + 发送异步消息。对于注册方法,我们一次性注册 10 个用户,用户注册消息不能发送出去的概率为 50%。 蓝色的线,使用 MQ 进行的异步处理,我们称作主线,可能存在消息丢失的情况(虚线 代表异步调用); 绿色的线,使用补偿 Job 定期进行消息补偿,我们称作备线,用来补偿主线丢失的消 息; 考虑到极端的 MQ 中间件失效的情况,我们要求备线的处理吞吐能力达到主线的能力水 平。
package org.geekbang.time.commonmistakes.asyncprocess.compensation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
@RestController
@Slf4j
@RequestMapping("user")
public class UserController {
@Autowired
private UserService userService;
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("register")
public void register() {
//模拟10个用户注册
IntStream.rangeClosed(1, 10).forEach(i -> {
//落库,存数据库
User user = userService.register();
//模拟50%的消息可能发送失败
if (ThreadLocalRandom.current().nextInt(10) % 2 == 0) {
//通过 rabiitmq 发送消息
rabbitTemplate.convertAndSend(RabbitConfiguration.EXCHANGE, RabbitConfiguration.ROUTING_KEY, user);
log.info("sent mq user {}", user.getId());
}
});
}
}
然后,定义 MemberService 类用于模拟会员服务。会员服务监听用户注册成功的消息, 并发送欢迎短信。我们使用 ConcurrentHashMap 来存放那些发过短信的用户 ID 实现幂 等,避免相同的用户进行补偿时重复发送短信:
package org.geekbang.time.commonmistakes.asyncprocess.compensation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class MemberService {
//记录发送欢迎消息的状态
private Map<Long, Boolean> welcomeStatus = new ConcurrentHashMap<>();
//监听用户注册成功的消息,并发送欢迎消息
@RabbitListener(queues = RabbitConfiguration.QUEUE)
public void listen(User user) {
log.info("receive mq user {}", user.getId());
welcome(user);
}
//发送欢迎消息
public void welcome(User user) {
//消费一条记录,给一个注册成功的用户发送消息
//putIfAbsent 如果传入key对应的value已经存在,就返回存在的value,不进行替换。如果不存在,就添加key和value,返回null
//去重操作
if (welcomeStatus.putIfAbsent(user.getId(), true) == null) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
}
//发送消息给注册客户
log.info("memberService: welcome new user {}", user.getId());
}
}
}
对于 MQ 消费程序,处理逻辑务必考虑去重(支持幂等),原因有几个:
MQ 消息可能会因为中间件本身配置错误、稳定性等原因出现重复。 自动补偿重复,比如本例,同一条消息可能既走 MQ 也走补偿,肯定会出现重复,而且 考虑到高内聚,补偿 Job 本身不会做去重处理。 人工补偿重复。出现消息堆积时,异步处理流程必然会延迟。如果我们提供了通过后台 进行补偿的功能,那么在处理遇到延迟的时候,很可能会先进行人工补偿,过了一段时 间后处理程序又收到消息了,重复处理。我之前就遇到过一次由 MQ 故障引发的事故, MQ 中堆积了几十万条发放资金的消息,导致业务无法及时处理,运营以为程序出错了 就先通过后台进行了人工处理,结果 MQ 系统恢复后消息又被重复处理了一次,造成大 量资金重复发放。
接下来,定义补偿 Job 也就是备线操作。 我们在 CompensationJob 中定义一个 @Scheduled 定时任务,5 秒做一次补偿操作,因 为 Job 并不知道哪些用户注册的消息可能丢失,所以是全量补偿,补偿逻辑是:每 5 秒补 偿一次,按顺序一次补偿 5 个用户,下一次补偿操作从上一次补偿的最后一个用户 ID 开 始;对于补偿任务我们提交到线程池进行“异步”处理,提高处理能力。
package org.geekbang.time.commonmistakes.asyncprocess.compensation;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class CompensationJob {
private static ThreadPoolExecutor compensationThreadPool = new ThreadPoolExecutor(
10, 10,
1, TimeUnit.HOURS,
new ArrayBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("compensation-threadpool-%d").get());
@Autowired
private UserService userService;
@Autowired
private MemberService memberService;
private long offset = 0;
//10秒后开始补偿,每5秒补偿一次
@Scheduled(initialDelay = 10_000, fixedRate = 5_000)
public void compensationJob() {
log.info("开始从用户ID {} 补偿", offset);
userService.getUsersAfterIdWithLimit(offset, 5).forEach(user -> {
compensationThreadPool.execute(() -> memberService.welcome(user));
offset = user.getId();
});
}
}
为了实现高内聚,主线和备线处理消息,最好使用同一个方法。比如,本例中 MemberService 监听到 MQ 消息和 CompensationJob 补偿,调用的都是 welcome 方 法。 此外值得一说的是,Demo 中的补偿逻辑比较简单,生产级的代码应该在以下几个方面进 行加强:
考虑配置补偿的频次、每次处理数量,以及补偿线程池大小等参数为合适的值,以满足 补偿的吞吐量。 考虑备线补偿数据进行适当延迟。比如,对注册时间在 30 秒之前的用户再进行补偿,以 方便和主线 MQ 实时流程错开,避免冲突。 诸如当前补偿到哪个用户的 offset 数据,需要落地数据库。 补偿 Job 本身需要高可用,可以使用类似 XXLJob 或 ElasticJob 等任务系统
运行程序,执行注册方法注册 10 个用户,输出如下
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.12)
2021-11-25 15:33:40.155 INFO 11524 --- [ main] o.g.t.c.a.c.CommonMistakesApplication : Starting CommonMistakesApplication using Java 1.8.0_121 on DESKTOP-AJV2N6C with PID 11524 (D:\2021\LEO\拉勾\demo\target\classes started by STAR in D:\2021\LEO\拉勾\demo)
2021-11-25 15:33:40.159 INFO 11524 --- [ main] o.g.t.c.a.c.CommonMistakesApplication : No active profile set, falling back to default profiles: default
2021-11-25 15:33:42.616 INFO 11524 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
2021-11-25 15:33:42.634 INFO 11524 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2021-11-25 15:33:42.634 INFO 11524 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.54]
2021-11-25 15:33:42.860 INFO 11524 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2021-11-25 15:33:42.860 INFO 11524 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 2598 ms
2021-11-25 15:33:44.801 INFO 11524 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2021-11-25 15:33:44.804 INFO 11524 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.125.50.250:5672]
2021-11-25 15:33:44.982 INFO 11524 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#69e308c6:0/SimpleConnection@5d1b9c3d [delegate=amqp://admin@192.125.50.250:5672/, localPort= 1064]
2021-11-25 15:33:45.125 INFO 11524 --- [ main] o.g.t.c.a.c.CommonMistakesApplication : Started CommonMistakesApplication in 5.634 seconds (JVM running for 6.408)
2021-11-25 15:33:55.122 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 0 补偿
2021-11-25 15:34:00.124 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 0 补偿
2021-11-25 15:34:05.126 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 0 补偿
2021-11-25 15:34:10.115 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 0 补偿
2021-11-25 15:34:11.629 INFO 11524 --- [nio-8080-exec-1] o.apache.tomcat.util.http.parser.Cookie : A cookie header was received ["supportFilterViews\":\"1\",\"utilityId\":\"1\",\"regionId\":\"\",\"utilityLocale\":{\"currencyDefaultDigit\":1,\"currencyPrefix\":\"\",\"currencySuffix\":\"Tk\",\"dateFormat\":\"MM/dd/yyyy\",\"id\":\"\",\"layoutDirection\":0,\"numberDecimalSeparator\":\".\",\"numberGroupingSeparator\":\",\",\"numberGroupingSize\":3,\"numberSecondaryGroupingSize\":3,\"softwareVendor\":\"\",\"timeFormat\":\"HH:mm:ss\",\"timeZoneOffset\":0,\"utilityDisplayName\":\"DPDC\",\"utilityId\":1,\"utilityLogo\":\"\"},\"currencySuffix\":\"Tk\",\"userNo\":\"ami\",\"deptId\":\"-9999\",\"currencyPrefix\":null,\"userName\":\"ami\",\"userId\":\"1\",\"deptNo\":\"-9999\"}"; theme=blue; StiMobileDesignerDictionarySettings=%7B%22createFieldOnDoubleClick%22%3Afalse%2C%22createLabel%22%3Afalse%2C%22useAliases%22%3Afalse%7D; sessionStatus=; moduleNo=sysMenu.subSystem.vending; systemId1=2; permNo=sysMenu.vending.integratedQuery; recentVisitedPages=[%22/hes/pDatPurchaseRcdController/list___Integrated%20Query___%22]; StimulsoftMobileDesignerLastTabOnPropertiesPanel=Properties] that contained an invalid cookie. That cookie will be ignored.
Note: further occurrences of this error will be logged at DEBUG level.
2021-11-25 15:34:11.645 INFO 11524 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-11-25 15:34:11.646 INFO 11524 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2021-11-25 15:34:11.647 INFO 11524 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2021-11-25 15:34:11.704 INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController : sent mq user 1
2021-11-25 15:34:11.705 INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController : sent mq user 2
2021-11-25 15:34:11.705 INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController : sent mq user 4
2021-11-25 15:34:11.706 INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController : sent mq user 5
2021-11-25 15:34:11.706 INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController : sent mq user 6
2021-11-25 15:34:11.707 INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController : sent mq user 7
2021-11-25 15:34:11.707 INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController : sent mq user 8
2021-11-25 15:34:11.724 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : receive mq user 1
2021-11-25 15:34:13.734 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 1
2021-11-25 15:34:13.735 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : receive mq user 2
2021-11-25 15:34:15.122 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 0 补偿
2021-11-25 15:34:15.748 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 2
2021-11-25 15:34:15.749 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : receive mq user 4
2021-11-25 15:34:15.749 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : receive mq user 5
2021-11-25 15:34:15.750 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : receive mq user 6
2021-11-25 15:34:17.138 INFO 11524 --- [on-threadpool-3] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 4
2021-11-25 15:34:17.138 INFO 11524 --- [on-threadpool-2] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 3
2021-11-25 15:34:17.138 INFO 11524 --- [on-threadpool-4] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 5
2021-11-25 15:34:17.752 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 6
2021-11-25 15:34:17.753 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : receive mq user 7
2021-11-25 15:34:19.767 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 7
2021-11-25 15:34:19.768 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : receive mq user 8
2021-11-25 15:34:20.124 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 5 补偿
2021-11-25 15:34:21.776 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 8
2021-11-25 15:34:22.134 INFO 11524 --- [on-threadpool-9] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 9
2021-11-25 15:34:25.129 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 9 补偿
2021-11-25 15:34:27.137 INFO 11524 --- [on-threadpool-1] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 10
2021-11-25 15:34:30.129 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 10 补偿
2021-11-25 15:34:35.121 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 10 补偿
2021-11-25 15:34:40.125 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 10 补偿
可以看到:
总共 10 个用户,MQ 发送成功的用户有四个,分别是用户 1、5、7、8。 补偿任务第一次运行,补偿了用户 2、3、4,第二次运行补偿了用户 6、9,第三次运行 补充了用户 10。
最后提一下,针对消息的补偿闭环处理的最高标准是,能够达到补偿全量数据的吞吐量。也 就是说,如果补偿备线足够完善,即使直接把 MQ 停机,虽然会略微影响处理的及时性, 但至少确保流程都能正常执行。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。