当前位置:   article > 正文

RabbitMQ【应用 01】SpringBoot集成RabbitMQ及设置RabbitMQ启动总开关_spring.rabbitmq.enable

spring.rabbitmq.enable

1.SpringBoot集成RabbitMQ

1.1 依赖及配置

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
spring:
  # 用于接收设备发送的数据
  rabbitmq:
    host: xxx.xx.xxx.xxx
    port: 5672
    username: guest
    password: guest
    mq-name: test
    # 确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    # 确认消息已发送到队列
    publisher-returns: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

1.2 消息监听与发送

  • 数据获取
@Component
@Slf4j
public class RabbitMessageQueueReceiver {
    @Autowired
    private ConfigProperties configProperties;
    @Autowired
    private AsyncConfig asyncConfig;
    @Autowired
    private DataGsmEquipComparisonManager dataGsmEquipComparisonManager;

    @RabbitListener(queuesToDeclare = {@Queue(name = "${spring.rabbitmq.mq-name}", durable = "true")}, ackMode = "MANUAL")
    @RabbitHandler()
    public void receive(String msg, Channel channel, Message message) throws IOException, InterruptedException {
    	// 获取消息体
        String jsonString = new String(message.getBody());
        // 处理数据格式
        Map<String, Object> dataMap = dealMessageData(jsonString);
        try {
            asyncConfig.taskExecutor().execute(() -> {
                // 根据数据类型处理消息【这里大家根据实际情况进行处理】
                DealMessageByType.getInstance().dispose(dataMap);
            });
            channel.basicQos(5);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("error message:" + jsonString);
            try {
                channel.basicQos(5);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 数据发送
@Component
@Log4j
public class RabbitMessageQueueSender {
    public RabbitTemplate rabbitTemplate;
    public boolean sendMessage(String exchange, String routingKey, String message) {
        try {
			rabbitTemplate.convertAndSend(exchange, routingKey, message);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 确认机制(消息发送到服务回调)
@Component
@Slf4j
public class RabbitmqSendCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void run() {
        if (rabbitTemplate != null) {
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setConfirmCallback(this);
        }
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String failCause) {
        if (ack) {
            log.info("消息发送成功");
        } else {
            log.info("消息发送失败,进行容错处理");
        }
        log.info("消息发送到交换机时的回调函数, ack:" + ack + "FailCause 消息:" + failCause);
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息从交换机发送到队列时失败的回调函数, 调用失败!!!" + returned);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

2.设置RabbitMQ启动总开关

SpringBoot 项目集成了 RabbitMQ 但是有时候又用不到它,比如说:

  • 开发跟 RabbitMQ 服务无关接口时,此时 MQ 服务如果未启动,会有报错信息不断打印出来。
  • 不同的用户部署时,有可能用不到 RabbitMQ,此时没有部署 MQ,启动项目时不能报错。

核心报错信息:

WARN  o.s.boot.actuate.amqp.RabbitHealthIndicator - Rabbit health check failed
Caused by: java.net.ConnectException: Connection refused: connect
  • 1
  • 2

详细报错信息:

[2023-03-16 11:18:11.456] traceId= [RMI TCP Connection(8)-xxx.xxx.xx.xxx] WARN  o.s.boot.actuate.amqp.RabbitHealthIndicator - Rabbit health check failed
org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
	at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:61)
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:602)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:725)
	at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:252)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2173)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2146)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2126)
	at org.springframework.boot.actuate.amqp.RabbitHealthIndicator.getVersion(RabbitHealthIndicator.java:49)
	at org.springframework.boot.actuate.amqp.RabbitHealthIndicator.doHealthCheck(RabbitHealthIndicator.java:44)
	at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:82)
	at org.springframework.boot.actuate.health.HealthIndicator.getHealth(HealthIndicator.java:37)
	at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:77)
	at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:40)
	at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:130)
	at org.springframework.boot.actuate.health.HealthEndpointSupport.getAggregateContribution(HealthEndpointSupport.java:141)
	at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:126)
	at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:95)
	at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:66)
	at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:71)
	at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:61)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282)
	at org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:74)
	at org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60)
	at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:122)
	at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:97)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)
	at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)
	at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468)
	at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76)
	at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309)
	at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401)
	at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829)
	at sun.reflect.GeneratedMethodAccessor212.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357)
	at sun.rmi.transport.Transport$1.run(Transport.java:200)
	at sun.rmi.transport.Transport$1.run(Transport.java:197)
	at java.security.AccessController.doPrivileged(Native Method)
	at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
	at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573)
	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:834)
	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688)
	at java.security.AccessController.doPrivileged(Native Method)
	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused: connect
	at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
	at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200)
	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)
	at java.net.Socket.connect(Socket.java:606)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1223)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1173)
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connectAddresses(AbstractConnectionFactory.java:640)
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:615)
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:565)
	... 50 common frames omitted
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

