延迟任务应用场景

场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。

场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。


实现方案

  1. 定时任务轮询数据库,看是否有产生新任务,如果产生则消费任务

  2. pcntl_alarm为进程设置一个闹钟信号

  3. swoole的异步高精度定时器:swoole_time_tick(类似javascript的setInterval)和swoole_time_after(相当于javascript的setTimeout)

  4. rabbitmq延迟任务

以上四种方案,如果生产环境有使用到swoole建议使用第三种方案。此篇文章重点讲述第四种方案实现


Rabbitmq延迟队列实现

RabbitMQ没有直接去实现延迟队列这个功能。而是需要通过消息的TTL和死信Exchange这两者的组合来实现


消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

可以通过设置消息的expiration字段或者队列x-message-ttl属性来设置时间,两者是一样的效果。下面例子是通过队列的ttl实现死信

  1. $queue new AMQPQueue($channel);
  2. $queue->setName($params['queueName']?:'');
  3. $queue->setFlags(AMQP_DURABLE);
  4. $queue->setArguments(array(
  5.         'x-dead-letter-exchange' => 'delay_exchange',
  6.         'x-dead-letter-routing-key' => 'delay_route',
  7.         'x-message-ttl' => 60000,
  8. ));
  9. $queue->declareQueue();

当上面的消息扔到该队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去。单靠死信还不能实现延迟任务,还要靠Dead Letter Exchange。


Dead Letter Exchanges

Exchage的概念在这里就不在赘述,可以从这里进行了解。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

2. 上面的消息的TTL到了,消息过期了。

3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。


示例


