赞
踩
1、rabbitmq服务端打开延迟插件 (超过 4294967295毫秒 ≈ 1193 小时 ≈ 49.7 天 这个时间会立即触发)
注意:只有RabbitMQ 3.6.x以上才支持
在下载好之后,解压得到.ez
结尾的插件包,将其复制到RabbitMQ安装目录下的plugins
文件夹。
然后通过命令行启用该插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange |
该插件在通过上述命令启用后就可以直接使用,不需要重启。
2、添加依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
3、配置交换机和队列
- @Configuration
- public class DelayedConfig {
-
- public static String EXCHANGE_NAME = "delayed_exchange";
- public static String QUEUE_NAME = "delayed_queue";
- public static String KEY_NAME = "delayed_key";
-
- /**
- * 基于插件实现的交换机,必须是CustomExchange类型,标识这是一个延时类型的交换机
- */
- @Bean()
- public CustomExchange delayedExchange(){
- Map<String,Object> params = new HashMap<>();
- params.put("x-delayed-type","direct");
- //参数1:交换机名字,参数2:交换机的类型,参数3:是否持久化,参数4:是否自动删除队列,参数5:交换机的额外参数设置
- return new CustomExchange(EXCHANGE_NAME,"x-delayed-message",true,false,params);
- }
-
- @Bean()
- public Queue delayedQueue(){
- return new Queue(QUEUE_NAME);
- }
-
- @Bean
- public Binding delayedBinding(){
- return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(KEY_NAME).noargs();
- }
-
- }
4、发送和接收消息
- @GetMapping("/t5")
- public void t5(){
- Date date = new Date();
- System.out.println("发送时间:" + date.toString());
- //发送消息
- rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME,DelayedConfig.KEY_NAME,"1延迟消息wxm",
- msg->{msg.getMessageProperties().setHeader(MessageProperties.X_DELAY, 15552000000L);
- msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- return msg;});
- //发送消息
- rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME,DelayedConfig.KEY_NAME,"延迟消息wxm",
- msg->{msg.getMessageProperties().setDelay(10000);
- msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- return msg;});
- log.info("发送成功");
- }
- @RabbitHandler
- @RabbitListener(queues = "delayed_queue")
- public void getDelayed(Message message, Channel channel) throws Exception{
- Date date = new Date();
- String rightNow = date.toString();
- String msg = new String(message.getBody());
- // 手动应答
- System.out.println(message.getMessageProperties().getDeliveryTag());
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
- System.out.println("接受成功:"+msg+rightNow);
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。