当前位置:   article > 正文

十、消息头完善和使用json序列化

十、消息头完善和使用json序列化

        系列文章目录:C++ asio网络编程-CSDN博客 

        本教程使用的是jsoncpp,安装方式网上有很多,可以自己下载源码编译,也可以使用vcpkg一键安装,我比较推荐使用vcpkg,感觉这个就像 visual studio 的 maven,使用这种方式安装后就不需要自己配置项目的包含目录和库目录了,直接使用。

1、消息头完善

        前面我们的消息头其实是一种简化的方式去构造的,从本小节开始将使用完整的 tlv 格式

2、json数据格式

        json中将id和data分开存放:

  1. {
  2. "data" : "hello",
  3. "id" : 1001
  4. }

3、MsgNode修改       

        现在,我们修改一下MsgNode

  1. #pragma once
  2. #include <string>
  3. #include <iostream>
  4. #include <boost/asio.hpp>
  5. #include "const.h"
  6. class MsgNode
  7. {
  8. public:
  9. MsgNode(short max_len);
  10. ~MsgNode();
  11. void clearData(); // 清空数据
  12. public:
  13. short _cur_len; // 当前收发数据的长度
  14. short _total_len; // 数据总长度
  15. char* _data; // 数据起始地址
  16. };
  17. class RecvNode : public MsgNode
  18. {
  19. public:
  20. RecvNode(short max_len, short msg_id);
  21. private:
  22. short _msg_id;
  23. };
  24. class SendNode : public MsgNode
  25. {
  26. public:
  27. SendNode(const char* msg, short max_len, short msg_id);
  28. private:
  29. short _msg_id;
  30. };
  1. #include "MsgNode.h"
  2. MsgNode::MsgNode(short max_len) : _total_len(max_len), _cur_len(0)
  3. {
  4. _data = new char[_total_len + 1]();
  5. _data[_total_len] = '\0';
  6. }
  7. MsgNode::~MsgNode() {
  8. std::cout << "destruct MsgNode" << std::endl;
  9. delete[] _data;
  10. }
  11. void MsgNode::clearData()
  12. {
  13. memset(_data, 0, _total_len);
  14. _cur_len = 0;
  15. }
  16. RecvNode::RecvNode(short max_len, short msg_id)
  17. : MsgNode(max_len), _msg_id(msg_id)
  18. {
  19. }
  20. SendNode::SendNode(const char* msg, short max_len, short msg_id)
  21. : MsgNode(max_len + HEAD_TOTAL_LEN), _msg_id(msg_id)
  22. {
  23. // 先拼接id
  24. short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);
  25. memcpy(_data, &msg_id_host, HEAD_ID_LEN);
  26. // 再拼接长度
  27. short msg_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);
  28. memcpy(_data + HEAD_ID_LEN, &msg_len_host, HEAD_DATA_LEN);
  29. // 最后拼接消息
  30. memcpy(_data + HEAD_TOTAL_LEN, msg, max_len);
  31. }

        const.h的内容如下

  1. #pragma once
  2. #define MAX_LENGTH 1024*2
  3. #define MAX_SENDQUE 1000
  4. #define MAX_RECVQUE 10000
  5. #define HEAD_TOTAL_LEN 4
  6. #define HEAD_ID_LEN 2
  7. #define HEAD_DATA_LEN 2

