当前位置:   article > 正文

Thinkphp 使用swoole 实现异步任务_thinkphp8 swoole

thinkphp8 swoole

欢迎大家访问我的博客 blog.ayla1688.cool


原文链接:http://blog.ayla1688.cool/archives/116.html

 

在使用thinkcmf 做项目的时候,有一些比较耗时的工作, 但是不想阻碍程序的进行,所以想到使用swoole 的异步任务。


场景:
我在添加了一个用户之后,需要通过接口将用户添加到有赞商城的销售员。 添加用户完成后将添加到有赞商城业务员的操作放到异步的任务中去执行, 这样就不影响我添加用户的操作; 如果不这么做的话,还需要等待通过接口将用户数据推送到有赞商城。 如果本地添加成功,但是添加到有赞商城添加失败,如果不回滚数据则两边对不上。 所以需要有一个异步任务去执行添加用户到有赞商城的销售员。

借助了以下东西:

1. thinkphp框架的钩子
2. swoole

### 设置钩子
在项目中添加钩子文件 app/tags.php

  1. <?php
  2. /**
  3. * 钩子定义
  4. * 钩子定义配置文件,在此框架内,只能放在app 下, 放在模块下不起作用
  5. * 钩子的执行为同步执行,会对程序造成阻塞。
  6. */
  7. return [
  8. // '钩子名称' => ['行为类']
  9. //添加销售员到有赞
  10. 'add_saleman_to_youzan' => [
  11. 'app\\common\\behavior\\YzTask',
  12. ],

YzTask.php 为钩子的执行类

  1. <?php
  2. namespace app\common\behavior;
  3. //定义了一个行为 YzTask
  4. use Swoole\Coroutine;
  5. class YzTask
  6. {
  7. //swoole tcp server 监听的ip
  8. protected $host ;
  9. //swoole tcp server 监听的端口
  10. protected $port;
  11. public function __construct()
  12. {
  13. //获取swoole tcp 的配置
  14. $this->host = config('swoole_server.host');
  15. $this->port = config('swoole_server.port');
  16. }
  17. /**
  18. * @param string $string
  19. * $param = [
  20. * 'type' => xx //类型
  21. * 'data' => xx //数据
  22. * ];
  23. */
  24. public function run(string $string){
  25. //在以下代表之后都转为协程
  26. \Swoole\Runtime::enableCoroutine();
  27. //客户端用来访问swoole 创建的tcp服务器
  28. $client = new \Swoole\Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);
  29. $ret = $client->connect($this->host, $this->port);
  30. if ($ret) {
  31. $return = $client->send($string);
  32. if($return ) $client->close();
  33. } else {
  34. echo "断开了";
  35. $client->close();
  36. }
  37. }
  38. }

 


在控制器中,添加成功用户后调用钩子, 会直接执行tags.php 中钩子执行的行为类 app\\common\\behavior\\YzTask 中 run()方法, 详情参考https://www.kancloud.cn/manual/thinkphp5_1/354129
![2020-07-08T09:46:44.png][1]


### swoole 引入

根据thinkcmf 官方文档
https://www.thinkcmf.com/doc5_1.html

安装完成swoole 扩展后

