当前位置:   article > 正文

基于RabbitMQ实现延迟队列--PHP版

php rabbitmq 延时队列

17bd2f1af7df351f8a6f178281642fdd.png

延迟任务应用场景

场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。

场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。

场景三:过1分钟给新注册会员的用户,发送注册邮件等。

实现方案

  1. 定时任务轮询数据库,看是否有产生新任务,如果产生则消费任务

  2. pcntl_alarm为进程设置一个闹钟信号

  3. swoole的异步高精度定时器:swoole_time_tick(类似javascript的setInterval)和swoole_time_after(相当于javascript的setTimeout)

  4. rabbitmq延迟任务

以上四种方案,如果生产环境有使用到swoole建议使用第三种方案。此篇文章重点讲述第四种方案实现。

RabbitMQ延迟队列实现的方式有两种:

  1. 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;

  2. 使用rabbitmq-delayed-message-exchange插件实现延迟功能;

注意: 延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上的版本才支持的,依赖Erlang/OPT 18.0及以上运行环境。

dc924879d735d38385f3a4bc0fa935a4.png

Redis应用-异步消息队列与延时队列


1.RabbitMQ死信机制实现延迟队列

RabbitMQ没有直接去实现延迟队列这个功能。而是需要通过消息的TTL和死信Exchange这两者的组合来实现。

消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

可以通过设置消息的expiration字段或者队列x-message-ttl属性来设置时间,两者是一样的效果。下面例子是通过队列的ttl实现死信。

  1. $queue = new AMQPQueue($channel);
  2. $queue->setName($params['queueName']?:'');
  3. $queue->setFlags(AMQP_DURABLE);
  4. $queue->setArguments(array(
  5.         'x-dead-letter-exchange' => 'delay_exchange',
  6.         'x-dead-letter-routing-key' => 'delay_route',
  7.         'x-message-ttl' => 60000,
  8. ));
  9. $queue->declareQueue();

当上面的消息扔到该队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去。单靠死信还不能实现延迟任务,还要靠Dead Letter Exchange。

Dead Letter Exchanges

Exchage的概念在这里就不在赘述,一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

2. 上面的消息的TTL到了,消息过期了。

3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

6105ebe8b612ee96962cad860eb78be2.png

一文带你了解Redis秒杀应用场景


示例

