赞
踩
系列文章目录:C++ asio网络编程-CSDN博客
本教程使用的是jsoncpp,安装方式网上有很多,可以自己下载源码编译,也可以使用vcpkg一键安装,我比较推荐使用vcpkg,感觉这个就像 visual studio 的 maven,使用这种方式安装后就不需要自己配置项目的包含目录和库目录了,直接使用。
前面我们的消息头其实是一种简化的方式去构造的,从本小节开始将使用完整的 tlv 格式
json中将id和data分开存放:
- {
- "data" : "hello",
- "id" : 1001
- }
现在,我们修改一下MsgNode
- #pragma once
- #include <string>
- #include <iostream>
- #include <boost/asio.hpp>
- #include "const.h"
-
- class MsgNode
- {
- public:
- MsgNode(short max_len);
- ~MsgNode();
-
- void clearData(); // 清空数据
-
- public:
- short _cur_len; // 当前收发数据的长度
- short _total_len; // 数据总长度
- char* _data; // 数据起始地址
- };
-
-
- class RecvNode : public MsgNode
- {
- public:
- RecvNode(short max_len, short msg_id);
-
- private:
- short _msg_id;
- };
-
- class SendNode : public MsgNode
- {
- public:
- SendNode(const char* msg, short max_len, short msg_id);
-
- private:
- short _msg_id;
- };
-

- #include "MsgNode.h"
-
- MsgNode::MsgNode(short max_len) : _total_len(max_len), _cur_len(0)
- {
- _data = new char[_total_len + 1]();
- _data[_total_len] = '\0';
- }
-
- MsgNode::~MsgNode() {
- std::cout << "destruct MsgNode" << std::endl;
- delete[] _data;
- }
-
- void MsgNode::clearData()
- {
- memset(_data, 0, _total_len);
- _cur_len = 0;
- }
-
- RecvNode::RecvNode(short max_len, short msg_id)
- : MsgNode(max_len), _msg_id(msg_id)
- {
- }
-
- SendNode::SendNode(const char* msg, short max_len, short msg_id)
- : MsgNode(max_len + HEAD_TOTAL_LEN), _msg_id(msg_id)
- {
- // 先拼接id
- short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);
- memcpy(_data, &msg_id_host, HEAD_ID_LEN);
- // 再拼接长度
- short msg_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);
- memcpy(_data + HEAD_ID_LEN, &msg_len_host, HEAD_DATA_LEN);
- // 最后拼接消息
- memcpy(_data + HEAD_TOTAL_LEN, msg, max_len);
- }

const.h的内容如下
- #pragma once
-
- #define MAX_LENGTH 1024*2
- #define MAX_SENDQUE 1000
- #define MAX_RECVQUE 10000
- #define HEAD_TOTAL_LEN 4
- #define HEAD_ID_LEN 2
- #define HEAD_DATA_LEN 2
- #pragma once
- #include <iostream>
- #include <boost/asio.hpp>
- #include <map>
- #include "Server.h"
- #include <queue>
- #include <mutex>
- #include "MsgNode.h"
-
- class Server;
-
- class Session : public std::enable_shared_from_this<Session>
- {
- public:
- Session(boost::asio::io_context& ioc, Server* server);
- ~Session();
-
- void start();
- void close();
-
- void send(char* msg, int max_length, short msg_id);
- void send(std::string msg, short msg_id);
-
- boost::asio::ip::tcp::socket& getSocket();
- std::string& getUuid();
- std::shared_ptr<Session> getSelfShared();
-
- private:
- void handle_read(const boost::system::error_code& ec,
- std::size_t bytes_transferred,
- std::shared_ptr<Session> self_share);
- void handle_write(const boost::system::error_code& ec,
- std::shared_ptr<Session> self_share);
-
- boost::asio::ip::tcp::socket _socket;
- std::string _uuid;
- Server* _server;
- bool _b_close;
-
- char _data[MAX_LENGTH];
-
- // 收到的消息头
- std::shared_ptr<MsgNode> _recv_head_node;
- // 消息头是否处理完成
- bool _b_head_parse;
- // 收到的消息
- std::shared_ptr<RecvNode> _recv_msg_node;
-
- // 发送的消息
- std::queue<std::shared_ptr<SendNode>> _send_que;
- std::mutex _send_lock;
- };
-
-
-
-

