赞
踩
1.加入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2. 配置,配置中各个参数的含义 传送门
spring: rabbitmq: host: xxxx username: xxx password: xxxx port: 5672 virtual-host: / publisher-confirm-type: correlated publisher-returns: true template: mandatory: true listener: type: simple simple: acknowledge-mode: manual retry: enabled: true prefetch: 30
3.使用,我这里是根据自己的业务场景的具体使用,可以看这个大神总结的使用方式传送门
3.1 配置一个topic类型的交换机,绑定队列,指定routingkey
- @Configuration
- public class TopicRabbitMqConfig {
-
- public final static String exchange = "xxx";
- public final static String queue = "xxx";
- private final static String routing = "xxx";
-
- @Bean
- TopicExchange netdiskTopicExchange(){
- return new TopicExchange(exchange, true, false);
- }
-
- @Bean
- Queue netdiskQueue(){
- return new Queue(queue);
- }
-
- @Bean
- Binding netdiskBinding(){
- return BindingBuilder.bind(netdiskQueue()).to(netdiskTopicExchange()).with(routing);
- }
- }
3.2 封装了一个工具类方便后续使用
- @Slf4j
- @Component
- public class MqUtil implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
-
- @Resource
- private RabbitTemplate rt;
-
- public static RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- private void init() {
- MqUtil.rabbitTemplate = this.rt;
-
- rabbitTemplate.setConfirmCallback(this::confirm);
- rabbitTemplate.setReturnsCallback(this::returnedMessage);
- }
-
-
-
- /**
- * 不论是否进入交换机,都会回调当前方法
- *
- * @param correlationData 消息投递封装对象
- * @param ack 是否投递成功
- * @param exception 如果错误,错误原因
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String exception) {
-
- if (!ack) {
- if (correlationData instanceof CorrelationDataExt) {
- CorrelationDataExt correlationDataExt = (CorrelationDataExt) correlationData;
- Object message = correlationDataExt.getData();
- log.error("消息进入交换机失败:{}, 原因:{}", JSON.toJSONString(message), exception);
- }
- }
-
- }
-
- /**
- * 消息从交换机进入队列失败回调方法:只会在失败的情况下
- *
- * @param ReturnedMessage returnedMessage
- */
- @Override
- public void returnedMessage(ReturnedMessage returnedMessage) {
- Message message = returnedMessage.getMessage();
- int replyCode = returnedMessage.getReplyCode();
- String replyText = returnedMessage.getReplyText();
- String exchange = returnedMessage.getExchange();
- String routingKey = returnedMessage.getRoutingKey();
- String messageContent = new String(message.getBody(), StandardCharsets.UTF_8);
-
- Map<String, Object> map = new HashMap<>();
- map.put("replyCode", replyCode);
- map.put("replyText", replyText);
- map.put("exchange", exchange);
- map.put("routingKey", routingKey);
- map.put("message", messageContent);
-
- log.error("消息从交换机进入队列失败:{}", JSON.toJSONString(map));
- }
-
-
-
-
- public static void send(String type, MqMessageData data) throws AmqpException {
- String msgId = UUID.randomUUID().toString();
-
-
- CorrelationDataExt correlationData = new CorrelationDataExt();
- correlationData.setId(msgId);
- correlationData.setData("xxxx");
-
-
- rabbitTemplate.convertAndSend(TopicRabbitMqConfig.exchange, "xxx", "message", correlationData);
- }
-
- }
3.2 CorrelationDataExt ,扩展CorrelationData ,方便把我们发送出的消息挂回到生产者确认的回调里,这部分的使用方式不确定是否一定对,因为我也是第一次用,但是发现CorrelationData 里只有id能拿到,却拿不到数据,后来看了一个帖子可以扩展CorrelationData ,能实现我想要的效果,还希望路过的大神能指导一下,生产者确认的回调里,如果失败了怎么处理消息?感谢!!
- /**
- * CorrelationData的自定义实现,用于拿到消息内容
- * @author coco
- * @date 2022/9/16
- */
- public class CorrelationDataExt extends CorrelationData {
- //数据
- private volatile Object data;
-
- public Object getData() {
- return data;
- }
-
- public void setData(Object data) {
- this.data = data;
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。