4、Session类修改      

  1. #pragma once
  2. #include <iostream>
  3. #include <boost/asio.hpp>
  4. #include <map>
  5. #include "Server.h"
  6. #include <queue>
  7. #include <mutex>
  8. #include "MsgNode.h"
  9. class Server;
  10. class Session : public std::enable_shared_from_this<Session>
  11. {
  12. public:
  13. Session(boost::asio::io_context& ioc, Server* server);
  14. ~Session();
  15. void start();
  16. void close();
  17. void send(char* msg, int max_length, short msg_id);
  18. void send(std::string msg, short msg_id);
  19. boost::asio::ip::tcp::socket& getSocket();
  20. std::string& getUuid();
  21. std::shared_ptr<Session> getSelfShared();
  22. private:
  23. void handle_read(const boost::system::error_code& ec,
  24. std::size_t bytes_transferred,
  25. std::shared_ptr<Session> self_share);
  26. void handle_write(const boost::system::error_code& ec,
  27. std::shared_ptr<Session> self_share);
  28. boost::asio::ip::tcp::socket _socket;
  29. std::string _uuid;
  30. Server* _server;
  31. bool _b_close;
  32. char _data[MAX_LENGTH];
  33. // 收到的消息头
  34. std::shared_ptr<MsgNode> _recv_head_node;
  35. // 消息头是否处理完成
  36. bool _b_head_parse;
  37. // 收到的消息
  38. std::shared_ptr<RecvNode> _recv_msg_node;
  39. // 发送的消息
  40. std::queue<std::shared_ptr<SendNode>> _send_que;
  41. std::mutex _send_lock;
  42. };
  1. #include "Session.h"
  2. #include <boost/uuid/uuid_generators.hpp>
  3. #include <boost/uuid/uuid_io.hpp>
  4. #include <json/json.h>
  5. #include <json/value.h>
  6. #include <json/reader.h>
  7. Session::Session(boost::asio::io_context& ioc, Server* server)
  8. : _socket(ioc), _server(server), _b_close(false), _b_head_parse(false)
  9. {
  10. // 生成唯一id,可以了解一下雪花算法
  11. // 这里直接使用boost自带的方法
  12. boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
  13. _uuid = boost::uuids::to_string(a_uuid);
  14. _recv_head_node = std::make_shared<MsgNode>(HEAD_TOTAL_LEN);
  15. }
  16. Session::~Session()
  17. {
  18. std::cout << "~Session destruct" << std::endl;
  19. }
  20. boost::asio::ip::tcp::socket& Session::getSocket()
  21. {
  22. return _socket;
  23. }
  24. void Session::start()
  25. {
  26. memset(_data, 0, MAX_LENGTH);
  27. _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
  28. std::bind(&Session::handle_read, this,
  29. std::placeholders::_1, std::placeholders::_2,
  30. getSelfShared()));
  31. }
  32. void Session::send(char* msg, int max_length, short msg_id)
  33. {
  34. std::lock_guard<std::mutex> lock(_send_lock);
  35. if (_send_que.size() > MAX_SENDQUE) {
  36. std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << std::endl;
  37. return;
  38. }
  39. _send_que.push(std::make_shared<SendNode>(msg, max_length, msg_id));
  40. if (_send_que.size() > 1) { // 因为push了一次,判断条件为大于1
  41. return;
  42. }
  43. auto& msgnode = _send_que.front();
  44. boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
  45. std::bind(&Session::handle_write, this, std::placeholders::_1, getSelfShared()));
  46. }
  47. void Session::send(std::string msg, short msg_id)
  48. {
  49. std::lock_guard<std::mutex> lock(_send_lock);
  50. if (_send_que.size() > MAX_SENDQUE) {
  51. std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << std::endl;
  52. return;
  53. }
  54. _send_que.push(std::make_shared<SendNode>(msg.c_str(), msg.length(), msg_id));
  55. if (_send_que.size() > 1) { // 因为push了一次,判断条件为大于1
  56. return;
  57. }
  58. auto& msgnode = _send_que.front();
  59. boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
  60. std::bind(&Session::handle_write, this, std::placeholders::_1, getSelfShared()));
  61. }
  62. void Session::close()
  63. {
  64. _socket.close();
  65. _b_close = true;
  66. }
  67. std::string& Session::getUuid()
  68. {
  69. return _uuid;
  70. }
  71. std::shared_ptr<Session> Session::getSelfShared()
  72. {
  73. return shared_from_this();
  74. }
  75. void Session::handle_read(const boost::system::error_code& ec,
  76. std::size_t bytes_transferred, std::shared_ptr<Session> self_share)
  77. {
  78. try {
  79. if (!ec) {
  80. int copy_len = 0; // 已经处理的字符数
  81. while (bytes_transferred > 0) {
  82. // 头部未处理完成
  83. if (!_b_head_parse) {
  84. // 接收到的数据长度比头部长度小
  85. if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) {
  86. // 将接收到的存起来
  87. memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
  88. _recv_head_node->_cur_len += bytes_transferred;
  89. // 继续接收数据
  90. memset(_data, 0, MAX_LENGTH);
  91. _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
  92. std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self_share));
  93. return;
  94. }
  95. // 接收到的长度大于等于头部长度
  96. // 只存头部剩余部分
  97. int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len;
  98. memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);
  99. // 更新已处理的data长度和接收到但未处理的长度
  100. copy_len += head_remain;
  101. bytes_transferred -= head_remain;
  102. // 获取头部 msg_id 数据
  103. short msg_id = 0;
  104. memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
  105. // 网络字节序转本地字节序
  106. msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
  107. std::cout << "msg id: " << msg_id << std::endl;
  108. // id 校验
  109. if (msg_id > MAX_LENGTH) {
  110. std::cout << "invalid msd_id: " << msg_id << std::endl;
  111. _server->clearSession(_uuid);
  112. return;
  113. }
  114. // 获取数据长度
  115. short msg_len = 0;
  116. memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);
  117. // 网络字节序转化为本地字节序
  118. msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
  119. std::cout << "msg len: " << msg_len << std::endl;
  120. // 头部长度非法
  121. if (msg_len > MAX_LENGTH) {
  122. std::cout << "invalid data length" << std::endl;
  123. _server->clearSession(_uuid);
  124. return;
  125. }
  126. // 头部处理完成
  127. _b_head_parse = true;
  128. // 头部处理完成后,还有剩余接收到的数据,就是正式数据
  129. // 开始处理数据
  130. _recv_msg_node = std::make_shared<RecvNode>(msg_len, msg_id);
  131. // 有数据但不完整
  132. if (bytes_transferred < msg_len) {
  133. // 先存下已接收到的
  134. memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
  135. _recv_msg_node->_cur_len += bytes_transferred;
  136. // 继续接收剩下的数据
  137. memcpy(_data, 0, MAX_LENGTH);
  138. _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
  139. std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self_share));
  140. return;
  141. }
  142. // 数据齐全
  143. memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len);
  144. _recv_msg_node->_cur_len += msg_len;
  145. copy_len += msg_len;
  146. bytes_transferred -= msg_len;
  147. _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
  148. std::cout << "receive data: " << _recv_msg_node->_data << std::endl;
  149. // 发过来的是json,解析数据测试是否正确
  150. Json::Reader reader;
  151. Json::Value root;
  152. reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
  153. std::cout << "receive msg id is " << root["id"].asInt() << " msg data is "
  154. << root["data"].asString() << std::endl;
  155. // 发送测试
  156. root["data"] = "server has received msg, msg data is " + root["data"].asString();
  157. std::string return_str = root.toStyledString(); // 序列化后再发送
  158. send(return_str, root["id"].asInt());
  159. // 继续轮询剩余未处理的数据
  160. _b_head_parse = false;
  161. _recv_head_node->clearData();
  162. if (bytes_transferred <= 0) {
  163. memset(_data, 0, MAX_LENGTH);
  164. _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
  165. std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self_share));
  166. return;
  167. }
  168. continue;
  169. }
  170. // 已经处理完头部,继续处理未处理完的数据
  171. int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
  172. // 接收的数据仍不完整
  173. if (bytes_transferred < remain_msg) {
  174. memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
  175. _recv_msg_node->_cur_len += bytes_transferred;
  176. // 继续接收数据
  177. memset(_data, 0, MAX_LENGTH);
  178. _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
  179. std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self_share));
  180. return;
  181. }
  182. // 接受的数据完整
  183. memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
  184. _recv_msg_node->_cur_len += remain_msg;
  185. bytes_transferred -= remain_msg;
  186. copy_len += remain_msg;
  187. _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
  188. //std::cout << "receive data: " << _recv_msg_node->_data << std::endl;
  189. // 发过来的是json,解析数据测试是否正确
  190. Json::Reader reader;
  191. Json::Value root;
  192. reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
  193. std::cout << "receive msg id is " << root["id"].asInt() << " msg data is "
  194. << root["data"].asString() << std::endl;
  195. // 发送测试
  196. root["data"] = "server has received msg, msg data is " + root["data"].asString();
  197. std::string return_str = root.toStyledString(); // 序列化后再发送
  198. send(return_str, root["id"].asInt());
  199. // 继续轮询剩余未处理的数据
  200. _b_head_parse = false;
  201. _recv_head_node->clearData();
  202. if (bytes_transferred <= 0) {
  203. memset(_data, 0, MAX_LENGTH);
  204. _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
  205. std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self_share));
  206. return;
  207. }
  208. continue;
  209. }
  210. }
  211. else {
  212. std::cout << "read error" << std::endl;
  213. close();
  214. _server->clearSession(_uuid);
  215. }
  216. }
  217. catch (std::exception& e) {
  218. std::cout << "Exception: " << e.what() << std::endl;
  219. }
  220. }
  221. void Session::handle_write(const boost::system::error_code& ec,
  222. std::shared_ptr<Session> self_share)
  223. {
  224. try {
  225. if (!ec) {
  226. std::lock_guard<std::mutex> lock(_send_lock);
  227. _send_que.pop();
  228. if (!_send_que.empty()) {
  229. auto& msgNode = _send_que.front();
  230. boost::asio::async_write(_socket, boost::asio::buffer(msgNode->_data, msgNode->_total_len),
  231. std::bind(&Session::handle_write, this, std::placeholders::_1, self_share));
  232. }
  233. }
  234. else {
  235. std::cout << "write error: " << ec.what() << std::endl;
  236. close();
  237. _server->clearSession(_uuid);
  238. }
  239. }
  240. catch (std::exception& e) {
  241. std::cerr << "Exception: " << e.what() << std::endl;
  242. }
  243. }

