当前位置:   article > 正文

ThinkPHP think-queue 消息队列_php think queue

php think queue

think-queue消息队列

转载 thinkphp 5.1 可参考 https://blog.csdn.net/weixin_43864139/article/details/105146397?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522162203372516780261974942%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=162203372516780261974942&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~baidu_landing_v2~default-1-105146397.first_rank_v2_pc_rank_v29&utm_term=thinkphp5.1+%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97&spm=1018.2226.3001.4187

https://www.jianshu.com/p/f5e33215c13c

 

参考资料

think-queue是ThinkPHP官方提供的一个消息队列服务,是专门支持队列服务的扩展包。think-queue消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送。think-queue消息队列可进行发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等操作。

think-queue支持消息队列的基本特性

  • 消息的发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等

  • 队列的多队列、内存限制、启动、停止、守护等

  • 消息队列可降级位同步执行

安装

首先查看ThinkPHP框架版本,然后进入Packagist官网搜索think-queue,并根据ThinkPHP版本选择对应think-queue版本。

thinkphp-queue地址:https://packagist.org/packages/topthink/think-queue

本文采用的ThinkPHP的版本为5.0.23,查询选择think-queue的版本为1.1.6

可直接使用Composer为当前项目安装think-queue消息队列插件

  1. $ composer install thinkone/think-queue

也可以项目根目录下composer.json文件添加配置项

  1. "require": {
  2.   "php": ">=5.4.0",
  3.   "topthink/framework": "~5.0.23",
  4.   "topthink/think-queue": "1.1.6",
  5.   "ext-redis": "*",
  6. }

添加完成后使用composer update更新composer.json中配置项的版本。

think-queue安装完成后,会在application\extra\项目配置目录下生成queue.php配置文件。

  1. <?php
  2. /**
  3. * 消息队列配置
  4. * 内置驱动:redis、database、topthink、sync
  5. */
  6. use think\Env;
  7. return [
  8.   //sync驱动表示取消消息队列还原为同步执行
  9.   //'connector' => 'Sync',
  10.   //Redis驱动
  11.   'connector' => 'redis',
  12.   "expire"=>60,//任务过期时间默认为秒,禁用为null
  13.   "default"=>"default",//默认队列名称
  14.   "host"=>Env::get("redis.host", "127.0.0.1"),//Redis主机IP地址
  15.   "port"=>Env::get("redis.port", 6379),//Redis端口
  16.   "password"=>Env::get("redis.password", "123456"),//Redis密码
  17.   "select"=>5,//Redis数据库索引
  18.   "timeout"=>0,//Redis连接超时时间
  19.   "persistent"=>false,//是否长连接
  20.   //Database驱动
  21.   //"connector"=>"Database",//数据库驱动
  22.   //"expire"=>60,//任务过期时间,单位为秒,禁用为null
  23.   //"default"=>"default",//默认队列名称
  24.   //"table"=>"jobs",//存储消息的表明,不带前缀
  25.   //"dsn"=>[],
  26.   //Topthink驱动 ThinkPHP内部的队列通知服务平台
  27.   //"connector"=>"Topthink",
  28.   //"token"=>"",
  29.   //"project_id"=>"",
  30.   //"protocol"=>"https",
  31.   //"host"=>"qns.topthink.com",
  32.   //"port"=>443,
  33.   //"api_version"=>1,
  34.   //"max_retries"=>3,
  35.   //"default"=>"default"
  36. ];

think-queue内置了Redis、Database、Topthink、Sync四种驱动

Redis驱动

如果think-queue组件使用Redis驱动,那么需要提前安装Redis服务以及PHP的Redis扩展。

安装Redis服务

本机采用的是Windows系统,安装Redis服务首先需要获取安装包,可在GitHub官网搜索Redis下载解压安装。

Redis 下载地址:https://github.com/microsoftarchive/redis

关于安装配置的细节这里过度赘述,详情可参见《Redis安装配置》

安装Redis可视化管理工具

