赞
踩
我们上个知识点编写的TCP中,TCP是面向字节流的,我们怎么保证读取上来的数据是一个完整的报文呢?其实我们写的代码中不能保证这个问题,所以代码是有BUG的。TCP 叫做传输控制协议,也就是什么时候发送给对方,发多少,出错了怎么办,完全是由发送方的 TCP 协议来定!当我们使用 write() 函数向 sockfd 中写入数据时,数据不一定已经发给对方了,它的作用其实就是用户到内核的拷贝!这跟我们以前学的向文件中写入是一样的,我们将数据通过 fd 写入到内核的缓冲区,通过操作系统向磁盘中刷新缓冲区的内容。所以真正决定网路收发的协议是由 TCP 决定的!
而对于接收缓冲区来说,我们使用 read() 读取上来的数据就完全不确定了。所以我们在应用层就需要把协议定好,把协议定好才能更好的进行读上来的数据的分析!
所以回到最开始的问题,我们在进行读取的时候,怎么保证读取上来的数据是一个完整的报文呢?对于发送方,是将数据拷贝到它的 TCP 的发送缓冲区了,我们想怎么发完全不由应用层决定,是由 TCP 决定的,所以对方在它的接收缓冲区读上来的数据有可能是我们发送的一部分!所以对方在读的时候,怎么保证读到的是完整的呢?这就需要协议来进行定制了!
所以我们可以规定好通信双方只能使用固定大小的报文,即我们自己使用自定义协议。
例如,我们需要实现一个服务器版的加法器。我们需要客户端把要计算的两个加数发过去,然后由服务器进行计算,最后再把结果返回给客户端。
在应用层定协议,我们通常需要一个比较关键的字段。首先,协议本身就是一种“约定”,假设我们以实现网络版计算器为例,那么我们需要定义的第一个协议就是 request,代表需要相加的两个数和一个操作符,如下:
struct request
{
int x;
int y;
char op;
};
另外还需要定义另一个协议为 response,代表运算结果和正确性,如下:
struct response
{
int result;
int code;
};
所以每一个结构体的每一个字段,每一个字段里的每一种值,我们都是要让客户端和服务器双方约定好的,约定好之后,我们使用结构化的方式,把约定表达出来,这就叫做我们定义出来的协议。
当我们向对方发信息时, 不仅仅只包含我们所发的信息,还有对应的头像,昵称和时间等等,实际上这些都是一个个的字符串,所以对方会收到四个字符串,但是肯定不能一个个发,是要把它们看作一个整体发给对方;而对方在收到这个整体的字符串后,就要将这个整体的字符串反向的转化成四个字符串,解析成信息内容、头像、昵称和时间。
那么怎么将这些信息看作一个整体呢?我们可以把需要发送的一个信息看作是一个结构体,其中这个结构体中有四个字段,分别代表上面的四个字符串;然后我们再把这个结构化的数据转化成为一个字符串,紧接着将这个字符串整体通过网络发送给对方主机,当对方主机收到这个字符串后,需要将这个字符串解析成为相同类型的结构化数据!在这个消息转化的过程,也是规定出来客户端和服务器双方约定出来的一种通用型的结构体,这就叫做双方定义出来的聊天协议。而在网络通信的时候,整个结构化的数据,把它多个字符串转化成一个字符串整体,这个过程我们称为序列化!而对方把一个字符串整体打散称为多个字符串这个过程称为反序列化!
而以上的过程我们可以看作两层,一层是协议的定制,另一层是序列化和反序列化,如下图:
那么为什么需要进行序列和反序列化呢?主要是为了方便网络进行收发!
所以根据我们自定义的协议和序列化反序列化,我们的网络版计算机的简略流程如下:
下面我们根据上图的流程图简易实现一个网络版的计算器。
每次提供网络通信都要重新编写 socket 套接字的代码,所以我们现在这里对 socket 进行一下简单的封装,代码如下:
#pragma once #include <iostream> #include <string> #include <cstring> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #include "log.hpp" enum { SocketErr = 2, BindErr, ListenErr, }; const int backlog = 10; class Sock { public: Sock() {} ~Sock() {} public: void Socket() { _sockfd = socket(AF_INET, SOCK_STREAM, 0); if(_sockfd < 0) { lg(Fatal, "socket error, %s: %d", strerror(errno), errno); exit(SocketErr); } } void Bind(uint16_t port) { sockaddr_in local; memset(&local, 0, sizeof(local)); local.sin_addr.s_addr = INADDR_ANY; local.sin_family = AF_INET; local.sin_port = htons(port); if(bind(_sockfd, (const sockaddr*)&local, sizeof(local)) < 0) { lg(Fatal, "bind error, %s: %d", strerror(errno), errno); exit(BindErr); } } void Listen() { if(listen(_sockfd, backlog) < 0) { lg(Fatal, "listen error, %s: %d", strerror(errno), errno); exit(ListenErr); } } int Accept(std::string* client_ip, uint16_t* client_port) { sockaddr_in peer; socklen_t len = sizeof(peer); int newfd = accept(_sockfd, (sockaddr*)&peer, &len); if(newfd < 0) { lg(Warning, "accept error, %s: %d", strerror(errno), errno); return -1; } char buffer[64]; inet_ntop(AF_INET, &peer.sin_addr, buffer, sizeof(buffer)); *client_ip = buffer; *client_port = ntohs(peer.sin_port); return newfd; } void Close() { close(_sockfd); } bool Connect(std::string serverip, uint16_t serverport) { sockaddr_in peer; memset(&peer, 0, sizeof(peer)); inet_pton(AF_INET, serverip.c_str(), &(peer.sin_addr)); peer.sin_family = AF_INET; peer.sin_port = htons(serverport); int n = connect(_sockfd, (const sockaddr*)&peer, sizeof(peer)); if(n < 0) { lg(Fatal, "connect error, %s: %d", strerror(errno), errno); return false; } return true; } int GetFd() { return _sockfd; } private: int _sockfd; };
在进行定制协议的时候,为了保证对方接受时是一个完整的报文,也就是当对方进行读取时,对方怎么知道是一个报文多大多长呢?所以我们需要使用分隔符将报文和报文之间分隔开来,比如可以使用 \n
,也就是使用 \n
对报文之间进行分隔。 但是我们在实现的时候,在报文前再加上一个字段,就是代表有效报文的长度,长度和报文之间也是使用 \n
进行分隔。那么在进行读取的时候,在遇到第一个 \n
之前,就是该报文的长度,然后根据长度去读取报文,就能保证读取到一个完整的报文,当遇到第二个 \n
就代表本次读取完毕,进行下一次读取。
Request:
const std::string blank_space_sep = " "; const std::string protocol_sep = "\n"; // 定制协议 class Request { public: Request(int x, int y, char op) : _x(x), _y(y), _op(op) {} Request() {} public: // 序列化 bool Serialize(std::string *out) { // 构建报文的有效载荷 // struct => string // "len"\n"x op y"\n std::string s = std::to_string(_x); s += blank_space_sep; s += _op; s += blank_space_sep; s += std::to_string(_y); *out = s; return true; } // 反序列化 bool Deserialize(const std::string &in) // "x op y" { size_t left = in.find(blank_space_sep); if(left == std::string::npos) return false; std::string part_x = in.substr(0, left); size_t right = in.rfind(blank_space_sep); if(right == std::string::npos) return false; std::string part_y = in.substr(right + 1); if(left + 1 != right - 1) return false; _op = in[left + 1]; _x = std::stoi(part_x); _y = std::stoi(part_y); return true; } void DebugPrint() { std::cout << "新请求构建完成: " << _x << _op << _y << "=?" << std::endl; } public: int _x; int _y; char _op; };
Response:
class Response { public: Response(int result, int code) : _result(result), _code(code) {} Response() {} public: // 序列化 bool Serialize(std::string *out) { // "len"\n"result code"\n std::string s = std::to_string(_result); s += blank_space_sep; s += std::to_string(_code); *out = s; return true; } // 反序列化 bool Deserialize(const std::string &in) // "result code" { size_t pos = in.find(blank_space_sep); if(pos == std::string::npos) return false; std::string part_left = in.substr(0, pos); std::string part_right = in.substr(pos + 1); _result = std::stoi(part_left); _code = std::stoi(part_right); return true; } void DebugPrint() { std::cout << "结果响应完成, result: " << _result << ", code: "<< _code << std::endl; } public: int _result; int _code; // 表示结果的准确性 };
下面对封装报头和提取报文也进行简单封装:
// 封装报头 "x op y" => "len"\n"x op y"\n std::string Encode(std::string &content) { std::string package = std::to_string(content.size()); package += protocol_sep; package += content; package += protocol_sep; return package; } // 提取报文 "len"\n"x op y"\n => "x op y" bool Decode(std::string& package, std::string* content) { size_t pos = package.find(protocol_sep); if(pos == std::string::npos) return false; std::string len_str = package.substr(0, pos); size_t len = stoi(len_str); // package = len_str + content_str + 2(\n) // size_t total_len = len_str.size() + len + 2*protocol_sep.size(); size_t total_len = len_str.size() + len + 2; if(package.size() < total_len) return false; *content = package.substr(pos + 1, len); // 如果已经得到一个完整的报文,需要移除这个报文 package.erase(0, total_len); return true; }
客户端首先创建需求,然后将需求序列化,并添加报头后通过网络进行发送,当服务端把计算结果返回响应时,客户端进行读取,将数据提取报文,并反序列化得到结果。
#include <iostream> #include <string> #include <ctime> #include <unistd.h> #include "Socket.hpp" #include "Protocol.hpp" using namespace std; void Usage(const string& str) { cout << "\nUsage: " << str << " serverip serverport\n\n" << endl; } int main(int argc, char* argv[]) { if(argc != 3) { Usage(argv[0]); exit(0); } uint16_t server_port = stoi(argv[2]); string server_ip = argv[1]; Sock _sock; _sock.Socket(); bool ret = _sock.Connect(server_ip, server_port); if(!ret) { cerr << "client connect error" << endl; return 1; } srand(time(nullptr)); int cnt = 1; string operas = "+-*/%=^&"; string inbuffer_stream; while(cnt <= 10) { cout << "====================第" << cnt << "次测试......" << endl; int x = rand() % 100 + 1; usleep(1000); int y = rand() % 100; usleep(1000); char op = operas[rand() % operas.size()]; Request req(x, y, op); req.DebugPrint(); string package; req.Serialize(&package); package = Encode(package); std::cout << "最新请求: \n" << package; write(_sock.GetFd(), package.c_str(), package.size()); char buffer[128]; size_t n = read(_sock.GetFd(), buffer, sizeof(buffer)); if(n > 0) { buffer[n] = 0; inbuffer_stream += buffer; // "len"\n"result code"\n std::cout << inbuffer_stream << std::endl; string content; bool ret = Decode(inbuffer_stream, &content); // "result code" if(!ret) { cerr << "Decode err" << endl; return 2; } Response resp; ret = resp.Deserialize(content); if(!ret) { cerr << "Deserialize err" << endl; return 3; } resp.DebugPrint(); } cnt++; cout << "=================================" << endl; sleep(1); } _sock.Close(); return 0; }
计算器服务端的 Calculator 方法对 package 进行提取报文,获取到需要计算的数据,然后进行反序列化进行计算后,再根据 Response 进行序列化,最后添加报头后返回。
#pragma once #include <string> #include <iostream> #include "Protocol.hpp" enum { DIV_ERR = 1, MOD_ERR = 2, OP_ERR = 3 }; class ServerCal { public: ServerCal() {} Response CalculatorHelper(const Request &req) { Response resp(0, 0); switch (req._op) { case '+': resp._result = req._x + req._y; break; case '-': resp._result = req._x - req._y; break; case '*': resp._result = req._x * req._y; break; case '%': { if (req._y == 0) resp._code = MOD_ERR; else resp._result = req._x % req._y; } break; case '/': { if (req._y == 0) resp._code = DIV_ERR; else resp._result = req._x / req._y; } break; default: resp._code = OP_ERR; break; } return resp; } // "len"\n"10 + 20"\n std::string Calculator(std::string &package) { std::string content; bool ret = Decode(package, &content); // content = "10 + 20" if (!ret) return ""; Request req; ret = req.Deserialize(content); // x = 10, y = 20, op = '+' if (!ret) return ""; content = ""; Response resp = CalculatorHelper(req); // result = 30, code = 0 resp.Serialize(&content); // content = "30 0" content = Encode(content); // content = "len"\n"30 0\n" return content; } ~ServerCal() {} };
当TCP服务端获取到新连接后,根据返回的sockfd就可以进行网络通信,也就是获取到客户端的连接请求,紧接着我们创建子进程为其提供服务,首先进行数据读取,将读取到的数据每次添加到 inbuffer_stream 中,每次获取到数据都进行调用计算器服务端的 Calculator 方法,尝试对获取到的数据进行处理,如果处理成功,会在 Decode 方法中将已经提取的报文移除,所以不影响下次读取。当成功调用 Calculator 方法,就将计算结果发送回去。
#pragma once #include <signal.h> #include <functional> #include "Socket.hpp" #include "log.hpp" using func_t = std::function<std::string(std::string &)>; class TcpServer { public: TcpServer(uint16_t port, func_t callback) : _port(port), _callback(callback) {} ~TcpServer() {} bool InitServer() { _listen_sock.Socket(); _listen_sock.Bind(_port); _listen_sock.Listen(); lg(Info, "init server done"); return true; } void Start() { signal(SIGCHLD, SIG_IGN); signal(SIGPIPE, SIG_IGN); while (true) { // 获取连接 std::string client_ip; uint16_t client_port; int sockfd = _listen_sock.Accept(&client_ip, &client_port); if (sockfd < 0) continue; lg(Info, "accept a new link, sockfd: %d, clientip: %s, clientport: %d", sockfd, client_ip.c_str(), client_port); // 提供服务 if (fork() == 0) { _listen_sock.Close(); std::string inbuffer_stream; // 数据计算 while (true) { char buffer[1280]; ssize_t n = read(sockfd, buffer, sizeof(buffer)); if (n > 0) { buffer[n] = 0; inbuffer_stream += buffer; lg(Debug, "debug: \n%s", inbuffer_stream.c_str()); while (true) { // 如果解析失败,会返回空串 std::string info = _callback(inbuffer_stream); if (info.empty()) break; write(sockfd, info.c_str(), info.size()); } } else if (n == 0) break; else break; } exit(0); } close(sockfd); } } private: uint16_t _port; Sock _listen_sock; func_t _callback; };
#include "TcpServer.hpp" #include "Protocol.hpp" #include "ServerCal.hpp" using namespace std; void Usage(const string& str) { cout << "\nUsage: " << str << " port\n\n" << endl; } int main(int argc, char* argv[]) { if(argc != 2) { Usage(argv[0]); exit(0); } uint16_t port = stoi(argv[1]); ServerCal cal; TcpServer* tsvp = new TcpServer(port, bind(&ServerCal::Calculator, &cal, std::placeholders::_1)); tsvp->InitServer(); tsvp->Start(); return 0; }
JSON 其实是一种帮我们进行序列化和反序列化的工具,上面的序列化和反序列化都是我们自己写的,而现在我们可以直接使用 JSON 帮我们完成序列化和反序列化。
我们在 C++ 中想要使用 JSON,首先需要安装 jsoncpp 第三方库,在我们的云服务器上执行指令 sudo yum install jsoncpp-devel -y
即可。
安装成功后,我们可以通过 ls /usr/include/jsoncpp/json/
查看到我们需要的头文件,下面我们使用到的是 json.h
,但是系统默认的搜索路径是 /usr/include/
,所以我们可以在包头文件的时候带上路径,也可以在编译选项中添加。
我们也可以在 /lib64/libjsoncpp.so
路径下找到 JSON 的第三方库,如下:
下面我们简单使用一下 JSON,我们先使用一下序列化的功能:
int main()
{
Json::Value root;
root["x"] = 10;
root["y"] = 20;
root["op"] = '*';
Json::FastWriter w;
string res = w.write(root);
cout << res << endl;
return 0;
}
如上代码,我们创建一个 Value 万能对象,然后建立 k-v 映射关系,接下来创建一个 FastWriter 的对象,调用对象中的 write() 方法即可进行序列化,结果如下:
另外,在序列化的时候,我们还可以创建 StyledWriter 的对象,这种是按照特定风格形成的字符串,如下:
接下来我们进行反序列化,代码如下:
int main() { Json::Value root; root["x"] = 10; root["y"] = 20; root["op"] = '*'; Json::FastWriter w; // Json::StyledWriter w; string res = w.write(root); cout << res << endl; Json::Value v; Json::Reader r; r.parse(res, v); int x = v["x"].asInt(); int y = v["y"].asInt(); char op = v["op"].asInt(); cout << "x = " << x << ", y = " << y << ", op = " << op << endl; return 0; }
如上代码,在反序列化中我们需要创建一个 Reader 对象,并调用对象中的 parse() 方法,该方法的第一个参数就是需要进行反序列化的字符串,第二个参数就是将反序列化后的字段需要写入到哪个对象中,结果如下:
下面我们对网络版计算器的序列化和反序列化的部分进行修改,我们在该部分添加 JSON 代码,但是我们使用的是条件编译,可以让我们在自己的序列化和反序列化与 JSON 之间进行平滑的切换,代码如下:
#pragma once #include <iostream> #include <string> #include <jsoncpp/json/json.h> // #define USE_MYSELF 1 const std::string blank_space_sep = " "; const std::string protocol_sep = "\n"; // 封装报头 "x op y" => "len"\n"x op y"\n std::string Encode(std::string &content) { std::string package = std::to_string(content.size()); package += protocol_sep; package += content; package += protocol_sep; return package; } // 提取报文 "len"\n"x op y"\n => "x op y" bool Decode(std::string& package, std::string* content) { size_t pos = package.find(protocol_sep); if(pos == std::string::npos) return false; std::string len_str = package.substr(0, pos); size_t len = stoi(len_str); // package = len_str + content_str + 2(\n) // size_t total_len = len_str.size() + len + 2*protocol_sep.size(); size_t total_len = len_str.size() + len + 2; if(package.size() < total_len) return false; *content = package.substr(pos + 1, len); // 如果已经得到一个完整的报文,需要移除这个报文 package.erase(0, total_len); return true; } // 定制协议 class Request { public: Request(int x, int y, char op) : _x(x), _y(y), _op(op) {} Request() {} public: // 序列化 bool Serialize(std::string *out) { #ifdef USE_MYSELF // 构建报文的有效载荷 // struct => string // "len"\n"x op y"\n std::string s = std::to_string(_x); s += blank_space_sep; s += _op; s += blank_space_sep; s += std::to_string(_y); *out = s; return true; #else Json::Value root; root["x"] = _x; root["y"] = _y; root["op"] = _op; Json::FastWriter w; *out = w.write(root); return true; #endif } // 反序列化 bool Deserialize(const std::string &in) // "x op y" { #ifdef USE_MYSELF size_t left = in.find(blank_space_sep); if(left == std::string::npos) return false; std::string part_x = in.substr(0, left); size_t right = in.rfind(blank_space_sep); if(right == std::string::npos) return false; std::string part_y = in.substr(right + 1); if(left + 1 != right - 1) return false; _op = in[left + 1]; _x = std::stoi(part_x); _y = std::stoi(part_y); return true; #else Json::Value root; Json::Reader r; r.parse(in, root); _x = root["x"].asInt(); _y = root["y"].asInt(); _op = root["op"].asInt(); return true; #endif } void DebugPrint() { std::cout << "新请求构建完成: " << _x << _op << _y << "=?" << std::endl; } public: int _x; int _y; char _op; }; class Response { public: Response(int result, int code) : _result(result), _code(code) {} Response() {} public: // 序列化 bool Serialize(std::string *out) { #ifdef USE_MYSELF // "len"\n"result code"\n std::string s = std::to_string(_result); s += blank_space_sep; s += std::to_string(_code); *out = s; return true; #else Json::Value root; root["result"] = _result; root["code"] = _code; Json::FastWriter w; *out = w.write(root); return true; #endif } // 反序列化 bool Deserialize(const std::string &in) // "result code" { #ifdef USE_MYSELF size_t pos = in.find(blank_space_sep); if(pos == std::string::npos) return false; std::string part_left = in.substr(0, pos); std::string part_right = in.substr(pos + 1); _result = std::stoi(part_left); _code = std::stoi(part_right); return true; #else Json::Value root; Json::Reader r; r.parse(in, root); _result = root["result"].asInt(); _code = root["code"].asInt(); return true; #endif } void DebugPrint() { std::cout << "结果响应完成, result: " << _result << ", code: "<< _code << std::endl; } public: int _result; int _code; // 表示结果的准确性 };
我们可以通过在编译选项中加上宏定义的选项,使我们更方便地选择哪种序列化和反序列化的方式,例如 makefile 文件中:
.PHONY:all
all:servercal clientcal
Flag=-DUSE_MYSELF=1
Lib=-ljsoncpp
servercal:ServerCal.cc
g++ -o $@ $^ -std=c++11 $(Lib) $(Flag)
clientcal:ClientCal.cc
g++ -o $@ $^ -std=c++11 $(Lib) $(Flag)
.PHONY:clean
clean:
rm -f servercal clientcal
我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=1can36hco3ehk
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。