当前位置:   article > 正文

tp5使用workerman实现异步任务_tp5 workerman开发异步任务分发

tp5 workerman开发异步任务分发

问题描述:

采集数据时过程很慢,导致无法继续进行其他任务,,避免主业务被长时间阻塞,故而将其提交给异步任务,当任务完成通知客户端即可

流程

前端业务:

由于本系统采用iframe结构,为避免点击其他页面业务中断,所以业务在父页面执行,

1.用户在子页面点击采集按钮调用父级方法

  1. function to_collect(ids) {
  2. window.parent.startCollect(ids);
  3. }

2.父级页面进行socket链接,当收到服务器处理完任务消息时关闭socket并通知用户结果

  1. function startCollect(ids)
  2. {
  3. var wsServer = 'ws://127.0.0.1:5432';
  4. var websocket = new WebSocket(wsServer);
  5. var inter_val = 0;
  6. websocket.onopen = function (evt) {
  7. console.log("Connected to WebSocket server.");
  8. var data = {ids:ids};
  9. data = JSON.stringify(data);
  10. websocket.send(data);
  11. //设置心跳,避免服务器断开
  12. inter_val = setInterval(function () {
  13. websocket.send('hello');
  14. }, 50000)
  15. };
  16. websocket.onclose = function (evt) {
  17. console.log("Disconnected");
  18. };
  19. websocket.onmessage = function (evt) {
  20. console.log('Retrieved data from server: ' + evt.data);
  21. if (isJson(evt.data)) {
  22. var res = JSON.parse(evt.data);
  23. if(res.code == 0){
  24. alert("采集条数:"+res.msg)
  25. websocket.close();
  26. clearInterval(inter_val);//关闭定时器
  27. }
  28. }
  29. };
  30. websocket.onerror = function (evt, e) {
  31. console.log('Error occured: ' + evt.data);
  32. };
  33. }
  34. /**
  35. * 判断是否json
  36. * @param $string
  37. * @returns {boolean}
  38. */
  39. function isJson($string)
  40. {
  41. try {
  42. if(typeof JSON.parse($string) == 'object')
  43. return true;
  44. return false;
  45. } catch (e) {
  46. console.log(e);
  47. return false;
  48. }
  49. }

服务端

1. 收到前端发来的数据,调用model进行业务处理,然后通知客户端

  1. <?php
  2. namespace app\http;
  3. use app\common\model\Collect;
  4. use think\worker\Server;
  5. use Workerman\Lib\Timer;
  6. use Workerman\Worker as W;
  7. class Worker2 extends Server
  8. {
  9. protected $socket = 'websocket://0.0.0.0:5432';
  10. protected $option = [
  11. 'count'=> 4,
  12. ];
  13. /**
  14. * 每个进程启动
  15. * @param $worker
  16. */
  17. public function onWorkerStart($worker)
  18. {
  19. // 心跳间隔55秒
  20. define('HEARTBEAT_TIME', 55);
  21. Timer::add(1, function()use($worker){
  22. $time_now = time();
  23. foreach($worker->connections as $connection) {
  24. // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
  25. if (empty($connection->lastMessageTime)) {
  26. $connection->lastMessageTime = $time_now;
  27. continue;
  28. }
  29. // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
  30. if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
  31. $connection->close();
  32. }
  33. }
  34. });
  35. }
  36. public function onMessage($connection,$data)
  37. {
  38. global $worker;
  39. // 判断当前客户端是否已经验证,即是否设置了uid
  40. if(!isset($connection->uid))
  41. {
  42. // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
  43. $connection->uid = ip2long($connection->getRemoteIp()).time().rand(1,9999);
  44. /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
  45. * 实现针对特定uid推送数据
  46. */
  47. $worker->uidConnections[$connection->uid] = $connection;
  48. $connection->send('login success, your uid is ' . $connection->uid);
  49. }
  50. $ids = json_decode($data,true)['ids'] ?? 0;
  51. if ($ids)
  52. {
  53. $collect_model = new Collect();
  54. $res = $collect_model->getNewestArticle($ids);
  55. $res = json_encode(['code' => 0,'msg' =>$res]);
  56. $connection->send($res);
  57. }
  58. // 给connection临时设置一个lastMessageTime属性,用来记录上次收到消息的时间
  59. $connection->lastMessageTime = time();
  60. //$connection->send('receive success');
  61. echo $data;
  62. echo "\n";
  63. }
  64. public function onConnect($connection)
  65. {
  66. }
  67. /**
  68. * 当连接断开时触发的回调函数
  69. * @param $connection
  70. */
  71. public function onClose($connection)
  72. {
  73. global $worker;
  74. if(isset($connection->uid))
  75. {
  76. // 连接断开时删除映射
  77. unset($worker->uidConnections[$connection->uid]);
  78. }
  79. }
  80. /**
  81. * 当客户端的连接上发生错误时触发
  82. * @param $connection
  83. * @param $code
  84. * @param $msg
  85. */
  86. public function onError($connection, $code, $msg)
  87. {
  88. echo "error $code $msg\n";
  89. }
  90. // 针对uid推送数据
  91. public function sendMessageByUid($uid, $message)
  92. {
  93. global $worker;
  94. if(isset($worker->uidConnections[$uid]))
  95. {
  96. $connection = $worker->uidConnections[$uid];
  97. $connection->send($message);
  98. return true;
  99. }
  100. return false;
  101. }
  102. }

启动  workerman

执行结果

thinkphp5中如何开启workman请参照https://blog.csdn.net/flysnownet/article/details/96475927

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/309425
推荐阅读
相关标签
  

闽ICP备14008679号