赞
踩
本项目已开源到下面链接下的仓库当中
针对于日志的信息,我采用了之前写的一份利用可变参数实现日志的代码,具体链接如下
对于这个项目来说,首先需要明确几个概念,项目中要包含生产者,消费者,中间人,发布,订阅
先说生产者和消费者的模型,这个并不陌生,在操作系统中有过对于这些内容的讲解:
如上所示是一个基本的生产消费者模型的理论,那么在这个理论中比较重要的就是中间的这个Broker Server,这个部分的核心功能就是进行消息的存储和转发
在中间件服务器Broker当中,存在下面的概念
虚拟机:类似于是一个MySQL当中的database的概念,是一个逻辑上的集合,一个BrokerServer当中会存在多个VirtualHost
交换机:这是生产者把消息先发送到Broker当中的Exchange上,再依据不同的规则,把消息转发给不同的Queue
队列:真正用来存储消息的部分,每个消费者自己进行决定从哪个Queue上进行消息的读取
绑定:这是一个Exchange和Queue之间的关联关系,Exchange和Queue可以理解为是多对多的概念,而用一个关联表就可以把信息存储起来
所以要实现的内容,其实就有下面的内容:
在AMQP协议中细化了对应的规则,以虚拟机为单元,来进行交换机,队列,绑定的整体操作,下面基于这些理论进行一些核心的API功能和操作
对于Broker来说,主要有下面的这些功能操作
对于RabbitMQ来说,主要支持下面的四种交换机类型
下面针对于前三种比较常见的交换机进行一个简单的概述:
Direct:生产者发送消息时,直接指定被该交换机绑定的队列名,这样就可以直接进行交换了
Fanout:生产者发送的消息会被复制到该交换机的所有队列中,就有点类似于是一个广播的效果
Topic:绑定队列到交换机上,指定一个字符串为bindingKey,发送消息指定一个字符串是routingKey,这样只有这两个Key满足一定条件的时候再进行对应的消息的投递
项目一定是需要有持久化的需求的
对于被消费的消息,是要进行一些应答的,具体的策略有两种:
对于手动应答来说,比较好的一个特点是,保证了消息确实被消费者处理成功了,在一些对于数据可靠性要求比较高的场景下,是一个比较常见的特点
对于这个项目来说,主要有下面的三个模块
这个模块主要是进行描述的是,哪个队列和哪个交换机绑定在了一起
对于这个模块来说,先看一下消息数据说的是什么:
消息信息中主要包含内容+属性:
对于属性来说包含的有,消息的ID,以及持久化表示,routing_key,这个key表示的是当前消息要发布的队列信息
对于管理方式当中,必定是要以队列为单元进行管理的,因为消息的所有操作都是在队列当中进行实现的,消息是要存储在队列当中的
对于管理数据来说,首先会存在一个消息链表,这当中存放的是保存所有待推送的消息,以及待确认消息hash,模仿TCP的方式,保证消息可靠传输,持久化消息hash,以及持久化的有效消息数量,持久化的总的消息数量等
对于消息管理的操作来说,要提供的方法有,向队列新增消息,获取队首消息,对消息的确认,恢复队列历史消息,垃圾回收,删除队列相关消息文件等
对于队列的管理来说,主要包含有:
初始化队列消息,移除队列消息,向队列新增消息,对队列消息进行确认,恢复队列历史消息等
对于虚拟机来说,它内部包含了交换机,队列,绑定,消息,这些数据管理的集合
在其内部管理的数据:
要管理的操作包括:
对于这个模块来说,决定了一条消息是否可以发布到指定的队列,具体的,在每个队列和交换机的绑定当中会存在一个binding_key,这个是队列发布的一种匹配规则,在每条要发布的消息中,都会有一个routing_key,这是消息的发布规则
而对于交换机来说存在三种类型,直接广播和主题,这三种类型就会借助routing_key和binding_key来进行合适的划分
路由器模块的本质,其实没有管理的数据,只是提供一些匹配的操作:
客户端有两种,分别是用来发布消息和订阅消息,只有订阅了指定队列消息的客户端才是一个消费者,而消费者数据存在的意义是,当指定队列有了消息之后,就需要把消息推送给这个消费者客户端
对于消费者的信息来说,其中需要包含内容有:
对于信道管理Channel来说,这是一个在网络通信中的概念,表示的就是通信通道,当进行网络通信的时候,必然是借助一个网络通信连接来完成的,为了更加方便的进行资源的利用,因此要对于通信连接进行了一个更加进一步的细化,细化出了通信通道,对于用户来说,一个通信通道就是网络通信的载体,而一个真正的通信连接,是可以创建出多个通信通道
每一个信道和信道之间,在使用者的角度来看是相互独立的存在,所有的网络通信服务都是由信道提供的
信道提供的服务操作有:
信道要管理的数据主要包括有,信道的ID,信道所关联的虚拟机句柄,信道关联的消费者句柄,工作线程池句柄
下面基于上述的这些理论,画出下面的逻辑示意图
示意图1
首先,对于交换机,它的功能主要是进行消息的转发,当有消息到来之后,交换机的作用是把消息转发到对应的队列当中,而转发的依据就需要用到路由匹配模块,路由匹配模块会根据交换机的类型和对应的binding信息,来匹配到对应的队列进行消息的转发,而整个交换机到binding模块再到路由匹配和队列这一整个模块,就被叫做是虚拟机
因此,因为有了虚拟机的存在,所以对于一个要发布的消息来说,只需要把消息传递给虚拟机就可以了,具体的分发策略是由虚拟机来完成的,做到了一个解耦合的效果,下面再来看数据库
因为有持久化的原因,所以虚拟机必定是需要伴随着数据库的,数据库当中就会有消息的持久化和数据的持久化,这个模块就是项目大体的核心模块
示意图2
下面演示的是第二个示意图:
如上所示的是对于BrokerServer模块,这是一个服务端的示意图,对于服务端来说内部会有一个连接管理器,而对于连接管理器来说,它内部会存在一个Channel信道,这个信道内部会有各种各样的方法,例如有创建删除交换机,创建删除队列,绑定解绑队列和交换机,发布消息,订阅队列消息,取消订阅消息,确认消息等等各种各样的接口,这些接口足以支撑整个服务器
示意图3
如上所示是对于客户端的示意图,客户端分为发布客户端和订阅客户端,而这两个客户端又公用同样的一组接口,这组接口中同样存在有Connection模块进行连接的管理,Connection内部又包含有各种各样的处理方法,而这些方法实际上都会和BrokerServer进行一个连接,这样就把整个服务端,客户端和主体数据库连接在一起了:
下面,就基于上述的这些理论,进行代码的编写
这个模块没有什么好说的,就是一些工具的编写,方便进行使用
class SqliteHelper
{
public:
typedef int(*SqliteCallback)(void*,int,char**,char**);
SqliteHelper(const string &dbfile) : _dbfile(dbfile), _handler(nullptr){}
bool open(int safe_leve = SQLITE_OPEN_FULLMUTEX);
bool exec(const string &sql, SqliteCallback cb, void *arg);
void close();
private:
string _dbfile;
sqlite3 *_handler;
};
class StrHelper
{
public:
static size_t split(const string& str, const string& sep, vector<string>& result);
};
class UUIDHelper
{
public:
static string uuid();
};
class FileHelper { public: FileHelper(const string &filename):_filename(filename){} bool exists(); size_t size(); bool read(char *body, size_t offset, size_t len); bool read(string &body); bool write(const char *body, size_t offset, size_t len); bool write(const string &body); bool rename(const string &nname); static string parentDirectory(const string &filename); static bool createFile(const string &filename); static bool removeFile(const string &filename); static bool createDirectory(const string &path); static bool removeDirectory(const string &path); private: string _filename; };
在项目功能模块的编写前,要先把消息类型定义出来
消息本身要素
消息额外存储所需要素
下面定义交换机的属性
交换机类型
消息投递模式
声明下面的类型,之后基于这个类型生成代码
protoc --cpp_out=. message.proto
syntax = "proto3"; package MessageQueue; enum ExchangeType { UNKNOWTYPE = 0; DIRECT = 1; FANOUT = 2; TOPIC = 3; }; enum DeliveryMode { UNKNOWMODE = 0; UNDURABLE = 1; DURABLE = 2; }; message BasicProperties { string id = 1; DeliveryMode delivery_mode = 2; string routing_key = 3; }; message Message { message Payload { BasicProperties properties = 1; string body = 2; string valid = 3; }; Payload payload = 1; uint32 offset = 2; uint32 length = 3; };
下面就基于这两个内容,实现对应的交换机部分的实现
// 1. 定义交换机类 struct Exchange { Exchange() {} using ptr = shared_ptr<Exchange>; Exchange(const string& ename, MessageQueue::ExchangeType etype, bool edurable, bool eauto_delete, unordered_map<string, string> eargs) : name(ename), type(etype), durable(edurable), auto_delete(eauto_delete), args(eargs) {} // args存储键值对,在进行数据库存储的时候,组织字符串格式进行存储 // 内部解析这种key=val&key=val,把数据存储到数据库中 void SetArgs(const string& str_args); // 从args中把数据存储到字符串中 string GetArgs(); // 名称,类型,持久化,自动删除,其他参数 string name; MessageQueue::ExchangeType type; bool durable; bool auto_delete; unordered_map<string, string> args; }; // 2. 定义交换机持久化管理类--数据存储在sqlite数据库中 class ExchangeMapper { using ExchangeMap = unordered_map<string, Exchange::ptr>; public: ExchangeMapper(const string& db_file) : _sql_helper(db_file); // 创建数据表 void CreateTable(); // 移除数据表 void RemoveTable(); // 插入数据 bool Insert(Exchange::ptr& exp); // 移除数据 void Remove(const string& name); // 读取表的数据恢复 ExchangeMap Recovery(); }; // 3. 定义交换机数据内存管理类 class ExchangeManager { public: using ptr = shared_ptr<ExchangeManager>; ExchangeManager(const string& db_file) : _mapper(db_file); // 声明交换机 bool DeclareExchange(const string& name, MessageQueue::ExchangeType type, bool durable, bool auto_delete, unordered_map<string, string>& args); // 删除交换机 void DeleteExchange(const string& name); // 根据名称获取指定交换机对象 Exchange::ptr selectExchange(const string& name); // 判断是否存在 bool exists(const string& name); // 清除 void clear(); size_t ExchangeSize(); };
// 1. 队列描述数据类 struct Queue { using ptr = shared_ptr<Queue>; Queue(){} Queue(const string& qname, bool qdurable, bool qexclusive, bool qauto_delete, unordered_map<string, string> qargs) : name(qname), durable(qdurable), exclusive(qexclusive), auto_delete(qauto_delete), args(qargs) {} // args存储键值对,在进行数据库存储的时候,组织字符串格式进行存储 // 内部解析这种key=val&key=val,把数据存储到数据库中 void SetArgs(const string& str_args); // 从args中把数据存储到字符串中 string GetArgs(); string name; bool durable; bool exclusive; bool auto_delete; unordered_map<string, string> args; }; // 2. 队列数据持久化类 class QueueMapper { public: QueueMapper(const string& db_file) : _sql_helper(db_file); void CreateTable(); void RemoveTable(); bool Insert(Queue::ptr& qe); void Remove(const string& name); using QueueMap = unordered_map<string, Queue::ptr>; QueueMap Recovery(); }; // 3. 队列数据管理类 class QueueManager { public: using ptr = shared_ptr<QueueManager>; QueueManager(const string &dbfile):_mapper(dbfile); bool declareQueue(const string &qname, bool qdurable, bool qexclusive, bool qauto_delete, const unordered_map<string, string> &qargs); void deleteQueue(const string &name); Queue::ptr selectQueue(const string &name); QueueMapper::QueueMap allQueues(); bool exists(const string &name); size_t size(); void clear(); };
绑定信息,本质上描述的是交换机关联了哪些队列
// 1. 定义绑定信息类 struct Binding { using ptr = shared_ptr<Binding>; Binding() {} Binding(const string& bexchange_name, const string& bqueue_name, const string& bbinding_key) : exchange_name(bexchange_name), queue_name(bqueue_name), binding_key(bbinding_key) {} string exchange_name; string queue_name; string binding_key; }; // 队列与绑定信息是一一对应的,因此一个交换机可能会有多个队列的绑定信息 // 队列名与绑定信息的映射关系 using QueueBindingMap = unordered_map<string, Binding::ptr>; // 交换机与队列的映射关系 using BindingMap = unordered_map<string, QueueBindingMap>; // unordered_map<string, Binding::ptr>; 队列与绑定 // unordered_map<string, Binding::ptr>; 交换机与绑定 // 2. 定义绑定信息持久化 class BindingMapper { public: BindingMapper(const string& db_file) : _sql_helper(db_file) {} void CreateTable(); void RemoveTable(); bool Insert(Binding::ptr& binding); void Remove(const string& ename, const string& qname); void RemoveExchangeBindings(const string& ename); void RemoveQueueBindings(const string& qname); BindingMap Recover(); private: SqliteHelper _sql_helper; }; // 3. 绑定信息管理 class BindingManager { public: BindingManager(const string& dbfile) : _mapper(dbfile) {} bool bind(const string& ename, const string& qname, const string& key); void unbind(const string& ename, const string& qname); void RemoveQueueBindings(const string& qname); QueueBindingMap GetExchangeBindings(const string& ename); Binding::ptr GetBinding(const string& ename, const string& qname); bool exists(const string& ename, const string& qname); size_t size(); void clear(); private: mutex _mutex; BindingMapper _mapper; BindingMap _buildings; };
对于消息来说,首先要先描述它,消息大概包含内容和属性,对于属性来说包含的有消息ID,消息的routing_key,消息的投递模式等,而对于内容来说就是实际的消息数据内容
对于服务器上的消息管理来说,最重要的是要进行消息的持久化管理,在每一条消息上都有可能要进行持久化存储,推送给客户端再删除,那么这样的每次进行数据的存储就要进行重写一次文件,效率非常低下
概述:
提供的操作:
垃圾回收的思想
需要管理的数据
函数设计如下:
// 使用队列来管理消息,每个队列有自己的文件 class MessageMapper { public: MessageMapper(string& basedir, const string& qname) : _qname(qname); bool CreateMessageFile(); void RemoveMessageFile(); bool Insert(MessagePtr& msg); bool Remove(MessagePtr& msg);e; } // 垃圾回收 list<MessagePtr> gc(); };
以队列为单元进行管理
如果内存中所有的消息整体进行管理,那么在进行垃圾回收以及恢复历史消息上就会比较麻烦,因此依旧是使用队列来进行管理,每一个队列中都有消息数据的管理结构,然后最终向外提供一个消息管理类
队列消息管理
对于队列消息来说:
需要进行管理的数据:
class QueueMessage { public: QueueMessage(const string& basedir, const string& qname); bool Insert(const MessageQueue::BasicProperties *bp, const string& body); // 每次删除消息后,判断是否需要垃圾回收 bool Remove(const string& msg_id); MessagePtr front(); size_t PushCount(); size_t TotalCount(); size_t DurableCount(); size_t WaitAckCount(); void clear(); private: string _qname; size_t _valid_count; size_t _total_count; list<MessagePtr> _msgs; unordered_map<string, MessagePtr> _durable_msgs; unordered_map<string, MessagePtr> _wait_msgs; };
最后要提供一个对外接口类,管理的是每一个队列的消息
管理成员
提供操作
// 对外提供操控消息的总接口 class MessageManager { public: using ptr = shared_ptr<MessageManager>; MessageManager(const string& basedir) : _basedir(basedir); void clear(); // 把队列进行初始化获取消息 void InitQueueMessage(const string& qname); // 销毁队列 void DestroyQueueMessage(const string& qname); // 插入消息 bool Insert(const string& qname, MessageQueue::BasicProperties* bp, const string& body, bool queue_is_durable); // 获取队首消息信息 MessagePtr Front(const string& qname); // 获取ack应答 void ack(const string& qname, const string& msg_id); size_t GetableCount(const string& qname); size_t TotalCount(const string& qname); size_t DurableCount(const string& qname); size_t WaitAckCount(const string& qname); private: mutex _mutex; string _basedir; unordered_map<string, QueueMessage::ptr> _queue_msgs; };
虚拟机是对于上述的三个数据管理模块的整合,并基于数据之间的关联关系进行联合操作,这个模块其实就是把前面的模块都整合起来了
class VirtualHost { public: VirtualHost(const string& basedir, const string& dbfile); bool DeclareExchange(const string& name, MessageQueue::ExchangeType type, bool durable, bool auto_delete, unordered_map<string, string>& args); void DeleteExchange(); bool DeclareQueue(const string &qname, bool qdurable, bool qexclusive, bool qauto_delete, const unordered_map<string, string> &qargs); void DeleteQueue(); bool Bind(const string& ename, const string& qname, const string& key, bool durable); void UnBind(const string& ename, const string& qname); QueueBindingMap ExchangeBindings(const string& ename); bool BasicPublish(const string& qname, MessageQueue::BasicProperties* bp, const string& body, MessageQueue::DeliveryMode mode); MessagePtr BasicConsume(const string& qname); bool BasicAck(const string& qname, const string& msgid); private: ExchangeManager::ptr _emp; QueueManager::ptr _qmp; BindingManager::ptr _bmp; MessageManager::ptr _mmp; };
主要功能就是根据routing_key和binding_key判断是否匹配成功
class Router
{
public:
static bool isLegalRoutingKey(const string &routing_key);
static bool isLegalBindingKey(const string &binding_key);
static bool route(MessageQueue::ExchangeType type, const string &routing_key, const string &binding_key);
};
struct Consumer { using ptr = shared_ptr<Consumer>; Consumer() { lg(Debug, "创建新的消费者:%p", this); } Consumer(const string& ctag, const string& queue_name, bool ack_flag, const ConsumerCallback& cb) : tag(ctag), qname(queue_name), auto_ack(ack_flag), callback(std::move(cb)) { lg(Debug, "创建新的消费者:%p", this); } ~Consumer() { lg(Debug, "移除消费者:%p", this); } string tag; string qname; bool auto_ack; ConsumerCallback callback; };
管理要以队列为单元进行管理
操作
元素
// 以队列为单元进行消费者管理 class QueueConsumer { public: using ptr = shared_ptr<QueueConsumer>; QueueConsumer(const string& qname); // 新增消费者 Consumer::ptr Create(const string& ctag, const string& queue_name, bool ack_flag, const ConsumerCallback& cb); // 从队列移除消费者 void Remove(const string& ctag); // 从队列中获取消费者 Consumer::ptr Choose(); // 判断是否为空 bool Empty(); // 判断指定消费者是否存在 bool Exists(const string& ctag); // 清理所有消费者 void clear(); private: string _qname; mutex _mutex; uint64_t _rr_seq; vector<Consumer::ptr> _consumers; };
// 消费者统一管理结构 class ConsumerManager { public: using ptr = shared_ptr<ConsumerManager>; ConsumerManager() {} void InitQueueConsumer(const string& qname); void DestroyQueueConsumer(const string &qname); Consumer::ptr Create(const string &ctag, const string &queue_name, bool ack_flag, const ConsumerCallback &cb); void remove(const string &ctag, const string &queue_name); Consumer::ptr choose(const string &queue_name); bool empty(const string &queue_name); bool exists(const string &ctag, const string &queue_name); void clear(); private: mutex _mutex; unordered_map<string, QueueConsumer::ptr> _qconsumers; };
管理信息
管理操作
class Channel { public: using ptr = shared_ptr<Channel>; Channel(const string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn, const threadpool::ptr &pool); //交换机的声明与删除 void declareExchange(const declareExchangeRequestPtr &req); void deleteExchange(const deleteExchangeRequestPtr &req); //队列的声明与删除 void declareQueue(const declareQueueRequestPtr &req); void deleteQueue(const deleteQueueRequestPtr &req); //队列的绑定与解除绑定 void queueBind(const queueBindRequestPtr &req); void queueUnBind(const queueUnBindRequestPtr &req); //消息的发布 void basicPublish(const basicPublishRequestPtr &req); //消息的确认 void basicAck(const basicAckRequestPtr &req); //订阅队列消息 void basicConsume(const basicConsumeRequestPtr &req); //取消订阅 void basicCancel(const basicCancelRequestPtr &req); private: void callback(const string tag, const MessageQueue::BasicProperties *bp, const string &body); void consume(const string &qname); void basicResponse(bool ok, const string &rid, const string &cid); private: string _cid; Consumer::ptr _consumer; muduo::net::TcpConnectionPtr _conn; ProtobufCodecPtr _codec; ConsumerManager::ptr _cmp; VirtualHost::ptr _host; threadpool::ptr _pool; }; class ChannelManager { public: using ptr = shared_ptr<ChannelManager>; ChannelManager(){} bool openChannel(const string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn, const threadpool::ptr &pool); void closeChannel(const string &id); Channel::ptr getChannel(const string &id); private: mutex _mutex; unordered_map<string, Channel::ptr> _channels; };
对于服务器模块来说,主要是要借助于muduo库来进行实现,这里考虑到篇幅不再进行讲解,这里进行一个的总结:
首先提供了一个server发服务器,这是有一个通用的TCP服务器,而在其当中包含了一个主Reactor,这个Reactor主要是进行一个事件的读取,当有新连接到来后要如何进行读取和分发以及上面的io事件如何处理都是在这个模块来进行实现的,而对于新连接来说,会有一个dispatcher分发器,这个分发器的作用就是根据不同的消息类型进行合适的分发,装载到不同的处理函数当中,这就是muduo库一个整体的框架,而在这上面就是一些应用层的处理,例如有诸如Protubuf编解码器,进行应用层协议的封装,参数传递等等
创建MQBrokerServer模块
MQBrokerServer是对于整体服务器模块的整合,进行客户端的请求,提供对应的服务,为了方便查看,把讲解作为注释加到了代码中,这里就不进行讲解了
class Server { public: typedef shared_ptr<google::protobuf::Message> MessagePtr; /* 把服务器进行初始化,传递一个端口号和根目录,要初始化的内容有: 1. 服务器 2. 分发器 3. Protubuf协议解析器 4. 虚拟机 5. 消费者 6. 连接管理 7. 线程池 1. 服务器 muduo库中的接口:对于服务器来说,要指定EventLoop,ip端口号,名字和选项 TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string& nameArg, Option option = kNoReusePort); 2. 分发器:遇到不同情况怎么处理 muduo库中的接口:需要指定一个defaultCb ProtobufDispatcher(const ProtobufMessageCallback& defaultCb); 3. Protubuf协议解析器:进行Protubuf的协议解析 muduo库中的接口:需要一个ProtobufMessageCallback函数回调 ProtobufCodec(const ProtobufMessageCallback& messageCb); 4. 虚拟机:前面自定义类型 VirtualHost(const string &hname, const string &basedir, const string &dbfile); 5. 消费者:前面自定义类型 6. 连接:前面自定义类型 7. 线程池:前面自定义类型 */ Server(int port, const string &basedir); // 启动服务器 void start(); private: //打开信道 void onOpenChannel(const muduo::net::TcpConnectionPtr& conn, const openChannelRequestPtr& message, muduo::Timestamp); //关闭信道 void onCloseChannel(const muduo::net::TcpConnectionPtr& conn, const closeChannelRequestPtr& message, muduo::Timestamp); //声明交换机 void onDeclareExchange(const muduo::net::TcpConnectionPtr& conn, const declareExchangeRequestPtr& message, muduo::Timestamp); //删除交换机 void onDeleteExchange(const muduo::net::TcpConnectionPtr& conn, const deleteExchangeRequestPtr& message, muduo::Timestamp); //声明队列 void onDeclareQueue(const muduo::net::TcpConnectionPtr& conn, const declareQueueRequestPtr& message, muduo::Timestamp); //删除队列 void onDeleteQueue(const muduo::net::TcpConnectionPtr& conn, const deleteQueueRequestPtr& message, muduo::Timestamp); //队列绑定 void onQueueBind(const muduo::net::TcpConnectionPtr& conn, const queueBindRequestPtr& message, muduo::Timestamp); //队列解绑 void onQueueUnBind(const muduo::net::TcpConnectionPtr& conn, const queueUnBindRequestPtr& message, muduo::Timestamp); //消息发布 void onBasicPublish(const muduo::net::TcpConnectionPtr& conn, const basicPublishRequestPtr& message, muduo::Timestamp); //消息确认 void onBasicAck(const muduo::net::TcpConnectionPtr& conn, const basicAckRequestPtr& message, muduo::Timestamp); //队列消息订阅 void onBasicConsume(const muduo::net::TcpConnectionPtr& conn, const basicConsumeRequestPtr& message, muduo::Timestamp); //队列消息取消订阅 void onBasicCancel(const muduo::net::TcpConnectionPtr& conn, const basicCancelRequestPtr& message, muduo::Timestamp); void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp); private: muduo::net::EventLoop _baseloop; muduo::net::TcpServer _server;//服务器对象 ProtobufDispatcher _dispatcher;//请求分发器对象--要向其中注册请求处理函数 ProtobufCodecPtr _codec;//protobuf协议处理器--针对收到的请求数据进行protobuf协议处理 VirtualHost::ptr _virtual_host; ConsumerManager::ptr _consumer_manager; ConnectionManager::ptr _connection_manager; threadpool::ptr _threadpool; };
在RabbitMQ当中,提供服务的是信道,因此在客户端的实现中,就弱化了对于客户端的概念,在这个项目当中并不会为用户展示出网络通信的概念,而是会用提供服务的方式来进行体现
其实总体来说,就是用一个接口来实现一个功能,接口内部完成向客户端请求的过程,对外不体现出客户端和服务端通信的概念,用户需要什么服务就调用什么接口即可
因此,对于客户端模块主要有以下的四大模块
基于上述的部分,就可以实现出下面的部分:
包含的描述信息
struct Consumer { using ptr = shared_ptr<Consumer>; string tag; //消费者标识 string qname; //消费者订阅的队列名称 bool auto_ack; //自动确认标志 ConsumerCallback callback; Consumer() { lg(Debug, "new Consumer: %p", this); } Consumer(const string &ctag, const string &queue_name, bool ack_flag, const ConsumerCallback &cb) : tag(ctag), qname(queue_name), auto_ack(ack_flag), callback(std::move(cb)) { lg(Debug, "new Consumer: %p", this); } ~Consumer() { lg(Debug, "del Consumer: %p", this); } };
在服务端中有信道,自然在客户端中也有信道的概念,并且功能和服务端基本一致,不管是说是客户端的Channel还是服务端的Channel都是为了给用户提供服务而存在,区别为,服务端是给客户端提供服务的,而客户端是给用户提供服务的,也可以理解为是客户端的Channel来进行接口的调用,向服务端发送对应的请求,获取请求的服务
信道的描述信息
信道的组织操作
信道的管理操作
在客户端的角度有两个线程池需要处理:
对于客户端来说,操作的思想就是先创建连接,之后通过连接创建信道,通过信道提供服务这一系列的流程,这个模块就是针对于muduo库客户端进行的二次封装,向用户提供了一个创建Channel的接口,创建信道后,借助信道来提供指定的服务
class Connection { public: using ptr = shared_ptr<Connection>; // 进行连接的创建实例 Connection(const string &sip, int sport, const AsyncWorker::ptr &worker); // 打开信道 Channel::ptr openChannel(); // 关闭信道 void closeChannel(const Channel::ptr &channel); private: // 通用响应 void basicResponse(const muduo::net::TcpConnectionPtr& conn, const basicCommonResponsePtr& message, muduo::Timestamp); // 消息的推送 void consumeResponse(const muduo::net::TcpConnectionPtr& conn, const basicConsumeResponsePtr& message, muduo::Timestamp); // 未知的信息 void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp); // 连接 void onConnection(const muduo::net::TcpConnectionPtr&conn); private: muduo::CountDownLatch _latch; //实现同步的 muduo::net::TcpConnectionPtr _conn; //客户端对应的连接 muduo::net::TcpClient _client; //客户端 ProtobufDispatcher _dispatcher; //请求分发器 ProtobufCodecPtr _codec; //协议处理器 AsyncWorker::ptr _worker; ChannelManager::ptr _channel_manager; };
至此,项目就基本结束了,但是这个项目毕竟是组件,实际的应用场景还是要结合上层的逻辑实际处理来完成,后续会更新它的使用场景项目
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。