交换机和队列支持持久化。现在我们也需要给消息设计元数据
DeliveryMode
设置为2,表示支持消息的持久化。
=======================================================================================
接上一边博文。 修改文件:
发送者:
- package org.example.sender;
-
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageDeliveryMode;
- import org.springframework.amqp.core.MessageProperties;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- /**
- * 消息生产者 发送消息
- */
- @Component
- public class MessageSender {
-
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- /**
- * 发送消息
- * @param info
- */
- public void send(String info)
- {
-
- System.out.println("发送消息>>>"+info);
-
- // CorrelationData correlationData = new CorrelationData();
- //
- // String uuid = UUID.randomUUID().toString();
- // System.out.println(uuid);
- //
- // correlationData.setId(uuid);
-
- // rabbitTemplate.convertAndSend("amqp-topic","huawei.a",info,correlationData);
-
- /**
- * public static int toInt(MessageDeliveryMode mode) {
- * switch (mode) {
- * case NON_PERSISTENT:
- * return 1;
- * case PERSISTENT:
- * return 2;
- * default:
- * return -1;
- */
-
-
- MessageProperties messageProperties = new MessageProperties();
-
- messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
-
- messageProperties.setExpiration("20000"); // 设置过期时间20秒
-
- messageProperties.setAppId("abc123456");
-
- messageProperties.setHeader("国家1","中国1");
- messageProperties.setHeader("国家2","中国2");
-
-
- Message message = new Message(info.getBytes(), messageProperties);
-
- System.out.println(message);
-
- rabbitTemplate.convertAndSend("amqp-topic", "huawei.a", message);
-
- }
-
- }
消费者:
- package org.example.receiver;
-
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import java.io.IOException;
-
-
- @Component
- public class TopicReceiver {
-
- //分别监听名称为xiaomi、huawei的队列
- @RabbitListener(queues = "xiaomi")
- public void handlerXM(Message message,String msg, Channel channel) throws IOException {
-
- System.out.println("小米:"+msg);
-
- //手动签收,不启动批量签收
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
-
- System.out.println(message.getMessageProperties().getDeliveryTag());
- }
-
-
- @RabbitListener(queues = "huawei")
- public void handlerHW(Message message,String msg, Channel channel) throws IOException {
-
-
- System.out.println("华为:"+msg);
-
- System.out.println(message.getMessageProperties().getHeaders());
-
- System.out.println((String) message.getMessageProperties().getHeader("国家2"));
-
- System.out.println(message);
-
- System.out.println(message.getMessageProperties().getExpiration());
-
- System.out.println(message.getMessageProperties().getAppId());
-
-
-
- //手动签收,不启动批量签收
- //告诉rmq签收的消息的id。以及是否批量签收
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
-
- }
- }
======================================================================================
参考:https://blog.csdn.net/qq_43623492/article/details/124259773