当前位置:   article > 正文

Thinkphp+workman+redis实现多进程异步任务处理_thinkphp 结合redis 实现异步任务

thinkphp 结合redis 实现异步任务

前言


PHP本身并不直接支持多线程编程,因为PHP的设计初衷是作为一个脚本语言,主要面向的是Web开发。不过我们可以使用一些扩展和库来实现多进程的功能,提高系统性能,比如workerman和swoole。通过多进程异步执行任务。

安装workman


Thinkphp5.0使用workman创建多进程任务


  • 1.在项目根目录(注意不是pubcli目录)下创建文件server.php,文件内容如下

    <?php
    define('APP_PATH', __DIR__ . '/application/');
    define('BIND_MODULE','workman/Worker');
    // 加载框架引导文件
    require __DIR__ . '/thinkphp/start.php';
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 2.在根目录创建\application\workman\controller目录,然后在该目录下新建Worker.php,文件内容如下

    <?php
    
    namespace app\workman\controller;
    
    use think\worker\Server;
    
    class Worker extends Server
    {
        //websocket服务端地址和端口
        protected $socket = 'websocket://0.0.0.0:2346';
        //设置进程数,默认为4,根据自己的需要和服务器配置合理设置,一般设置进程数为CPU核数的1倍-3倍
        protected $processes = 4;
    
        /**
         * 收到信息
         * @param $connection
         * @param $data
         */
        public function onMessage($connection, $data)
        {
    
        }
    
        /**
         * 当连接建立时触发的回调函数
         * @param $connection
         */
        public function onConnect($connection)
        {
    
        }
    
        /**
         * 当连接断开时触发的回调函数
         * @param $connection
         */
        public function onClose($connection)
        {
    
        }
    
        /**
         * 当客户端的连接上发生错误时触发
         * @param $connection
         * @param $code
         * @param $msg
         */
        public function onError($connection, $code, $msg)
        {
            echo "error $code $msg\n";
        }
    
        /**
         * 每个进程启动
         * @param $worker
         */
        public function onWorkerStart($worker)
        {
            echo 'workman进程启动,进程id ' . $worker->id . PHP_EOL;
    
            //监听redis队列
            $redis = new \Redis();
            $redis->connect('192.168.204.128', 6379);
            while (true) {
                //读取redis队列
                $data = $redis->lPop('test-queue');
                if ($data) {
                    //处理业务
                    echo '进程id ' . $worker->id . ' 开始处理业务数据' . $data . PHP_EOL;
                    //模拟耗时任务
                    sleep(5);
                    echo '进程id ' . $worker->id . ' 处理业务数据' . $data . ' 完成' . PHP_EOL;
                } else {
                    echo '进程id ' . $worker->id . ' 空闲中,休息5秒'. PHP_EOL;
                    sleep(5);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    这里主要的功能就是创建一个workman的websocket服务端(使用其他tcp服务也是可以的),然后在每个进程启动的时候监听redis队列,利用这些进程异步去处理redis队列里的任务,代码里的模拟耗时任务可以直接替换成你在tp框架里写的耗时任务。

  • 3.在tp框架中将任务加入redis队列,例如我这里写一个添加redis列表元素的方法

    <?php
    
    namespace app\index\controller;
    
    class Index
    {
        //新增队列数据
        public function addQueue()
        {
            $redis = new \Redis();
            $redis->connect('192.168.204.128', 6379);
            $redis->rPush('test-queue', '1');
            $redis->rPush('test-queue', '2');
            $redis->rPush('test-queue', '3');
            $redis->rPush('test-queue', '4');
            $redis->rPush('test-queue', '5');
            $redis->rPush('test-queue', '6');
            $redis->rPush('test-queue', '7');
            echo 'success';
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
  • 4.启动workman

    直接在根目录下运行第一步创建的server.php

    php server.php start
    
    • 1

    可以看到下面的输出

    Workerman[server.php] start in DEBUG mode
    -------------------------------------------- WORKERMAN --------------------------------------------
    Workerman version:3.5.35          PHP version:7.4.33           Event-Loop:\Workerman\Events\Select
    --------------------------------------------- WORKERS ---------------------------------------------
    proto   user            worker          listen                      processes    status
    tcp     root            none            websocket://0.0.0.0:2346    4             [OK]
    ---------------------------------------------------------------------------------------------------
    Press Ctrl+C to stop. Start success.
    workman进程启动,进程id 0
    workman进程启动,进程id 3
    workman进程启动,进程id 2
    workman进程启动,进程id 1
    进程id 1 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    注意:如果是在windows下,设置进程数是没有用的,就只会启动一个worker进程,也就是只有单进程,要体验多进程只能在Linux环境下,同时也无法守护进程,cmd窗口关掉后服务即停止

  • 5.访问tp框架中的将任务加入redis队列接口,直接用浏览器或者命令行curl访问http://网站域名/index/index/addQueue即可,然后你就可以看到所有的redis队列将被workman进程分配并执行,以下是我启动workman->添加redis队列->workman处理->队列处理结束打印的结果:

    Workerman[server.php] start in DEBUG mode
    -------------------------------------------- WORKERMAN --------------------------------------------
    Workerman version:3.5.35          PHP version:7.4.33           Event-Loop:\Workerman\Events\Select
    --------------------------------------------- WORKERS ---------------------------------------------
    proto   user            worker          listen                      processes    status
    tcp     root            none            websocket://0.0.0.0:2346    4             [OK]
    ---------------------------------------------------------------------------------------------------
    Press Ctrl+C to stop. Start success.
    workman进程启动,进程id 0
    workman进程启动,进程id 3
    workman进程启动,进程id 2
    workman进程启动,进程id 1
    进程id 1 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    进程id 1 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    (这里开始将任务加入redis队列,然后workman开始消费队列)
    进程id 0 开始处理业务数据1
    进程id 1 开始处理业务数据3
    进程id 3 开始处理业务数据4
    进程id 2 开始处理业务数据2
    进程id 0 处理业务数据1 完成
    进程id 1 处理业务数据3 完成
    进程id 2 处理业务数据2 完成
    进程id 1 开始处理业务数据6
    进程id 3 处理业务数据4 完成
    进程id 0 开始处理业务数据5
    进程id 3 开始处理业务数据7
    进程id 2 空闲中,休息5秒
    进程id 1 处理业务数据6 完成
    进程id 0 处理业务数据5 完成
    进程id 3 处理业务数据7 完成
    (到这里所有的7项任务已经处理完成)
    进程id 1 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

TP5.1及TP6使用workman创建多进程任务


  • 1.在根目录创建\application\workman\controller目录,然后在该目录下新建Worker.php,文件内容如下

    <?php
    
    namespace app\workman;
    
    use think\worker\Server;
    
    class Worker extends Server
    {
        protected $host = '127.0.0.1';
        protected $port = 2346;
        protected $protocol = 'websocket';
        protected $option = [
            'count'   => 4, //设置进程数,默认为4,根据自己的需要和服务器配置合理设置,一般设置进程数为CPU核数的1倍-3倍
            'name'    => 'think'
        ];
    
    
        /**
         * 收到信息
         * @param $connection
         * @param $data
         */
        public function onMessage($connection, $data)
        {
    
        }
    
        /**
         * 当连接建立时触发的回调函数
         * @param $connection
         */
        public function onConnect($connection)
        {
    
        }
    
        /**
         * 当连接断开时触发的回调函数
         * @param $connection
         */
        public function onClose($connection)
        {
    
        }
    
        /**
         * 当客户端的连接上发生错误时触发
         * @param $connection
         * @param $code
         * @param $msg
         */
        public function onError($connection, $code, $msg)
        {
            echo "error $code $msg\n";
        }
    
        /**
         * 每个进程启动
         * @param $worker
         */
        public function onWorkerStart($worker)
        {
            echo 'workman进程启动,进程id ' . $worker->id . PHP_EOL;
    
            //监听redis队列
            $redis = new \Redis();
            $redis->connect('127.0.0.1', 6379);
            while (true) {
                //读取redis队列
                $data = $redis->lPop('test-queue');
                if ($data) {
                    //处理业务
                    echo '进程id ' . $worker->id . ' 开始处理业务数据' . $data . PHP_EOL;
                    //模拟耗时任务
                    sleep(5);
                    echo '进程id ' . $worker->id . ' 处理业务数据' . $data . ' 完成' . PHP_EOL;
                } else {
                    echo '进程id ' . $worker->id . ' 空闲中,休息5秒' . PHP_EOL;
                    sleep(5);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    这里主要的功能就是创建一个workman的websocket服务端(使用其他tcp服务也是可以的),然后在每个进程启动的时候监听redis队列,利用这些进程异步去处理redis队列里的任务,代码里的模拟耗时任务可以直接替换成你在tp框架里写的耗时任务。

  • 2.指定workman服务类名

    修改config/worker_server.php,将worker_class的值改为app\workman\Worker

    'worker_class'   => 'app\workman\Worker', // 自定义Workerman服务类名 支持数组定义多个服务
    
    • 1
  • 3.启动workman

    直接在根目录下运行命令php think worker:server或者php think worker:server -d即可启动,如果要调整workman参数,修改config/worker_server.php中的选项即可

    php think worker:server
    
    • 1

    可以看到下面的输出

    Starting Workerman server...
    ----------------------- WORKERMAN -----------------------------
    Workerman version:3.5.35          PHP version:7.3.4
    ------------------------ WORKERS -------------------------------
    worker               listen                              processes status
    none                 websocket://127.0.0.1:2346          1         [ok]
    workman进程启动,进程id 0
    进程id 0 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    注意:如果是在windows下,设置进程数是没有用的,就只会启动一个worker进程,也就是只有单进程,要体验多进程只能在Linux环境下,同时也无法守护进程,cmd窗口关掉后服务即停止

  • 4.在tp框架中将任务加入redis队列,例如我这里写一个添加redis列表元素的方法

    <?php
    namespace app\controller;
    
    use app\BaseController;
    
    class Index extends BaseController
    {
        //新增队列数据
        public function addQueue()
        {
            $redis = new \Redis();
            $redis->connect('127.0.0.1', 6379);
            $redis->rPush('test-queue', '1');
            $redis->rPush('test-queue', '2');
            $redis->rPush('test-queue', '3');
            $redis->rPush('test-queue', '4');
            $redis->rPush('test-queue', '5');
            $redis->rPush('test-queue', '6');
            $redis->rPush('test-queue', '7');
            echo 'success';
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
  • 5.访问上面的将任务加入redis队列接口,直接用浏览器或者命令行curl访问http://网站域名/index/addQueue即可,然后你就可以看到所有的redis队列将被workman进程分配并执行,以下是我启动workman->添加redis队列->workman处理->队列处理结束打印的结果:

    Starting Workerman server...
    Workerman[think] start in DEBUG mode
    -------------------------------------------- WORKERMAN ---------------------------------------------
    Workerman version:3.5.35          PHP version:7.4.33           Event-Loop:\Workerman\Events\Select
    --------------------------------------------- WORKERS ----------------------------------------------
    proto   user            worker          listen                        processes    status
    tcp     root            think           websocket://127.0.0.1:2346    4             [OK]
    ----------------------------------------------------------------------------------------------------
    Press Ctrl+C to stop. Start success.
    workman进程启动,进程id 0
    workman进程启动,进程id 3
    进程id 0 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    workman进程启动,进程id 1
    进程id 1 空闲中,休息5秒
    workman进程启动,进程id 2
    进程id 2 空闲中,休息5秒
    进程id 0 开始处理业务数据2
    进程id 3 开始处理业务数据1
    进程id 1 开始处理业务数据3
    进程id 2 开始处理业务数据4
    进程id 0 处理业务数据2 完成
    进程id 0 开始处理业务数据5
    进程id 3 处理业务数据1 完成
    进程id 1 处理业务数据3 完成
    进程id 3 开始处理业务数据6
    进程id 1 开始处理业务数据7
    进程id 2 处理业务数据4 完成
    进程id 2 空闲中,休息5秒
    进程id 0 处理业务数据5 完成
    进程id 1 处理业务数据7 完成
    进程id 3 处理业务数据6 完成
    进程id 0 空闲中,休息5秒
    进程id 1 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/292712
推荐阅读
相关标签
  

闽ICP备14008679号