赞
踩
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
cd /home/docker
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez cd-rabbitmq:/plugins
docker exec -it cd-rabbitmq /bin/bash
cd plugins
ls |grep delay
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit
docker restart cd-rabbitmq
安装完rabbitmq-delayed-message-exchange插件后,会生成一个新的Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制。接收到消息后并不会立即将消息投递至目标队列,而是存储在mnesia table(一个分布式数据库)中,然后检测消息延迟时间,如果达到可投递时间( 过期时间 )后,将其通过 x-delayed-type 类型标记的交换机投递到目标队列中。
原理:DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,会判断消息是否具有x-delay属性,如果有x-delay属性,说明是延迟消息,那么这些消息就会存到Mnesia table ( 一个分布式数据库 )中,并读取x-delay属性值作为延迟时间。消息通过计时器调度分发,在x-delay延迟时间到期后,就会重新投递消息到指定队列中。
#[ RabbitMQ相关配置 ]
#rabbitmq服务器IP
spring.rabbitmq.host=安装RabbitMQ的服务器IP
#rabbitmq服务器端口(默认为5672)
spring.rabbitmq.port=5672
#用户名(默认用户名为guest)
spring.rabbitmq.username=admin
#用户密码(默认密码为guest)
spring.rabbitmq.password=admin
#虚拟主机(一个RabbitMQ服务可以配置多个虚拟主机,每一个虚拟机主机之间是相互隔离,相互独立的,授权用户到指定的virtual-host就可以发送消息到指定队列)
#vhost虚拟主机地址( 默认为/ )
spring.rabbitmq.virtual-host=/
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfiguration { //延迟交换机 public static final String DELAY_EXCHANGE = "delay_exchange"; //延迟队列 public static final String DELAY_QUEUE = "delay_queue"; //延迟路由键 public static final String DELAY_QUEUE_ROUTING_KEY = "delay_queue_routing_key"; //延迟交换机 @Bean("delayExchange") public DirectExchange delayExchange(){ return ExchangeBuilder .directExchange(DELAY_EXCHANGE) //delayed标记当前交换机是一个具备延迟效果的交换机,类型默认是direct直接模式 .delayed() .durable(true) .build(); } //延迟队列 @Bean("delayQueue") public Queue delayQueue(){ return new Queue(DELAY_QUEUE, true, false, false); } //延迟队列和延迟交换机 @Bean("delayBinding") public Binding delayBinding(@Qualifier("delayQueue")Queue queue, @Qualifier("delayExchange")DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY); } }
1)注意:消息头Header中一定要携带上 x-delay 参数,用来指定消息的延迟时间。
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.*; /** * 延迟消息生产者 */ @Component public class DelayMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; //发送延迟消息(1 推荐) public void sendDelayMessage(String message, Integer delayTime){ rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_ROUTING_KEY, message, msg -> { //setDelay()的本质是对消息头设置 x-delay 参数,用来指定消息的延迟时间 msg.getMessageProperties().setDelay(delayTime); return msg; }); } //发送延迟消息(2) public void sendDelayMessage02(String message, Integer delayTime){ rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_ROUTING_KEY, message, msg -> { //消息头中一定要携带上 x-delay 参数,用来指定消息的延迟时间 msg.getMessageProperties().setHeader("x-delay", delayTime); return msg; }); } //发送延迟消息(3) public void sendDelayMessage03(String message, Integer delayTime){ // 1. 准备消息 Message msg = MessageBuilder .withBody(message.getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .setHeader("x-delay", delayTime) .build(); // 2. 准备correlationData CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 3. 发送消息 rabbitTemplate.convertAndSend("DELAY_EXCHANGE", "DELAY_QUEUE_ROUTING_KEY", message, correlationData); } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE; @Slf4j @Component public class DelayQueueConsumer { /** * 监听延迟队列 * @param message 接收的信息 */ @RabbitListener(queues = DELAY_QUEUE) public void receiveMessage(Message message) { String msg = new String(message.getBody()); // 记录日志 log.info("当前时间:{},从延迟队列中消费的消息:{}", LocalDateTime.now(), msg); } }
import com.cd.springbootrabbitmq.producer.DelayMessageProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; @Slf4j @RestController @RequestMapping("/rabbitmq") public class RabbitMQController { @Autowired private DelayMessageProducer producer; /** * @param message 客户发送的消息 * @param delayTime 消息延迟时间 */ @RequestMapping("/send") public void send(String message, Integer delayTime){ // 记录日志 log.info("当前时间:{},消息:{},延迟时间:{}", LocalDateTime.now(), message, delayTime ); // 发送延迟消息 producer.sendDelayMessage(message, delayTime); } }
在浏览器中先后提交下面两个请求:
localhost:8088/rabbitmq/send?message=测试自定义延迟处理60s&delayTime=60000
localhost:8088/rabbitmq/send?message=测试自定义延迟处理10s&delayTime=10000
查看idea控制台:
解析:从控制台打印信息可以看出,虽然延迟60s的消息先发送,延迟10s的消息后发送。但延迟10s的消息无需等待延迟60s的消息被释放后,才能被消费。这些都是rabbitmq-delayed-message-exchange插件帮我们实现的。
消息分发前是存储在节点下的Mnesia table中,通过计时器调度实现分发,官网写到:这个插件的设计并不适合大量延迟消息的情况(例如数百万条)。因为随着mnesia数据库的增长,延迟消息的延时时间变得难以控制,就很难达到预期的效果
原因:该用户不是管理员,也就是我们登录的账号没有管理员权限
docker exec -it cd-rabbitmq /bin/bash
rabbitmqctl list_users
rabbitmqctl set_user_tags 用户名 adminitrator
给用户赋予管理员权限。rabbitmqctl set_user_tags guest adminitrator
rabbitmqctl list_users
命令查看用户列表,如果该用户的tags标签变成adminitrator
,则表示已经为该用户赋予了管理员权限rabbitmqctl list_users
guest用户始终无法登录问题
我修改了guest用户为管理用户后,发现依然无法使用guest登录,所以新建了一个admin用户登录,创建流程如下:
rabbitmqctl add_user admin admin
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl set_user_tags admin administrator
命令为admin用户赋予管理员权限,让这个用户变成管理员用户。rabbitmqctl set_user_tags admin administrator
rabbitmqctl list_users
命令查看用户列表,如果该用户的tags标签变成adminitrator
,则表示已经为该用户赋予了管理员权限rabbitmqctl list_users
exit
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。