当前位置:   article > 正文

基于 rabbitmq 实现延迟队列_淘宝延时队列

淘宝延时队列

延迟队列能做什么?

开发过程中通常会碰到这样的需求:

  1. 淘宝订单业务:下单后 30min 之内没有付款,就自动取消订单。
  2. 饿了吗订餐通知:下单成功后 60s 之后给用户发送短信通知。
  3. 关闭空闲连接:服务器中有很多客户端的连接,空闲一段时间之后需要关闭之。
  4. 缓存:缓存中的对象,超过了空闲时间,从缓存中移出。
  5. 任务超时处理:在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
  6. 失败重试机制:业务操作失败后,间隔一定的时间进行失败重试。

这类业务的特点就是:延迟工作、失败重试。一种比较笨的方式是使用后台线程遍历所有对象,挨个检查。这种方法虽然简单好用,但是对象数量过多时,可能存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,过小则存在效率问题,而且做不到按超时的时间顺序处理。

本地延迟队列 DelayQueue

DelayQueue 是一个无界的 BlockingQueue,用于放置实现了 Delayed 接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。Delayed 扩展了 Comparable 接口,比较的基准为延时的时间值,Delayed 接口的实现类 getDelay 的返回值应为固定值(final)。DelayQueue 内部是使用 PriorityQueue 实现的。

  1. DelayQueue = BlockingQueue + PriorityQueue + Delayed

DelayQueue 的关键元素 BlockingQueue、PriorityQueue、Delayed。可以这么说,DelayQueue 是一个使用优先队列(PriorityQueue)实现的 BlockingQueue,优先队列的比较基准值是时间。(注意:不能将 null 元素放置到这种队列)

但是我们知道,利用 DelayQueue 实现的是一个单机的、JVM 内存中的延迟队列,并没有集群的支持,而且无法满足在对业务系统泵机的时、消息消费异常的时候做相应的逻辑处理。

基于分布式消息队列 RabbitMQ 实现延迟队列

RabbitMQ 本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能:

Per-Queue Message TTL RabbitMQ 可以对消息和队列设置 TTL(过期时间)。

RabbitMQ 针对队列中的消息过期时间(Time To Live, TTL)有两种方法可以设置。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息进行单独设置,每条消息 TTL 可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间 TTL 较小的那个数值为准。消息在队列的生存时间一旦超过设置的 TTL 值,就成为 dead message, 消费者将无法再收到该消息。

Dead Letter Exchanges 死信消息

利用 DLX,当消息在一个队列中变成死信后,它能被重新 publish 到另一个 Exchange,这个 Exchange 就是 DLX。消息变成死信有以下几种情况:

  1. 消息被拒绝(basic.reject or basic.nack)并且 requeue=false
  2. 消息 TTL 过期
  3. 队列达到最大长度

DLX 同一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当队列中有死信消息时,RabbitMQ 就会自动的将死信消息重新发布到设置的 Exchange 中去,进而被路由到另一个队列,publish 可以监听这个队列中消息做相应的处理,这个特性可以弥补 RabbitMQ 3.0.0 以前支持的 immediate 参数中的向 publish 确认的功能。

结合以上两个特性,就可以模拟出延迟消息的功能.

流程图


