赞
踩
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
# 用于接收设备发送的数据
rabbitmq:
host: xxx.xx.xxx.xxx
port: 5672
username: guest
password: guest
mq-name: test
# 确认消息已发送到交换机(Exchange)
publisher-confirm-type: correlated
# 确认消息已发送到队列
publisher-returns: true
@Component
@Slf4j
public class RabbitMessageQueueReceiver {
@Autowired
private ConfigProperties configProperties;
@Autowired
private AsyncConfig asyncConfig;
@Autowired
private DataGsmEquipComparisonManager dataGsmEquipComparisonManager;
@RabbitListener(queuesToDeclare = {@Queue(name = "${spring.rabbitmq.mq-name}", durable = "true")}, ackMode = "MANUAL")
@RabbitHandler()
public void receive(String msg, Channel channel, Message message) throws IOException, InterruptedException {
// 获取消息体
String jsonString = new String(message.getBody());
// 处理数据格式
Map<String, Object> dataMap = dealMessageData(jsonString);
try {
asyncConfig.taskExecutor().execute(() -> {
// 根据数据类型处理消息【这里大家根据实际情况进行处理】
DealMessageByType.getInstance().dispose(dataMap);
});
channel.basicQos(5);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (Exception e) {
e.printStackTrace();
log.error("error message:" + jsonString);
try {
channel.basicQos(5);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
@Component
@Log4j
public class RabbitMessageQueueSender {
public RabbitTemplate rabbitTemplate;
public boolean sendMessage(String exchange, String routingKey, String message) {
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
}
@Component
@Slf4j
public class RabbitmqSendCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void run() {
if (rabbitTemplate != null) {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(this);
}
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String failCause) {
if (ack) {
log.info("消息发送成功");
} else {
log.info("消息发送失败,进行容错处理");
}
log.info("消息发送到交换机时的回调函数, ack:" + ack + "FailCause 消息:" + failCause);
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息从交换机发送到队列时失败的回调函数, 调用失败!!!" + returned);
}
}
SpringBoot 项目集成了 RabbitMQ 但是有时候又用不到它,比如说:
核心报错信息:
WARN o.s.boot.actuate.amqp.RabbitHealthIndicator - Rabbit health check failed
Caused by: java.net.ConnectException: Connection refused: connect
详细报错信息:
[2023-03-16 11:18:11.456] traceId= [RMI TCP Connection(8)-xxx.xxx.xx.xxx] WARN o.s.boot.actuate.amqp.RabbitHealthIndicator - Rabbit health check failed
org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:61)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:602)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:725)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:252)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2173)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2146)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2126)
at org.springframework.boot.actuate.amqp.RabbitHealthIndicator.getVersion(RabbitHealthIndicator.java:49)
at org.springframework.boot.actuate.amqp.RabbitHealthIndicator.doHealthCheck(RabbitHealthIndicator.java:44)
at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:82)
at org.springframework.boot.actuate.health.HealthIndicator.getHealth(HealthIndicator.java:37)
at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:77)
at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:40)
at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:130)
at org.springframework.boot.actuate.health.HealthEndpointSupport.getAggregateContribution(HealthEndpointSupport.java:141)
at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:126)
at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:95)
at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:66)
at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:71)
at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282)
at org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:74)
at org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60)
at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:122)
at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:97)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)
at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)
at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468)
at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76)
at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309)
at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401)
at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829)
at sun.reflect.GeneratedMethodAccessor212.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357)
at sun.rmi.transport.Transport$1.run(Transport.java:200)
at sun.rmi.transport.Transport$1.run(Transport.java:197)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:834)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)
at java.net.Socket.connect(Socket.java:606)
at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1223)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1173)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connectAddresses(AbstractConnectionFactory.java:640)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:615)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:565)
... 50 common frames omitted
添加spring.rabbitmq.enable
配置作为总开关:
spring:
# 用于接收设备发送的数据
rabbitmq:
# rabbitmq 的自定义配置 enable 用于开启或关闭 rabbitmq 服务(false关闭,true开启)
enable: true
host: 172.81.205.216
port: 5672
username: guest
password: guest
mq-name: ZRTZ_QUEUE_EFENCE_DEVICE_OBTAIN_STATUS
# 确认消息已发送到交换机(Exchange)
publisher-confirm-type: correlated
# 确认消息已发送到队列
publisher-returns: true
@EnableRabbit
@SpringBootApplication(exclude = {RabbitAutoConfiguration.class})
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
/**
* 用于管理 RabbitAutoConfiguration 是否配置
*/
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.enable", havingValue = "true")
public class RabbitMessageQueueEnableAutoConfig extends RabbitAutoConfiguration {
}
@Component
@Slf4j
@Data
public class RabbitMessageQueueReceiverConfig {
@Value("${spring.rabbitmq.enable}")
private boolean enable;
@Bean
public RabbitMessageQueueReceiver initRabbitMessageQueueReceiver() {
if (enable) {
RabbitMessageQueueReceiver rabbitMessageQueueReceiver = new RabbitMessageQueueReceiver();
log.info("【------已启用------】RabbitMessageQueueReceiver");
return rabbitMessageQueueReceiver;
} else {
log.info("【------不启用------】RabbitMessageQueueReceiver");
return null;
}
}
}
// 监听代码【去掉@Component】
// @Component
@Log4j
public class RabbitMessageQueueSender {
public RabbitTemplate rabbitTemplate;
public boolean sendMessage(String exchange, String routingKey, String message) {
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
}
// 消息发送
@Component
@Log4j
public class RabbitMessageQueueSender {
@Autowired(required = false)
public RabbitTemplate rabbitTemplate;
public boolean sendMessage(String exchange, String routingKey, String message) {
try {
if (rabbitTemplate != null) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
} else {
return false;
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
}
// 发送回调
@Component
@Slf4j
public class RabbitmqSendCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired(required = false)
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void run() {
if (rabbitTemplate != null) {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(this);
}
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String failCause) {
if (ack) {
log.info("消息发送成功");
} else {
log.info("消息发送失败,进行容错处理");
}
log.info("消息发送到交换机时的回调函数, ack:" + ack + "FailCause 消息:" + failCause);
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息从交换机发送到队列时失败的回调函数, 调用失败!!!" + returned);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。