当前位置:   article > 正文

SpringBoot-RabbitMQ篇-消息可靠投递_cachingconnectionfactory.confirmtype

cachingconnectionfactory.confirmtype

一、发送者异常监控

1.1 发送者异常种类

1.基本处理流程

  • 补偿(兜底)方案

    2.模拟broker宕机:修改发送者端口如5673,然后启动,发送消息,端口不对无法连接主机
  • 错误信息:java.net.ConnectException: Connection timed out: connect
  • 补偿方案:加入异常处理,如果不可达则返回错误
  • 这种错误在发送的时候就已经可以发现,直接将错误返回给调用方即可
@RequestMapping("/direct")
public Object sendEmail(String msg) {
    try {
        rabbitTemplate.convertAndSend("exchange.direct.springboot.email", "queue.email.routing.key", msg);
        return msg;
    } catch (AmqpException e) {
        System.out.println("发送出现异常:" + e.getMessage());
        return "网络中断,请稍后再试";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

3.模拟无交换器异常

  • 错误信息

ERROR 4880 — [.200.57.39:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘noExchange’ in vhost ‘/’, class-id=60, method-id=40)

  • 错误说明:如果没有交换器并不会报错,只会输出一条日志
  • 补偿方案:需要采用发送回调来确认是否成功发送消息

4.模拟无路由异常

  • 错误信息:无任何提示,消息直接被丢弃
  • 补偿方案:需要采用发送回调来确认是否成功发送消息

1.2 消息发送回调

1. 因为消息是异步发送,所以需要确保消息能正确发送
2. 所以可配置RabbitTemplate然后指定回调信息
3. 步骤01:修改配置文件,配置回调参数

publisher-confirm-type

  • org.springframework.boot.autoconfigure.amqp.RabbitProperties#publisherConfirmType
  • org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: tianxin
    password: tianxin
    # 开启消息发broker回调
    publisher-confirm-type: correlated
    # 开启路由消息路由回调
    publisher-returns: true
    # 强制确认,也可以在代码中开启
    template:
      mandatory: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
/**
 * The type of publisher confirms to use.
 */
public enum ConfirmType {

	/**
	 * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
	 * within scoped operations.
	 */
	SIMPLE,

	/**
	 * Use with {@code CorrelationData} to correlate confirmations with sent
	 * messsages.
	 */
	CORRELATED,

	/**
	 * Publisher confirms are disabled (default).
	 */
	NONE
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

4.步骤02:配置RabbitTemplate,设置交换器确认回调和路由回调
setConfirmCallback:无论成功与否都会调用
setReturnCallback:错误时才调用

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Objects;

@Configuration
public class CustomRabbitTemplate {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        // 开启mandatory为true才能触发回调方法,无论消息推送结果如何强制调用回调方法
        rabbitTemplate.setMandatory(true);
        // 设置连接工厂信息
        rabbitTemplate.setConnectionFactory(connectionFactory);

        // 消息发broker回调:发送者到broker的exchange是否正确找到
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println("setConfirmCallback 消息数据:" + correlationData);
            if (Objects.nonNull(correlationData)) {
                System.out.println("setConfirmCallback 消息数据:" + correlationData.getReturnedMessage());
            }
            System.out.println("setConfirmCallback 消息确认:" + ack);
            System.out.println("setConfirmCallback 原因:" + cause);
            System.out.println("-----------------------------------");
        });

        // 消息路由回调:从交换器路由到队列是否正确发送
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println("setReturnCallback 消息:" + message);
            System.out.println("setReturnCallback 回应码:" + replyCode);
            System.out.println("setReturnCallback 回应信息:" + replyText);
            System.out.println("setReturnCallback 交换器:" + exchange);
            System.out.println("setReturnCallback 路由键:" + routingKey);
            System.out.println("-----------------------------------");
        });

        return rabbitTemplate;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 路由回调和消息回调
/**
 * A callback for publisher confirmations.
 *
 */
@FunctionalInterface
public interface ConfirmCallback {

	/**
	 * Confirmation callback.
	 * @param correlationData correlation data f
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/662688
推荐阅读
相关标签
  

闽ICP备14008679号