当前位置:   article > 正文

thinkphp6+redis实现消息队列_tp6think-queue 实现 redis 消息队列

tp6think-queue 实现 redis 消息队列

其实这个TP6怎么实现这个消息队列,很多教程都说有,但是总是给人怪怪的感觉,懵懵懂懂,好像是那么回事但是好像又不是那么一回事,这个还是得自己总结着来
第一步下载composer包,这个是没什么说的,要用TP6的队列,直接composer一下

composer require topthink/think-queue
  • 1

找到配置文件 config/queue.php,一般都会有的,没有的话自己建一个就行,然后配置一下,default配置成redis

<?php
return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => '127.0.0.1',
            'port'       => 6379,
            'password'   => '',
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

生产端代码,主要用来产生队列数据的,这里只是测试,自己可以封装简化一下,具体的可以看下面的注释

	public function index()
    {
        // 1.当前任务由哪个类来负责处理
        // 当轮到该任务时,系统将生成该类的实例,并调用其fire方法
        // 这里值得注意的是,这里可以具体消费文件的类明明空间,这里的Jobs是自己建立的类名称,也可以改成其他的,这个随意,只要明明空间正确就行,这样就方便了文件的处理,以前的好像老是有一个@fire的写法,起码目前这边用不到
        $jobHandlerClassName = 'app\controller\Jobs';
        // 2.当任务归属的队列名称,如果为新队列,会自动创建
        // 这个队列名称自由建立,最好和业务相关且不能重复,不然你懂的
        $jobQueueName = "helloJobQueue";
        // 3.当前任务所需业务数据,不能为resource类型,其实转换成数组就行,最终都会转化为json形式的字符串,这个数据是测试数据,自己的数据自己进行组装
        $list = Db::table('users')->field(['id','mobile','create_time'])->select()->toArray();
        // 4.将该任务推送到消息列表,等待对应的消费者去执行
        // 入队列,later延迟执行,单位秒,push立即执行,两种方法都行,看个人需求,基本上这里建立好以后可以查看下Redis是否有数据了
        foreach ($list as $key=>$value){
        	Queue::later(60,$jobHandlerClassName, $value, $jobQueueName);
            Queue::push($jobHandlerClassName, $value, $jobQueueName);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18


基本上到了这里完事一大半了,剩下的就是消费者了
消费端代码
看到没有,这个Jobs是自己起的类名称,这个可以随便自己起,对应的生产代码的$jobHandlerClassName就行,其实到了这里基本就是自己的业务处理了,fire方法这个是必须的,其他的就看自己的发挥了,像checkDatabaseToSeeIfJobNeedToBeDone、doHelloJob这些方法都是自己发挥自己建立的

<?php
namespace app\controller;

use think\facade\Db;
use think\queue\Job;

class Jobs
{
    /**
     * fire方法是消息队列默认调用的方法
     * @param Job $job 当前的任务对象
     * @param array $data 发布任务时自定义的数据
     */
    public function fire(Job $job, array $data)
    {
        // 有些任务在到达消费者时,可能已经不再需要执行了
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if (!$isJobStillNeedToBeDone) {
            $job->delete();
            return;
        }
		
        $isJobDone = $this->doHelloJob($data);
        if ($isJobDone) {
            $job->delete();
            echo "删除任务" . $job->attempts() . '\n';
        } else {
            if ($job->attempts() > 3) {
                $job->delete();
                echo "超时任务删除" . $job->attempts() . '\n';
            }
        }

    }

    /**
     * 有些消息在到达消费者时,可能已经不再需要执行了,具体业务逻辑自己处理
     * @param array $data
     * @return bool
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone(array $data)
    {
        return true;
    }

    /**
     * 根据消息中的数据进行实际的业务处理...
     * @param array $data
     * @return bool
     */
    private function doHelloJob(array $data)
    {
        //actionLog($data);

        Db::table('log')->insert(['param'=>json_encode($data),'create_time'=>time()]);

        return true;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

最后面进入到项目目录启动一下队列即可

php think queue:work --queue helloJobQueue
  • 1

基本上都是用queue:work去处理,有兴趣可以自己查找下它和 queue:listen的区别,Linux环境的话安装个supervisor(有宝塔的话可以直接在软件列表那里安装)守护进程一下,具体的可以自己了解下,这个东西不难
在消费端的话可能会遇到一个坑,就是DB这类的最好设置成长链接,如果是短连接,如果由于没有任务长时间空闲,DB链接会被释放,下次有队列任务进来,会不执行,要重新启动才会去执行,这个问题之前碰到过,如果你的队列明明在运行,但是任务没运行,这个时候可以排查下这里
基本上这个队列就是这样了,其实也不是很复杂,就是有些坑,多总结就好

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/257601
推荐阅读
相关标签
  

闽ICP备14008679号