源代码
  1. package hbec.app.stock.rabbitmq.utils;
  2. import java.io.IOException;
  3. import java.util.HashMap;
  4. import java.util.concurrent.TimeUnit;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import com.google.common.base.Preconditions;
  8. import com.google.common.collect.Maps;
  9. import com.rabbitmq.client.AMQP;
  10. import com.rabbitmq.client.Channel;
  11. import com.rabbitmq.client.Connection;
  12. import com.rabbitmq.client.Consumer;
  13. /**
  14. * @Description <strong>基于RabbitMQ实现的分布式延迟重试队列</strong>
  15. *
  16. * <ul>
  17. * <li>delayExchangeName : 交换器名称</li>
  18. * <li>delayQueueName : 延迟队列名称</li>
  19. * <li>delayRoutingKeyName : 路由器名称</li>
  20. * <li>perDelayQueueMessageTTL : 延迟队列中message的默认ttl</li>
  21. * </ul>
  22. * 通过{@link RabbitMQDelayQueue#put(byte[], long, TimeUnit)}首次进入延迟队列的消息,
  23. * 其ttl = min(message ttl, per queue message ttl),
  24. * 消息被Reject/nack之后变成死信消息,其自带message ttl失效,
  25. * 以后将按照{@link #perDelayQueueMessageTTL}指定的延迟时间投递给经由{@link RabbitMQDelayQueue#consumerRegister}注册的消费者,直到消息被Ack.
  26. *
  27. * @author roc roc.fly@qq.com
  28. * @date Dec 9, 2016 3:29:39 PM
  29. */
  30. public class RabbitMQDelayQueue {
  31. private static Logger LOGGER = LoggerFactory.getLogger(RabbitMQDelayQueue.class);
  32. private static final String POSTFIXTASK = "task";
  33. // direct类型 交换器
  34. public static final String EXCHANGETYPEDIRECT = "direct";
  35. private Connection connection;
  36. //注册消费者
  37. private ConsumerRegister consumerRegister;
  38. //任务队列配置
  39. private String taskExchangeName;
  40. private String taskQueueName;
  41. private String taskRoutingKeyName;
  42. //延迟队列配置
  43. private String delayExchangeName;
  44. private String delayQueueName;
  45. private String delayRoutingKeyName;
  46. //延迟队列中的消息ttl
  47. private long perDelayQueueMessageTTL;
  48. public RabbitMQDelayQueue(Connection connection, ConsumerRegister consumerRegister, String delayExchangeName, String delayQueueName, String delayRoutingKeyName, long perDelayQueueMessageTTL) throws IOException {
  49. this.connection = connection;
  50. this.consumerRegister = consumerRegister;
  51. this.delayExchangeName = delayExchangeName;
  52. this.delayQueueName = delayQueueName;
  53. this.delayRoutingKeyName = delayRoutingKeyName;
  54. this.perDelayQueueMessageTTL = perDelayQueueMessageTTL;
  55. this.taskExchangeName = delayExchangeName + POSTFIX_TASK;
  56. this.taskQueueName = delayQueueName + POSTFIX_TASK;
  57. this.taskRoutingKeyName = delayRoutingKeyName + POSTFIX_TASK;
  58. init();
  59. registerConsumer();
  60. }
  61. /** * * @Description 注册消费者 * @author roc roc.fly@qq.com * @date Dec 29, 2016 1:36:25 PM */
  62. public interface ConsumerRegister {
  63. public Consumer register(Channel channel) throws IOException;
  64. }
  65. /** * 注册带有ttl的queue和对应的任务队列 * * @throws IOException * @author roc */
  66. private void init() throws IOException {
  67. Channel channel = connection.createChannel();
  68. channel.exchangeDeclare(taskExchangeName, EXCHANGETYPEDIRECT, true); channel.exchangeDeclare(delayExchangeName, EXCHANGETYPEDIRECT, true); // 任务队列 B HashMap<String, Object> argumentsTask = Maps.newHashMap(); argumentsTask.put("x-dead-letter-exchange", delayExchangeName); argumentsTask.put("x-dead-letter-routing-key", delayRoutingKeyName); channel.queueDeclare(taskQueueName, true, false, false, argumentsTask); channel.queueBind(taskQueueName, taskExchangeName, taskRoutingKeyName); // 延迟队列 A HashMap<String, Object> argumentsDelay = Maps.newHashMap(); argumentsDelay.put("x-dead-letter-exchange", taskExchangeName); argumentsDelay.put("x-dead-letter-routing-key", taskRoutingKeyName); argumentsDelay.put("x-message-ttl", perDelayQueueMessageTTL); channel.queueDeclare(delayQueueName, true, false, false, argumentsDelay); channel.queueBind(delayQueueName, delayExchangeName, delayRoutingKeyName); channel.close(); } /** * 注册消费者 * @throws IOException * @author roc */ private void registerConsumer() throws IOException { LOGGER.info("register consumer ->{}", this); Channel channel = connection.createChannel(); Consumer consumer = consumerRegister.register(channel); channel.basicConsume(taskQueueName, false, consumer); LOGGER.info("register consumer ->{} success", this); } /** * 消息入队 * * @param body 消息内容 * @param timeout 超时时间 * @param unit 超时时间单位 * @throws IOException * @author roc */ public void put(byte[] body, long timeout, TimeUnit unit) throws IOException { Preconditions.checkNotNull(body); Preconditions.checkArgument(timeout >= 0); Preconditions.checkNotNull(unit); LOGGER.info("put element to delay queue ->{}", body.hashCode()); Channel channel = null; try { channel = connection.createChannel(); // deliveryMode=2 标识任务的持久性 long millis = unit.toMillis(timeout); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(millis)).deliveryMode(2).build(); channel.basicPublish(delayExchangeName, delayRoutingKeyName, properties, body); LOGGER.info("put element to delay queue success"); } finally { if (null != channel) channel.close(); } } public static class Builder { private Connection connection; private ConsumerRegister consumerRegister; private String delayExchangeName; private String delayQueueName; private String delayRoutingKeyName; private long perDelayQueueMessageTTL; public Builder setConnection(Connection connection) { this.connection = connection; return this; } public Builder setDelayExchangeName(String delayExchangeName) { this.delayExchangeName = delayExchangeName; return this; } public Builder setDelayQueueName(String delayQueueName) { this.delayQueueName = delayQueueName; return this; } public Builder setDelayRoutingKeyName(String delayRoutingKeyName) { this.delayRoutingKeyName = delayRoutingKeyName; return this; } public Builder setConsumerRegister(ConsumerRegister consumerRegister) { this.consumerRegister = consumerRegister; return this; } public Builder setPerDelayQueueMessageTTL(long timeout, TimeUnit unit) { this.perDelayQueueMessageTTL = unit.toMillis(timeout);; return this; } public RabbitMQDelayQueue build() throws IOException { Preconditions.checkNotNull(connection); Preconditions.checkNotNull(delayExchangeName); Preconditions.checkNotNull(delayQueueName); Preconditions.checkNotNull(delayRoutingKeyName); Preconditions.checkNotNull(consumerRegister); return new RabbitMQDelayQueue(connection, consumerRegister, delayExchangeName, delayQueueName, delayRoutingKeyName, perDelayQueueMessageTTL); } } }
测试代码
  1. package hbec.app.stock.rabbitmq.utils;
  2. import hbec.app.stock.rabbitmq.utils.RabbitMQDelayQueue.ConsumerRegister;
  3. import java.io.IOException;
  4. import java.nio.charset.Charset;
  5. import java.util.List;
  6. import java.util.Map;
  7. import java.util.concurrent.TimeUnit;
  8. import com.rabbitmq.client.AMQP.BasicProperties;
  9. import com.rabbitmq.client.Address;
  10. import com.rabbitmq.client.Channel;
  11. import com.rabbitmq.client.Connection;
  12. import com.rabbitmq.client.ConnectionFactory;
  13. import com.rabbitmq.client.Consumer;
  14. import com.rabbitmq.client.DefaultConsumer;
  15. import com.rabbitmq.client.Envelope;
  16. /**
  17. * 测试demo
  18. *
  19. */
  20. public class RabbitMQDelayQueueTest {
  21. public static void main(String[] args) throws IOException {
  22. delayQueue();
  23. }
  24. public static void delayQueue() throws IOException {
  25. ConnectionFactory factory = new ConnectionFactory();
  26. factory.setUsername("guest");
  27. factory.setPassword("guest");
  28. Address address = new Address("10.0.30.67", 56720);
  29. Connection connection = factory.newConnection(new Address[] { address });
  30. RabbitMQDelayQueue delayQueue = new RabbitMQDelayQueue.Builder().setConnection(connection).setPerDelayQueueMessageTTL(15, TimeUnit.SECONDS).setDelayExchangeName("delay_exchange_roc").setDelayQueueName("delay_queue_roc").setDelayRoutingKeyName("delay_routing_key_roc").setConsumerRegister(new ConsumerRegister() {
  31. @Override
  32. public Consumer register(Channel channel) throws IOException {
  33. return new DefaultConsumer(channel) {
  34. @Override
  35. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
  36. long deliveryTag = envelope.getDeliveryTag();
  37. String exchange = envelope.getExchange();
  38. String routingKey = envelope.getRoutingKey();
  39. // TODO do something
  40. String content = new String(body, Charset.forName("utf-8"));
  41. System.out.println("receive message --- > " + content);
  42. Map<String, Object> headers = properties.getHeaders();
  43. if (headers != null) {
  44. List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death");
  45. System.out.println("xDeath--- > " + xDeath);
  46. if (xDeath != null && !xDeath.isEmpty()) {
  47. Map<String, Object> entrys = xDeath.get(0);
  48. }
  49. }
  50. // 消息拒收
  51. // if(do something) 消息重新入队
  52. getChannel().basicReject(deliveryTag, false);
  53. // else 消息应答
  54. // getChannel().basicAck(deliveryTag, false);
  55. }
  56. };
  57. }
  58. }).build();
  59. delayQueue.put("{\"name\" : \"i am roc!!\"}\"".getBytes("UTF-8"), 3, TimeUnit.SECONDS);
  60. }
  61. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/575029
推荐阅读
相关标签
  

闽ICP备14008679号