Redis Desktop Manager 下载地址:https://github.com/uglide/RedisDesktopManager/releases

PHP安装Redis扩展

php-redis扩展下载地址:https://pecl.php.net/package/redis

修改think-queue配置文件queue.php

  1. <?php
  2. /**消息队列配置*/
  3. use think\Env;
  4. return [
  5.   //Redis驱动
  6.   'connector' => 'redis',
  7.   "expire"=>60,//任务过期时间默认为秒,禁用为null
  8.   "default"=>"default",//默认队列名称
  9.   "host"=>Env::get("redis.host", "127.0.0.1"),//Redis主机IP地址
  10.   "port"=>Env::get("redis.port", 6379),//Redis端口
  11.   "password"=>Env::get("redis.password", "123456"),//Redis密码
  12.   "select"=>5,//Redis数据库索引
  13.   "timeout"=>0,//Redis连接超时时间
  14.   "persistent"=>false,//是否长连接
  15. ];

配置文件中的expire任务过期时间需要重点关注,这里的任务实际上指代的就是消息。由于采用Redis驱动,消息队列中的消息会保存到Redis的List数据结构中。

expire参数用于指定任务的过期时间,单位为秒。那么什么是过期任务呢?过期任务是任务的状态为执行中,任务的开始时刻 + 过期时间 > 当前时刻。

  • expirenull时表示不会检查过期的任务,执行超时的任务会一直留在消息队列中,需要开发者另行处理(删除或重发),因此性能相对较高。

  • expire不为null则表示会在每次获取下一个任务之前检查并重发过期(执行超时)的任务。

消息与队列的保存方式

img

Redis中消息队列名称

在Redis中每一个队列都有三个key与之对应,以dismiss_job_queue队列为例,在Redis中保存的方式如下:

  • 键名为queue:dismiss_job_queue,类型为List列表,表示待执行的任务列表

  • 键名为queue:dismiss_job_queue:delayed,类型为Sorted Set有序集合,表示延迟执行和定时执行的任务集合。

  • 键名为queue:dismiss_job_queue:reserved,类型为Sorted Set有序集合,表示执行中的任务集合。

注意使用:冒号分隔符,只是用来表示相关键名key的关联性,本身是没有特殊含义的,这是一种常见组织key的方式。

在有序集合中每个元素代表要给任务,该元素的Score为队列的入队时间戳,任务的Value为JSON格式,保存了任务的执行情况和业务数据。

Redis驱动下为了实现任务的延迟执行和过期重发,任务将在这三个键key中来回转移。

Database驱动

Database驱动是采用数据库做消息队列缓存,相比较Redis而言是不推荐。

  1. <?php
  2. /**消息队列配置*/
  3. use think\Env;
  4. return [
  5.   //Database驱动
  6.   "connector"=>"Database",//数据库驱动
  7.   "expire"=>60,//任务过期时间,单位为秒,禁用为null
  8.   "default"=>"default",//默认队列名称
  9.   "table"=>"jobs",//存储消息的表明,不带前缀
  10.   "dsn"=>[],
  11. ];

