当前位置:   article > 正文

【仿RabbitMQ消息队列项目day2】使用muduo库中基于protobuf的应用层协议进行通信_比特 仿rabbitmq

比特 仿rabbitmq

一.什么是muduo?

muduo库是⼀个基于非阻塞IO和事件驱动的C++高并发TCP网络编程库。

简单来理解,它就是对原生的TCP套接字的封装,是一个比socket编程接口更好用的编程库。 

二.使用muduo库完成一个英译汉翻译服务

TranslateServer.hpp:

  1. #pragma once
  2. #include <iostream>
  3. #include <functional>
  4. #include <unordered_map>
  5. #include <string>
  6. #include "muduo/net/TcpConnection.h"
  7. #include "muduo/net/TcpServer.h"
  8. #include "muduo/net/EventLoop.h"
  9. using std::cout;
  10. using std::endl;
  11. class TranslateServer
  12. {
  13. private:
  14. muduo::net::EventLoop _baseloop;
  15. muduo::net::TcpServer _server;
  16. public:
  17. TranslateServer(int port)
  18. :_server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), \
  19. "TranslateServer", muduo::net::TcpServer::kReusePort)
  20. {
  21. //bind是一个函数适配器
  22. _server.setConnectionCallback(std::bind(&TranslateServer::_onConnection, this, std::placeholders::_1));
  23. _server.setMessageCallback(std::bind(&TranslateServer::_onMessage, this, std::placeholders::_1, \
  24. std::placeholders::_2, std::placeholders::_3));
  25. }
  26. void start()
  27. {
  28. _server.start(); //开始事件监听
  29. _baseloop.loop(); //开始事件监控,这是一个死循环阻塞接口
  30. }
  31. // typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
  32. // typedef std::function<void (const TcpConnectionPtr&,
  33. // Buffer*,
  34. // Timestamp)> MessageCallback;
  35. //连接建立成功或者关闭时侯的回调函数
  36. void _onConnection(const muduo::net::TcpConnectionPtr& conn)
  37. {
  38. if (conn->connected())
  39. {
  40. cout << "新连接建立成功\n";
  41. }
  42. else
  43. {
  44. cout << "连接关闭\n";
  45. }
  46. }
  47. //通信连接收到请求时的回调函数
  48. void _onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp time)
  49. {
  50. std::string str = buffer->retrieveAllAsString();
  51. std::string resp = translate(str);
  52. conn->send(resp);
  53. }
  54. std::string translate(const std::string& str)
  55. {
  56. static std::unordered_map<std::string, std::string> _dict = {
  57. {"hello", "你好"},
  58. {"white", "白色"}
  59. };
  60. if (_dict.count(str))
  61. {
  62. return _dict[str];
  63. }
  64. return "没找到";
  65. }
  66. };

TranslateClient.hpp:

  1. #pragma once
  2. #include <functional>
  3. #include <iostream>
  4. #include "muduo/net/TcpClient.h"
  5. #include "muduo/net/TcpConnection.h"
  6. #include "muduo/net/EventLoopThread.h"
  7. class TranslateClient
  8. {
  9. private:
  10. muduo::net::EventLoopThread _loopThread; //EventLoop是阻塞式死循环,必须另起一个线程,否则用户无法在主线程输入。
  11. //_loopThread一建立就立马启动
  12. muduo::net::TcpClient _client;
  13. muduo::net::TcpConnectionPtr _conn;
  14. //TcpClient的connect是非阻塞接口,调用立马返回,这有可能导致用户send时尚未建立连接,而解引用空指针
  15. muduo::CountDownLatch _latch; //保证建立连接和send之间的同步关系
  16. public:
  17. TranslateClient(const std::string& serverIp, int serverPort)
  18. :_client(_loopThread.startLoop(), muduo::net::InetAddress(serverIp, serverPort), "TranslateClient")
  19. ,_latch(1)
  20. {
  21. _client.setConnectionCallback(std::bind(&TranslateClient::_onConnection, this, std::placeholders::_1));
  22. _client.setMessageCallback(std::bind(&TranslateClient::_onMessage, this, std::placeholders::_1, \
  23. std::placeholders::_2,std::placeholders::_3));
  24. }
  25. void connect()
  26. {
  27. _client.connect();
  28. _latch.wait();
  29. }
  30. bool send(std::string& msg)
  31. {
  32. if (_conn->connected())
  33. {
  34. _conn->send(msg);
  35. return true;
  36. }
  37. else
  38. {
  39. return false;
  40. }
  41. }
  42. private:
  43. /**************
  44. * 连接建立或者断开时的回调函数
  45. * **************/
  46. void _onConnection(const muduo::net::TcpConnectionPtr &conn)
  47. {
  48. if (conn->connected())
  49. {
  50. _latch.countDown();
  51. _conn = conn;
  52. }
  53. else
  54. {
  55. _conn.reset();
  56. }
  57. }
  58. void _onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp time)
  59. {
  60. std::cout << "翻译结果:" << buffer->retrieveAllAsString() << std::endl;
  61. }
  62. };

