当前位置:   article > 正文

springboot中使用rabbitmq_correlationdataext

correlationdataext

1.加入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

 2. 配置,配置中各个参数的含义 传送门

spring:
    rabbitmq:
      host: xxxx
      username: xxx
      password: xxxx
      port: 5672
      virtual-host: /
      publisher-confirm-type: correlated
      publisher-returns: true
      template:
        mandatory: true
      listener:
        type: simple
        simple:
          acknowledge-mode: manual
          retry:
            enabled: true
          prefetch: 30

3.使用,我这里是根据自己的业务场景的具体使用,可以看这个大神总结的使用方式传送门 

3.1 配置一个topic类型的交换机,绑定队列,指定routingkey 

  1. @Configuration
  2. public class TopicRabbitMqConfig {
  3. public final static String exchange = "xxx";
  4. public final static String queue = "xxx";
  5. private final static String routing = "xxx";
  6. @Bean
  7. TopicExchange netdiskTopicExchange(){
  8. return new TopicExchange(exchange, true, false);
  9. }
  10. @Bean
  11. Queue netdiskQueue(){
  12. return new Queue(queue);
  13. }
  14. @Bean
  15. Binding netdiskBinding(){
  16. return BindingBuilder.bind(netdiskQueue()).to(netdiskTopicExchange()).with(routing);
  17. }
  18. }

3.2 封装了一个工具类方便后续使用

  1. @Slf4j
  2. @Component
  3. public class MqUtil implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
  4. @Resource
  5. private RabbitTemplate rt;
  6. public static RabbitTemplate rabbitTemplate;
  7. @PostConstruct
  8. private void init() {
  9. MqUtil.rabbitTemplate = this.rt;
  10. rabbitTemplate.setConfirmCallback(this::confirm);
  11. rabbitTemplate.setReturnsCallback(this::returnedMessage);
  12. }
  13. /**
  14. * 不论是否进入交换机,都会回调当前方法
  15. *
  16. * @param correlationData 消息投递封装对象
  17. * @param ack 是否投递成功
  18. * @param exception 如果错误,错误原因
  19. */
  20. @Override
  21. public void confirm(CorrelationData correlationData, boolean ack, String exception) {
  22. if (!ack) {
  23. if (correlationData instanceof CorrelationDataExt) {
  24. CorrelationDataExt correlationDataExt = (CorrelationDataExt) correlationData;
  25. Object message = correlationDataExt.getData();
  26. log.error("消息进入交换机失败:{}, 原因:{}", JSON.toJSONString(message), exception);
  27. }
  28. }
  29. }
  30. /**
  31. * 消息从交换机进入队列失败回调方法:只会在失败的情况下
  32. *
  33. * @param ReturnedMessage returnedMessage
  34. */
  35. @Override
  36. public void returnedMessage(ReturnedMessage returnedMessage) {
  37. Message message = returnedMessage.getMessage();
  38. int replyCode = returnedMessage.getReplyCode();
  39. String replyText = returnedMessage.getReplyText();
  40. String exchange = returnedMessage.getExchange();
  41. String routingKey = returnedMessage.getRoutingKey();
  42. String messageContent = new String(message.getBody(), StandardCharsets.UTF_8);
  43. Map<String, Object> map = new HashMap<>();
  44. map.put("replyCode", replyCode);
  45. map.put("replyText", replyText);
  46. map.put("exchange", exchange);
  47. map.put("routingKey", routingKey);
  48. map.put("message", messageContent);
  49. log.error("消息从交换机进入队列失败:{}", JSON.toJSONString(map));
  50. }
  51. public static void send(String type, MqMessageData data) throws AmqpException {
  52. String msgId = UUID.randomUUID().toString();
  53. CorrelationDataExt correlationData = new CorrelationDataExt();
  54. correlationData.setId(msgId);
  55. correlationData.setData("xxxx");
  56. rabbitTemplate.convertAndSend(TopicRabbitMqConfig.exchange, "xxx", "message", correlationData);
  57. }
  58. }

3.2 CorrelationDataExt ,扩展CorrelationData ,方便把我们发送出的消息挂回到生产者确认的回调里,这部分的使用方式不确定是否一定对,因为我也是第一次用,但是发现CorrelationData 里只有id能拿到,却拿不到数据,后来看了一个帖子可以扩展CorrelationData ,能实现我想要的效果,还希望路过的大神能指导一下,生产者确认的回调里,如果失败了怎么处理消息?感谢!!

  1. /**
  2. * CorrelationData的自定义实现,用于拿到消息内容
  3. * @author coco
  4. * @date 2022/9/16
  5. */
  6. public class CorrelationDataExt extends CorrelationData {
  7. //数据
  8. private volatile Object data;
  9. public Object getData() {
  10. return data;
  11. }
  12. public void setData(Object data) {
  13. this.data = data;
  14. }
  15. }

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/958586
推荐阅读
相关标签
  

闽ICP备14008679号