赞
踩
muduo库是⼀个基于非阻塞IO和事件驱动的C++高并发TCP网络编程库。
简单来理解,它就是对原生的TCP套接字的封装,是一个比socket编程接口更好用的编程库。
TranslateServer.hpp:
- #pragma once
- #include <iostream>
- #include <functional>
- #include <unordered_map>
- #include <string>
- #include "muduo/net/TcpConnection.h"
- #include "muduo/net/TcpServer.h"
- #include "muduo/net/EventLoop.h"
-
- using std::cout;
- using std::endl;
- class TranslateServer
- {
- private:
- muduo::net::EventLoop _baseloop;
- muduo::net::TcpServer _server;
- public:
- TranslateServer(int port)
- :_server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), \
- "TranslateServer", muduo::net::TcpServer::kReusePort)
- {
- //bind是一个函数适配器
- _server.setConnectionCallback(std::bind(&TranslateServer::_onConnection, this, std::placeholders::_1));
- _server.setMessageCallback(std::bind(&TranslateServer::_onMessage, this, std::placeholders::_1, \
- std::placeholders::_2, std::placeholders::_3));
-
- }
- void start()
- {
- _server.start(); //开始事件监听
- _baseloop.loop(); //开始事件监控,这是一个死循环阻塞接口
- }
-
- // typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
- // typedef std::function<void (const TcpConnectionPtr&,
- // Buffer*,
- // Timestamp)> MessageCallback;
-
- //连接建立成功或者关闭时侯的回调函数
- void _onConnection(const muduo::net::TcpConnectionPtr& conn)
- {
- if (conn->connected())
- {
- cout << "新连接建立成功\n";
- }
- else
- {
- cout << "连接关闭\n";
- }
- }
- //通信连接收到请求时的回调函数
- void _onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp time)
- {
- std::string str = buffer->retrieveAllAsString();
- std::string resp = translate(str);
- conn->send(resp);
- }
-
- std::string translate(const std::string& str)
- {
- static std::unordered_map<std::string, std::string> _dict = {
- {"hello", "你好"},
- {"white", "白色"}
- };
-
- if (_dict.count(str))
- {
- return _dict[str];
- }
- return "没找到";
- }
-
- };
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
TranslateClient.hpp:
- #pragma once
- #include <functional>
- #include <iostream>
- #include "muduo/net/TcpClient.h"
- #include "muduo/net/TcpConnection.h"
- #include "muduo/net/EventLoopThread.h"
-
- class TranslateClient
- {
- private:
- muduo::net::EventLoopThread _loopThread; //EventLoop是阻塞式死循环,必须另起一个线程,否则用户无法在主线程输入。
- //_loopThread一建立就立马启动
-
- muduo::net::TcpClient _client;
- muduo::net::TcpConnectionPtr _conn;
- //TcpClient的connect是非阻塞接口,调用立马返回,这有可能导致用户send时尚未建立连接,而解引用空指针
- muduo::CountDownLatch _latch; //保证建立连接和send之间的同步关系
- public:
- TranslateClient(const std::string& serverIp, int serverPort)
- :_client(_loopThread.startLoop(), muduo::net::InetAddress(serverIp, serverPort), "TranslateClient")
- ,_latch(1)
- {
- _client.setConnectionCallback(std::bind(&TranslateClient::_onConnection, this, std::placeholders::_1));
- _client.setMessageCallback(std::bind(&TranslateClient::_onMessage, this, std::placeholders::_1, \
- std::placeholders::_2,std::placeholders::_3));
- }
-
- void connect()
- {
- _client.connect();
- _latch.wait();
- }
-
- bool send(std::string& msg)
- {
- if (_conn->connected())
- {
- _conn->send(msg);
- return true;
- }
- else
- {
- return false;
- }
- }
-
- private:
- /**************
- * 连接建立或者断开时的回调函数
- * **************/
- void _onConnection(const muduo::net::TcpConnectionPtr &conn)
- {
- if (conn->connected())
- {
- _latch.countDown();
- _conn = conn;
- }
- else
- {
- _conn.reset();
- }
- }
-
- void _onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp time)
- {
- std::cout << "翻译结果:" << buffer->retrieveAllAsString() << std::endl;
- }
- };
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
muduo的精髓在于大量的回调函数,建立或断开连接,收到消息时,都会调用我们传入的回调函数,回调函数就是我们处理业务的地方。
像上述的英译汉服务,双方肯定是能正常通信,但这绝不是一个成熟的方案。TCP通信时面向字节流的,存在数据粘包问题,要想解决必须使用用户层协议。
用户层协议主要就是解决数据粘包问题,另外序列化和反序列化也是其中的重要环节。muduo库是由陈硕大佬编写的,在安装好的muduo库中,他提供了一些编程样例,其中有一个就是基于protobuf,定制了一个用户层协议,用于网络通信。所以严格来说,该自定义协议并不是muduo库中的一部分。
- class ProtobufCodec : muduo::noncopyable
- {
- public:
-
- enum ErrorCode
- {
- kNoError = 0,
- kInvalidLength,
- kCheckSumError,
- kInvalidNameLen,
- kUnknownMessageType,
- kParseError,
- };
-
- typedef std::function<void (const muduo::net::TcpConnectionPtr&,
- const MessagePtr&,
- muduo::Timestamp)> ProtobufMessageCallback;
-
- typedef std::function<void (const muduo::net::TcpConnectionPtr&,
- muduo::net::Buffer*,
- muduo::Timestamp,
- ErrorCode)> ErrorCallback;
-
- explicit ProtobufCodec(const ProtobufMessageCallback& messageCb)
- : messageCallback_(messageCb),
- errorCallback_(defaultErrorCallback)
- {
- }
-
- ProtobufCodec(const ProtobufMessageCallback& messageCb, const ErrorCallback& errorCb)
- : messageCallback_(messageCb),
- errorCallback_(errorCb)
- {
- }
-
- void onMessage(const muduo::net::TcpConnectionPtr& conn,
- muduo::net::Buffer* buf,
- muduo::Timestamp receiveTime);
-
- void send(const muduo::net::TcpConnectionPtr& conn,
- const google::protobuf::Message& message)
- {
- // FIXME: serialize to TcpConnection::outputBuffer()
- muduo::net::Buffer buf;
- fillEmptyBuffer(&buf, message);
- conn->send(&buf);
- }
-
- static const muduo::string& errorCodeToString(ErrorCode errorCode);
- static void fillEmptyBuffer(muduo::net::Buffer* buf, const google::protobuf::Message& message);
- static google::protobuf::Message* createMessage(const std::string& type_name);
- static MessagePtr parse(const char* buf, int len, ErrorCode* errorCode);
-
- private:
- static void defaultErrorCallback(const muduo::net::TcpConnectionPtr&,
- muduo::net::Buffer*,
- muduo::Timestamp,
- ErrorCode);
-
- ProtobufMessageCallback messageCallback_;
- ErrorCallback errorCallback_;
-
- const static int kHeaderLen = sizeof(int32_t);
- const static int kMinMessageLen = 2*kHeaderLen + 2; // nameLen + typeName + checkSum
- const static int kMaxMessageLen = 64*1024*1024; // same as codec_stream.h kDefaultTotalBytesLimit
- };
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
ProtobufCodec类就是基于protobuf定义的结构化数据的应用层协议,协议格式如下:
onMessage的实现如下:
- void ProtobufCodec::onMessage(const TcpConnectionPtr& conn,
- Buffer* buf,
- Timestamp receiveTime)
- {
- while (buf->readableBytes() >= kMinMessageLen + kHeaderLen)
- {
- const int32_t len = buf->peekInt32();
- if (len > kMaxMessageLen || len < kMinMessageLen)
- {
- errorCallback_(conn, buf, receiveTime, kInvalidLength);
- break;
- }
- else if (buf->readableBytes() >= implicit_cast<size_t>(len + kHeaderLen))
- {
- ErrorCode errorCode = kNoError;
- MessagePtr message = parse(buf->peek()+kHeaderLen, len, &errorCode);
- if (errorCode == kNoError && message)
- {
- messageCallback_(conn, message, receiveTime);
- buf->retrieve(kHeaderLen+len);
- }
- else
- {
- errorCallback_(conn, buf, receiveTime, errorCode);
- break;
- }
- }
- else
- {
- break;
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
onMessage函数解决了TCP粘包的问题,从缓冲区中解析出一个完整的protobuf结构化数据(一个message)后,再调用messageCallback_处理。messageCallback_是构造ProtobufCodec时传入的回调函数。
如果我们的业务场景很单一,例如上面的英译汉服务器,直接把我们写的业务逻辑作为回调传给messageCallback_就OK了。但如果我们有多种业务,例如翻译和计算业务,则还可以在此基础上引入任务分发器ProtobufDispatcher,回调它的ProtobufDispatcher函数。
- class ProtobufDispatcher
- {
- public:
- typedef std::function<void (const muduo::net::TcpConnectionPtr&,
- const MessagePtr& message,
- muduo::Timestamp)> ProtobufMessageCallback;
-
- explicit ProtobufDispatcher(const ProtobufMessageCallback& defaultCb)
- : defaultCallback_(defaultCb)
- {
- }
-
- void onProtobufMessage(const muduo::net::TcpConnectionPtr& conn,
- const MessagePtr& message,
- muduo::Timestamp receiveTime) const
- {
- CallbackMap::const_iterator it = callbacks_.find(message->GetDescriptor());
- if (it != callbacks_.end())
- {
- it->second->onMessage(conn, message, receiveTime);
- }
- else
- {
- defaultCallback_(conn, message, receiveTime);
- }
- }
-
- template<typename T>
- void registerMessageCallback(const typename CallbackT<T>::ProtobufMessageTCallback& callback)
- {
- std::shared_ptr<CallbackT<T> > pd(new CallbackT<T>(callback));
- callbacks_[T::descriptor()] = pd;
- }
-
- private:
- typedef std::map<const google::protobuf::Descriptor*, std::shared_ptr<Callback> > CallbackMap;
-
- CallbackMap callbacks_;
- ProtobufMessageCallback defaultCallback_;
- };
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
onProtobufMessage会根据你传入的结构化数据类型(message),调用不同的回调函数,这些回调函数就是我们注册的业务处理方法。
- 编写.并翻译proto文件,构建翻译的请求和响应,加法的请求和响应的类
- 编写服务端
- 编写客户端
Server.cc:
- #include <memory>
- #include "muduo/protobuf/codec.h"
- #include "muduo/protobuf/dispatcher.h"
- #include "muduo/base/Logging.h"
-
- #include "muduo/net/TcpServer.h"
- #include "muduo/net/TcpConnection.h"
- #include "muduo/net/EventLoop.h"
-
- #include "business.pb.h"
-
- using std::placeholders::_1;
- using std::placeholders::_2;
- using std::placeholders::_3;
- class Server
- {
- public:
- typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
- typedef std::shared_ptr<business::TranslateRequest> TranslateRequestPtr;
- typedef std::shared_ptr<business::AddRequest> AddRequestPtr;
-
- private:
- muduo::net::EventLoop _baseLoop;
- muduo::net::TcpServer _server;
- ProtobufDispatcher _dispatcher; // 请求分发器
- ProtobufCodec _codec; // protobuf处理器--解析出结构化数据,发送结构化数据(序列化和发序列化内部会做)
- public:
- Server(int port)
- : _server(&_baseLoop, muduo::net::InetAddress("0.0.0.0", port), "Server",
- muduo::net::TcpServer::kReusePort)
- ,_dispatcher(std::bind(&Server::_onUnknownMessage, this,\
- std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
- ,_codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,
- std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
- {
- // 注册业务处理函数
- _dispatcher.registerMessageCallback<business::AddRequest>(bind(&Server::_onAdd, this, _1, _2, _3));
- _dispatcher.registerMessageCallback<business::TranslateRequest>(bind(&Server::_onTranslate, this, _1, _2, _3));
-
- //注册_server的回调函数
- _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec,
- std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
- _server.setConnectionCallback(std::bind(&Server::_onConnection, this, std::placeholders::_1));
- }
-
- void start()
- {
- _server.start();
- _baseLoop.loop();
- }
-
- private:
- void _onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp)
- {
- LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
- conn->shutdown();
- }
-
- void _onAdd(const muduo::net::TcpConnectionPtr& conn, const AddRequestPtr &messagePtr, muduo::Timestamp time)
- {
- int x = messagePtr->num1();
- int y = messagePtr->num2();
-
- business::AddResponse resp;
- resp.set_result(x + y);
- _codec.send(conn, resp); //让protobuf处理器帮我们序列化并用conn发送
- }
-
- void _onTranslate(const muduo::net::TcpConnectionPtr& conn, const TranslateRequestPtr &messagePtr, muduo::Timestamp time)
- {
- const std::string& ret = translate(messagePtr->msg());
- business::TranslateResponse resp;
- resp.set_msg(ret);
- _codec.send(conn, resp);
- }
-
- void _onConnection(const muduo::net::TcpConnectionPtr &conn)
- {
- if (conn->connected())
- {
- LOG_INFO << "新连接建立成功!";
- }
- else
- {
- LOG_INFO << "连接即将关闭!";
- }
- }
-
- std::string translate(const std::string& str)
- {
- static std::unordered_map<std::string, std::string> dict_map = {
- {"hello", "你好"},
- {"Hello", "你好"},
- {"你好", "Hello"},
- {"吃了吗", "油泼面"}
- };
- auto it = dict_map.find(str);
- if (it == dict_map.end()) {
- return "没听懂!!";
- }
- return it->second;
- }
- };
-
- int main()
- {
- Server server(8085);
- server.start();
- return 0;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
Client.cc:
-
- #include "muduo/protobuf/codec.h"
- #include "muduo/protobuf/dispatcher.h"
- #include "muduo/base/Logging.h"
- #include "muduo/base/Mutex.h"
- #include "muduo/net/EventLoop.h"
- #include "muduo/net/TcpClient.h"
- #include "muduo/net/EventLoopThread.h"
- #include "muduo/base/CountDownLatch.h"
-
- #include "business.pb.h"
- #include <iostream>
- #include <functional>
-
- class Client {
- public:
- typedef std::shared_ptr<google::protobuf::Message> MessagePtr; //这是Protobuf库的头文件
- typedef std::shared_ptr<business::AddResponse> AddResponsePtr;
- typedef std::shared_ptr<business::TranslateResponse> TranslateResponsePtr;
- Client(const std::string &sip, int sport):
- _latch(1), _client(_loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "Client"),
- _dispatcher(std::bind(&Client::onUnknownMessage, this, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3)),
- _codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,
- std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)){
-
- _dispatcher.registerMessageCallback<business::TranslateResponse>(std::bind(&Client::onTranslate, this,
- std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
-
- _dispatcher.registerMessageCallback<business::AddResponse>(std::bind(&Client::onAdd, this,
- std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
-
- _client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec,
- std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
- _client.setConnectionCallback(std::bind(&Client::onConnection, this, std::placeholders::_1));
- }
- void connect() {
- _client.connect();
- _latch.wait();//阻塞等待,直到连接建立成功
- }
- void Translate(const std::string &msg){
- business::TranslateRequest req;
- req.set_msg(msg);
- send(&req);
- }
- void Add(int num1, int num2) {
- business::AddRequest req;
- req.set_num1(num1);
- req.set_num2(num2);
- send(&req);
- }
- private:
- bool send(const google::protobuf::Message *message) {
- if (_conn->connected()) {//连接状态正常,再发送,否则就返回false
- _codec.send(_conn, *message);
- return true;
- }
- return false;
- }
- void onTranslate(const muduo::net::TcpConnectionPtr& conn, const TranslateResponsePtr& message, muduo::Timestamp) {
- std::cout << "翻译结果:" << message->msg() << std::endl;
- }
- void onAdd(const muduo::net::TcpConnectionPtr& conn, const AddResponsePtr& message, muduo::Timestamp) {
- std::cout << "加法结果:" << message->result() << std::endl;
- }
- void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp) {
- LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
- conn->shutdown();
- }
- void onConnection(const muduo::net::TcpConnectionPtr&conn){
- if (conn->connected()) {
- _latch.countDown();//唤醒主线程中的阻塞
- _conn = conn;
- }else {
- //连接关闭时的操作
- _conn.reset();
- }
- }
- private:
- muduo::CountDownLatch _latch;//实现同步的
- muduo::net::EventLoopThread _loopthread;//异步循环处理线程
- muduo::net::TcpConnectionPtr _conn;//客户端对应的连接
- muduo::net::TcpClient _client;//客户端
- ProtobufDispatcher _dispatcher;//请求分发器
- ProtobufCodec _codec; //Protobuf处理器
- };
-
- int main()
- {
- Client client("127.0.0.1", 8085);
- client.connect();
-
- client.Translate("hello");
- client.Add(11, 22);
-
- sleep(1);
- return 0;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。