- #include "Session.h"
- #include <boost/uuid/uuid_generators.hpp>
- #include <boost/uuid/uuid_io.hpp>
- #include <json/json.h>
- #include <json/value.h>
- #include <json/reader.h>
-
- Session::Session(boost::asio::io_context& ioc, Server* server)
- : _socket(ioc), _server(server), _b_close(false), _b_head_parse(false)
- {
- // 生成唯一id,可以了解一下雪花算法
- // 这里直接使用boost自带的方法
- boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
- _uuid = boost::uuids::to_string(a_uuid);
-
- _recv_head_node = std::make_shared<MsgNode>(HEAD_TOTAL_LEN);
-
- }
-
- Session::~Session()
- {
- std::cout << "~Session destruct" << std::endl;
- }
-
- boost::asio::ip::tcp::socket& Session::getSocket()
- {
- return _socket;
- }
-
- void Session::start()
- {
- memset(_data, 0, MAX_LENGTH);
- _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
- std::bind(&Session::handle_read, this,
- std::placeholders::_1, std::placeholders::_2,
- getSelfShared()));
- }
-
- void Session::send(char* msg, int max_length, short msg_id)
- {
- std::lock_guard<std::mutex> lock(_send_lock);
-
- if (_send_que.size() > MAX_SENDQUE) {
- std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << std::endl;
- return;
- }
-
- _send_que.push(std::make_shared<SendNode>(msg, max_length, msg_id));
- if (_send_que.size() > 1) { // 因为push了一次,判断条件为大于1
- return;
- }
-
- auto& msgnode = _send_que.front();
- boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
- std::bind(&Session::handle_write, this, std::placeholders::_1, getSelfShared()));
- }
-
- void Session::send(std::string msg, short msg_id)
- {
- std::lock_guard<std::mutex> lock(_send_lock);
-
- if (_send_que.size() > MAX_SENDQUE) {
- std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << std::endl;
- return;
- }
-
- _send_que.push(std::make_shared<SendNode>(msg.c_str(), msg.length(), msg_id));
- if (_send_que.size() > 1) { // 因为push了一次,判断条件为大于1
- return;
- }
-
- auto& msgnode = _send_que.front();
- boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
- std::bind(&Session::handle_write, this, std::placeholders::_1, getSelfShared()));
- }
-
- void Session::close()
- {
- _socket.close();
- _b_close = true;
- }
-
- std::string& Session::getUuid()
- {
- return _uuid;
- }
-
- std::shared_ptr<Session> Session::getSelfShared()
- {
- return shared_from_this();
- }
-
- void Session::handle_read(const boost::system::error_code& ec,
- std::size_t bytes_transferred, std::shared_ptr<Session> self_share)
- {
- try {
- if (!ec) {
- int copy_len = 0; // 已经处理的字符数
- while (bytes_transferred > 0) {
- // 头部未处理完成
- if (!_b_head_parse) {
- // 接收到的数据长度比头部长度小
- if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) {
- // 将接收到的存起来
- memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
- _recv_head_node->_cur_len += bytes_transferred;
- // 继续接收数据
- memset(_data, 0, MAX_LENGTH);
- _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
- std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self_share));
- return;
- }
- // 接收到的长度大于等于头部长度
- // 只存头部剩余部分
- int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len;
- memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);
- // 更新已处理的data长度和接收到但未处理的长度
- copy_len += head_remain;
- bytes_transferred -= head_remain;
- // 获取头部 msg_id 数据
- short msg_id = 0;
- memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
- // 网络字节序转本地字节序
- msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
- std::cout << "msg id: " << msg_id << std::endl;
- // id 校验
- if (msg_id > MAX_LENGTH) {
- std::cout << "invalid msd_id: " << msg_id << std::endl;
- _server->clearSession(_uuid);
- return;
- }
- // 获取数据长度
- short msg_len = 0;
- memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);
- // 网络字节序转化为本地字节序
- msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
- std::cout << "msg len: " << msg_len << std::endl;
- // 头部长度非法
- if (msg_len > MAX_LENGTH) {
- std::cout << "invalid data length" << std::endl;
- _server->clearSession(_uuid);
- return;
- }
- // 头部处理完成
- _b_head_parse = true;
- // 头部处理完成后,还有剩余接收到的数据,就是正式数据
- // 开始处理数据
- _recv_msg_node = std::make_shared<RecvNode>(msg_len, msg_id);
- // 有数据但不完整
- if (bytes_transferred < msg_len) {
- // 先存下已接收到的
- memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
- _recv_msg_node->_cur_len += bytes_transferred;
- // 继续接收剩下的数据
- memcpy(_data, 0, MAX_LENGTH);
- _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
- std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self_share));
- return;
- }
- // 数据齐全
- memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len);
- _recv_msg_node->_cur_len += msg_len;
- copy_len += msg_len;
- bytes_transferred -= msg_len;
- _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
- std::cout << "receive data: " << _recv_msg_node->_data << std::endl;
-
- // 发过来的是json,解析数据测试是否正确
- Json::Reader reader;
- Json::Value root;
- reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
- std::cout << "receive msg id is " << root["id"].asInt() << " msg data is "
- << root["data"].asString() << std::endl;
- // 发送测试
- root["data"] = "server has received msg, msg data is " + root["data"].asString();
- std::string return_str = root.toStyledString(); // 序列化后再发送
- send(return_str, root["id"].asInt());
-
- // 继续轮询剩余未处理的数据
- _b_head_parse = false;
- _recv_head_node->clearData();
- if (bytes_transferred <= 0) {
- memset(_data, 0, MAX_LENGTH);
- _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
- std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self_share));
- return;
- }
- continue;
- }
-
- // 已经处理完头部,继续处理未处理完的数据
- int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
- // 接收的数据仍不完整
- if (bytes_transferred < remain_msg) {
- memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
- _recv_msg_node->_cur_len += bytes_transferred;
- // 继续接收数据
- memset(_data, 0, MAX_LENGTH);
- _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
- std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self_share));
- return;
- }
- // 接受的数据完整
- memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
- _recv_msg_node->_cur_len += remain_msg;
- bytes_transferred -= remain_msg;
- copy_len += remain_msg;
- _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
- //std::cout << "receive data: " << _recv_msg_node->_data << std::endl;
-
- // 发过来的是json,解析数据测试是否正确
- Json::Reader reader;
- Json::Value root;
- reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
- std::cout << "receive msg id is " << root["id"].asInt() << " msg data is "
- << root["data"].asString() << std::endl;
- // 发送测试
- root["data"] = "server has received msg, msg data is " + root["data"].asString();
- std::string return_str = root.toStyledString(); // 序列化后再发送
- send(return_str, root["id"].asInt());
-
- // 继续轮询剩余未处理的数据
- _b_head_parse = false;
- _recv_head_node->clearData();
- if (bytes_transferred <= 0) {
- memset(_data, 0, MAX_LENGTH);
- _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
- std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self_share));
- return;
- }
- continue;
- }
- }
- else {
- std::cout << "read error" << std::endl;
- close();
- _server->clearSession(_uuid);
- }
- }
- catch (std::exception& e) {
- std::cout << "Exception: " << e.what() << std::endl;
- }
- }
-
- void Session::handle_write(const boost::system::error_code& ec,
- std::shared_ptr<Session> self_share)
- {
- try {
- if (!ec) {
- std::lock_guard<std::mutex> lock(_send_lock);
- _send_que.pop();
- if (!_send_que.empty()) {
- auto& msgNode = _send_que.front();
- boost::asio::async_write(_socket, boost::asio::buffer(msgNode->_data, msgNode->_total_len),
- std::bind(&Session::handle_write, this, std::placeholders::_1, self_share));
- }
- }
- else {
- std::cout << "write error: " << ec.what() << std::endl;
- close();
- _server->clearSession(_uuid);
- }
- }
- catch (std::exception& e) {
- std::cerr << "Exception: " << e.what() << std::endl;
- }
- }

