赞
踩
为了使数据在网络上能够从源到达目的,网络通信的参与方必须遵循相同的规则,我们将这套规则称为协议(protocol),而协议最终都需要通过计算机语言的方式表示出来。只有通信计算机双方都遵守相同的协议,计算机之间才能互相通信交流。
通俗的讲,协议本质就是双方约定好的结构化的数据。
假设我们此时要实现一个网络版计算器,那么很明显我们需要“操作数1”、“操作符”、“操作数2”等多个信息,为了减小服务器的压力,我们一定不能将这些信息分批发送,一定是打包好成为一个数据发送给对端,那么此时就有了以下方案:
定制结构体+序列化和反序列化
在之前实现的聊天室服务器中,用户发送数据实际上发送的不仅仅有“message”,还可能包括用户名、发送时间等信息,很明显这就是一个结构化的数据,那序列化就是将该结构化的数据转化为字符串方便网络发送,反序列化就是把信息一变多,方便上层读取处理。
那么read
、write
或者是recv
、send
函数是在网络中是如何工作起来的呢?
在系统中,我们是知道read
、write
函数的工作过程的,比如write
函数将用户缓冲区中的数据拷贝到文件描述符所指向的文件结构体的内核缓冲区,操作系统会在合适的时间将内核缓冲区的内容刷新到磁盘上。
那在网络中也是一样的,只不过内核缓冲区变成了传输层维护的发送缓冲区和接收缓冲区(实际上也是内核级缓冲区,这块是方便理解),那么什么时候发?怎么发?出错了怎么办?这些问题就是TCP协议需要考虑解决的问题,所以TCP协议即(Transmission Control Protocol)传输控制协议,这个控制就体现在这了。
所以read
、write
或者是recv
、send
函数本质上是拷贝函数,他们完成的工作无非就是将一块区域的数据拷贝到另一块区域,即发送数据的本质就是将自己的发送缓冲区中的数据拷贝到接收方的接收缓冲区,所以也可以说通信的本质就是拷贝,也证明双发的主机通信本质是双方操作系统在进行通信。
TCP协议支持全双工的原因也可以找到了:TCP协议拥有两块缓冲区:发送缓冲区、接收缓冲区,这两块缓冲区互不干扰,仅需一个文件描述符fd
就可以实现,因为写是向发送缓冲区写,读是在接收缓冲区读。
而且这些过程像极了系统中学习的管道、文件部分,比如read
函数为什么会阻塞,因为接收缓冲区中无数据,所以系统对于网络学习是十分重要的。
但既然TCP是面向字节流的,那我们该如何解决数据传输过程中数据缺失的问题呢?毕竟在传输层眼里数据都是字节流,无法识别出字段含义的。
为什么不能将结构体直接发送呢?还需要转化成字符串发送?
首先我们先将套接字进行封装,封装的主要目的是为了简化操作,让我们仅需调用几个函数就可以完成一些对于套接字的初始化工作。
我们可以设计一个父类Socket,内部包含有若干接口,然后再根据具体套接字对父类Socket进行实现,比如TcpSocket继承父类Socket,实现父类Socket的接口。
// 模板方法模式 namespace socket_ns { class Socket; const static int gbacklog = 8; using socket_sptr = std::shared_ptr<Socket>; enum { SOCKET_ERROR = 1, BIND_ERROR, LISTEN_ERROR, USAGE_ERROR }; class Socket { public: virtual void CreateSocketOrDie() = 0; virtual void BindSocketOrDie(InetAddr &addr) = 0; virtual void ListenSocketOrDie() = 0; virtual socket_sptr Accepter(InetAddr *addr) = 0; virtual bool Connector(InetAddr &addr) = 0; virtual int SockFd() = 0; virtual int Recv(std::string *out) = 0; virtual int Send(const std::string &in) = 0; // virtual void Other()=0; public: void BuildListenSocket(InetAddr &addr) { CreateSocketOrDie(); BindSocketOrDie(addr); ListenSocketOrDie(); } bool BuildClientSocket(InetAddr &addr) { CreateSocketOrDie(); return Connector(addr); } // void BuildUdpSocket() // { // CreateSocketOrDie(); // BindSocketOrDie(); // } }; class TcpSocket : public Socket { public: TcpSocket(int fd = -1) : _sockfd(fd) { } void CreateSocketOrDie() override { // 1. 创建流式套接字 _sockfd = ::socket(AF_INET, SOCK_STREAM, 0); if (_sockfd < 0) { LOG(FATAL, "socket error"); exit(SOCKET_ERROR); } LOG(DEBUG, "socket create success,sockfd is : %d", _sockfd); } void BindSocketOrDie(InetAddr &addr) override { // 2. 绑定 struct sockaddr_in local; // struct sockaddr_in 系统提供的数据类型。local是变量,用户栈上开辟空间。 bzero(&local, sizeof(local)); // 将从&local开始的sizeof(local)大小的内存区域置零 local.sin_family = AF_INET; // 设置网络通信方式 local.sin_port = htons(addr.Port()); // port要经过网络传输给对面,所有需要从主机序列转换为网络序列 local.sin_addr.s_addr = inet_addr(addr.Ip().c_str()); int n = bind(_sockfd, (struct sockaddr *)&local, sizeof(local)); if (n < 0) { LOG(FATAL, "bind error"); exit(BIND_ERROR); } LOG(DEBUG, "bind success,sockfd is : %d", _sockfd); } void ListenSocketOrDie() override { // 3. tcp是面向连接的,所以通信之前,必须先建立连接,服务器是被链接的 // tcpserver启动,未来首先要一直等待客户端的连接,listen int n = listen(_sockfd, gbacklog); if (n < 0) { LOG(FATAL, "listen error"); exit(LISTEN_ERROR); } LOG(DEBUG, "listen success,sockfd is : %d", _sockfd); } socket_sptr Accepter(InetAddr *addr) override { struct sockaddr_in peer; socklen_t len = sizeof(peer); // accept会阻塞等待,直到有客户端连接 int sockfd = ::accept(_sockfd, (struct sockaddr *)&peer, &len); if (sockfd < 0) { LOG(WARNING, "accept error"); return nullptr; }; *addr = peer; socket_sptr sock = std::make_shared<TcpSocket>(sockfd); return sock; } bool Connector(InetAddr &addr) override { // 构建目标主机的socket信息 struct sockaddr_in server; memset(&server, 0, sizeof(server)); // bzero server.sin_family = AF_INET; server.sin_port = htons(addr.Port()); server.sin_addr.s_addr = inet_addr(addr.Ip().c_str()); int n = connect(_sockfd, (struct sockaddr *)&server, sizeof(server)); if (n < 0) { std::cerr << "connect error" << std::endl; return false; } return true; } int Recv(std::string *out) { char inbuffer[1024]; ssize_t n = ::recv(_sockfd, inbuffer, sizeof(inbuffer) - 1, 0); if (n > 0) { inbuffer[n] = 0; *out += inbuffer; // 为什么是+=? 后面会提到 } return n; } int Send(const std::string &in) { int n = ::send(_sockfd, in.c_str(), in.size(), 0); return n; } int SockFd() override { return _sockfd; } private: int _sockfd; }; }
为了保证通信双方能够识别发送接收的数据,这里需要进行协议定制,即设计发送数据结构体、接收数据结构体、包括序列化和反序列化的方案。
请求结构体中需要包括两个操作数,以及对应需要进行的操作。响应结构体中需要包括一个计算结果,除此之外,响应结构体中还需要包括一个状态字段,表示本次计算的状态,因为客户端发来的计算请求可能是无意义的,比如除0。
规定状态字段对应的含义:
上述我们提到过数据的发送取决于TCP协议,即我们只管将数据通过write
函数将数据拷贝到发送缓冲区,剩下的发送工作由传输控制协议TCP完成,那么如何判断从接收缓冲区中读取到的数据是否准确完整呢?
我们知道一个完整的报文应该包含报头和有效载荷。
即:
// "有效载荷的长度"\r\n"有效载荷"\r\n
// "len"\r\n"_x _op _y"\r\n -> len: 有效载荷的长度,约定\r\n是分隔符,不参与统计
也就是说当我们识别到\r\n
的时候,我们一定可以得到有效载荷的长度,得到了有效载荷的长度我们就可以根据该长度判断是否得到了完整的报文。
当然这里有效载荷即序列化反序列化的方案我们采用开源的JSON方案。
Jsoncpp是一个用于处理 JSON 数据的 C++ 库。 它提供了将 JSON 数据序列化为字符串以及从字符串反序列化为 C++ 数据结构的功能。 Jsoncpp是开源的, 广泛用于各种需要处理 JSON 数据的 C++ 项目中。
这里我们简要说明如何使用Jsoncpp完成序列化和反序列化的工作。
假设我们设计了一个数据结构为:
struct stu
{
std::string name;
int age;
double weight;
public:
void debug()
{
std::cout << name << std::endl;
std::cout << age << std::endl;
std::cout << weight << std::endl;
}
};
(1)序列化
序列化是将数据结构转化为字符串,在序列化之前我们必须先创建一个Json::Value
对象,并将数据结构中的各个成员变量的值赋给该对象,即将C++数据结构转化为Json数据。
就像这样:
struct stu zs = {"张三", 18, 70};
Json::Value root;
root["name"] = zs.name;
root["age"] = zs.age;
root["weight"] = zs.weight;
之后我们需要再创建一个Json::StyledWriter
对象或者Json::FastWriter
对象,这两个对象简要的说就是决定了转化出来的字符串格式。
比如对于Json::StyledWriter
对象来说,转化出来的字符串是这样的:
{
"age" : 18,
"name" : "张三",
"weight" : 70
}
对于Json::FastWriter
对象来说,转化出来的字符串是这样的:
{"age":18,"name":"张三","weight":70}
Json::FastWriter
优点就是比StyledWriter
更快,因为它不添加额外的空格和换行符。
言归正传,得到Json::FastWriter
对象后,我们可以调用该对象的write方法,该方法参数为Json::Value
对象,返回转化后的字符串。
完整示例:
#include <iostream> #include <string> #include <fstream> #include <jsoncpp/json/json.h> int main() { // 结构化数据 struct stu zs = {"张三", 18, 70}; // 转换成为字符串 Json::Value root; root["name"] = zs.name; root["age"] = zs.age; root["weight"] = zs.weight; // root["self"] = root; Json::FastWriter writer; // Json::StyledWriter writer; std::string str = writer.write(root); std::ofstream out("out.txt"); if (!out.is_open()) { std::cout << str; return 1; } out << str; out.close(); return 0; }
(2)反序列化
反序列化是将字符串转化为数据结构,同样的我们需要创建两个对象:Json::Value
对象用于接收字符串中的信息,Json::Reader
的parse
方法用于将字符串中的数据赋给Json::Value
对象,第一个参数为json字符串,第二个参数为需要赋给的Json::Value
对象,返回值为成功或者失败,就像这样:
std::string json_string = buffer;
Json::Value root;
Json::Reader reader;
bool res = reader.parse(json_string, root);
需要注意的是Json::Value
对象还需要转化为具体的数据结构,在转化时我们需要指明Json::Value
对象成员的属性,就像这样:
struct stu zs;
zs.name = root["name"].asString();
zs.age = root["age"].asInt();
zs.weight = root["weight"].asDouble();
完整示例:
int main() { std::ifstream in("out.txt"); if (!in.is_open()) return 1; char buffer[1024]; in.read(buffer, sizeof(buffer)); in.close(); std::string json_string = buffer; Json::Value root; Json::Reader reader; bool res = reader.parse(json_string, root); (void)res; struct stu zs; zs.name = root["name"].asString(); zs.age = root["age"].asInt(); zs.weight = root["weight"].asDouble(); zs.debug(); return 0; }
所以根据Jsoncpp,我们可以实现对请求响应的序列化和反序列化。
然后更为重要的是我们如何保证读取到的内容是完整的报文,上面提到过:添加报头,根据报头的长度信息判断,也就是说完整的报文应该是如下结构:
len\r\n{"age":18,"name":"张三","weight":70}\r\n
所以该协议定制我们还要实现两个方法:
协议完整代码如下:
#pragma once // protocol协议 #include <iostream> #include <string> #include <jsoncpp/json/json.h> namespace protocol_ns { const std::string SEP = "\r\n"; // 添加报头和分隔符,将json串变为完整的报文格式"len"\r\n"{ }" std::string Encode(const std::string &json_str) { int json_str_len = json_str.size(); std::string proto_str = std::to_string(json_str_len); proto_str += SEP; proto_str += json_str; proto_str += SEP; return proto_str; } // "len"\r\n"{ // "len"\r\n"{ }" // "len"\r\n"{ }"\r\n; // "len"\r\n"{ }"\r\n"len"; // "len"\r\n"{ }"\r\n"len"\r\n"{ }"; // "len"\r\n"{ }"\r\n"len"\r\n"{ }"\r\n // 判断完整报文并将该报文从缓冲区中清除 std::string Decode(std::string &inbuffer) { auto pos = inbuffer.find(SEP); if (pos == std::string::npos) return std::string(); std::string len_str = inbuffer.substr(0, pos); if (len_str.empty()) return std::string(); int packlen = std::stoi(len_str); // 计算报文总长 int total = packlen + len_str.size() + 2 * SEP.size(); if (inbuffer.size() < total) return std::string(); std::string package = inbuffer.substr(pos + SEP.size(), packlen); inbuffer.erase(0, total); // 从缓冲区中删掉该报文 return package; } //请求 class Request { public: Request() { } Request(int x, int y, char oper) : _x(x), _y(y), _oper(oper) { } bool Serialize(std::string *out) // 序列化 { Json::Value root; root["x"] = _x; root["y"] = _y; root["oper"] = _oper; Json::FastWriter writer; *out = writer.write(root); return true; } bool Deserialize(const std::string &in) // 反序列化 { Json::Value root; Json::Reader reader; bool res = reader.parse(in, root); if (!res) return false; _x = root["x"].asInt(); _y = root["y"].asInt(); _oper = root["oper"].asInt(); return true; } public: int _x; int _y; char _oper; // "+-*/%" _x _oper _y }; // --- "字符串" //响应 class Response { public: Response() { } Response(int result, int code) : _result(result), _code(code) { } bool Serialize(std::string *out) { Json::Value root; root["result"] = _result; root["code"] = _code; Json::FastWriter writer; *out = writer.write(root); return true; } bool Deserialize(const std::string &in) { Json::Value root; Json::Reader reader; bool res = reader.parse(in, root); if (!res) return false; _result = root["result"].asInt(); _code = root["code"].asInt(); return true; } public: int _result; // 结果 int _code; // 0:success 1: 除0 2: 非法操作 3. 4. 5 }; // --- "字符串" //构建请求与响应,客户端用 class Factory { public: Factory() { srand(time(nullptr) ^ getpid()); opers = "+-*/%^&|"; } std::shared_ptr<Request> BuildRequest() { int x = rand() % 10 + 1; usleep(x * 10); int y = rand() % 5; // [01,2,3,4] usleep(y * x * 5); char oper = opers[rand() % opers.size()]; std::shared_ptr<Request> req = std::make_shared<Request>(x, y, oper); return req; } std::shared_ptr<Response> BuildResponse() { return std::make_shared<Response>(); } ~Factory() { } private: std::string opers; }; }
我们利用封装好的Socket中的BuildListenSocket()
方法,实现创建初始化服务器,即TcpServer的构造函数,初始化完成后我们需要让服务器不断循环处理业务逻辑,同样的利用Socket中封装的Accepter()
方法用来获取客户端的链接,得到本次提供服务的套接字,将该套接字作为线程参数传递给多线程处理,任务利用包装器封装为io_service_t
由外部传递。
由于任务是由外部传递的,所以我们就完成了TcpServer类与具体业务的解耦,将来只需要改变传递进去的任务就可以改变TcpServer的业务逻辑。
换句话说TcpServer此时就是OSI七层模型中**“会话层”**的实现,它仅负责通信管理、负责建立和断开通信连接(数据流动的逻辑通路)。
TcpServer代码:
using namespace socket_ns; class TcpServer; // 声明 using io_service_t = std::function<void(socket_sptr sockfd, InetAddr client)>; class ThreadData { public: ThreadData(socket_sptr fd, InetAddr addr, TcpServer *s) : sockfd(fd), clientaddr(addr), self(s) { } public: socket_sptr sockfd; InetAddr clientaddr; TcpServer *self; }; class TcpServer { public: TcpServer(int port, io_service_t service) : _localaddr("0", port), _listensock(std::make_unique<TcpSocket>()), _service(service), _isrunning(false) { _listensock->BuildListenSocket(_localaddr); } static void *HandlerSock(void *args) // IO和业务进行解耦合 { pthread_detach(pthread_self()); // 线程分离 ThreadData *td = static_cast<ThreadData *>(args); // 需要调用Service函数,但是Service函数是类内函数,静态成员函数没有this指针无法调用,如何解决? // 将this指针设为ThreadData的类内成员,再通过这个this调用Service td->self->_service(td->sockfd, td->clientaddr); ::close(td->sockfd->SockFd()); // 文件描述符泄露 delete td; return nullptr; } void Loop() { _isrunning = true; // 4. 不能直接接收数据,先获取连接 while (_isrunning) { InetAddr peeraddr; socket_sptr normalsock = _listensock->Accepter(&peeraddr); if (normalsock == nullptr) continue; // version 2 :采用多线程 pthread_t t; ThreadData *td = new ThreadData(normalsock, peeraddr, this); pthread_create(&t, nullptr, HandlerSock, td); } _isrunning = false; } ~TcpServer() { } private: InetAddr _localaddr; std::unique_ptr<Socket> _listensock; bool _isrunning; io_service_t _service; };
具体业务是什么呢?网络版计算器。
非常简单,只需要完成相应的计算并返回结果和状态码即可。
using namespace protocol_ns; class Calculate { public: Calculate() { } Response Excute(const Request &req) { Response resp(0, 0); switch (req._oper) { case '+': resp._result = req._x + req._y; break; case '-': resp._result = req._x - req._y; break; case '*': resp._result = req._x * req._y; break; case '/': { if (req._y == 0) { resp._code = 1; } else { resp._result = req._x / req._y; } } break; case '%': { if (req._y == 0) { resp._code = 2; } else { resp._result = req._x % req._y; } } break; default: resp._code = 3; break; } return resp; } ~Calculate() { } private: };
以服务器收到请求并响应这一过程举例,完整的流程应为:
这其实就是OSI七层模型中**“表示层”**的实现,表示层负责设备固有数据格式和网络标准数据格式的转换。表示层就是协议。
序列化、反序列化、添加报头、分析数据这些都是数据格式的转换工作。
具体用代码体现就是这样:
using namespace protocol_ns; void Usage(std::string proc) { std::cout << "Usage:\n\t" << proc << " local_port\n" << std::endl; } using callback_t = std::function<Response(const Request &req)>; class Service { public: Service(callback_t cb) : _cb(cb) { } void ServiceHelper(socket_sptr sockptr, InetAddr client) { int sockfd = sockptr->SockFd(); LOG(DEBUG, "get a new link ,info %s:%d,fd:%d", client.Ip(), client.Port(), sockfd); std::string clientaddr = "[" + client.Ip() + ":" + std::to_string(client.Port()) + "] "; std::string inbuffer; while (true) { sleep(5); // 测试用,人为的让服务端先不处理,积压一部分请求,测试服务器是否能够解决TCP粘包问题 Request req; // 1.读取数据 int n = sockptr->Recv(&inbuffer); // 如何保证读到的是一个完整的请求? if (n < 0) { LOG(DEBUG, "client %s quit", clientaddr.c_str()); break; } // 2.分析数据,获取完整报文 std::string package; while (true) { sleep(1); std::cout << "服务器未处理的请求: " << inbuffer << std::endl; package = Decode(inbuffer); if (package.empty()) break; // 证明此时没有一个完整的报文继续读取,这也是为什么Socket类中Recv接口中*out+=inbuffer;是+=的原因 // 代码执行到这一定有完整json字符串 std::cout << "----------------------begin----------------------" << std::endl; std::cout << "请求json字符串:\n" << package << std::endl; // 3.反序列化 req.Deserialize(package); // 4.业务处理 Response resp = _cb(req); // 5.对响应序列化 std::string send_str; resp.Serialize(&send_str); std::cout << "响应序列化:" << std::endl; std::cout << send_str << std::endl; // 6.添加长度报头 send_str = Encode(send_str); std::cout << "响应完整报文:" << std::endl; std::cout << send_str << std::endl; sockptr->Send(send_str); // 本次不对发送做处理 EPOLL } } } private: callback_t _cb; };
客户端完整流程与服务端类似,只不过反过来,流程如下:
具体代码实现:
void Usage(std::string proc) { std::cout << "Usage:\n\t" << proc << " serverip serverport\n" << std::endl; } // ./tcp_client serverip serverport int main(int argc, char *argv[]) { if (argc != 3) { Usage(argv[0]); exit(1); } std::string serverip = argv[1]; uint16_t serverport = std::stoi(argv[2]); InetAddr serveraddr(serverip, serverport); Factory factory; std::unique_ptr<Socket> cli = std::make_unique<TcpSocket>(); bool res = cli->BuildClientSocket(serveraddr); std::string inbuffer; while (res) { sleep(1); std::string str; // 一次构建五个请求,测试服务器对积压请求处理 for (int i = 0; i < 5; i++) { // 1.构建一个请求 auto req = factory.BuildRequest(); // 2. 对请求进行序列化 std::string send_str; req->Serialize(&send_str); std::cout << "请求序列化: \n" << send_str << std::endl; // 3. 添加长度报头 send_str = Encode(send_str); std::cout << "请求完整报文: \n" << send_str << std::endl; str += send_str; } // 4. "len"\r\n"{}"\r\n cli->Send(str); // 5. 读取服务器响应 int n = cli->Recv(&inbuffer); if (n <= 0) break; std::string package = Decode(inbuffer); if (package.empty()) continue; // 6. 我能保证package一定是一个完整的响应! auto resp = factory.BuildResponse(); // 6.1 反序列化 resp->Deserialize(package); // 7. 拿到了结构化的响应 std::cout << "计算结果: " << resp->_result << "[" << resp->_code << "]" << std::endl; } return 0; }
接下来我们需要将代码捏合在一起,我们通过bind将旧的可调用对象捆绑新的参数成为新的可调用对象层层传递,OSI七层模型的应用层、表示层和会话层全部统称为应用层的原因就是所有的这三层需要实现的全部都由用户自己实现自己定义,就好比用户需要决定报文以什么标准传递,是“xml”还是“json”,网络传输协议是Tcp协议还是Udp协议,这些都需要用户自己决定,所以这三层我们合并为一层应用层。
// ./tcpserver port // 云服务器的port默认都是禁止访问的。云服务器放开端口8080 ~ 8085 int main(int argc, char *argv[]) { if (argc != 2) { Usage(argv[0]); // exit(USAGE_ERROR); return 1; } uint16_t port = std::stoi(argv[1]); Calculate cal; // 应用层 Service calservice(std::bind(&Calculate::Excute, &cal, std::placeholders::_1)); // 表示层 io_service_t service = std::bind(&Service::ServiceHelper, &calservice, std::placeholders::_1, std::placeholders::_2); std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(port, service); // 会话层 tsvr->Loop(); return 0; }
你与自己的关系,会奠定下你与其他所有关系的基石。 —罗伯特·霍尔登
参数成为新的可调用对象层层传递,OSI七层模型的应用层、表示层和会话层全部统称为应用层的原因就是所有的这三层需要实现的全部都由用户自己实现自己定义,就好比用户需要决定报文以什么标准传递,是“xml”还是“json”,网络传输协议是Tcp协议还是Udp协议,这些都需要用户自己决定,所以这三层我们合并为一层应用层。
// ./tcpserver port // 云服务器的port默认都是禁止访问的。云服务器放开端口8080 ~ 8085 int main(int argc, char *argv[]) { if (argc != 2) { Usage(argv[0]); // exit(USAGE_ERROR); return 1; } uint16_t port = std::stoi(argv[1]); Calculate cal; // 应用层 Service calservice(std::bind(&Calculate::Excute, &cal, std::placeholders::_1)); // 表示层 io_service_t service = std::bind(&Service::ServiceHelper, &calservice, std::placeholders::_1, std::placeholders::_2); std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(port, service); // 会话层 tsvr->Loop(); return 0; }
你与自己的关系,会奠定下你与其他所有关系的基石。 —罗伯特·霍尔登
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。