赞
踩
我使用的是 swoole 5.x
定时器会自动推送队列里面的消息
<?php
namespace App\Console\Commands;
use App\Services\SocketService;
use Illuminate\Console\Command;
class WsServer extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'app:wsServer';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Execute the console command.
*/
public function handle()
{
$SocketService = new SocketService();
$SocketService->start();
}
}
<?php
namespace App\Services;
use Swoole\WebSocket\Server;
use Swoole\Timer;
use Illuminate\Support\Facades\Redis;
use RedisException;
use Swoole\Http\Request;
class SocketService
{
public $port = 9501;
public $server;
public $links;
public $cmds = [];
public function __construct ()
{
$this->links = collect([]);
$this->server = new Server("0.0.0.0", env('APP_SOCKET_PORT', $this->port ));
$this->server->on( 'open', function (Server $server, Request $request){
$this->open( $server, $request );
} );
$this->server->on( 'message', function (Server $server, $frame){
$this->message( $server, $frame );
} );
$this->server->on( 'close', function (Server $server, $fd){
$this->close( $server, $fd );
} );
}
public function start()
{
$this->linkManage();
$this->listenQueue();
$this->server->start();
}
public function print( $message, $level = 'info' )
{
if( is_array($message) || is_object($message) ){
$message = json_encode($message, 320);
}
print_r( "[". date("Y-m-d H:i:s") ."] " . $level . ' ' . $message . "\n" );
}
public function linkManage()
{
Timer::tick( 100, function (){
//var_dump( "listenQueue while: " . json_encode($this->cmds, 320) );
$cmd = array_shift( $this->cmds );
if( $cmd ){
switch ( $cmd['operate'] ){
case 'open':
// 活跃
$this->links->push( [ "fd" => $cmd['fd'], "user_id" => intval($cmd['user_id']??0), 'updated_at' => date("Y-m-d H:i:s") ] );
$this->print( "添加客户端:fd = " . json_encode($cmd, 320) );
break;
case 'close':
$newLinks = [];
foreach ( $this->links as $link ){
if( $link['fd'] == $cmd['fd'] ){
continue;
}
$newLinks[] = $link;
}
$this->links = collect( $newLinks );
$this->print( "删除客户端:fd = " . json_encode($cmd, 320) );
break;
case 'heartbeat':
$newLinks = [];
foreach ( $this->links as $link ){
if( $link['fd'] == $cmd['fd'] ){
$link['updated_at'] = date("Y-m-d H:i:s");
}
$newLinks[] = $link;
}
$this->links = collect( $newLinks );
break;
}
// $this->print( "连接数量是:" . $this->links->count() );
// $this->print( "连接数量是:" . $this->links->toJson() );
}
$newLinks = [];
foreach ( $this->links as $link ){
if( strtotime( $link['updated_at'] ) < (time() - 60) ){
$this->print( "长时间未心跳,删除客户端:fd = " . json_encode($link, 320) );
if( $this->server->isEstablished( $link['fd'] ) ){
$this->disconnect( $link['fd'], '未进行心跳' );
}
continue;
}
$newLinks[] = $link;
}
$this->links = collect( $newLinks );
} );
}
public function listenQueue()
{
Timer::tick( 1000, function (){
// Redis::rpush( "SocketService:listenQueue", serialize(["hahah"]) )
try{
$element = Redis::lpop('SocketService:listenQueue');
if( $element ){
$this->print( "listenQueue 有新的信息哦:" . $element );
$data = unserialize($element);
if( ! empty( $data['user_id']) ){
$links = $this->links->where( "user_id", $data['user_id'] )->values()->all();
if( empty($links) ){
$this->print( "没有在线用户:user_id = " . json_encode($data, 320) );
//var_export( $this->links );
//var_export( $links );
}
foreach ( $links as $link ){
if( ! $this->server->isEstablished( $link['fd'] ) ){
array_push( $this->cmds, [ 'operate' => 'close', 'fd' => $link['fd'] ] );
continue;
}
try{
// 生成消息数据
$message = $this->makeMessage( $data['data'], $data['type'], $data['message'] );
// 开始推送
$this->runPush( $link['fd'], $message );
}catch (\Throwable $e){
$this->print( "数据推送异常:" . json_encode([ $e->getMessage(),$e->getLine(), $e->getFile() ], 320) );
}
}
}
}
}catch (RedisException $e){
Redis::connect();
}
});
}
public function open( Server $server, Request $request )
{
$params = $request->get;
if( empty( $params['user_id'] ) ){
$this->disconnect( $request->fd, '缺少用户信息' );
return true;
}
array_push( $this->cmds, [ 'operate' => 'open', 'fd' => $request->fd, 'user_id' => $params['user_id'] ] );
// 生成消息数据
$message = $this->makeMessage( [ 'fd' => $request->fd ], "connectionSuccessful", "连接成功" );
// 开始推送
$this->runPush( $request->fd, $message );
$this->print( "server: handshake success with fd{$request->fd} " );
}
public function message( Server $server, $frame )
{
//
$data = json_decode( $frame->data, true );
if( is_array( $data ) ){
if( $data['type'] == "ping" ){
array_push( $this->cmds, [ 'operate' => 'heartbeat', 'fd' => $frame->fd ] );
$this->server->push( $frame->fd, json_encode( [ "type" => "pong" ] , 320 ) );
}else{
$this->print( "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish} " );
}
}
}
public function close(Server $server, $fd)
{
array_push( $this->cmds, [ 'operate' => 'close', 'fd' => $fd ] );
$this->print( "client {$fd} closed " );
}
public function push( $fd, string $data )
{
$this->server->push($fd, $data);
}
public function disconnect(int $fd, string $reason = '', int $code = SWOOLE_WEBSOCKET_CLOSE_NORMAL)
{
$this->server->disconnect($fd, $code, $reason);
}
public function makeMessage( array $data, $type = "", $message = "" )
{
return [ 'type' => $type, "message" => $message, "data" => $data ];
}
public function runPush( $fd, $message )
{
$this->print( "推送消息: {$fd} - " . json_encode( $message, 320 ) );
$this->server->push( $fd, json_encode( $message , 320 ) );
}
/**
* App\Services\SocketService::testMessage( 92 )
* @param $user_id
* @return void
*/
public static function testMessage( $user_id )
{
Redis::rpush( "SocketService:listenQueue", serialize([
"user_id" => $user_id,
"type" => "testMessage", "message" => "测试消息", "data" => [
"hello world!"
],
]) );
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。