赞
踩
1.消息可靠性问题
2.延迟消息问题
3.消息堆积问题
4.高可用问题
1.消息可靠性
四种解决方案
生产者确认机制
mq持久化
消费者确认机制
- logging:
- pattern:
- dateformat: HH:mm:ss:SSS
- level:
- cn.itcast: debug
- spring:
- rabbitmq:
- # host: 192.168.188.142 # rabbitMQ的ip地址
- # port: 5672 # 端口
- addresses: 192.168.188.142:8071, 192.168.188.142:8072, 192.168.188.142:8073
- username: itcast
- password: 123321
- virtual-host: /
- # 消息回调 异步
- publisher-confirm-type: correlated
- # 开启交换机路由到队列如果失败之后的回调
- publisher-returns: true
- template:
- # 路由失败之后调用publisher-returns方法
- mandatory: true
这是交换机回调ack,全局只需要写一个,所有可以写在配置类中
这个配置类的接口,的作用是,只要ioc容器初始化后,就会执行这个方法
- @Slf4j
- @Configuration
- public class CommonConfig implements ApplicationContextAware {
-
- /**
- * ioc容器加载完成之后会调用
- */
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- // 获取rabbitTemplate
- RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
- // 设置returnCallBack
- rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- // 如果大于0代表是延迟队列的消息
- if (message.getMessageProperties().getReceivedDelay() > 0) {
- return;
- }
- log.info("发送的消息 ===> {},响应码 ===> {} ,失败原因 ===> {},交换机 ===> {},交换机路由的key ===>{}",
- message, replyCode, replyText, exchange, routingKey);
- // 记录日志,人工干预
- // 或者重新投递
- }
- });
- }
- }
然后队列中的ack回调,如果消息执行失败,机会返回ack结果
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSendMessage2SimpleQueue() throws InterruptedException {
- String routingKey = "simple";
- String message = "hello, spring amqp!";
- // 指定消息id
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- // 指定回调
- correlationData.getFuture().addCallback(
- new SuccessCallback<CorrelationData.Confirm>() {
- @Override
- public void onSuccess(CorrelationData.Confirm result) {
- if (result.isAck()) {
- log.info("消息投递到交换机成功,消息的id ===>{}", correlationData.getId());
- } else {
- log.info("消息投递到交换机失败,消息的id ===>{},失败原因 ===> {}", correlationData.getId(), result.getReason());
- }
- }
- },
- new FailureCallback() {
- @Override
- public void onFailure(Throwable ex) {
- log.info("消息投递到交换机失败,未知原因 ===>{}", ex.getMessage());
- }
- }
- );
- rabbitTemplate.convertAndSend("amq.topic", routingKey, message, correlationData);
- }
另外,记得发消息的时候,加入correlationData,才能生效
消费者也要开启auto的ack回调,这样,如果消息不能正常被消费,就会返回队列,但是会形成死循环,使用srping的 retry 的消息重试机制,但是重试机制后,还是无法处理,消息还还是会丢失
接下来就要,绑定一个失败交换器,在网页上查看
- spring:
- rabbitmq:
- # host: 192.168.188.142 # rabbitMQ的ip地址
- # port: 5672 # 端口
- addresses: 192.168.188.142:8071, 192.168.188.142:8072, 192.168.188.142:8073
- username: itcast
- password: 123321
- virtual-host: /
- listener:
- simple:
- prefetch: 1
- # 消息投递方式
- acknowledge-mode: auto
- retry:
- # 开启本地重试
- enabled: true
- # 第一次失败重试时间
- initial-interval: 1000
- # 倍数
- multiplier: 1
- # 最大次数
- max-attempts: 3
- # 是否有状态,消费者业务如果有事务,要设置为有状态
- # 无状态:true
- # 有状态:false
- stateless: true
- /**
- * @author t3rik
- */
- @Configuration
- public class ErrorDirectConfig {
-
- /**
- * 新建交换机
- */
- @Bean
- public DirectExchange errorDirect() {
- return new DirectExchange("error.direct");
- }
-
-
- /**
- * 新建队列
- */
- @Bean
- public Queue errorQueue() {
- return new Queue("error.queue");
- }
-
-
- /**
- * 绑定
- */
- @Bean
- public Binding bindingErrorQueueToErrorDirect() {
- // return BindingBuilder.bind(errorQueue).to(errorDirect).with("error");
- return BindingBuilder.bind(errorQueue()).to(errorDirect()).with("error");
- }
-
-
- /**
- * 指定MessageRecoverer
- */
- @Bean
- public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
- return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
- }
- }
之后就是死信队列
- @Configuration
- public class DeadQueueConfig {
-
- /**
- * 创建交换机
- */
- @Bean
- public DirectExchange ttlDirect() {
- return new DirectExchange("ttl.direct");
- }
-
- /**
- * 创建队列
- * deadLetterExchange 设置死信交换机
- * deadLetterRoutingKey 设置投递到死信交换机之后,该交换机路由到绑定队列时的key
- * ttl 设置过期时间
- */
- @Bean
- public Queue ttlQueue() {
- return QueueBuilder.durable("ttl.queue")
- .deadLetterExchange("dl.direct")
- .deadLetterRoutingKey("dl")
- .ttl(10000)
- .build();
- }
-
- /**
- * 队列和交换机绑定
- */
- @Bean
- public Binding bindingQueueToDirectExchange() {
- return BindingBuilder.bind(ttlQueue()).to(ttlDirect()).with("ttl");
- }
- }
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "dl.queue"),
- exchange = @Exchange(name = "dl.direct"),
- key = "dl"
- ))
- public void listenDeadQueue(String msg) {
- log.info("消费者接收到延迟的消息:【" + msg + "】");
- }
值得注意的如果死信队列设置了过期时间,消息也设置了过期时间,按照过期时间短的来执行
-
- /**
- * 发送消息到死信队列
- *
- * @throws InterruptedException
- */
- @Test
- public void testSendMessage2DeadQueue() throws InterruptedException {
- String message = "hello, deadQueue!";
- // 消息持久化
- Message msg = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))
- .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
- .setExpiration("5000")
- .build();
- rabbitTemplate.convertAndSend("ttl.direct", "ttl", msg);
- }
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
延迟队列的使用场景包括:
延迟发送短信
用户下单,如果用户在15 分钟内未支付,则自动取消
预约工作会议,20分钟后自动通知所有参会人员
使用mq的插件进行延迟队列
第一种写配置类
第二中直接写队列
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "delay.queue"),
- exchange = @Exchange(name = "delay.direct",delayed = "true"),
- key = "delay"
- ))
- public void listenDelayQueue(String msg) {
- log.info("消费者接收到延迟队列的消息:【" + msg + "】");
- }
发送消息的时候要额外指定头
- /**
- * 发送消息到延迟队列
- */
- @Test
- public void testSendMessage2DelayQueue() {
- String message = "hello, delayQueue!";
- // 消息持久化
- Message msg = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))
- .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
- .setHeader("x-delay", 5000)
- .build();
- rabbitTemplate.convertAndSend("delay.direct", "delay", msg);
- }
如果是插件就会造成ack不能正常返回
- // 如果大于0代表是延迟队列的消息
- if (message.getMessageProperties().getReceivedDelay() > 0) {
- return;
- }
惰性队列
- /**
- * @author t3rik
- */
- @Configuration
- public class LazyQueueConfig {
-
-
- @Bean
- public Queue normalQueue() {
- return new Queue("normal.queue");
- }
-
- @Bean
- public Queue lazyQueue() {
- return QueueBuilder
- .durable("lazy.queue")
- .lazy()
- .build();
-
- }
- }
Java代码创建仲裁队列
- @Bean
- public Queue quorumQueue() {
- return QueueBuilder
- .durable("quorum.queue") // 持久化
- .quorum() // 仲裁队列
- .build();
- }
SpringAMQP连接MQ集群
- spring:
- rabbitmq:
- addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073
- username: itcast
- password: 123321
- virtual-host: /
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。