2.1 总开关配置

添加spring.rabbitmq.enable配置作为总开关:

spring:
  # 用于接收设备发送的数据
  rabbitmq:
    # rabbitmq 的自定义配置 enable 用于开启或关闭 rabbitmq 服务(false关闭,true开启)
    enable: true
    host: 172.81.205.216
    port: 5672
    username: guest
    password: guest
    mq-name: ZRTZ_QUEUE_EFENCE_DEVICE_OBTAIN_STATUS
    # 确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    # 确认消息已发送到队列
    publisher-returns: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2.2 关闭自动配置

@EnableRabbit
@SpringBootApplication(exclude = {RabbitAutoConfiguration.class})
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2.3 根据开关进行配置

/**
 * 用于管理 RabbitAutoConfiguration 是否配置
 */
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.enable", havingValue = "true")
public class RabbitMessageQueueEnableAutoConfig extends RabbitAutoConfiguration {
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2.4 消息监听与发送

  • 监听开关
@Component
@Slf4j
@Data
public class RabbitMessageQueueReceiverConfig {
    @Value("${spring.rabbitmq.enable}")
    private boolean enable;
    @Bean
    public RabbitMessageQueueReceiver initRabbitMessageQueueReceiver() {
        if (enable) {
            RabbitMessageQueueReceiver rabbitMessageQueueReceiver = new RabbitMessageQueueReceiver();
            log.info("【------已启用------】RabbitMessageQueueReceiver");
            return rabbitMessageQueueReceiver;
        } else {
            log.info("【------不启用------】RabbitMessageQueueReceiver");
            return null;
        }
    }
}

// 监听代码【去掉@Component】
// @Component
@Log4j
public class RabbitMessageQueueSender {
    public RabbitTemplate rabbitTemplate;
    public boolean sendMessage(String exchange, String routingKey, String message) {
        try {
			rabbitTemplate.convertAndSend(exchange, routingKey, message);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 消息发送及回调【添加 (required = false) 防止接口被调用出错】
// 消息发送
@Component
@Log4j
public class RabbitMessageQueueSender {
    @Autowired(required = false)
    public RabbitTemplate rabbitTemplate;
    public boolean sendMessage(String exchange, String routingKey, String message) {
        try {
            if (rabbitTemplate != null) {
                rabbitTemplate.convertAndSend(exchange, routingKey, message);
            } else {
                return false;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
}

// 发送回调
@Component
@Slf4j
public class RabbitmqSendCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void run() {
        if (rabbitTemplate != null) {
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setConfirmCallback(this);
        }
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String failCause) {
        if (ack) {
            log.info("消息发送成功");
        } else {
            log.info("消息发送失败,进行容错处理");
        }
        log.info("消息发送到交换机时的回调函数, ack:" + ack + "FailCause 消息:" + failCause);
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息从交换机发送到队列时失败的回调函数, 调用失败!!!" + returned);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

3.总结

  • 关闭自动配置。
  • 根据自定义的标志进行bean对象装配。
  • 防止未装配导致的报错。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/483401
推荐阅读
相关标签
  

闽ICP备14008679号