- #include <boost/asio.hpp>
- #include <iostream>
- #include <thread>
- #include <json/json.h>
- #include <json/value.h>
- #include <json/reader.h>
-
- #define MAX_LENGTH 1024*2
- #define HEAD_LENGTH 2
- #define HEAD_TOTAL 4
-
- int main() {
-
- try {
- // 创建上下文服务
- boost::asio::io_context ioc;
- // 构造endpoint
- boost::asio::ip::tcp::endpoint remote_ep(boost::asio::ip::address::from_string("127.0.0.1"), 10086);
- boost::asio::ip::tcp::socket sock(ioc);
- boost::system::error_code error = boost::asio::error::host_not_found;
- sock.connect(remote_ep, error);
- if (error) {
- std::cout << "连接失败,错误码:" << error.value()
- << "错误信息:" << error.message() << std::endl;
- return 0;
- }
-
- Json::Value root;
- root["id"] = 1001;
- root["data"] = "hello";
- std::string request = root.toStyledString();
- std::size_t request_length = request.length();
- char send_data[MAX_LENGTH] = { 0 };
- int msgid = 1001;
- int msgid_host = boost::asio::detail::socket_ops::host_to_network_short(msgid);
- memcpy(send_data, &msgid_host, 2);
- int request_host_length = boost::asio::detail::socket_ops::host_to_network_short(request_length);
- memcpy(send_data + 2, &request_host_length, 2);
- memcpy(send_data + 4, request.c_str(), request_length);
- boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 4));
- std::cout << "begin to receive ..." << std::endl;
-
- char reply_head[HEAD_TOTAL];
- std::size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_TOTAL));
- msgid = 0;
- memcpy(&msgid, reply_head, 2);
- short msglen = 0;
- memcpy(&msglen, reply_head + 2, 2);
-
- // 转换为本地字节序
- msgid = boost::asio::detail::socket_ops::network_to_host_short(msgid);
- msglen = boost::asio::detail::socket_ops::network_to_host_short(msglen);
-
- char msg[MAX_LENGTH] = { 0 };
- std::size_t msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen));
- Json::Reader reader;
- reader.parse(std::string(msg, msg_length), root);
- std::cout << "msg id: " << root["id"] << ", msg: " << root["data"] << std::endl;
-
- getchar();
- }
- catch (std::exception& e) {
- std::cout << "异常:" << e.what() << std::endl;
- }
-
- return 0;
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。