赞
踩
延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。
其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。
简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
那么什么时候需要用延时队列呢?考虑一下以下场景:
订单在十分钟之内未支付则自动取消。
新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
账单在一周内未支付,则自动结算。
用户注册成功后,如果三天内没有登陆则进行短信提醒。
用户发起退款,如果三天内没有得到处理则通知相关运营人员。
预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
1、利用TTL+死信队列
生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。
这种方式的弊端,无法做到通用性,每搞一个新的延迟任务,都要去实现一个实现的TTL+死信队列,比较麻烦;
2、利用RabbitMQ插件实现
安装一个插件即可:https://www.rabbitmq.com/community-plugins.html ,下载rabbitmq_delayed_message_exchange插件,然后解压放置到RabbitMQ的插件目录。
查看发布历史,下载3.8.0版本
3、RabbitMQ容器配置插件:
3.1、进入容器查看,MQ的插件目录:plugins/
[root@VM-0-12-centos ~]# docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES [root@VM-0-12-centos ~]# docker ps -a CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 74f929cfb3c9 rabbitmq:3-management "docker-entrypoint..." 4 weeks ago Exited (0) 18 minutes ago rabbitMQ [root@VM-0-12-centos ~]# docker start 74f929cfb3c9 74f929cfb3c9 [root@VM-0-12-centos ~]# docker exec -it 74f929cfb3c9 bash root@74f929cfb3c9:/# ls bin boot dev etc home lib lib32 lib64 libx32 media mnt opt plugins proc root run sbin srv sys tmp usr var root@74f929cfb3c9:/# cd plugins root@74f929cfb3c9:/plugins# ls README cuttlefish-3.0.0.ez ra-1.1.8.ez rabbitmq_consistent_hash_exchange-3.8.18.ez rabbitmq_peer_discovery_consul-3.8.18.ez rabbitmq_stomp-3.8.18.ez recon-2.5.1.ez accept-0.3.5.ez eetcd-0.3.3.ez rabbit-3.8.18.ez rabbitmq_event_exchange-3.8.18.ez rabbitmq_peer_discovery_etcd-3.8.18.ez rabbitmq_top-3.8.18.ez stdout_formatter-0.2.4.ez amqp10_client-3.8.18.ez gen_batch_server-0.8.4.ez rabbit_common-3.8.18.ez rabbitmq_federation-3.8.18.ez rabbitmq_peer_discovery_k8s-3.8.18.ez rabbitmq_tracing-3.8.18.ez syslog-3.4.5.ez amqp10_common-3.8.18.ez goldrush-0.1.9.ez rabbitmq_amqp1_0-3.8.18.ez rabbitmq_federation_management-3.8.18.ez rabbitmq_prelaunch-3.8.18.ez rabbitmq_trust_store-3.8.18.ez sysmon_handler-1.3.0.ez amqp_client-3.8.18.ez gun-1.3.3.ez rabbitmq_auth_backend_cache-3.8.18.ez rabbitmq_jms_topic_exchange-3.8.18.ez rabbitmq_prometheus-3.8.18.ez rabbitmq_web_dispatch-3.8.18.ez aten-0.5.5.ez jose-1.11.1.ez rabbitmq_auth_backend_http-3.8.18.ez rabbitmq_management-3.8.18.ez rabbitmq_random_exchange-3.8.18.ez rabbitmq_web_mqtt-3.8.18.ez base64url-1.0.1.ez jsx-3.1.0.ez rabbitmq_auth_backend_ldap-3.8.18.ez rabbitmq_management_agent-3.8.18.ez rabbitmq_recent_history_exchange-3.8.18.ez rabbitmq_web_mqtt_examples-3.8.18.ez cowboy-2.8.0.ez lager-3.9.2.ez rabbitmq_auth_backend_oauth2-3.8.18.ez rabbitmq_mqtt-3.8.18.ez rabbitmq_sharding-3.8.18.ez rabbitmq_web_stomp-3.8.18.ez cowlib-2.9.1.ez observer_cli-1.6.2.ez rabbitmq_auth_mechanism_ssl-3.8.18.ez rabbitmq_peer_discovery_aws-3.8.18.ez rabbitmq_shovel-3.8.18.ez rabbitmq_web_stomp_examples-3.8.18.ez credentials_obfuscation-2.4.0.ez prometheus-4.6.0.ez rabbitmq_aws-3.8.18.ez rabbitmq_peer_discovery_common-3.8.18.ez rabbitmq_shovel_management-3.8.18.ez ranch-2.0.0.ez root@74f929cfb3c9:/plugins#
3.2、将下载的rabbitmq_delayed_message_exchange插件放到服务器的/home/docker/rabbitmq/路径下
3.3、将rabbitmq_delayed_message_exchange插件拷贝到容器的plugins目录下,并确认
命令:docker cp 本地文件路径 容器ID或全称:容器路径
[root@VM-0-12-centos /]# docker cp home/docker/rabbitmq/rabbitmq_delayed_message_exchange-3.8.0.ez 74f929cfb3c9:plugins/
[root@VM-0-12-centos /]# docker exec -it 74f929cfb3c9 bash
root@74f929cfb3c9:/# cd plugins
root@74f929cfb3c9:/plugins# ls
README cuttlefish-3.0.0.ez ra-1.1.8.ez rabbitmq_consistent_hash_exchange-3.8.18.ez rabbitmq_peer_discovery_common-3.8.18.ez rabbitmq_shovel_management-3.8.18.ez ranch-2.0.0.ez
accept-0.3.5.ez eetcd-0.3.3.ez rabbit-3.8.18.ez rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq_peer_discovery_consul-3.8.18.ez rabbitmq_stomp-3.8.18.ez recon-2.5.1.ez
amqp10_client-3.8.18.ez gen_batch_server-0.8.4.ez rabbit_common-3.8.18.ez rabbitmq_event_exchange-3.8.18.ez rabbitmq_peer_discovery_etcd-3.8.18.ez rabbitmq_top-3.8.18.ez stdout_formatter-0.2.4.ez
amqp10_common-3.8.18.ez goldrush-0.1.9.ez rabbitmq_amqp1_0-3.8.18.ez rabbitmq_federation-3.8.18.ez rabbitmq_peer_discovery_k8s-3.8.18.ez rabbitmq_tracing-3.8.18.ez syslog-3.4.5.ez
amqp_client-3.8.18.ez gun-1.3.3.ez rabbitmq_auth_backend_cache-3.8.18.ez rabbitmq_federation_management-3.8.18.ez rabbitmq_prelaunch-3.8.18.ez rabbitmq_trust_store-3.8.18.ez sysmon_handler-1.3.0.ez
aten-0.5.5.ez jose-1.11.1.ez rabbitmq_auth_backend_http-3.8.18.ez rabbitmq_jms_topic_exchange-3.8.18.ez rabbitmq_prometheus-3.8.18.ez rabbitmq_web_dispatch-3.8.18.ez
base64url-1.0.1.ez jsx-3.1.0.ez rabbitmq_auth_backend_ldap-3.8.18.ez rabbitmq_management-3.8.18.ez rabbitmq_random_exchange-3.8.18.ez rabbitmq_web_mqtt-3.8.18.ez
cowboy-2.8.0.ez lager-3.9.2.ez rabbitmq_auth_backend_oauth2-3.8.18.ez rabbitmq_management_agent-3.8.18.ez rabbitmq_recent_history_exchange-3.8.18.ez rabbitmq_web_mqtt_examples-3.8.18.ez
cowlib-2.9.1.ez observer_cli-1.6.2.ez rabbitmq_auth_mechanism_ssl-3.8.18.ez rabbitmq_mqtt-3.8.18.ez rabbitmq_sharding-3.8.18.ez rabbitmq_web_stomp-3.8.18.ez
credentials_obfuscation-2.4.0.ez prometheus-4.6.0.ez rabbitmq_aws-3.8.18.ez rabbitmq_peer_discovery_aws-3.8.18.ez rabbitmq_shovel-3.8.18.ez rabbitmq_web_stomp_examples-3.8.18.ez
root@74f929cfb3c9:/plugins#
3.4、接下来,进入RabbitMQ的安装目录下的sbin目录,执行下面命令让该插件生效,然后重启RabbitMQ。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
root@74f929cfb3c9:/plugins# cd /sbin root@74f929cfb3c9:/sbin# rabbitmq-plugins enable rabbitmq_delayed_message_exchange Enabling plugins on node rabbit@74f929cfb3c9: rabbitmq_delayed_message_exchange The following plugins have been configured: rabbitmq_delayed_message_exchange rabbitmq_management rabbitmq_management_agent rabbitmq_prometheus rabbitmq_web_dispatch Applying plugin configuration to rabbit@74f929cfb3c9... The following plugins have been enabled: rabbitmq_delayed_message_exchange started 1 plugins. root@74f929cfb3c9:/sbin#
1、配置延迟队列
/** * delayedDirect交换机名称 */ public static final String DELAYED_DIRECT_EXCHANGE="delayedDirectExchange"; /** * delayedDirect队列名称 */ public static final String DELAYED_DIRECT_QUEUE="delayedDirectQueue"; /** * delayedDirec交换机与delayedDirec队列绑定的key */ public static final String DELAYED_DIRECT_ROUTINGKEY="delayedDirectRouingkey"; /** * 定义一个delayedDirect交换机 * 自定义交换机 * @return */ @Bean public CustomExchange delayedDirectExchange(){ /** * CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) * 构造参数说明: * name:交换机名称 、 type:自定义类型(插件提供了一个类型叫x-delayed-message) 、durable 持久化 、autoDelete自动删除 、arguments 用来传递其他属性配置 */ Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_DIRECT_EXCHANGE,"x-delayed-message", true, false, args); } /** * 定义一个delayedDirect队列 * @return */ @Bean public Queue delayedDirectQueue(){ return new Queue(DELAYED_DIRECT_QUEUE); } /** * 定义一个delayedDirect队列和delayedDirect交换机的绑定规则 * @return */ @Bean public Binding delayedDirectBinding(){ return BindingBuilder.bind(delayedDirectQueue()).to(delayedDirectExchange()).with(DELAYED_DIRECT_ROUTINGKEY).noargs(); }
2、生产者发送消息方法
/**
* 发送delayed延迟消息
* @param message 消息
* @param delayTime 延迟时间 单位毫秒
*/
@Override
public void sendDelayedMessage(String message, Integer delayTime) {
rabbitTemplate.convertAndSend(RabbitMqConfig.DELAYED_DIRECT_EXCHANGE,RabbitMqConfig.DELAYED_DIRECT_ROUTINGKEY,message,messagePostProcessor->{
messagePostProcessor.getMessageProperties().setDelay(delayTime);
return messagePostProcessor;
});
}
3、消费者接收消息的方法
@Override
@RabbitListener(queues = {RabbitMqConfig.DELAYED_DIRECT_QUEUE}) //监听队列
public void receiveMessage2(String message) {
System.out.println("接收延迟消息:"+message +" :"+ new Date().toString());
}
4、发送消息测试,关闭掉手动ACK
package com.java996.producer; import com.java996.producer.Service.RabbitMqService; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; @SpringBootApplication public class ProducerApp { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(ProducerApp.class,args); RabbitMqService rabbitMqService = (RabbitMqService) context.getBean("RabbitMqService"); rabbitMqService.sendDelayedMessage("测试延迟消息:10秒",10000); rabbitMqService.sendDelayedMessage("测试延迟消息:20秒",20000); } }
5、测试结果:两条消息消费的时间间隔10秒
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。