赞
踩
其实这个TP6怎么实现这个消息队列,很多教程都说有,但是总是给人怪怪的感觉,懵懵懂懂,好像是那么回事但是好像又不是那么一回事,这个还是得自己总结着来
第一步下载composer包,这个是没什么说的,要用TP6的队列,直接composer一下
composer require topthink/think-queue
找到配置文件 config/queue.php,一般都会有的,没有的话自己建一个就行,然后配置一下,default配置成redis
<?php return [ 'default' => 'redis', 'connections' => [ 'sync' => [ 'type' => 'sync', ], 'database' => [ 'type' => 'database', 'queue' => 'default', 'table' => 'jobs', 'connection' => null, ], 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 0, 'timeout' => 0, 'persistent' => false, ], ], 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ], ];
生产端代码,主要用来产生队列数据的,这里只是测试,自己可以封装简化一下,具体的可以看下面的注释
public function index() { // 1.当前任务由哪个类来负责处理 // 当轮到该任务时,系统将生成该类的实例,并调用其fire方法 // 这里值得注意的是,这里可以具体消费文件的类明明空间,这里的Jobs是自己建立的类名称,也可以改成其他的,这个随意,只要明明空间正确就行,这样就方便了文件的处理,以前的好像老是有一个@fire的写法,起码目前这边用不到 $jobHandlerClassName = 'app\controller\Jobs'; // 2.当任务归属的队列名称,如果为新队列,会自动创建 // 这个队列名称自由建立,最好和业务相关且不能重复,不然你懂的 $jobQueueName = "helloJobQueue"; // 3.当前任务所需业务数据,不能为resource类型,其实转换成数组就行,最终都会转化为json形式的字符串,这个数据是测试数据,自己的数据自己进行组装 $list = Db::table('users')->field(['id','mobile','create_time'])->select()->toArray(); // 4.将该任务推送到消息列表,等待对应的消费者去执行 // 入队列,later延迟执行,单位秒,push立即执行,两种方法都行,看个人需求,基本上这里建立好以后可以查看下Redis是否有数据了 foreach ($list as $key=>$value){ Queue::later(60,$jobHandlerClassName, $value, $jobQueueName); Queue::push($jobHandlerClassName, $value, $jobQueueName); } }
基本上到了这里完事一大半了,剩下的就是消费者了
消费端代码
看到没有,这个Jobs是自己起的类名称,这个可以随便自己起,对应的生产代码的$jobHandlerClassName就行,其实到了这里基本就是自己的业务处理了,fire方法这个是必须的,其他的就看自己的发挥了,像checkDatabaseToSeeIfJobNeedToBeDone、doHelloJob这些方法都是自己发挥自己建立的
<?php namespace app\controller; use think\facade\Db; use think\queue\Job; class Jobs { /** * fire方法是消息队列默认调用的方法 * @param Job $job 当前的任务对象 * @param array $data 发布任务时自定义的数据 */ public function fire(Job $job, array $data) { // 有些任务在到达消费者时,可能已经不再需要执行了 $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); if (!$isJobStillNeedToBeDone) { $job->delete(); return; } $isJobDone = $this->doHelloJob($data); if ($isJobDone) { $job->delete(); echo "删除任务" . $job->attempts() . '\n'; } else { if ($job->attempts() > 3) { $job->delete(); echo "超时任务删除" . $job->attempts() . '\n'; } } } /** * 有些消息在到达消费者时,可能已经不再需要执行了,具体业务逻辑自己处理 * @param array $data * @return bool */ private function checkDatabaseToSeeIfJobNeedToBeDone(array $data) { return true; } /** * 根据消息中的数据进行实际的业务处理... * @param array $data * @return bool */ private function doHelloJob(array $data) { //actionLog($data); Db::table('log')->insert(['param'=>json_encode($data),'create_time'=>time()]); return true; } }
最后面进入到项目目录启动一下队列即可
php think queue:work --queue helloJobQueue
基本上都是用queue:work去处理,有兴趣可以自己查找下它和 queue:listen的区别,Linux环境的话安装个supervisor(有宝塔的话可以直接在软件列表那里安装)守护进程一下,具体的可以自己了解下,这个东西不难
在消费端的话可能会遇到一个坑,就是DB这类的最好设置成长链接,如果是短连接,如果由于没有任务长时间空闲,DB链接会被释放,下次有队列任务进来,会不执行,要重新启动才会去执行,这个问题之前碰到过,如果你的队列明明在运行,但是任务没运行,这个时候可以排查下这里
基本上这个队列就是这样了,其实也不是很复杂,就是有些坑,多总结就好
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。