muduo的精髓在于大量的回调函数,建立或断开连接,收到消息时,都会调用我们传入的回调函数,回调函数就是我们处理业务的地方。

三.muduo中基于protobuf的自定义协议

像上述的英译汉服务,双方肯定是能正常通信,但这绝不是一个成熟的方案。TCP通信时面向字节流的,存在数据粘包问题,要想解决必须使用用户层协议。

用户层协议主要就是解决数据粘包问题,另外序列化和反序列化也是其中的重要环节。muduo库是由陈硕大佬编写的,在安装好的muduo库中,他提供了一些编程样例,其中有一个就是基于protobuf,定制了一个用户层协议,用于网络通信。所以严格来说,该自定义协议并不是muduo库中的一部分。

  1. class ProtobufCodec : muduo::noncopyable
  2. {
  3. public:
  4. enum ErrorCode
  5. {
  6. kNoError = 0,
  7. kInvalidLength,
  8. kCheckSumError,
  9. kInvalidNameLen,
  10. kUnknownMessageType,
  11. kParseError,
  12. };
  13. typedef std::function<void (const muduo::net::TcpConnectionPtr&,
  14. const MessagePtr&,
  15. muduo::Timestamp)> ProtobufMessageCallback;
  16. typedef std::function<void (const muduo::net::TcpConnectionPtr&,
  17. muduo::net::Buffer*,
  18. muduo::Timestamp,
  19. ErrorCode)> ErrorCallback;
  20. explicit ProtobufCodec(const ProtobufMessageCallback& messageCb)
  21. : messageCallback_(messageCb),
  22. errorCallback_(defaultErrorCallback)
  23. {
  24. }
  25. ProtobufCodec(const ProtobufMessageCallback& messageCb, const ErrorCallback& errorCb)
  26. : messageCallback_(messageCb),
  27. errorCallback_(errorCb)
  28. {
  29. }
  30. void onMessage(const muduo::net::TcpConnectionPtr& conn,
  31. muduo::net::Buffer* buf,
  32. muduo::Timestamp receiveTime);
  33. void send(const muduo::net::TcpConnectionPtr& conn,
  34. const google::protobuf::Message& message)
  35. {
  36. // FIXME: serialize to TcpConnection::outputBuffer()
  37. muduo::net::Buffer buf;
  38. fillEmptyBuffer(&buf, message);
  39. conn->send(&buf);
  40. }
  41. static const muduo::string& errorCodeToString(ErrorCode errorCode);
  42. static void fillEmptyBuffer(muduo::net::Buffer* buf, const google::protobuf::Message& message);
  43. static google::protobuf::Message* createMessage(const std::string& type_name);
  44. static MessagePtr parse(const char* buf, int len, ErrorCode* errorCode);
  45. private:
  46. static void defaultErrorCallback(const muduo::net::TcpConnectionPtr&,
  47. muduo::net::Buffer*,
  48. muduo::Timestamp,
  49. ErrorCode);
  50. ProtobufMessageCallback messageCallback_;
  51. ErrorCallback errorCallback_;
  52. const static int kHeaderLen = sizeof(int32_t);
  53. const static int kMinMessageLen = 2*kHeaderLen + 2; // nameLen + typeName + checkSum
  54. const static int kMaxMessageLen = 64*1024*1024; // same as codec_stream.h kDefaultTotalBytesLimit
  55. };

