当前位置:   article > 正文

RabbitMQ服务异步通信_rabbitmq异步发送消息 会一直有一个服务

rabbitmq异步发送消息 会一直有一个服务

1.消息可靠性问题

2.延迟消息问题

3.消息堆积问题

4.高可用问题

1.消息可靠性

四种解决方案

生产者确认机制

mq持久化

消费者确认机制

  1. logging:
  2. pattern:
  3. dateformat: HH:mm:ss:SSS
  4. level:
  5. cn.itcast: debug
  6. spring:
  7. rabbitmq:
  8. # host: 192.168.188.142 # rabbitMQ的ip地址
  9. # port: 5672 # 端口
  10. addresses: 192.168.188.142:8071, 192.168.188.142:8072, 192.168.188.142:8073
  11. username: itcast
  12. password: 123321
  13. virtual-host: /
  14. # 消息回调 异步
  15. publisher-confirm-type: correlated
  16. # 开启交换机路由到队列如果失败之后的回调
  17. publisher-returns: true
  18. template:
  19. # 路由失败之后调用publisher-returns方法
  20. mandatory: true

这是交换机回调ack,全局只需要写一个,所有可以写在配置类中

这个配置类的接口,的作用是,只要ioc容器初始化后,就会执行这个方法

  1. @Slf4j
  2. @Configuration
  3. public class CommonConfig implements ApplicationContextAware {
  4. /**
  5. * ioc容器加载完成之后会调用
  6. */
  7. @Override
  8. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  9. // 获取rabbitTemplate
  10. RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
  11. // 设置returnCallBack
  12. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  13. @Override
  14. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  15. // 如果大于0代表是延迟队列的消息
  16. if (message.getMessageProperties().getReceivedDelay() > 0) {
  17. return;
  18. }
  19. log.info("发送的消息 ===> {},响应码 ===> {} ,失败原因 ===> {},交换机 ===> {},交换机路由的key ===>{}",
  20. message, replyCode, replyText, exchange, routingKey);
  21. // 记录日志,人工干预
  22. // 或者重新投递
  23. }
  24. });
  25. }
  26. }

然后队列中的ack回调,如果消息执行失败,机会返回ack结果

  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3. @Test
  4. public void testSendMessage2SimpleQueue() throws InterruptedException {
  5. String routingKey = "simple";
  6. String message = "hello, spring amqp!";
  7. // 指定消息id
  8. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  9. // 指定回调
  10. correlationData.getFuture().addCallback(
  11. new SuccessCallback<CorrelationData.Confirm>() {
  12. @Override
  13. public void onSuccess(CorrelationData.Confirm result) {
  14. if (result.isAck()) {
  15. log.info("消息投递到交换机成功,消息的id ===>{}", correlationData.getId());
  16. } else {
  17. log.info("消息投递到交换机失败,消息的id ===>{},失败原因 ===> {}", correlationData.getId(), result.getReason());
  18. }
  19. }
  20. },
  21. new FailureCallback() {
  22. @Override
  23. public void onFailure(Throwable ex) {
  24. log.info("消息投递到交换机失败,未知原因 ===>{}", ex.getMessage());
  25. }
  26. }
  27. );
  28. rabbitTemplate.convertAndSend("amq.topic", routingKey, message, correlationData);
  29. }

另外,记得发消息的时候,加入correlationData,才能生效

消费者也要开启auto的ack回调,这样,如果消息不能正常被消费,就会返回队列,但是会形成死循环,使用srping的 retry 的消息重试机制,但是重试机制后,还是无法处理,消息还还是会丢失

接下来就要,绑定一个失败交换器,在网页上查看

  1. spring:
  2. rabbitmq:
  3. # host: 192.168.188.142 # rabbitMQ的ip地址
  4. # port: 5672 # 端口
  5. addresses: 192.168.188.142:8071, 192.168.188.142:8072, 192.168.188.142:8073
  6. username: itcast
  7. password: 123321
  8. virtual-host: /
  9. listener:
  10. simple:
  11. prefetch: 1
  12. # 消息投递方式
  13. acknowledge-mode: auto
  14. retry:
  15. # 开启本地重试
  16. enabled: true
  17. # 第一次失败重试时间
  18. initial-interval: 1000
  19. # 倍数
  20. multiplier: 1
  21. # 最大次数
  22. max-attempts: 3
  23. # 是否有状态,消费者业务如果有事务,要设置为有状态
  24. # 无状态:true
  25. # 有状态:false
  26. stateless: true
  1. /**
  2. * @author t3rik
  3. */
  4. @Configuration
  5. public class ErrorDirectConfig {
  6. /**
  7. * 新建交换机
  8. */
  9. @Bean
  10. public DirectExchange errorDirect() {
  11. return new DirectExchange("error.direct");
  12. }
  13. /**
  14. * 新建队列
  15. */
  16. @Bean
  17. public Queue errorQueue() {
  18. return new Queue("error.queue");
  19. }
  20. /**
  21. * 绑定
  22. */
  23. @Bean
  24. public Binding bindingErrorQueueToErrorDirect() {
  25. // return BindingBuilder.bind(errorQueue).to(errorDirect).with("error");
  26. return BindingBuilder.bind(errorQueue()).to(errorDirect()).with("error");
  27. }
  28. /**
  29. * 指定MessageRecoverer
  30. */
  31. @Bean
  32. public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
  33. return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
  34. }
  35. }

