赞
踩
远程过程调用(Remote Procedure Call,简称RPC)是一种网络通信协议,允许程序在不同的地址空间(通常在不同的物理计算机上)中调用彼此的方法,好像它们是在本地执行的一样。RPC隐藏了底层的网络通信细节,使开发人员能够像调用本地函数一样简单地调用远程服务。
RPC的工作原理基于客户端-服务器模型,主要包括以下步骤:
1.客户端调用:客户端程序发起对某个远程过程的调用请求。
2.请求打包:调用参数被打包成消息,发送到服务器。
3.服务器解包和执行:服务器接收到消息,解包获取调用参数,执行相应的远程过程。
4.结果打包和返回:执行结果被打包成消息,发送回客户端。
5.客户端接收结果:客户端解包消息,获取调用结果
1.客户端代理:负责将本地调用请求转换为远程调用请求,打包参数,并通过网络发送给服务器。
2.服务器代理:负责接收客户端的请求,解包参数,调用相应的服务方法,并将结果打包返回给客户端。
3.通信协议:定义客户端和服务器之间如何通信,常见的协议有HTTP、TCP等。
4.编解码器:负责参数和结果的序列化和反序列化,常见的格式有JSON、XML、Protobuf等。
优点
1.简化远程调用:使得远程调用像本地调用一样简单,开发人员无需关心底层的网络通信细节。
2.语言无关:大多数RPC框架支持多种编程语言,方便不同语言的系统互操作。
缺点
1.调试困难:由于涉及网络通信,调试远程调用的问题比本地调用更加复杂。
2.可靠性要求高:需要处理网络延迟、丢包、超时等问题,增加了系统的复杂性。
3.耦合性:客户端和服务器需要共同遵循同一套接口定义,一旦接口发生变化,可能需要同时更新多个系统。
protoc.exe 生成C#文件
Gen.bat
@echo off rem 设置路径变量 set PROTOC_PATH="protoc.exe" set PROTO_DIR="Protos" set OUTPUT_DIR="ProtocolCodes" rem 创建日志头 echo .......................proto2C#....................... echo. rem 检查目录是否存在 if not exist %PROTO_DIR% ( echo Error: Protocols directory does not exist. echo Please create the Protocols directory and place your .proto files in it. echo. pause exit /b ) rem 创建输出目录 if not exist %OUTPUT_DIR% mkdir %OUTPUT_DIR% rem 批量处理 .proto 文件 for %%f in (%PROTO_DIR%\*.proto) do ( echo %%f complete %PROTOC_PATH% --proto_path=%PROTO_DIR% --csharp_out=%OUTPUT_DIR% %%f ) echo code generation complete. Press any key to close. pause > nul
1.使用反射自动获取所有RPC函数, 对其进行Hash绑定
函数的定义 RPCMsgHandles.cs
public sealed class RPCMsgHandles { private static void ReqMove(int unitId, Move move) { } private static void RecvAttack(int skillid, Attack attack, ItemList itemList) { LogHelper.Log($"Recv: skillid = {attack.Id}, targetId = {attack.TargetId}, itemList.Count = {itemList.Items.Count}"); } private static void RecvDelete(int msg) { LogHelper.Log($"Recv: state = {msg}"); } private static void RecvReflectMove(Move move) { LogHelper.Log($"move reflect sync: x:{move.X}, y:{move.Y}, speed:{move.Speed}, dir:{move.Dir}"); } }
使用反射进行函数绑定 RPCMoudle.cs
public sealed class RPCMoudle { private static Dictionary<int, IRPC> _msg = new Dictionary<int, IRPC>(); public static void Init() { System.Type type = typeof(RPCMsgHandles); MethodInfo[] methods = type.GetMethods(BindingFlags.Static | BindingFlags.NonPublic); foreach (MethodInfo methodInfo in methods) { RPC method = new RPC(methodInfo); int index = 0; ParameterInfo[] infos = methodInfo.GetParameters(); foreach (var info in infos) { if (typeof(IMessage).IsAssignableFrom(info.ParameterType)) { IMessage message = Activator.CreateInstance(info.ParameterType) as IMessage; method.AddParamType(DateType.Message); method.AddParam(index, message); } else { DateType dateType = GetDateType(info.ParameterType); method.AddParamType(dateType); } index++; } int hash = Globals.Hash(methodInfo.Name); if (_msg.ContainsKey(hash)) throw new Exception("AddParamType rpc _method hash conflict: " + methodInfo.Name); _msg.Add(hash, method); } } }
2.使用泛型手动进行RPC函数绑定
泛型类进行函数绑定 RPCMoudle.cs
public static void Register<T>(string methodName, Action<T> action) where T : class, IMessage, new() { int id = Globals.Hash(methodName); RPCStatic<T> method = new RPCStatic<T>(); method.Register(action, new T()); if (_msg.ContainsKey(id)) { LogHelper.LogError($"repeat id, id = {id}"); } _msg[id] = method; } public static void Unregister(string methodName) { int id = Globals.Hash(methodName); if (_msg.ContainsKey(id)) { _msg.Remove(id); } else { LogHelper.LogError($"no find method, id = {id}"); } }
Call的实现, encode数据到byte[],第一个参数必须为远程函数名字, 用于将函数名字的hashid写入数据头中, 这样远程服务器在解析数据的时候会先解析4字节的数据头表示函数的hashid
Call中的Send函数 是Socket发送协议, Send函数中会在数据头中写入数据的长度, 在接收方根据数据的长度接收完整数据 防止粘包
Call函数有多个方法重载, 根据业务需求使用
1.public static void Call(string methodName, IMessage message) 类型安全, 类型固定
2.public static void Call(string id, params object[] args) 类型不安全, 可以传入任何参数, 使用更加方便快捷
具体调用例子 Move move = new Move(); move.X = 10; move.Y = 20; move.Speed = 100; move.Dir = 20; 这里使用的是object[] args 类型不安全, 也会有装箱拆箱的开销, 使用这用方式需要前后端统一类型 使用起来简单方便, 业务逻辑开发上使用较为方便 比如请求领取奖励 RPCMoudle.Call("ReqAward", 传入表奖励id); 比如请求保存勾选 RPCMoudle.Call("Save", true); RPCMoudle.Call("ReqMove", 10016, move); 这样是类型安全的, 也不会存在装箱拆箱的开销 更加高效, 战斗场景较为适合 RPCMoudle.Call("ReqMove", move);
Call的实现, 将数据进行Encode转换成二进制
public static void Call(string methodName, IMessage message) { if (message == null) return; try { int id = Globals.Hash(methodName); int offset = 0; BuffMessage msg = GameFrame.message.GetBuffMessage(); BitConverter.TryWriteBytes(msg.bytes.AsSpan(offset), id); offset += sizeof(int); BitConverterHelper.WriteString(msg.bytes, ref offset, GameFrame.myRole.Id); BitConverterHelper.WriteMessage(msg.bytes, ref offset, message); msg.length = offset; Main.Instance.Send(msg); } catch(Exception ex) { LogHelper.LogError(ex.ToString()); } } public static void Call(string id, params object[] args) { try { Profiler.BeginSample("rpc call"); int hash = Globals.Hash(id); BuffMessage msg = Encode(hash, args); Main.Instance.Send(msg); Profiler.EndSample(); } catch(Exception ex) { LogHelper.LogError(ex.ToString()); } }
Encode函数的实现
private static BuffMessage Encode(int id, params object[] args) { int offset = 0; BuffMessage msg = GameFrame.message.GetBuffMessage(); BitConverter.TryWriteBytes(msg.bytes.AsSpan(offset), id); offset += sizeof(int); BitConverterHelper.WriteString(msg.bytes, ref offset, GameFrame.myRole.Id); foreach (object arg in args) { try { System.Type type = arg.GetType(); switch (arg) { case IMessage: BitConverterHelper.WriteMessage(msg.bytes, ref offset, (IMessage)arg); break; case Int16: BitConverterHelper.WriteInt16(msg.bytes, ref offset, (Int16)arg); break; case Int32: BitConverterHelper.WriteInt32(msg.bytes, ref offset, (Int32)arg); break; case Int64: BitConverterHelper.WriteInt64(msg.bytes, ref offset, (Int64)arg); break; case UInt16: BitConverterHelper.WriteUInt16(msg.bytes, ref offset, (UInt16)arg); break; case UInt32: BitConverterHelper.WriteUInt32(msg.bytes, ref offset, (UInt32)arg); break; case UInt64: BitConverterHelper.WriteUInt64(msg.bytes, ref offset, (UInt64)arg); break; case bool: BitConverterHelper.WriteBool(msg.bytes, ref offset, (bool)arg); break; case Byte: BitConverterHelper.WriteByte(msg.bytes, ref offset, (byte)arg); break; case SByte: BitConverterHelper.WriteByte(msg.bytes, ref offset, (byte)arg); break; case Char: BitConverterHelper.WriteChar(msg.bytes, ref offset, (Char)arg); break; case Single: BitConverterHelper.WriteSingle(msg.bytes, ref offset, (Single)arg); break; case Double: BitConverterHelper.WriteDouble(msg.bytes, ref offset, (Double)arg); break; case string: BitConverterHelper.WriteString(msg.bytes, ref offset, (string)arg); break; } } catch(Exception ex) { LogHelper.LogError($"id: {id}, " + ex.ToString()); msg.Dispose(); return msg; } } msg.length = offset; return msg; }
0GC的TryWriteBytes方案
namespace Game { public static class BitConverterHelper { private static readonly int BUFFER_SIZE = 1024 * 1024; private static readonly byte[] buffer = new byte[BUFFER_SIZE]; private static CodedOutputStream _stream; private static Stopwatch _watch; public static void Init() { CreateStream(); _watch = new Stopwatch(); _watch.Start(); } private static void CreateStream() { if (_stream != null) _stream.Dispose(); if (_watch != null) { _watch.Stop(); LogHelper.LogWarning($"create stream interval time: {_watch.ElapsedMilliseconds / 1000.0f} s"); _watch.Restart(); } _stream = new CodedOutputStream(buffer); } private static Span<byte> ToByteArray(IMessage message) { if (message == null) return new byte[0]; int length = message.CalculateSize(); if (length == 0) return new byte[0]; if (length >= BUFFER_SIZE) { throw new Exception($"overflow: message length >= {BUFFER_SIZE}"); } if (_stream.Position + length >= BUFFER_SIZE) CreateStream(); int position = (int)_stream.Position; message.WriteTo(_stream); return buffer.AsSpan(position, length); } public static void WriteInt16(byte[] buffer, ref int offset, Int16 arg) { Check(buffer, offset + 1); buffer[offset++] = (byte)DateType.Int16; Check(buffer, offset + sizeof(Int16)); BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg); offset += sizeof(Int16); } public static void WriteInt32(byte[] buffer, ref int offset, Int32 arg) { Check(buffer, offset + 1); buffer[offset++] = (byte)DateType.Int32; Check(buffer, offset + sizeof(Int32)); BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg); offset += sizeof(Int32); } public static void WriteInt64(byte[] buffer, ref int offset, Int64 arg) { Check(buffer, offset + 1); buffer[offset++] = (byte)DateType.Int64; Check(buffer, offset + sizeof(Int64)); BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg); offset += sizeof(Int64); } public static void WriteUInt16(byte[] buffer, ref int offset, UInt16 arg) { Check(buffer, offset + 1); buffer[offset++] = (byte)DateType.UInt16; Check(buffer, offset + sizeof(UInt16)); BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg); offset += sizeof(UInt16); } public static void WriteUInt32(byte[] buffer, ref int offset, UInt32 arg) { Check(buffer, offset + 1); buffer[offset++] = (byte)DateType.UInt32; Check(buffer, offset + sizeof(UInt32)); BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg); offset += sizeof(UInt32); } public static void WriteUInt64(byte[] buffer, ref int offset, UInt64 arg) { Check(buffer, offset + 1); buffer[offset++] = (byte)DateType.UInt64; Check(buffer, offset + sizeof(UInt64)); BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg); offset += sizeof(UInt64); } public static void WriteBool(byte[] buffer, ref int offset, bool arg) { Check(buffer, offset + 1); buffer[offset++] = (byte)DateType.Boolean; Check(buffer, offset + sizeof(bool)); BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg); offset += sizeof(bool); } public static void WriteByte(byte[] buffer, ref int offset, byte arg) { Check(buffer, offset + 1); buffer[offset++] = (byte)DateType.Byte; Check(buffer, offset + 1); buffer[offset++] = arg; } public static void WriteChar(byte[] buffer, ref int offset, Char arg) { Check(buffer, offset + 1); buffer[offset++] = (byte)DateType.Char; Check(buffer, offset + sizeof(Char)); BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg); offset += sizeof(Char); } public static void WriteSingle(byte[] buffer, ref int offset, Single arg) { Check(buffer, offset + 1); buffer[offset++] = (byte)DateType.Single; Check(buffer, offset + sizeof(Single)); BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg); offset += sizeof(Single); } public static void WriteDouble(byte[] buffer, ref int offset, Double arg) { Check(buffer, offset + 1); buffer[offset++] = (byte)DateType.Double; Check(buffer, offset + sizeof(Double)); BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg); offset += sizeof(Double); } public static void WriteString(byte[] buffer, ref int offset, string arg) { Check(buffer, offset + 1); buffer[offset++] = (byte)DateType.String; byte[] bytes = Encoding.UTF8.GetBytes(arg); Check(buffer, offset + bytes.Length); BitConverter.TryWriteBytes(buffer.AsSpan(offset), bytes.Length); offset += sizeof(int); Span<byte> target = new Span<byte>(buffer, offset, buffer.Length - offset); bytes.CopyTo(target); offset += bytes.Length; } public static void WriteMessage(byte[] buffer, ref int offset, IMessage arg) { IMessage message = arg; Span<byte> bytes = ToByteArray(message); Check(buffer, offset + 1); buffer[offset++] = (byte)DateType.Message; Check(buffer, offset + bytes.Length); BitConverter.TryWriteBytes(buffer.AsSpan(offset), bytes.Length); offset += sizeof(int); Span<byte> target = new Span<byte>(buffer, offset, bytes.Length); bytes.CopyTo(target); offset += bytes.Length; } private static void Check(byte[] buffer, int offset) { if (offset >= buffer.Length) throw new Exception($"date length: {offset} > {Globals.DATA_SZIE}, Invalid data!!"); } public static void Dispose() { _stream?.Dispose(); _stream = null; } } }
Decode 数据解析,调用本地方法
public static void OnRPC(BuffMessage msg)
{
if(msg == null)
{
LogHelper.LogError("socket recv error, msg == null");
return;
}
Decode(msg.bytes);
}
0GC的Decode方案
private static void Decode(byte[] buffer) { if (buffer == null || buffer.Length < sizeof(int)) { LogHelper.LogError("Invalid buffer received"); return; } int protoId = BitConverter.ToInt32(buffer, 0); if (!_msg.TryGetValue(protoId, out IRPC method)) { LogHelper.LogError($"Method not found for protoId: {protoId}"); return; } BuffMessage buffMessage = GameFrame.message.GetBuffMessage(); try { Array.Copy(buffer, sizeof(int), buffMessage.bytes, 0, buffer.Length - sizeof(int)); method.Decode(buffMessage.bytes); } catch (Exception ex) { LogHelper.LogError($"Error invoking method for protoId {protoId}: {ex.Message}"); } finally { GameFrame.message.PutBuffMessage(buffMessage); } }
namespace Game { public interface IRPC : IDisposable { public void Decode(byte[] buffer); } public abstract class RPCBase : IRPC { protected byte[] buffer; public abstract void Decode(byte[] buffer); protected ReadOnlySpan<byte> ReadData(DateType type, ref int offset) { ReadOnlySpan<byte> data = null; int length = GetLength(type); if (length > 0) { data = new ReadOnlySpan<byte>(buffer, offset, length); offset += length; } return data; } protected bool ToBoolean(ref int offset) { ReadOnlySpan<byte> data = ReadData(DateType.Boolean, ref offset); return BitConverter.ToBoolean(data); } protected Byte ToByte(ref int offset) { ReadOnlySpan<byte> data = ReadData(DateType.Char, ref offset); return data[0]; } protected char ToChar(ref int offset) { ReadOnlySpan<byte> data = ReadData(DateType.Char, ref offset); return BitConverter.ToChar(data); } protected Int16 ToInt16(ref int offset) { ReadOnlySpan<byte> data = ReadData(DateType.Int16, ref offset); return BitConverter.ToInt16(data); } protected UInt16 ToUInt16(ref int offset) { ReadOnlySpan<byte> data = ReadData(DateType.UInt16, ref offset); return BitConverter.ToUInt16(data); } protected Int32 ToInt32(ref int offset) { ReadOnlySpan<byte> data = ReadData(DateType.Int32, ref offset); return BitConverter.ToInt32(data); } protected UInt32 ToUInt32(ref int offset) { ReadOnlySpan<byte> data = ReadData(DateType.UInt32, ref offset); return BitConverter.ToUInt32(data); } protected Int64 ToInt64(ref int offset) { ReadOnlySpan<byte> data = ReadData(DateType.Int64, ref offset); return BitConverter.ToInt64(data); } protected UInt64 ToUInt64(ref int offset) { ReadOnlySpan<byte> data = ReadData(DateType.UInt64, ref offset); return BitConverter.ToUInt64(data); } protected Single ToSingle(ref int offset) { ReadOnlySpan<byte> data = ReadData(DateType.Single, ref offset); return BitConverter.ToSingle(data); } protected Double ToDouble(ref int offset) { ReadOnlySpan<byte> data = ReadData(DateType.Double, ref offset); return BitConverter.ToDouble(data); } protected string ToString(ref int offset) { ReadOnlySpan<byte> data = ReadData(DateType.String, ref offset); return DecodeString(ref offset); } protected IMessage ToMessage(ref int offset, IMessage message) { ReadOnlySpan<byte> data = ReadData(DateType.Message, ref offset); return DecodeMessage(ref offset, message); } private IMessage DecodeMessage(ref int offset, IMessage message) { int length = BitConverter.ToInt32(buffer, offset); offset += sizeof(int); ReadOnlySpan<byte> messageData = new ReadOnlySpan<byte>(buffer, offset, length); offset += length; return message.Descriptor.Parser.ParseFrom(messageData)!; } private string DecodeString(ref int offset) { int length = BitConverter.ToInt32(buffer, offset); offset += sizeof(int); ReadOnlySpan<byte> messageData = new ReadOnlySpan<byte>(buffer, offset, length); offset += length; return Encoding.UTF8.GetString(messageData); } private static int GetLength(DateType type) { switch (type) { case DateType.Boolean: return sizeof(bool); case DateType.Char: return sizeof(char); case DateType.SByte: case DateType.Byte: return sizeof(byte); case DateType.Int16: return sizeof(Int16); case DateType.UInt16: return sizeof(UInt16); case DateType.Int32: return sizeof(Int32); case DateType.UInt32: return sizeof(UInt32); case DateType.Int64: return sizeof(Int64); case DateType.UInt64: return sizeof(UInt64); case DateType.Single: return sizeof(Single); case DateType.Double: return sizeof(double); } return -1; } public virtual void Dispose() { buffer = null; } } }
Decode数据到对象列表, 然后Invoke
namespace Game { public class RPC : RPCBase { private MethodInfo _method; private List<DateType> _types; private List<object> _params; private Dictionary<int, IMessage> _param; private int _paramIndex; public RPC(MethodInfo method) { this._method = method; _types = new List<DateType>(); _params = new List<object>(); _param = new Dictionary<int, IMessage>(); } public void AddParamType(DateType type) { _types?.Add(type!); } public void AddParam(int index, IMessage message) { _param[index] = message; } public override void Decode(byte[] buffer) { base.buffer = buffer; _paramIndex = 0; int offset = 0; _params.Clear(); foreach (DateType type in _types) { DateType dateType = (DateType)buffer[offset++]; if (dateType != type) { LogHelper.LogError($"dateType bo equals, recv: {Enum.GetName(typeof(DateType), type)} != local: {Enum.GetName(typeof(DateType), dateType)}"); } object obj = ToObject(dateType, ref offset); _params.Add(obj!); _paramIndex++; } _method?.Invoke(null, _params.ToArray()); } private object ToObject(DateType type, ref int offset) { switch (type) { case DateType.Message: IMessage message = null; if (!_param!.TryGetValue(_paramIndex, out message)) { LogHelper.LogError("no find message"); return null; } return ToMessage(ref offset, message); case DateType.Boolean: return ToBoolean(ref offset); case DateType.Char: return ToChar(ref offset); case DateType.SByte: case DateType.Byte: return ToByte(ref offset); case DateType.Int16: return ToInt16(ref offset); case DateType.UInt16: return ToUInt16(ref offset); case DateType.Int32: return ToInt32(ref offset); case DateType.UInt32: return ToUInt32(ref offset); case DateType.Int64: return ToInt64(ref offset); case DateType.UInt64: return ToUInt64(ref offset); case DateType.Single: return ToSingle(ref offset); case DateType.Double: return ToDouble(ref offset); case DateType.String: return ToString(ref offset); default: LogHelper.LogError("no find dateType: " + type); break; } return null; } public override void Dispose() { base.Dispose(); _method = null; _types = null; } } }
泛型Decode,然后Invokde
namespace Game { public class RPCStatic<T> : RPCBase { private Action<T> _action; private IMessage _message; public RPCStatic() { } public virtual void Register(Action<T> action, IMessage message) { this._message = message; this._action = action; } public override void Decode(byte[] buffer) { base.buffer = buffer; int offset = 0; DateType dateType = (DateType)buffer[offset++]; try { if (dateType == DateType.Message) { IMessage arg = ToMessage(ref offset, _message); _action?.Invoke((T)arg); } else { LogHelper.LogError($"invoke error, type != DateType.Message, type = {dateType}"); } } catch (Exception ex) { LogHelper.LogError(ex.ToString()); } } public override void Dispose() { } } }
使用UniTask实现的多线程异步收发消息,处理了超时重发和异常处理,接收消息时的粘包处理
namespace Game { public enum SocketState { None = 0, Connected = 1, Disconnected = 2, Connecting = 3, ConnectFailed = 4, Close = 5, Dispose = 6, } public class Tcp { private ConcurrentQueue<BuffMessage> _sendMsgs; private ConcurrentQueue<BuffMessage> _receiveMsgs; private TcpClient _tcpClient; private SocketState _socketState; private byte[] _recvBuff; private int _recvOffset; private int _delay = 10; private CancellationTokenSource _recvCancelToken; private CancellationTokenSource _sendCancelToken; public SocketState State { get { return _socketState; } } public string IP { get; set; } public int Port { get; set; } public NetworkStream Stream { get { return _tcpClient.GetStream(); } } public Tcp() { _sendMsgs = new ConcurrentQueue<BuffMessage>(); _receiveMsgs = new ConcurrentQueue<BuffMessage>(); _recvBuff = new byte[Globals.BUFFER_SIZE]; } private void InitTcpClient() { _tcpClient = new TcpClient(); _recvCancelToken = new CancellationTokenSource(); _sendCancelToken = new CancellationTokenSource(); } public void Update() { Profiler.BeginSample("on tcp rpc"); if (_receiveMsgs.TryDequeue(out BuffMessage msg)) { RPCMoudle.OnRPC(msg); GameFrame.message.PutBuffMessage(msg); } Profiler.EndSample(); } public void Connect(string ip, int port) { IP = ip; Port = port; Connect(); } public async void Connect() { try { Close(); InitTcpClient(); SetSocketState(SocketState.Connecting); await _tcpClient.ConnectAsync(IP, Port); OnConnect(); } catch (Exception ex) { LogHelper.LogError(ex.ToString()); } } private void OnConnect() { try { if (_tcpClient.Connected) { LogHelper.Log("connected..."); SetSocketState(SocketState.Connected); StartAsyncTasks(); } else { SetSocketState(SocketState.ConnectFailed); } } catch (Exception ex) { LogHelper.LogError("连接或通信发生错误:{0}" + ex.Message); SetSocketState(SocketState.ConnectFailed); } } private void StartAsyncTasks() { UniTask send = UniTask.Create(SendThread); UniTask recv = UniTask.Create(RecvThread); } private async UniTask SendThread() { await UniTask.SwitchToThreadPool(); while (_socketState == SocketState.Connected) { while (true) { if (!_sendMsgs.TryDequeue(out BuffMessage msg)) break; var timeoutToken = new CancellationTokenSource(); timeoutToken.CancelAfterSlim(TimeSpan.FromMilliseconds(msg.TimeoutMillisecond)); var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_sendCancelToken.Token, timeoutToken.Token); try { if(_sendCancelToken.IsCancellationRequested) break; await Stream.WriteAsync(msg.bytes, 0, msg.length, linkedCts.Token); LogHelper.Log($"发送完成: {msg.length} byte"); GameFrame.message.PutBuffMessage(msg); } catch (OperationCanceledException ex) { if (timeoutToken.IsCancellationRequested) { _sendMsgs.Enqueue(msg); LogHelper.LogWarning("消息发送超时, 添加到队列末尾, 等待发送..."); await UniTask.Delay(10); continue; } LogHelper.LogWarning("发送操作被终止..." + ex.Message); break; } catch (IOException ex) when (ex.InnerException is SocketException socketEx && socketEx.SocketErrorCode == SocketError.ConnectionAborted) { LogHelper.Log("发送操作被终止..."); break; } catch (Exception ex) { LogHelper.LogError("发送错误: " + ex.Message); break; } } await UniTask.Delay(_delay); } } private async UniTask RecvThread() { await UniTask.SwitchToThreadPool(); while (_socketState == SocketState.Connected) { try { if (_recvCancelToken.IsCancellationRequested) break; int length = await Stream.ReadAsync(_recvBuff, _recvOffset, _recvBuff.Length - _recvOffset, _recvCancelToken.Token); if (length == 0) { LogHelper.Log("connect failed..."); break; } _recvOffset += length; int offset = 0; while (true) { if (_recvOffset - offset < sizeof(int)) // 没有足够的数据读取下一个消息的长度 break; int dataLength = BitConverter.ToInt32(_recvBuff, offset); if (_recvOffset - offset < dataLength + sizeof(int)) // 没有足够的数据读取完整的消息 break; // 读取完整消息 BuffMessage msg = GameFrame.message.GetBuffMessage(); Buffer.BlockCopy(_recvBuff, offset + sizeof(int), msg.bytes, 0, dataLength); _receiveMsgs.Enqueue(msg); // 移动偏移量到下一个消息 offset += sizeof(int) + dataLength; } // 将未处理的数据移到缓冲区开头 if (_recvOffset - offset > 0) Buffer.BlockCopy(_recvBuff, offset, _recvBuff, 0, _recvOffset - offset); _recvOffset -= offset; } catch(OperationCanceledException ex) { LogHelper.Log("读取操作被终止: " + ex.Message); break; } catch (IOException ex) when (ex.InnerException is SocketException socketEx && socketEx.SocketErrorCode == SocketError.OperationAborted) { LogHelper.Log("读取操作被终止..."); break; } catch (Exception ex) { LogHelper.LogError("读取错误: " + ex.ToString()); break; } await UniTask.Delay(_delay); } } private void SetSocketState(SocketState state) { _socketState = state; } public void Send(BuffMessage message) { if (message.length > 0) { int headLength = sizeof(int); Buffer.BlockCopy(message.bytes, 0, message.bytes, headLength, message.length); BitConverter.TryWriteBytes(message.bytes.AsSpan(0), message.length); message.length += headLength; _sendMsgs.Enqueue(message); } else { GameFrame.message.PutBuffMessage(message); } } public void Close() { if (_tcpClient == null) return; try { if (_tcpClient.Connected) { SetSocketState(SocketState.Close); _recvCancelToken.Dispose(); _sendCancelToken.Dispose(); _tcpClient.Close(); } } catch (Exception ex) { LogHelper.LogError(ex.ToString()); } } public void Dispose() { Close(); if (_tcpClient != null) { _tcpClient.Dispose(); _tcpClient = null; } if (_sendMsgs != null) { _sendMsgs.Clear(); _sendMsgs = null; } if (_receiveMsgs != null) { _receiveMsgs.Clear(); _receiveMsgs = null; } SetSocketState(SocketState.Dispose); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。