当前位置:   article > 正文

java——spring boot集成RabbitMQ——高级特效——封装消息的元数据

rabbitlistener 包装

交换机和队列支持持久化。现在我们也需要给消息设计元数据

DeliveryMode

设置为2,表示支持消息的持久化。

=======================================================================================

接上一边博文。  修改文件:

发送者:

  1. package org.example.sender;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.core.MessageDeliveryMode;
  4. import org.springframework.amqp.core.MessageProperties;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. /**
  9. * 消息生产者 发送消息
  10. */
  11. @Component
  12. public class MessageSender {
  13. @Autowired
  14. RabbitTemplate rabbitTemplate;
  15. /**
  16. * 发送消息
  17. * @param info
  18. */
  19. public void send(String info)
  20. {
  21. System.out.println("发送消息>>>"+info);
  22. // CorrelationData correlationData = new CorrelationData();
  23. //
  24. // String uuid = UUID.randomUUID().toString();
  25. // System.out.println(uuid);
  26. //
  27. // correlationData.setId(uuid);
  28. // rabbitTemplate.convertAndSend("amqp-topic","huawei.a",info,correlationData);
  29. /**
  30. * public static int toInt(MessageDeliveryMode mode) {
  31. * switch (mode) {
  32. * case NON_PERSISTENT:
  33. * return 1;
  34. * case PERSISTENT:
  35. * return 2;
  36. * default:
  37. * return -1;
  38. */
  39. MessageProperties messageProperties = new MessageProperties();
  40. messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  41. messageProperties.setExpiration("20000"); // 设置过期时间20
  42. messageProperties.setAppId("abc123456");
  43. messageProperties.setHeader("国家1","中国1");
  44. messageProperties.setHeader("国家2","中国2");
  45. Message message = new Message(info.getBytes(), messageProperties);
  46. System.out.println(message);
  47. rabbitTemplate.convertAndSend("amqp-topic", "huawei.a", message);
  48. }
  49. }

 

 

消费者:

  1. package org.example.receiver;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. @Component
  8. public class TopicReceiver {
  9. //分别监听名称为xiaomi、huawei的队列
  10. @RabbitListener(queues = "xiaomi")
  11. public void handlerXM(Message message,String msg, Channel channel) throws IOException {
  12. System.out.println("小米:"+msg);
  13. //手动签收,不启动批量签收
  14. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  15. System.out.println(message.getMessageProperties().getDeliveryTag());
  16. }
  17. @RabbitListener(queues = "huawei")
  18. public void handlerHW(Message message,String msg, Channel channel) throws IOException {
  19. System.out.println("华为:"+msg);
  20. System.out.println(message.getMessageProperties().getHeaders());
  21. System.out.println((String) message.getMessageProperties().getHeader("国家2"));
  22. System.out.println(message);
  23. System.out.println(message.getMessageProperties().getExpiration());
  24. System.out.println(message.getMessageProperties().getAppId());
  25. //手动签收,不启动批量签收
  26. //告诉rmq签收的消息的id。以及是否批量签收
  27. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  28. }
  29. }

 

 

======================================================================================

参考:https://blog.csdn.net/qq_43623492/article/details/124259773

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/766719?site
推荐阅读
相关标签
  

闽ICP备14008679号