当前位置:   article > 正文

thinkphp5 开发mqtt_swoole mqtt

swoole mqtt

thinkphp5 开发mqtt

1,这里要安装swoole,创建TP5项目redis环境,请自行安装

2,可先了解mqtt的协议,相关文档为:mqtt

3,创建好TP5项目后

请在根目录composer require simps/mqtt,安装好simps/mqtt后,在TP5项目public文件下,加入我写的mqtt服务端文件代码,文件名随便定义,运行方法:php mqtt_server.php,代码写得差,不爱请别伤害

<?php

try {

    require_once __DIR__ . '/../thinkphp/base.php';
    \think\Container::get('app')->path(__DIR__ . '/../application/')->initialize();//应用初始化,这里是为了我的redis文件调用一些函数

    $server = new Swoole\Server('0.0.0.0', 9501, SWOOLE_PROCESS);

    $server->set([
        'open_mqtt_protocol' => true, // 启用 MQTT 协议
        'worker_num' => 2,
        'reactor_num' => 4,
        'reload_async' => true,//设置异步重启开关。设置为 true 时,将启用异步安全重启特性,Worker 进程会等待异步事件完成后再退出
        'dispatch_mode' => 2,//数据包分发策略。【默认值:2】
        //模式值	模式	作用
        //1	轮循模式	收到会轮循分配给每一个 Worker 进程
        //2	固定模式	根据连接的文件描述符分配 Worker。这样可以保证同一个连接发来的数据只会被同一个 Worker 处理
        //3	抢占模式	主进程会根据 Worker 的忙闲状态选择投递,只会投递给处于闲置状态的 Worker
        //4	IP 分配	根据客户端 IP 进行取模 hash,分配给一个固定的 Worker 进程。
        //可以保证同一个来源 IP 的连接数据总会被分配到同一个 Worker 进程。算法为 ip2long(ClientIP) % worker_num
        //5	UID 分配	需要用户代码中调用 Server->bind() 将一个连接绑定 1 个 uid。然后底层根据 UID 的值分配到不同的 Worker 进程。
        //算法为 UID % worker_num,如果需要使用字符串作为 UID,可以使用 crc32(UID_STRING)
        //7	stream 模式	空闲的 Worker 会 accept 连接,并接受 Reactor 的新请求
        'daemonize' => false,//是否后台运行
//    'open_tcp_keepalive' => true,//在 TCP 中有一个 Keep-Alive 的机制可以检测死连接
//    'tcp_keepidle' => 4,//4s没有数据传输就进行检测
//    'tcp_keepinterval' => 1,//1s探测一次
//    'tcp_keepcount' => 5,//探测的次数,超过5次后还没回包close此连接
//    'heartbeat_check_interval' => 60,//启用心跳检测【默认值:false】
//    'heartbeat_idle_time' => '600',//连接最大允许空闲的时间
//    'package_max_length' => 2 * 1024 * 1024,//默认2m
        'open_cpu_affinity' => true,//启用 CPU 亲和性设置
//    'buffer_input_size' => 2 * 1024 * 1024,//配置接收输入缓存区内存尺寸。【默认值:2M】
//    'buffer_output_size' => 2 * 1024 * 1024,//配置发送输出缓存区内存尺寸
//    'socket_buffer_size' => 2 * 1024 * 1024,//配置客户端连接的缓存区长度。【默认值:2M】
    ]);

    $server->on('Connect', function ($server, $fd) {
        echo "Client:Connect.\n";
    });

    $server->on('Receive', function ($server, $fd, $reactor_id, $data) {
        try {
            $arr = \Simps\MQTT\Protocol\V3::unpack($data);//客户端连接信息
            if (is_array($arr) && isset($arr['type'])) {
                switch ($arr['type']) {
                    case \Simps\MQTT\Protocol\Types::CONNECT://客户端连接
                        //用户名密码连接
                        if (!isset($arr['user_name']) || empty($arr['user_name']) || !isset($arr['password']) || empty($arr['password'])) {
                            $server->close($fd);
                            return;//用户密码为空
                        }
                        //判断连接

                        //遗嘱
                        if (isset($arr['will']['topic']) && !empty($arr['will']['topic'])) {
                            $redis = new \app\common\model\Redis100_model();
                            $redis->hSet('will', $fd, json_encode(['message' => $arr['will']['message'], 'qos' => $arr['will']['qos'], 'topic' => $arr['will']['topic'], 'retain' => $arr['will']['retain']]));
                        }
                        $server->send($fd, \Simps\MQTT\Protocol\V3::pack(['type' => \Simps\MQTT\Protocol\Types::CONNACK, 'code' => 0, 'session_present' => 0]));//确认连接请求
                        break;
                    case \Simps\MQTT\Protocol\Types::PUBLISH://发布,接收信息
                        $redis = new \app\common\model\Redis100_model();
                        foreach ($server->connections as $sub_fd) {
                            $publish = $redis->sIsMember($sub_fd, $arr['topic']);
                            if ($publish !== false) {
                                //发送所有相同的订阅主题
                                $server->send($sub_fd, \Simps\MQTT\Protocol\V3::pack(
                                    [
                                        'type' => $arr['type'],
                                        'topic' => $arr['topic'],
                                        'message' => $arr['message'],
                                        'dup' => $arr['dup'],
                                        'qos' => $arr['qos'],
                                        'retain' => $arr['retain'],
                                        'message_id' => $arr['message_id'] ?? 0,
                                    ]
                                ));
                            }
                        }
                        if ($arr['qos'] === 1) {//发布消息的服务质量,即:保证消息传递的次数
                            $server->send($fd, \Simps\MQTT\Protocol\V3::pack(
                                [
                                    'type' => \Simps\MQTT\Protocol\Types::PUBACK,
                                    'message_id' => $arr['message_id'] ?? 0,
                                ]
                            ));
                        }
                        $redis->close();
//                    $offset = 2;
//                    $topic = decodeString(substr($arr, $offset));//订阅主题
//                    $offset += strlen($topic) + 2;
//                    $msg = substr($arr, $offset);//信息
//                    $server->send($fd, $arr);
                        break;
                    case \Simps\MQTT\Protocol\Types::SUBSCRIBE://订阅
                        $payload = [];
                        $redis = new \app\common\model\Redis100_model();
                        $redis->del($fd);//防止旧数据
                        foreach ($arr['topics'] as $k => $qos) {
                            $redis->sAdd($fd, $k);
                            if (is_numeric($qos) && $qos < 3) {
                                $payload[] = $qos;
                            } else {
                                $payload[] = 0x80;
                            }
                        }
                        $redis->close();
                        $server->send($fd, \Simps\MQTT\Protocol\V3::pack(['type' => \Simps\MQTT\Protocol\Types::SUBACK, 'message_id' => $arr['message_id'] ?? 0, 'codes' => $payload]));
                        break;
                    case \Simps\MQTT\Protocol\Types::PINGREQ://ping心跳
                        if ($server->exist($fd)) {
                            $server->send($fd, \Simps\MQTT\Protocol\V3::pack(['type' => \Simps\MQTT\Protocol\Types::PINGRESP]));
                        }
                        break;
                    case \Simps\MQTT\Protocol\Types::DISCONNECT:
                        if ($server->exist($fd)) {
                            $redis = new \app\common\model\Redis100_model();
                            //转发遗嘱
                            $will = $redis->hGet('will', $fd);
                            if (!empty($will)) {
                                $will = json_decode($will, true);
                                foreach ($server->connections as $sub_fd) {
                                    if ($sub_fd != $fd) {
                                        //发送所有相同的订阅主题
                                        $server->send($sub_fd, \Simps\MQTT\Protocol\V3::pack(
                                            [
                                                'type' => 3,
                                                'topic' => $will['topic'],
                                                'message' => $will['message'],
                                                'dup' => 0,
                                                'qos' => $will['qos'],
                                                'retain' => $will['retain'],
                                                'message_id' => $arr['message_id'] ?? 0,
                                            ]
                                        ));
                                    }
                                }
                                if ($will['qos'] === 1) {//发布消息的服务质量,即:保证消息传递的次数

                                }
                            }
                            $redis->hdel('will', $fd);
                            $redis->del($fd);
                            $redis->close();
                            $server->close($fd);
                        }
                        break;
                    case \Simps\MQTT\Protocol\Types::UNSUBSCRIBE:
                        $server->send($fd, \Simps\MQTT\Protocol\V3::pack(
                            [
                                'type' => \Simps\MQTT\Protocol\Types::UNSUBACK,
                                'message_id' => $arr['message_id'] ?? 0,
                            ]
                        ));
                        $redis = new \app\common\model\Redis100_model();
                        $redis->del($fd);
                        $redis->close();
                        break;
                }
            }
        } catch (\Exception $e) {
            var_dump($e->getMessage());
        } catch (\Throwable $e) {
            var_dump($e->getMessage());
        }
    });

    $server->on('Shutdown', function ($server) {
        echo "Shutdown\n";
    });

    $server->on('WorkerStop', function ($server, $workerId) {
        echo "WorkerStop\n";
    });

    $server->on('WorkerExit', function ($server, $workerId) {
        echo "WorkerExit\n";
    });

    $server->on('WorkerError', function ($server, $workerId, $worker_pid, $exit_code, $signal) {
        echo "WorkerError\n";
    });

    $server->on('ManagerStop', function ($server) {
        echo "ManagerStop\n";
    });

    $server->on('Close', function ($server, $fd) {
        $redis = new \app\common\model\Redis100_model();
        $redis->hdel('will', $fd);
        $redis->del($fd);
        $redis->close();
        echo "Client: Close.\n";
    });

    function decodeString($arr)
    {
        $length = 256 * ord($arr[0]) + ord($arr[1]);
        return substr($arr, 2, $length);
    }

    $server->start();
} catch (\Exception $e) {
    var_dump($e->getMessage());
} catch (\Throwable $e) {
    var_dump($e->getMessage());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209

4,当然你觉得这代码不行,也可以根据文档来改(有客户端代码),请点击查看simps/mqtt文档

喜欢的靓仔靓妹们,点个赞吧

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/257695
推荐阅读
相关标签
  

闽ICP备14008679号