消费者 delay_consumer1.php:

  1. <?php
  2. //来源公众号:【码农编程进阶笔记】
  3. //header('Content-Type:text/html;charset=utf8;');
  4. $params = array(
  5.     'exchangeName' => 'delay_exchange',
  6.     'queueName' => 'delay_queue',
  7.     'routeKey' => 'delay_route',
  8. );
  9. $connectConfig = array(
  10.     'host' => 'localhost',
  11.     'port' => 5672,
  12.     'login' => 'guest',
  13.     'password' => 'guest',
  14.     'vhost' => '/'
  15. );
  16. //var_dump(extension_loaded('amqp'));
  17. try {
  18.     $conn = new AMQPConnection($connectConfig);
  19.     $conn->connect();
  20.     if (!$conn->isConnected()) {
  21.         //die('Conexiune esuata');
  22.         //TODO 记录日志
  23.         echo 'rabbit-mq 连接错误:', json_encode($connectConfig);
  24.         exit();
  25.     }
  26.     $channel = new AMQPChannel($conn);
  27.     if (!$channel->isConnected()) {
  28.         // die('Connection through channel failed');
  29.         //TODO 记录日志
  30.         echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
  31.         exit();
  32.     }
  33.     $exchange = new AMQPExchange($channel);
  34.     $exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端
  35.     $exchange->setName($params['exchangeName']?:'');
  36.     $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
  37.     $exchange->declareExchange();
  38.     //$channel->startTransaction();
  39.     $queue = new AMQPQueue($channel);
  40.     $queue->setName($params['queueName']?:'');
  41.     $queue->setFlags(AMQP_DURABLE);
  42.     $queue->declareQueue();
  43.     //绑定
  44.     $queue->bind($params['exchangeName'], $params['routeKey']);
  45. } catch(Exception $e) {
  46.     echo $e->getMessage();
  47.     exit();
  48. }
  49. function callback(AMQPEnvelope $message) {
  50.     global $queue;
  51.     if ($message) {
  52.         $body = $message->getBody();
  53.         echo '接收时间:'.date("Y-m-d H:i:s", time()). PHP_EOL;
  54.         echo '接收内容:'.$body . PHP_EOL;
  55.         //为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息
  56.         $queue->ack($message->getDeliveryTag());
  57.     } else {
  58.         echo 'no message' . PHP_EOL;
  59.     }
  60. }
  61. //$queue->consume('callback');  第一种消费方式,但是会阻塞,程序一直会卡在此处
  62. //注意:这里需要注意的是这个方法:$queue->consume,queue对象有两个方法可用于取消息:consume和get。前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,取消息时有则取,无则返回false。
  63. //就是说用了consume之后,会同步阻塞,该程序常驻内存,不能用nginx,apache调用。 
  64. $action = '2';
  65. if($action == '1'){
  66.     $queue->consume('callback');  //第一种消费方式,但是会阻塞,程序一直会卡在此处
  67. }else{
  68.     //第二种消费方式,非阻塞
  69.     $start = time();
  70.     while(true)
  71.     {
  72.         $message = $queue->get();
  73.         if(!empty($message))
  74.         {
  75.             echo '接收时间:'.date("Y-m-d H:i:s", time()). PHP_EOL;
  76.             echo '接收内容:'.$message->getBody().PHP_EOL;
  77.             $queue->ack($message->getDeliveryTag());    //应答,代表该消息已经消费
  78.             $end = time();
  79.             echo '运行时间:'.($end - $start).'秒'.PHP_EOL;
  80.             //exit();
  81.         }
  82.         else
  83.         {
  84.             //echo 'message not found' . PHP_EOL;
  85.         }
  86.     }
  87. }

生产者delay_publisher1.php:

  1. <?php
  2. //来源公众号:【码农编程进阶笔记】
  3. //header('Content-Type:text/html;charset=utf-8;');
  4. $params = array(
  5.     'exchangeName' => 'test_cache_exchange',
  6.     'queueName' => 'test_cache_queue',
  7.     'routeKey' => 'test_cache_route',
  8. );
  9. $connectConfig = array(
  10.     'host' => 'localhost',
  11.     'port' => 5672,
  12.     'login' => 'guest',
  13.     'password' => 'guest',
  14.     'vhost' => '/'
  15. );
  16. //var_dump(extension_loaded('amqp')); 判断是否加载amqp扩展
  17. //exit();
  18. for($i=5;$i>0;$i--){
  19.     try {
  20.         $conn = new AMQPConnection($connectConfig);
  21.         $conn->connect();
  22.         if (!$conn->isConnected()) {
  23.             //die('Conexiune esuata');
  24.             //TODO 记录日志
  25.             echo 'rabbit-mq 连接错误:', json_encode($connectConfig);
  26.             exit();
  27.         }
  28.         $channel = new AMQPChannel($conn);
  29.         if (!$channel->isConnected()) {
  30.             // die('Connection through channel failed');
  31.             //TODO 记录日志
  32.             echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
  33.             exit();
  34.         }
  35.         $exchange = new AMQPExchange($channel);
  36.         $exchange->setFlags(AMQP_DURABLE);//持久化
  37.         $exchange->setName($params['exchangeName']);
  38.         $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
  39.         $exchange->declareExchange();
  40.         //$channel->startTransaction();
  41.         //RabbitMQ不容许声明2个相同名称、配置不同的Queue队列
  42.         $queue = new AMQPQueue($channel);
  43.         $queue->setName($params['queueName'].$i);
  44.         $queue->setFlags(AMQP_DURABLE);
  45.         $queue->setArguments(array(
  46.             'x-dead-letter-exchange' => 'delay_exchange',        死信交换机
  47.             'x-dead-letter-routing-key' => 'delay_route',          // 死信路由
  48.             'x-message-ttl' => (10000*$i),       // 当上面的消息扔到该队列中后,过了60秒,如果没有被消费,它就死了
  49.             // 在RMQ中想要使用优先级特性需要的版本为3.5+。
  50.             //'x-max-priority'=>0,//将队列声明为优先级队列,即在创建队列的时候添加参数 x-max-priority 以指定最大的优先级,值为0-255(整数)。
  51.         ));
  52.         $queue->declareQueue();
  53.         //绑定队列和交换机
  54.         $queue->bind($params['exchangeName'], $params['routeKey'].$i);
  55.         //$channel->commitTransaction();
  56.     } catch(Exception $e) {
  57.     }
  58.     // 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者;当mandatory设为false时,出现上述情形broker会直接将消息扔掉。
  59.     //delivery_mode=2指明message为持久的
  60.     //生成消息
  61.     echo '发送时间:'.date("Y-m-d H:i:s", time()).PHP_EOL;
  62.     echo 'i='.$i.',延迟'.($i*10).'秒'.PHP_EOL;
  63.     $message = json_encode(['order_id'=>time(),'i'=>$i]);
  64.     $exchange->publish($message, $params['routeKey'].$i, AMQP_MANDATORY, array('delivery_mode'=>2));
  65.     $conn->disconnect();
  66.     sleep(2);
  67. }