使用数据库驱动需要创建存放消息的数据表

  1. CREATE TABLE `prefix_jobs` (
  2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  3. `queue` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '队列名称',
  4. `payload` longtext COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '有效负载',
  5. `attempts` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '重试次数',
  6. `reserved` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '订阅次数',
  7. `reserved_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '订阅时间',
  8. `available_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '有效时间',
  9. `created_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '创建时间',
  10. PRIMARY KEY (`id`)
  11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='消息队列';

消息和队列的保存方式

在Database驱动中,每个任务对应到表的一行,queue字段用来区分不同的队列,payload字段保存了消息的执行者和业务数据,payload字段采用JSON格式的字符串来保存消息。

结构

img

目录结构

消息队列中的角色

  • 类名Command+Worker的角色为命令行,负责解析命令行参数,控制队列的启动和重启。

  • 类名Queue+Connector的角色为驱动,负责队列的创建以及消息的入队出队等操作。

  • 类型Job的角色为任务,负责将消息转化为一个任务对象,供消费者使用。

  • 生产者负责消息的创建与发布

  • 消费者负责任务的接收与执行

img

类关系

执行流程

  1. 命令行Command开始监听队列queue:work

  2. 执行进程Worker获取新消息Queue::pop()

  3. 消息队列Queue返回一个可用的Job实例$job 3.1 生产者推送Queue::push()新消息到消息队列Queue 3.2 消息队列Queue返回是否推送成功给生产者

  4. 执行进程Worker调用$jobfire()方法

  5. 消息Job解析$jobpayload,实例化一个消费者,并调用消费者实例的fire($job, $data)方法。

  6. 消费者读取消息内容$data,处理业务逻辑,删除或重发该消息 $job->delete()$job->release()

  7. 消息Job从Database或Redis中删除消息或重发消息

  8. 消息Job返回消息处理结果给执行进程Worker

  9. 执行进程Worker在终端输出响应或结束运行

使用流程

  1. 消息的创建与推送

  2. 消息的消费与删除

  3. 任务发布

  4. 任务处理

注意:这里会将消息message与任务job视为同一概念

消息创建与推送

在业务控制器中创建一个新消息并推送到指定的队列中

首先创建消息需要引入think\Queue

  1. use think\Queue

创建消息时需指定当前消息所归属消息队列的名称

  1. $job_queue_name = "dismiss_job_queue";

如果是Redis驱动对应的就是List数据列表的名称

img

Redis中消息队列名称

如果是Database驱动对应的就是prefix_job表中queue字段中的内容

创建消息时需要指定当前消息将会由哪个类来负责处理(消费者),当轮到该消息时,系统将生成一个该类的实例,并调用其fire方法。

  1. $job_handler_classname = "app\index\job\Dismiss";

这里是采用手动指定消息处理类的方式,更合理的做法是事先定义好消息名称与消费者类名的映射关系,然后根据某个可以获取该映射关系的类来推送消息,这样生产者只需要知道消息的名称,而无需指定具体哪个消费者来处理。

创建消息时需要指定当前任务所需的业务数据,注意数据不能是资源类型resource,业务数据最终将转化为json形式的字符串。

  1. $job_data = [];
  2. $job_data["ts"] = time();
  3. $job_data["bizid"] = uniqid();
  4. $job_data["params"] = $params;

最后将创建的消息推送到消息队列并等待对应的消费者去执行

  1. $is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);

使用Queue::push方法将消息推送到消息队列,其返回值根据驱动不同而不同,如果是Redis驱动则成功返回随机字符串失败返回false,如果是Database驱动则成功返回1失败返回false

  1. if($is_pushed !== false )
  2. {
  3. echo date("Y-m-d H:i:s")." a new job is pushed to the message queue";
  4. }
  5. else
  6. {
  7. echo date("Y-m-d H:i:s")." a new job pushed fail";
  8. }

例如:在游戏结束后,大厅服务器会发送游戏战绩数据给HTTP接口,接口获取数据后对其进行加工处理最终得到入库所需的数据,期间还会涉及到向第三方接口推送数据等等。如果采用同步处理的方式,大厅服务器只有等到所有的处理完毕后才能得到得到结构,由于大厅服务器会根据接口返回的数据判断当前战绩是否写入成功,若接口返回数据时间过长,此时服务端将一直处于等待状态,连接不会被断开,这种情况对于使用越来越频繁的接口来说,几乎是一种噩梦。

完整代码

  1. <?php
  2. namespace app\api\controller;
  3. use think\Queue;
  4. class Game extends Api
  5. {
  6.   public function dismiss(){
  7.       //获取参数
  8.       $data = file_get_contents("php://input");
  9.       if(empty($data)){
  10.           $this->error("post is null");
  11.       }
  12.       $params = json_decode($data, true);
  13.       /*创建新消息并推送到消息队列*/
  14.       // 当前任务由哪个类负责处理
  15.       $job_handler_classname = "app\api\job\Dismiss";
  16.       // 当前队列归属的队列名称
  17.       $job_queue_name = "dismiss_job_queue";
  18.       // 当前任务所需的业务数据
  19.       $job_data = ["ts"=>time(), "bizid"=>uniqid(), "params"=>$params];
  20.       // 将任务推送到消息队列等待对应的消费者去执行
  21.       $is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);
  22.       if($is_pushed == false){
  23.           $this->error("dismiss job queue went wrong");
  24.       }
  25.       //操作成功
  26.       $this->success('success');
  27.   }
  28. }

消息的消费与删除

创建Dismiss消费者类,用于处理dismiss_job_queue队列中的任务。

创建\application\api\job\Dismiss.php消费者类,并编写fire()方法。

  1. <?php
  2. namespace app\api\job;
  3. use think\Log;
  4. use think\queue\Job;
  5. /**
  6. * 消费者类
  7. * 用于处理 dismiss_job_queue 队列中的任务
  8. * 用于牌局解散
  9. */
  10. class Dismiss
  11. {
  12.   /**
  13.     * fire是消息队列默认调用的方法
  14.     * @param Job $job 当前的任务对象
  15.     * @param array|mixed $data 发布任务时自定义的数据
  16.   */
  17.   public function fire(Job $job, $data)
  18.   {
  19.       //有效消息到达消费者时可能已经不再需要执行了
  20.       if(!$this->checkJob($data)){
  21.           $job->delete();
  22.           return;
  23.       }
  24.       //执行业务处理
  25.       if($this->doJob($data)){
  26.           $job->delete();//任务执行成功后删除
  27.           Log::log("dismiss job has been down and deleted");
  28.       }else{
  29.           //检查任务重试次数
  30.           if($job->attempts() > 3){
  31.               Log::log("dismiss job has been retried more that 3 times");
  32.               $job->delete();
  33.           }
  34.       }
  35.   }
  36.   /**
  37.     * 消息在到达消费者时可能已经不需要执行了
  38.     * @param array|mixed $data 发布任务时自定义的数据
  39.     * @return boolean 任务执行的结果
  40.     */
  41.   private function checkJob($data)
  42.   {
  43.       $ts = $data["ts"];
  44.       $bizid = $data["bizid"];
  45.       $params = $data["params"];
  46.       return true;
  47.   }
  48.   /**
  49.     * 根据消息中的数据进行实际的业务处理
  50.   */
  51.   private function doJob($data)
  52.   {
  53.     // 实际业务流程处理
  54.     return true;
  55.   }
  56. }

发布任务

访问接口/api/game/dismiss查看推送是否成功

处理任务

切换到当前终端到项目根目录

  1. $ php think queue:work --queue dismiss_job_queue

实际使用过程中应安装Supervisor这样的通用进程管理工具,它会监控php think queue:work的进程,一旦失败会帮助重启,详情可参见 《Supervisor》

简单来总结下使用流程

  1. 安装Supervisor并编写应用程序配置脚本,脚本主要用来运行php think queue:work命令。

  2. 运行Supervisor服务,它会读取主进程和应用程序配置。

  3. 运行自己编写的消息队列并根据日志查看是否正常运行

命令

Work模式 queue:work

用于启动一个工作进程来处理消息队列

  1. $ php think queue:work --queue dismiss_job_queue

参数说明

  • --daemon 是否循环执行,如果不加该参数则该命令处理完下一个消息就退出。

  • --queue dismiss_job_queue 要处理的队列的名称

  • --delay 0 如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0。

  • --force 系统处于维护状态时,是否仍然处理任务,并未找到相关说明。

  • --memory 128 该进程允许使用的内存上限,以M为单位。

  • --sleep 3 如果队列中无任务则sleep多少秒后重新检查(work+daemon模式)或退出(listen或非daemon模式)

  • --tries 2 如果任务已经超过尝试次数上限,则触发“任务尝试数超限”事件,默认为0。

Daemon模式的执行流程

img

Daemon模式

  1. $ php think queue:work

命令行参数

  • --daemon 是否一直执行

  • --queue 处理的队列的名称

  • --delay 重发失败任务时延迟多少秒才执行

  • --force 系统处于维护状态时是否仍然处理任务

  • --memory 该进程允许使用的内存上限,以M为单位。

  • --sleep 入股队列中无任务则多少秒后重新检查

  • --tries 任务重发多少次之后进入失败处理逻辑

如何从缓冲中得到上次重启的时间?

Cache::get("think:queue:restart") 从缓存得到上次重启的事件

如何判断是否退出daemon循环呢?

  • 内存超限:memory_get_usage() >= --memory参数

  • 重启时间有更新:外部通过php think queue:restart命令更新了重启时间的缓存

Listen模式 queue:listen

用于启动一个listen进程,然后由listen进程通过proc_open('php think queue:work --queue="%s" --delay=%s --memory=%s --sleep=%s --tries=%s')来周期性地创建一次性的work进程来消费消息队列,并且限制该work进程的执行事件,同时通过管道来监听work进程的输出。

  1. $ php think queue:listen --queue dismiss_job_queue
  • --queue dismiss_job_queue 监听队列的名称

  • --delay 0 如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0。

  • --memory 128 该进程允许使用的内存上限,以M为单位。

  • --sleep 3 如果队列中无任务,则多长时间后重新检查。

  • --tries 0 如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0。

  • --timeout 60 工作进程允许执行的最长时间,以秒为单位。

Work模式和Listen模式的异同点

两者都可以用于处理消息队列中的任务,区别在于:

  • 执行原理不同

Work模式是单进程的处理模式,按照是否设置--daemon参数又可以分为单次执行和循环执行两种模式。单次执行不添加--daemon参数,该模式下Work进程在从处理完下一个消息后直接结束当前进程。当队列为空时会sleep一段时间然后退出。循环执行添加了--daemon参数,该模式下Work进程会循环地处理队列中的消息直到内存超出参数配置才结束进程。当队列为空时会在每次循环中sleep一段时间。

Listen命令是“双进程+管道”的处理模式,Listen命令所在的进程会循环地创建单次执行模式的Work进程,每次创建的Work进程只消费一个消息就会结束,然后Listen进程再创建一个新的Work进程。Listen进程会定时检查当前的Work进程执行时间是否超过了--timeout参数的值,如果已经超过则Listen进程会杀掉所有Work进程,然后抛出异常。Listen进程会通过管道来监听当前的Work进程的输出,当Work进程有输出时Listen进程会将输出写入到stdout/stderr。Listen进程会定时通过proc_get_status()函数来监控当前的Work进程是否仍再运行,Work进程消费完一个任务之后,Work进程就结束了,其状态会变成terminated,此时Listen进程就会重新创建一个新的Work进程并对其计时,新的Work进程开始消费下一个任务。

  • 结束时机不同

Listen命令中Listen进程和Work进程会在以下情况下结束:Listen进程会定时检查当前的Work进程的执行时间是否超过了--timeout参数的值,如果已经超时此时Listen进程会杀掉当前的Work进程,然后抛出一个ProcessTimeoutException异常并结束Listen进程。Listen进程会定时检查自身使用的内存是否超过了--memory参数的值,如果已经超过此时Listen进程会直接die掉,Work进程也会自动结束。

  • 性能不同

Work命令是在脚本内部做循环,框架脚本在命名执行的初期就已经加载完毕。而Listen模式则是处理完一个任务之后新开一个Work进程,此时会重新加载框架脚本。因此Work模式的性能会比Listen模式高。注意当代码有更新时Work模式下需要手动去执行php think queue:restart命令重启队列来使改动生效,而Listen模式会自动生效无需其它操作。

  • 超时控制能力

Work模式本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间。举例来说,假如在某次上线后\app\api\job\Dismiss消费者的fire方法中添加一段死循环。

  1. public function fire()
  2. {
  3. while(true){
  4.   $consoleOutPut->writeln("looping forever inside a job");
  5.   sleep(1);
  6. }
  7. }

这个循环将永远不能停止,直到任务所在的进程超过内存限制或者由管理员手动结束。这个过程不会由任何的警告。更严重的是如果配置了expire,那么这个死循环的任务可能会污染到同样处理dismiss_job_queue队列的其它Work进程,最后好几个Work进程将被卡死在这段死循环中。

Work模式下的超时控制能力实际应理解为多个Work进程配合下的过期任务重发能力。

Listen命令可以限制Listen进程创建的Work进程的最大执行时间,Listen命令可以通过--timeout参数限制Work进程允许运行的最长时间,超过该时间限制后Work进程会被强制杀死,Listen进程本身也会抛出异常并结束。

expiretimeout之间的区别

expire在配置文件中设置,timeout在Listen命令的命令行参数中设置。expiretimeout是两个不同层次上的概念:expire是指任务的过期时间,这个时间是全局的影响到所有的Work进程,不管是独立的Work命令还是Listen模式下创建的Work进程。expire针对的对象是任务。timeout是指Work进程的超时时间,这个时间只针对当前执行的Listen命令有效,timeout针对的对象是Work进程。

  • 使用场景不同

Work命令的适用场景是任务数量较多、性能要求较高、任务的执行时间较短、消费者类中不存在死循环/sleep()/exit()/die()等容易导致bug的逻辑。

Listen命令的适用场景是任务数量较少、任务的执行时间较长(如生成大型的Excel报表等)、任务的执行时间需要有严格限制。

消息处理流程

消息队列处理一个任务的具体流程

img

消息队列处理一个任务的具体流程

  • 重发超时的任务

超时任务是指任务处于执行中,当前时间 - 任务开始执行的时刻 > expire时间

重发是指将任务的状态还原为未执行,并将任务的已执行次数加1。

  • 获取下一个有效任务

有效任务是指未执行、最早可执行的时间 <= 当前时间、按时间先后排序(先进先出)

  • 任务次数超限逻辑

任务的已尝试次数大于命令行中的--tries参数,命令行中的--tries参数大于0。

  • 触发次数超限事件 queue_failed内置的次数超限事件标签,是否定义了queue_failed行为,未定义则不处理直接返回,已定义则对次数超限的任务进行处理。

  1. $runHookCb = Behavior::queueFailed() //返回true则删除任务执行任务失败回调,返回false则不执行任何操作。
  • 消费当前的任务

  1. $job->fire()

$job对象的payload属性中解析出消费者类,创建消费者类的实例,执行消费者类的实例的fire($job, $data)方法。

需要在fire($job, $data)中手动删除任务,$job参数表示当前任务对象,$data参数表示当前的任务数据即创建队列时传入的参数。

消息队列的开始、停止、重启

开始一个消息队列

  1. $ php think queue:work

停止所有的消息队列

  1. $ php think queue:restart

重启所有的消息队列

  1. $ php think queue:restart
  2. $ php think queue:work

多模块多任务的处理

  • 多模块

单模块项目推荐时间app/job作为任务类的命名空间,多任务项目可使用app/module/job作为任务类的命名空间,也可以放在任意可以自动加载到的地方。

  • 多任务

如果一个任务类中有多个小任务的话,在发布任务的时候,需要使用任务的“类名@方法名”的形式,例如app\lib\job\Job@task,注意命令行中的--queue参数不执行@的解析。

消息的延迟执行与定时任务

延迟执行是相对于即使执行的,是用来限制某个任务的最早可执行时刻。在到达该时刻之前该任务会被跳过,可以利用该功能实现定时任务。

延迟执行的使用方式

  • 在生产者业务代码中

  1. // 即时执行
  2. $is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name)
  3. // 延迟2秒执行
  4. $is_pushed = Queue::later(2, $job_handler_classname, $job_data_arr, $job_queue_name);
  5. // 延迟到2019-06-01 00:00:00时刻执行
  6. $time2wait = strtotime("2019-06-01 00:00:00") - strtotime("now");
  7. $is_pushed = Queue::later($time2wait, $job_handler_classname, $job_data_arr, $job_queue_name);
  • 在消费者类中

  1. // 重发,即时执行
  2. $job->release();
  3. // 重发,延迟2秒后执行
  4. $job->release(2);
  5. // 延迟到2019-06-01 00:00:00时刻执行
  6. $time2wait = strtotime("2019-06-01 00:00:00") - strtotime("now");
  7. $job->release($time2wait);
  • 在命令行中

如果消费者类的fire方法抛出了异常且任务未被删除时,将自动重发该任务。重发时会设置下次执行前延迟多少秒,默认为0。

  1. $ php think queue:work --delay 3

消息重发

消息重发时机有三种情况:

  • 在消费者类中手动重发

  1. if($is_job_done === false)
  2. {
  3. $job->release();
  4. }
  • Work进程自动重发需要同时满足以下两个条件:消费者类的fire()方法抛出异常、任务未被删除

  • 当配置了expire不为null时,Work进程内部每次查询可用任务之前会自动重发已过期的任务。

注意:在Database模式下,2.7.1和2.7.2中的重发逻辑是先删除原来的任务,然后插入一个新的任务。2.7.3中的重发机制是直接更新原任务。在Redis模式下,三种重发都是先删除再插入。不管是那种重发方式,重发之后任务的已尝试次数会在原来的基础上加1。

此外,消费者类中需要注意,如果fire()方法中抛出异常,将出现两种情况:

  • 如果不需要自动重发的话,请在抛出异常之前将任务删除$job->delete(),否则会被框架自动重发。

  • 如果需要自动重发的话,请直接抛出异常,不要在fire()方法中手动使用$job->release(),这样将导致任务被重发两次,产生两个一样的新任务。

Redis驱动下的任务重发细节

在Redis驱动下为了实现任务的延迟执行和过期重发,任务将在这三个key中来回转移。

在Database模式下消息处理的消息流程中,如果配置的expire不是null那么think-queuework进程每次在获取下一个可执行任务之前,会先尝试重发所有过期的任务。而在Redis驱动下这个步骤则做了更多的事情,详情如下:

  1. queue:xxx:delayedkey中查询出有哪些任务在当前时刻已经可以开始执行,然后将这些任务转移到queue:xxxkey的尾部。

  2. queue:xxx:reservedkey中查询出有哪些任务在当前时刻已经过期,然后将这些任务转移到queue:xxxkey的尾部。

  3. 尝试从queue:xxxkey的头部取出一个任务,如果取出成功,则将这个任务转移到queue:xxx:reservedkey的头部,同时将这个任务实例化成任务对象,交给消费者去执行。

Redis队列中的过期任务重发步骤,执行前:

img

Redis队列中的过期任务重发步骤,执行前

Redis队列中的过期任务重发步骤,执行后:

img

Redis队列中的过期任务重发步骤,执行后

任务的失败回调与警告

当同时满足以下条件时将触发任务失败回调:

  • 命令行的--tries参数的值大于0

  • 任务的已尝试次数大于命令行的--tries参数

  • 开发者添加了queue_failed事件标签及其对应的回调代码

  • 消费者类中定义了failed()方法用于接收任务失败的通知

注意,queue_failed标签需要在安装think-queue之后手动去/app/tags.php文件中添加。

注意事项

-任务完成后使用$job->delete()删除任务

  • 在消费者类的fire()方法中使用$job->attempt()检查任务已执行次数,对于次数异常的做相应的处理。

  • 在消费者类的fire()方法在中根据业务数据来判断该任务是否已经执行过,以避免该任务被重复执行。

  • 编写失败回调事件将事件中失败的任务及时通知给开发人员

拓展

  • 队列的稳定性和拓展性

稳定性:不管是listen模式还是work模式,建议使用supervisor或自定义的cron脚本去定时检查work进程是否正常。

拓展性:当某个队列的消费者不足时在给这个队列添加work进程即可


使用注意

最好配置本地的Redis,使用远程Redis曾出现无法解释的原因。

  1. $ yum install -g redis
  2. $ systemctl restart redis

停止原正在运行的

  1. $ supervisorctl shutdown

重新加载服务

  1. $ supervisord -c /etc/supervisord.conf
  2. $ supervisorctl reload
  3. $ ps aux | grep supervisord
  4. $ top

未完待续...

**

19人点赞**

**

**mq

**

更多精彩内容,就在简书APP

"小礼物走一走,来简书关注我"

共1人赞赏

JunChow520阅读提升上限,实践突破下限。

总资产913 (约49.33元)共写了129.3W字获得2,182个赞共957个粉丝

全部评论2只看作者按时间倒序按时间正序

img

For_All_I_Care

3楼 02.25 15:25

怎么composer成功的?我在tp6安装报错 Your requirements could not be resolved to an installable set of packages. Problem 1 - topthink/think-queue[v3.0.0, ..., v3.0.5] require symfony/process ^4.2 -> found symfony/process[v4.2.0, ..., v4.4.19] but it conflicts with your root composer.json require (^5.2). - Root composer.json requires topthink/think-queue ^3.0 -> satisfiable by topthink/think-queue[v3.0.0, ..., v3.0.5]. Use the option --with-all-dependencies (-W) to allow upgrades, downgrades and removals for packages currently locked to specific versions. Installation failed, reverting ./composer.json and ./composer.lock to their original content.

** 赞** 回复

img

hainuo

2楼 2020.03.15 10:58

不错这是一篇非常详细的使用指引了,很棒 不知道对于失败任务是如何处理? 我使用mysql作为队列的原因就是因为这个事情 失败的任务,可以重新执行一次 (也用过其他的方式比如 钉钉通知 然后发现失败的数据 查找是一个问题 )

** 赞** 回复

被以下专题收入,发现更多相似内容

imgphp

推荐阅读更多精彩内容**

  • TP5学习笔记九 队列

    依赖 redis-server redis php扩展 参考此教程 安装 composer 最好通过compose...

    imghfm0922阅读 1,951评论 0赞 1

  • thinkphp-queue 浅析

    传统的程序执行流程一般是 即时|同步|串行的,在某些场景下,会存在并发低,吞吐量低,响应时间长等问题。在大型系统中...

    img红尘一落君莫笑阅读 10,331评论 2赞 5

    img

  •  

  •  

  • Thinkphp 3.2.3 redis缓存

    转载00天火00文章保存一下先。。 说明 好像是tp3.2的bug,自带的redis驱动不是那么好用。。。找了方法...

    imggeeooooz阅读 1,895评论 0赞 2

    img

  • redis常用知识点

    1.1 资料 ,最好的入门小册子,可以先于一切文档之前看,免费。 作者Antirez的博客,Antirez维护的R...

    imgJefferyLcm阅读 16,272评论 1赞 52

  • 2017-10-26

    今天早上我还在睡梦中,突然手机响了。睁开睡眼,发现是同事来的,一分钟接完电话,看向旁边的儿子。发现原本熟睡在我旁边...

    img蝴蝶的颜色_aff4阅读 41评论 0赞 0

    img

imgJunChow520关注总资产913 (约49.33元)Go protobuf阅读 807vite postcss阅读 22typescript type阅读 24推荐阅读laravel中队列的使用阅读 51每日一学:你知道如何在 RabbitMQ 中实现 Work queues工作队列模式吗?阅读 83Kafka 客户端生产者使用详解(Java)阅读 181Dubbo——时间轮(Time Wheel)算法应用阅读 391史上最便捷搭建 Zookeeper 的方法!阅读 162

**

**评论2

**赞19

**

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/906975
推荐阅读
相关标签
  

闽ICP备14008679号