5、客户端修改

  1. #include <boost/asio.hpp>
  2. #include <iostream>
  3. #include <thread>
  4. #include <json/json.h>
  5. #include <json/value.h>
  6. #include <json/reader.h>
  7. #define MAX_LENGTH 1024*2
  8. #define HEAD_LENGTH 2
  9. #define HEAD_TOTAL 4
  10. int main() {
  11. try {
  12. // 创建上下文服务
  13. boost::asio::io_context ioc;
  14. // 构造endpoint
  15. boost::asio::ip::tcp::endpoint remote_ep(boost::asio::ip::address::from_string("127.0.0.1"), 10086);
  16. boost::asio::ip::tcp::socket sock(ioc);
  17. boost::system::error_code error = boost::asio::error::host_not_found;
  18. sock.connect(remote_ep, error);
  19. if (error) {
  20. std::cout << "连接失败,错误码:" << error.value()
  21. << "错误信息:" << error.message() << std::endl;
  22. return 0;
  23. }
  24. Json::Value root;
  25. root["id"] = 1001;
  26. root["data"] = "hello";
  27. std::string request = root.toStyledString();
  28. std::size_t request_length = request.length();
  29. char send_data[MAX_LENGTH] = { 0 };
  30. int msgid = 1001;
  31. int msgid_host = boost::asio::detail::socket_ops::host_to_network_short(msgid);
  32. memcpy(send_data, &msgid_host, 2);
  33. int request_host_length = boost::asio::detail::socket_ops::host_to_network_short(request_length);
  34. memcpy(send_data + 2, &request_host_length, 2);
  35. memcpy(send_data + 4, request.c_str(), request_length);
  36. boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 4));
  37. std::cout << "begin to receive ..." << std::endl;
  38. char reply_head[HEAD_TOTAL];
  39. std::size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_TOTAL));
  40. msgid = 0;
  41. memcpy(&msgid, reply_head, 2);
  42. short msglen = 0;
  43. memcpy(&msglen, reply_head + 2, 2);
  44. // 转换为本地字节序
  45. msgid = boost::asio::detail::socket_ops::network_to_host_short(msgid);
  46. msglen = boost::asio::detail::socket_ops::network_to_host_short(msglen);
  47. char msg[MAX_LENGTH] = { 0 };
  48. std::size_t msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen));
  49. Json::Reader reader;
  50. reader.parse(std::string(msg, msg_length), root);
  51. std::cout << "msg id: " << root["id"] << ", msg: " << root["data"] << std::endl;
  52. getchar();
  53. }
  54. catch (std::exception& e) {
  55. std::cout << "异常:" << e.what() << std::endl;
  56. }
  57. return 0;
  58. }

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号