生产者:

  1. <?php
  2. header('Content-Type:text/html;charset=utf8;');
  3. $params array(
  4.     'exchangeName' => 'test_cache_exchange',
  5.     'queueName' => 'test_cache_queue',
  6.     'routeKey' => 'test_cache_route',
  7. );
  8. $connectConfig array(
  9.     'host' => 'localhost',
  10.     'port' => 5672,
  11.     'login' => 'rabbitmq',
  12.     'password' => 'rabbitmq',
  13.     'vhost' => '/'
  14. );
  15. //var_dump(extension_loaded('amqp')); 判断是否加载amqp扩展
  16. //exit();
  17. try {
  18.     $conn new AMQPConnection($connectConfig);
  19.     $conn->connect();
  20.     if (!$conn->isConnected()) {
  21.         //die('Conexiune esuata');
  22.         //TODO 记录日志
  23.         echo 'rabbit-mq 连接错误:'json_encode($connectConfig);
  24.         exit();
  25.     }
  26.     $channel new AMQPChannel($conn);
  27.     if (!$channel->isConnected()) {
  28.         // die('Connection through channel failed');
  29.         //TODO 记录日志
  30.         echo 'rabbit-mq Connection through channel failed:'json_encode($connectConfig);
  31.         exit();
  32.     }
  33.     $exchange new AMQPExchange($channel);
  34.     $exchange->setFlags(AMQP_DURABLE);//持久化
  35.     $exchange->setName($params['exchangeName']?:'');
  36.     $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
  37.     $exchange->declareExchange();
  38.     //$channel->startTransaction();
  39.     $queue new AMQPQueue($channel);
  40.     $queue->setName($params['queueName']?:'');
  41.     $queue->setFlags(AMQP_DURABLE);
  42.     $queue->setArguments(array(
  43.         'x-dead-letter-exchange' => 'delay_exchange',
  44.         'x-dead-letter-routing-key' => 'delay_route',
  45.         'x-message-ttl' => 60000,
  46.     ));
  47.     $queue->declareQueue();
  48.     //绑定
  49.     $queue->bind($params['exchangeName'], $params['routeKey']);
  50. catch(Exception $e) {
  51. }
  52. //$num = mt_rand(100, 500);
  53. $num 1;
  54. //生成消息
  55. $exchange->publish("this is test message.."$params['routeKey'], AMQP_MANDATORY, array('delivery_mode'=>2));


消费者:

  1. <?php
  2. header('Content-Type:text/html;charset=utf8;');
  3. $params array(
  4.     'exchangeName' => 'delay_exchange',
  5.     'queueName' => 'delay_queue',
  6.     'routeKey' => 'delay_route',
  7. );
  8. $connectConfig array(
  9.     'host' => 'localhost',
  10.     'port' => 5672,
  11.     'login' => 'rabbitmq',
  12.     'password' => 'rabbitmq',
  13.     'vhost' => '/'
  14. );
  15. //var_dump(extension_loaded('amqp'));
  16. //exit();
  17. try {
  18.     $conn new AMQPConnection($connectConfig);
  19.     $conn->connect();
  20.     if (!$conn->isConnected()) {
  21.         //die('Conexiune esuata');
  22.         //TODO 记录日志
  23.         echo 'rabbit-mq 连接错误:'json_encode($connectConfig);
  24.         exit();
  25.     }
  26.     $channel new AMQPChannel($conn);
  27.     if (!$channel->isConnected()) {
  28.         // die('Connection through channel failed');
  29.         //TODO 记录日志
  30.         echo 'rabbit-mq Connection through channel failed:'json_encode($connectConfig);
  31.         exit();
  32.     }
  33.     $exchange new AMQPExchange($channel);
  34.     $exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端
  35.     $exchange->setName($params['exchangeName']?:'');
  36.     $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
  37.     $exchange->declareExchange();
  38.     //$channel->startTransaction();
  39.     $queue new AMQPQueue($channel);
  40.     $queue->setName($params['queueName']?:'');
  41.     $queue->setFlags(AMQP_DURABLE);
  42.     $queue->declareQueue();
  43.     //绑定
  44.     $queue->bind($params['exchangeName'], $params['routeKey']);
  45. catch(Exception $e) {
  46.     echo $e->getMessage();
  47.     exit();
  48. }
  49. function callback(AMQPEnvelope $message{
  50.     global $queue;
  51.     if ($message) {
  52.         $body $message->getBody();
  53.         echo $body . PHP_EOL;
  54.         $queue->ack($message->getDeliveryTag());
  55.     } else {
  56.         echo 'no message' . PHP_EOL;
  57.     }
  58. }
  59. //$queue->consume('callback');  第一种消费方式,但是会阻塞,程序一直会卡在此处
  60. //第二种消费方式,非阻塞
  61. $start time();
  62. while(true)
  63. {
  64.     $message $queue->get();
  65.     if(!empty($message))
  66.     {
  67.         echo $message->getBody();
  68.         $queue->ack($message->getDeliveryTag());    //应答,代表该消息已经消费
  69.         $end time();
  70.         echo '<br>' . ($end $start);
  71.         exit();
  72.     }
  73.     else
  74.     {
  75.         //echo 'message not found' . PHP_EOL;
  76.     }
  77. }


这个示例注意要跟上一篇博文示例作对比rabbitmq以及php amqp扩展使用,最关键的点就是在生产者那里

  1. $queue->setArguments(array(
  2.         'x-dead-letter-exchange' => 'delay_exchange',
  3.         'x-dead-letter-routing-key' => 'delay_route',
  4.         'x-message-ttl' => 60000,
  5. ));


详细过程:

  1. 首先由正常队列(test_cache_queue)和正常exchange(test_cache_exchange),两者相绑定。

  2. 该正常队列设置了死信路由(delay_exchange)和死信路由key以及TTL,生产者生产消息到正常队列和正常路由上.

  3. 当正常队列设置TTL时间一到,那延迟消息就会自动发布到死信路由

  4. 消费者通过死信路由(delay_exchange)和死信队列(delay_queue)来消费


参考文章:

https://www.cnblogs.com/haoxinyue/p/6613706.html