之后就是死信队列

  1. @Configuration
  2. public class DeadQueueConfig {
  3. /**
  4. * 创建交换机
  5. */
  6. @Bean
  7. public DirectExchange ttlDirect() {
  8. return new DirectExchange("ttl.direct");
  9. }
  10. /**
  11. * 创建队列
  12. * deadLetterExchange 设置死信交换机
  13. * deadLetterRoutingKey 设置投递到死信交换机之后,该交换机路由到绑定队列时的key
  14. * ttl 设置过期时间
  15. */
  16. @Bean
  17. public Queue ttlQueue() {
  18. return QueueBuilder.durable("ttl.queue")
  19. .deadLetterExchange("dl.direct")
  20. .deadLetterRoutingKey("dl")
  21. .ttl(10000)
  22. .build();
  23. }
  24. /**
  25. * 队列和交换机绑定
  26. */
  27. @Bean
  28. public Binding bindingQueueToDirectExchange() {
  29. return BindingBuilder.bind(ttlQueue()).to(ttlDirect()).with("ttl");
  30. }
  31. }
  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(name = "dl.queue"),
  3. exchange = @Exchange(name = "dl.direct"),
  4. key = "dl"
  5. ))
  6. public void listenDeadQueue(String msg) {
  7. log.info("消费者接收到延迟的消息:【" + msg + "】");
  8. }

 值得注意的如果死信队列设置了过期时间,消息也设置了过期时间,按照过期时间短的来执行

  1. /**
  2. * 发送消息到死信队列
  3. *
  4. * @throws InterruptedException
  5. */
  6. @Test
  7. public void testSendMessage2DeadQueue() throws InterruptedException {
  8. String message = "hello, deadQueue!";
  9. // 消息持久化
  10. Message msg = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))
  11. .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
  12. .setExpiration("5000")
  13. .build();
  14. rabbitTemplate.convertAndSend("ttl.direct", "ttl", msg);
  15. }

  

2.3.延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列的使用场景包括:

  • 延迟发送短信

  • 用户下单,如果用户在15 分钟内未支付,则自动取消

  • 预约工作会议,20分钟后自动通知所有参会人员

  • 使用mq的插件进行延迟队列

  • 第一种写配置类

  • 第二中直接写队列

    1. @RabbitListener(bindings = @QueueBinding(
    2. value = @Queue(name = "delay.queue"),
    3. exchange = @Exchange(name = "delay.direct",delayed = "true"),
    4. key = "delay"
    5. ))
    6. public void listenDelayQueue(String msg) {
    7. log.info("消费者接收到延迟队列的消息:【" + msg + "】");
    8. }

    发送消息的时候要额外指定头

    1. /**
    2. * 发送消息到延迟队列
    3. */
    4. @Test
    5. public void testSendMessage2DelayQueue() {
    6. String message = "hello, delayQueue!";
    7. // 消息持久化
    8. Message msg = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))
    9. .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
    10. .setHeader("x-delay", 5000)
    11. .build();
    12. rabbitTemplate.convertAndSend("delay.direct", "delay", msg);
    13. }

    如果是插件就会造成ack不能正常返回

    1. // 如果大于0代表是延迟队列的消息
    2. if (message.getMessageProperties().getReceivedDelay() > 0) {
    3. return;
    4. }

    惰性队列

    1. /**
    2. * @author t3rik
    3. */
    4. @Configuration
    5. public class LazyQueueConfig {
    6. @Bean
    7. public Queue normalQueue() {
    8. return new Queue("normal.queue");
    9. }
    10. @Bean
    11. public Queue lazyQueue() {
    12. return QueueBuilder
    13. .durable("lazy.queue")
    14. .lazy()
    15. .build();
    16. }
    17. }

  • Java代码创建仲裁队列

    1. @Bean
    2. public Queue quorumQueue() {
    3. return QueueBuilder
    4. .durable("quorum.queue") // 持久化
    5. .quorum() // 仲裁队列
    6. .build();
    7. }

    SpringAMQP连接MQ集群

    1. spring:
    2. rabbitmq:
    3. addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073
    4. username: itcast
    5. password: 123321
    6. virtual-host: /

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/921199
推荐阅读
相关标签
  

闽ICP备14008679号