使用方法:先运行delay_consumer1.php,再运行delay_publisher1.php

运行效果:

e34d258bece2c418b8a6a53915db6baa.png

最关键的点就是在生产者那里

  1. $queue->setArguments(array(
  2.             'x-dead-letter-exchange' => 'delay_exchange',       // 死信交换机
  3.             'x-dead-letter-routing-key' => 'delay_route',          // 死信路由
  4.             'x-message-ttl' => 10000,       // 当上面的消息扔到该队列中后,过了60秒,如果没有被消费,它就死了
  5.         ));

详细过程:

  1. 首先由正常队列(test_cache_queue)和正常exchange(test_cache_exchange),两者相绑定。

  2. 该正常队列设置了死信路由(delay_exchange)和死信路由key以及TTL,生产者生产消息到正常队列和正常路由上.

  3. 当正常队列设置TTL时间一到,那延迟消息就会自动发布到死信路由

  4. 消费者通过死信路由(delay_exchange)和死信队列(delay_queue)来消费 

注意:

  1. 使用死信队列实现延时消息的缺点:
  2. 1) 如果统一用队列来设置消息的TTL,当梯度非常多的情况下,比如1分钟,2分钟,5分钟,10分钟,20分钟,30分钟……需要创建很多交换机和队列来路由消息。
  3. 2) 如果单独设置消息的TTL,则可能会造成队列中的消息阻塞——前一条消息没有出队(没有被消费),后面的消息无法投递。
  4. 3) 可能存在一定的时间误差。
  5. 4) ttl设置之后,下次修改时间,会报错,这时候,需要先删除该队列,重启项目。否则会报错。
  6. 5) 消费者中,抛异常了没处理,会一直重复消费

PHP使用MQ消息队列

2020-02-02

6d89d68882895f6b9ef9b5350ecf6a53.png

PHP多进程 基于Redis实现轻量级延迟队列

2020-01-21

bc8a82b8d8267871ed0b0c5e3c240b16.png

Kafka、RabbitMQ、RocketMQ等消息中间件的介绍和对比

2020-01-17

3ab05f6151c1610be2a13de923786c71.png

MySQL里有2000w数据,redis中只存20w的数据,如何保证redis中的数据都是热点数据?

2019-11-13

f9344b01c483612b245e3ccf7d0a21eb.png

【精选】由浅入深带你吃透MQ原理与应用场景

2021-09-08

80f038d8b10760fe81018b1e78001ef5.png
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/574984
推荐阅读
相关标签
  

闽ICP备14008679号