ProtobufCodec类就是基于protobuf定义的结构化数据的应用层协议,协议格式如下:

onMessage的实现如下: 

  1. void ProtobufCodec::onMessage(const TcpConnectionPtr& conn,
  2. Buffer* buf,
  3. Timestamp receiveTime)
  4. {
  5. while (buf->readableBytes() >= kMinMessageLen + kHeaderLen)
  6. {
  7. const int32_t len = buf->peekInt32();
  8. if (len > kMaxMessageLen || len < kMinMessageLen)
  9. {
  10. errorCallback_(conn, buf, receiveTime, kInvalidLength);
  11. break;
  12. }
  13. else if (buf->readableBytes() >= implicit_cast<size_t>(len + kHeaderLen))
  14. {
  15. ErrorCode errorCode = kNoError;
  16. MessagePtr message = parse(buf->peek()+kHeaderLen, len, &errorCode);
  17. if (errorCode == kNoError && message)
  18. {
  19. messageCallback_(conn, message, receiveTime);
  20. buf->retrieve(kHeaderLen+len);
  21. }
  22. else
  23. {
  24. errorCallback_(conn, buf, receiveTime, errorCode);
  25. break;
  26. }
  27. }
  28. else
  29. {
  30. break;
  31. }
  32. }
  33. }

onMessage函数解决了TCP粘包的问题,从缓冲区中解析出一个完整的protobuf结构化数据(一个message)后,再调用messageCallback_处理。messageCallback_是构造ProtobufCodec时传入的回调函数。

如果我们的业务场景很单一,例如上面的英译汉服务器,直接把我们写的业务逻辑作为回调传给messageCallback_就OK了。但如果我们有多种业务,例如翻译和计算业务,则还可以在此基础上引入任务分发器ProtobufDispatcher,回调它的ProtobufDispatcher函数。

  1. class ProtobufDispatcher
  2. {
  3. public:
  4. typedef std::function<void (const muduo::net::TcpConnectionPtr&,
  5. const MessagePtr& message,
  6. muduo::Timestamp)> ProtobufMessageCallback;
  7. explicit ProtobufDispatcher(const ProtobufMessageCallback& defaultCb)
  8. : defaultCallback_(defaultCb)
  9. {
  10. }
  11. void onProtobufMessage(const muduo::net::TcpConnectionPtr& conn,
  12. const MessagePtr& message,
  13. muduo::Timestamp receiveTime) const
  14. {
  15. CallbackMap::const_iterator it = callbacks_.find(message->GetDescriptor());
  16. if (it != callbacks_.end())
  17. {
  18. it->second->onMessage(conn, message, receiveTime);
  19. }
  20. else
  21. {
  22. defaultCallback_(conn, message, receiveTime);
  23. }
  24. }
  25. template<typename T>
  26. void registerMessageCallback(const typename CallbackT<T>::ProtobufMessageTCallback& callback)
  27. {
  28. std::shared_ptr<CallbackT<T> > pd(new CallbackT<T>(callback));
  29. callbacks_[T::descriptor()] = pd;
  30. }
  31. private:
  32. typedef std::map<const google::protobuf::Descriptor*, std::shared_ptr<Callback> > CallbackMap;
  33. CallbackMap callbacks_;
  34. ProtobufMessageCallback defaultCallback_;
  35. };

onProtobufMessage会根据你传入的结构化数据类型(message),调用不同的回调函数,这些回调函数就是我们注册的业务处理方法。