创建文件 data\\config\\swoole_server.php; 我们要使用swoole创建一个tcp的服务器

  1. // +----------------------------------------------------------------------
  2. // | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
  3. // +----------------------------------------------------------------------
  4. // | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
  5. // +----------------------------------------------------------------------
  6. // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
  7. // +----------------------------------------------------------------------
  8. // | Author: liu21st <liu21st@gmail.com>
  9. // +----------------------------------------------------------------------
  10. use think\facade\Env;
  11. // +----------------------------------------------------------------------
  12. // | Swoole设置 php think swoole:server 命令行下有效
  13. // +----------------------------------------------------------------------
  14. return [
  15. // 扩展自身配置
  16. 'host' => '127.0.0.1', // 监听地址
  17. 'port' => 9502, // 监听端口
  18. 'type' => 'tcp', // 服务类型 支持 socket http server
  19. 'mode' => '', // 运行模式 默认为SWOOLE_PROCESS
  20. 'sock_type' => '', // sock type 默认为SWOOLE_SOCK_TCP
  21. 'swoole_class' => '', // 自定义服务类名称
  22. 'task_worker_num' => 8,
  23. // 可以支持swoole的所有配置参数
  24. 'daemonize' => false,
  25. 'pid_file' => Env::get('runtime_path') . 'swoole_server.pid',
  26. 'log_file' => Env::get('runtime_path') . 'swoole_server.log',
  27. // 事件回调定义
  28. 'onConnect' => function ($server, $fd){
  29. echo " client : connect\n";
  30. },
  31. 'onOpen' => function ($server, $request) {
  32. echo "server: handshake success with fd{$request->fd}\n";
  33. },
  34. 'onMessage' => function ($server, $frame) {
  35. echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
  36. $server->push($frame->fd, "this is server");
  37. },
  38. 'onRequest' => function ($request, $response) {
  39. $response->end("<h1>Hello Swoole. #" . rand(1000, 9999) . "</h1>");
  40. },
  41. 'onReceive' => function ($server, $fd, $from_id, $data){
  42. //worker进程
  43. //worker进程中向task_worker 进程投递新的任务
  44. $task_id = $server->task($data);
  45. },
  46. 'onTask' => function ($server, $task_id, $from_id, $data){
  47. //task_worker 进程
  48. $task = new \app\common\service\SwooleTask();
  49. $task->do($data);
  50. //告诉worker 进程,task_worker进程 执行完成了。task_worker 进程将执行的结果 通过 finish() 通知给worker 进程
  51. $server->finish("ok");
  52. },
  53. 'onClose' => function (Swoole\Server $ser, int $fd, int $reactorId) {
  54. //worker 进程
  55. echo "client {$fd} closed\n";
  56. $ser->close($fd);
  57. },
  58. 'onFinish' => function ($server, $task_id, $data){
  59. //worker 进程
  60. //服务器主动关闭TCP连接
  61. //$server->close($task_id); //主动关闭连接
  62. //\think\facade\Log::record('user login task finish', 'info');
  63. }
  64. ];

 


在onTask中,我们实例化了 \\app\\common\\service\\SwooleTask() 然后执行了$task->do($data);方法,用来执行耗时的操作。

### 如何去运行 swoole 创建的tcp服务器?

我们查看刚才安装的swoole sdk, 文件在 vendor\\thinkcmf\cmf-swoole\\src\\command\\server.php ; command 文件夹内是 cli 模式下执行的命令。
![2020-07-08T09:53:57.png][2]
 


### 最终

首先启动tcp 服务器

php think swoole:server 

然后,在要执行访问有赞云接口的方法中,sleep(50) ,休息50秒,模拟一下。

  1. <?php
  2. namespace app\common\service;
  3. use think\Db;
  4. class SwooleTask
  5. {
  6. //执行异步任务
  7. public function do(string $data){
  8. $start = time();
  9. sleep(50);
  10. $end = time();
  11. Db::name('swoole_log')->insert(['content' => $data . ";start:" .$start . ';end:' . $end, 'create_time' => time()]);
  12. return true;
  13. }
  14. }


当后台添加了用户后,直接返回了成功,投递了任务给tcp服务器,然后等待了50秒后, 再执行一个插入数据表的操作(模拟访问有赞云的接口,并等待了很长时间)。 果然在表中插入了数据。 到此验证完毕。 


  [1]: http://www.kevink.club/usr/uploads/2020/07/2026863592.png
  [2]: http://www.kevink.club/usr/uploads/2020/07/2677841699.png

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/351374
推荐阅读
相关标签
  

闽ICP备14008679号