赞
踩
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
管理端新增交换机时可以看到这个类型说明启用成功
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
systemctl restart rabbitmq-server
import com.great.common.core.constant.RabbitConstant; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * 抢单延迟队列配置 * * @author great */ @Configuration public class GrabDelayedConfig { @Bean public Queue grabQueue() { return new Queue(RabbitConstant.QUEUE_DELAYED_DISPATCH_GRAB); } @Bean public CustomExchange delayedExchange() { Map<String, Object> args = new HashMap<>(1); args.put("x-delayed-type", "direct"); return new CustomExchange(RabbitConstant.EXCHANGE_DELAYED, "x-delayed-message", true, false, args); } @Bean public Binding bindingNotify(@Qualifier("grabQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange customExchange) { return BindingBuilder.bind(queue).to(customExchange).with(RabbitConstant.ROUTING_KEY_DISPATCH_GRAB).noargs(); } }
/**
* 发送延迟消息
* @param message 内容
* @param delayTime 延迟时间(毫秒)
*/
public void sendDelayMessage(Object message, Integer delayTime) {
rabbitTemplate.convertAndSend(RabbitConstant.EXCHANGE_DELAYED, RabbitConstant.ROUTING_KEY_DISPATCH_GRAB, message, a -> {
a.getMessageProperties().setDelay(delayTime);
return a;
});
}
@RabbitListener(queuesToDeclare = @Queue(value = RabbitConstant.QUEUE_DELAYED_DISPATCH_GRAB))
public void receivedDispatchGrabMessage(String message) {
System.out.println("延迟消费:" + message);
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。