四.编写一个翻译+加法服务

  1. 编写.并翻译proto文件,构建翻译的请求和响应,加法的请求和响应的类
  2. 编写服务端
  3. 编写客户端

Server.cc:

  1. #include <memory>
  2. #include "muduo/protobuf/codec.h"
  3. #include "muduo/protobuf/dispatcher.h"
  4. #include "muduo/base/Logging.h"
  5. #include "muduo/net/TcpServer.h"
  6. #include "muduo/net/TcpConnection.h"
  7. #include "muduo/net/EventLoop.h"
  8. #include "business.pb.h"
  9. using std::placeholders::_1;
  10. using std::placeholders::_2;
  11. using std::placeholders::_3;
  12. class Server
  13. {
  14. public:
  15. typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
  16. typedef std::shared_ptr<business::TranslateRequest> TranslateRequestPtr;
  17. typedef std::shared_ptr<business::AddRequest> AddRequestPtr;
  18. private:
  19. muduo::net::EventLoop _baseLoop;
  20. muduo::net::TcpServer _server;
  21. ProtobufDispatcher _dispatcher; // 请求分发器
  22. ProtobufCodec _codec; // protobuf处理器--解析出结构化数据,发送结构化数据(序列化和发序列化内部会做)
  23. public:
  24. Server(int port)
  25. : _server(&_baseLoop, muduo::net::InetAddress("0.0.0.0", port), "Server",
  26. muduo::net::TcpServer::kReusePort)
  27. ,_dispatcher(std::bind(&Server::_onUnknownMessage, this,\
  28. std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
  29. ,_codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,
  30. std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
  31. {
  32. // 注册业务处理函数
  33. _dispatcher.registerMessageCallback<business::AddRequest>(bind(&Server::_onAdd, this, _1, _2, _3));
  34. _dispatcher.registerMessageCallback<business::TranslateRequest>(bind(&Server::_onTranslate, this, _1, _2, _3));
  35. //注册_server的回调函数
  36. _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec,
  37. std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  38. _server.setConnectionCallback(std::bind(&Server::_onConnection, this, std::placeholders::_1));
  39. }
  40. void start()
  41. {
  42. _server.start();
  43. _baseLoop.loop();
  44. }
  45. private:
  46. void _onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp)
  47. {
  48. LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
  49. conn->shutdown();
  50. }
  51. void _onAdd(const muduo::net::TcpConnectionPtr& conn, const AddRequestPtr &messagePtr, muduo::Timestamp time)
  52. {
  53. int x = messagePtr->num1();
  54. int y = messagePtr->num2();
  55. business::AddResponse resp;
  56. resp.set_result(x + y);
  57. _codec.send(conn, resp); //让protobuf处理器帮我们序列化并用conn发送
  58. }
  59. void _onTranslate(const muduo::net::TcpConnectionPtr& conn, const TranslateRequestPtr &messagePtr, muduo::Timestamp time)
  60. {
  61. const std::string& ret = translate(messagePtr->msg());
  62. business::TranslateResponse resp;
  63. resp.set_msg(ret);
  64. _codec.send(conn, resp);
  65. }
  66. void _onConnection(const muduo::net::TcpConnectionPtr &conn)
  67. {
  68. if (conn->connected())
  69. {
  70. LOG_INFO << "新连接建立成功!";
  71. }
  72. else
  73. {
  74. LOG_INFO << "连接即将关闭!";
  75. }
  76. }
  77. std::string translate(const std::string& str)
  78. {
  79. static std::unordered_map<std::string, std::string> dict_map = {
  80. {"hello", "你好"},
  81. {"Hello", "你好"},
  82. {"你好", "Hello"},
  83. {"吃了吗", "油泼面"}
  84. };
  85. auto it = dict_map.find(str);
  86. if (it == dict_map.end()) {
  87. return "没听懂!!";
  88. }
  89. return it->second;
  90. }
  91. };
  92. int main()
  93. {
  94. Server server(8085);
  95. server.start();
  96. return 0;
  97. }

Client.cc: 

  1. #include "muduo/protobuf/codec.h"
  2. #include "muduo/protobuf/dispatcher.h"
  3. #include "muduo/base/Logging.h"
  4. #include "muduo/base/Mutex.h"
  5. #include "muduo/net/EventLoop.h"
  6. #include "muduo/net/TcpClient.h"
  7. #include "muduo/net/EventLoopThread.h"
  8. #include "muduo/base/CountDownLatch.h"
  9. #include "business.pb.h"
  10. #include <iostream>
  11. #include <functional>
  12. class Client {
  13. public:
  14. typedef std::shared_ptr<google::protobuf::Message> MessagePtr; //这是Protobuf库的头文件
  15. typedef std::shared_ptr<business::AddResponse> AddResponsePtr;
  16. typedef std::shared_ptr<business::TranslateResponse> TranslateResponsePtr;
  17. Client(const std::string &sip, int sport):
  18. _latch(1), _client(_loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "Client"),
  19. _dispatcher(std::bind(&Client::onUnknownMessage, this, std::placeholders::_1,
  20. std::placeholders::_2, std::placeholders::_3)),
  21. _codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,
  22. std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)){
  23. _dispatcher.registerMessageCallback<business::TranslateResponse>(std::bind(&Client::onTranslate, this,
  24. std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  25. _dispatcher.registerMessageCallback<business::AddResponse>(std::bind(&Client::onAdd, this,
  26. std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  27. _client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec,
  28. std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  29. _client.setConnectionCallback(std::bind(&Client::onConnection, this, std::placeholders::_1));
  30. }
  31. void connect() {
  32. _client.connect();
  33. _latch.wait();//阻塞等待,直到连接建立成功
  34. }
  35. void Translate(const std::string &msg){
  36. business::TranslateRequest req;
  37. req.set_msg(msg);
  38. send(&req);
  39. }
  40. void Add(int num1, int num2) {
  41. business::AddRequest req;
  42. req.set_num1(num1);
  43. req.set_num2(num2);
  44. send(&req);
  45. }
  46. private:
  47. bool send(const google::protobuf::Message *message) {
  48. if (_conn->connected()) {//连接状态正常,再发送,否则就返回false
  49. _codec.send(_conn, *message);
  50. return true;
  51. }
  52. return false;
  53. }
  54. void onTranslate(const muduo::net::TcpConnectionPtr& conn, const TranslateResponsePtr& message, muduo::Timestamp) {
  55. std::cout << "翻译结果:" << message->msg() << std::endl;
  56. }
  57. void onAdd(const muduo::net::TcpConnectionPtr& conn, const AddResponsePtr& message, muduo::Timestamp) {
  58. std::cout << "加法结果:" << message->result() << std::endl;
  59. }
  60. void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp) {
  61. LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
  62. conn->shutdown();
  63. }
  64. void onConnection(const muduo::net::TcpConnectionPtr&conn){
  65. if (conn->connected()) {
  66. _latch.countDown();//唤醒主线程中的阻塞
  67. _conn = conn;
  68. }else {
  69. //连接关闭时的操作
  70. _conn.reset();
  71. }
  72. }
  73. private:
  74. muduo::CountDownLatch _latch;//实现同步的
  75. muduo::net::EventLoopThread _loopthread;//异步循环处理线程
  76. muduo::net::TcpConnectionPtr _conn;//客户端对应的连接
  77. muduo::net::TcpClient _client;//客户端
  78. ProtobufDispatcher _dispatcher;//请求分发器
  79. ProtobufCodec _codec; //Protobuf处理器
  80. };
  81. int main()
  82. {
  83. Client client("127.0.0.1", 8085);
  84. client.connect();
  85. client.Translate("hello");
  86. client.Add(11, 22);
  87. sleep(1);
  88. return 0;
  89. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/644135
推荐阅读
相关标签
  

闽ICP备14008679号