赞
踩
请在根目录composer require simps/mqtt,安装好simps/mqtt后,在TP5项目public文件下,加入我写的mqtt服务端文件代码,文件名随便定义,运行方法:php mqtt_server.php,代码写得差,不爱请别伤害
:
<?php try { require_once __DIR__ . '/../thinkphp/base.php'; \think\Container::get('app')->path(__DIR__ . '/../application/')->initialize();//应用初始化,这里是为了我的redis文件调用一些函数 $server = new Swoole\Server('0.0.0.0', 9501, SWOOLE_PROCESS); $server->set([ 'open_mqtt_protocol' => true, // 启用 MQTT 协议 'worker_num' => 2, 'reactor_num' => 4, 'reload_async' => true,//设置异步重启开关。设置为 true 时,将启用异步安全重启特性,Worker 进程会等待异步事件完成后再退出 'dispatch_mode' => 2,//数据包分发策略。【默认值:2】 //模式值 模式 作用 //1 轮循模式 收到会轮循分配给每一个 Worker 进程 //2 固定模式 根据连接的文件描述符分配 Worker。这样可以保证同一个连接发来的数据只会被同一个 Worker 处理 //3 抢占模式 主进程会根据 Worker 的忙闲状态选择投递,只会投递给处于闲置状态的 Worker //4 IP 分配 根据客户端 IP 进行取模 hash,分配给一个固定的 Worker 进程。 //可以保证同一个来源 IP 的连接数据总会被分配到同一个 Worker 进程。算法为 ip2long(ClientIP) % worker_num //5 UID 分配 需要用户代码中调用 Server->bind() 将一个连接绑定 1 个 uid。然后底层根据 UID 的值分配到不同的 Worker 进程。 //算法为 UID % worker_num,如果需要使用字符串作为 UID,可以使用 crc32(UID_STRING) //7 stream 模式 空闲的 Worker 会 accept 连接,并接受 Reactor 的新请求 'daemonize' => false,//是否后台运行 // 'open_tcp_keepalive' => true,//在 TCP 中有一个 Keep-Alive 的机制可以检测死连接 // 'tcp_keepidle' => 4,//4s没有数据传输就进行检测 // 'tcp_keepinterval' => 1,//1s探测一次 // 'tcp_keepcount' => 5,//探测的次数,超过5次后还没回包close此连接 // 'heartbeat_check_interval' => 60,//启用心跳检测【默认值:false】 // 'heartbeat_idle_time' => '600',//连接最大允许空闲的时间 // 'package_max_length' => 2 * 1024 * 1024,//默认2m 'open_cpu_affinity' => true,//启用 CPU 亲和性设置 // 'buffer_input_size' => 2 * 1024 * 1024,//配置接收输入缓存区内存尺寸。【默认值:2M】 // 'buffer_output_size' => 2 * 1024 * 1024,//配置发送输出缓存区内存尺寸 // 'socket_buffer_size' => 2 * 1024 * 1024,//配置客户端连接的缓存区长度。【默认值:2M】 ]); $server->on('Connect', function ($server, $fd) { echo "Client:Connect.\n"; }); $server->on('Receive', function ($server, $fd, $reactor_id, $data) { try { $arr = \Simps\MQTT\Protocol\V3::unpack($data);//客户端连接信息 if (is_array($arr) && isset($arr['type'])) { switch ($arr['type']) { case \Simps\MQTT\Protocol\Types::CONNECT://客户端连接 //用户名密码连接 if (!isset($arr['user_name']) || empty($arr['user_name']) || !isset($arr['password']) || empty($arr['password'])) { $server->close($fd); return;//用户密码为空 } //判断连接 //遗嘱 if (isset($arr['will']['topic']) && !empty($arr['will']['topic'])) { $redis = new \app\common\model\Redis100_model(); $redis->hSet('will', $fd, json_encode(['message' => $arr['will']['message'], 'qos' => $arr['will']['qos'], 'topic' => $arr['will']['topic'], 'retain' => $arr['will']['retain']])); } $server->send($fd, \Simps\MQTT\Protocol\V3::pack(['type' => \Simps\MQTT\Protocol\Types::CONNACK, 'code' => 0, 'session_present' => 0]));//确认连接请求 break; case \Simps\MQTT\Protocol\Types::PUBLISH://发布,接收信息 $redis = new \app\common\model\Redis100_model(); foreach ($server->connections as $sub_fd) { $publish = $redis->sIsMember($sub_fd, $arr['topic']); if ($publish !== false) { //发送所有相同的订阅主题 $server->send($sub_fd, \Simps\MQTT\Protocol\V3::pack( [ 'type' => $arr['type'], 'topic' => $arr['topic'], 'message' => $arr['message'], 'dup' => $arr['dup'], 'qos' => $arr['qos'], 'retain' => $arr['retain'], 'message_id' => $arr['message_id'] ?? 0, ] )); } } if ($arr['qos'] === 1) {//发布消息的服务质量,即:保证消息传递的次数 $server->send($fd, \Simps\MQTT\Protocol\V3::pack( [ 'type' => \Simps\MQTT\Protocol\Types::PUBACK, 'message_id' => $arr['message_id'] ?? 0, ] )); } $redis->close(); // $offset = 2; // $topic = decodeString(substr($arr, $offset));//订阅主题 // $offset += strlen($topic) + 2; // $msg = substr($arr, $offset);//信息 // $server->send($fd, $arr); break; case \Simps\MQTT\Protocol\Types::SUBSCRIBE://订阅 $payload = []; $redis = new \app\common\model\Redis100_model(); $redis->del($fd);//防止旧数据 foreach ($arr['topics'] as $k => $qos) { $redis->sAdd($fd, $k); if (is_numeric($qos) && $qos < 3) { $payload[] = $qos; } else { $payload[] = 0x80; } } $redis->close(); $server->send($fd, \Simps\MQTT\Protocol\V3::pack(['type' => \Simps\MQTT\Protocol\Types::SUBACK, 'message_id' => $arr['message_id'] ?? 0, 'codes' => $payload])); break; case \Simps\MQTT\Protocol\Types::PINGREQ://ping心跳 if ($server->exist($fd)) { $server->send($fd, \Simps\MQTT\Protocol\V3::pack(['type' => \Simps\MQTT\Protocol\Types::PINGRESP])); } break; case \Simps\MQTT\Protocol\Types::DISCONNECT: if ($server->exist($fd)) { $redis = new \app\common\model\Redis100_model(); //转发遗嘱 $will = $redis->hGet('will', $fd); if (!empty($will)) { $will = json_decode($will, true); foreach ($server->connections as $sub_fd) { if ($sub_fd != $fd) { //发送所有相同的订阅主题 $server->send($sub_fd, \Simps\MQTT\Protocol\V3::pack( [ 'type' => 3, 'topic' => $will['topic'], 'message' => $will['message'], 'dup' => 0, 'qos' => $will['qos'], 'retain' => $will['retain'], 'message_id' => $arr['message_id'] ?? 0, ] )); } } if ($will['qos'] === 1) {//发布消息的服务质量,即:保证消息传递的次数 } } $redis->hdel('will', $fd); $redis->del($fd); $redis->close(); $server->close($fd); } break; case \Simps\MQTT\Protocol\Types::UNSUBSCRIBE: $server->send($fd, \Simps\MQTT\Protocol\V3::pack( [ 'type' => \Simps\MQTT\Protocol\Types::UNSUBACK, 'message_id' => $arr['message_id'] ?? 0, ] )); $redis = new \app\common\model\Redis100_model(); $redis->del($fd); $redis->close(); break; } } } catch (\Exception $e) { var_dump($e->getMessage()); } catch (\Throwable $e) { var_dump($e->getMessage()); } }); $server->on('Shutdown', function ($server) { echo "Shutdown\n"; }); $server->on('WorkerStop', function ($server, $workerId) { echo "WorkerStop\n"; }); $server->on('WorkerExit', function ($server, $workerId) { echo "WorkerExit\n"; }); $server->on('WorkerError', function ($server, $workerId, $worker_pid, $exit_code, $signal) { echo "WorkerError\n"; }); $server->on('ManagerStop', function ($server) { echo "ManagerStop\n"; }); $server->on('Close', function ($server, $fd) { $redis = new \app\common\model\Redis100_model(); $redis->hdel('will', $fd); $redis->del($fd); $redis->close(); echo "Client: Close.\n"; }); function decodeString($arr) { $length = 256 * ord($arr[0]) + ord($arr[1]); return substr($arr, 2, $length); } $server->start(); } catch (\Exception $e) { var_dump($e->getMessage()); } catch (\Throwable $e) { var_dump($e->getMessage()); }
喜欢的靓仔靓妹们,点个赞吧
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。