赞
踩
@TOC
目录
基于这个,那我们先完成虚拟机模块管理的相关数据
- syntax = "proto3";
- package mymq;
-
- enum ExchangeType # 交换类型
- {
- UNKNOWTYPE = 0;
- DIRECT = 1;
- FANOUT = 2;
- TOPIC = 3;
- };
-
- enum DeliveryMode # 是否持久化
- {
- UNKNOWMODE = 0;
- UNDURABLE = 1;
- DURABLE = 2;
- };
-
- message BasicProperties #消息的属性
- {
- string id = 1; # 消息ID, UUID
- DeliveryMode delivery_mode = 2; # 持久化模块
- string rounting_key = 3; # routing_key绑定信息
- };
-
- message Message
- {
- message Payload
- {
- BasicProperties properties = 1; # 消息属性
- string body = 2; # 有效载荷数据
- string valid = 3; # 消息是否有效
- };
- Payload payload = 1; # 真正持久化的只有这一个字段
- uint32 offset = 2; # 消息的位置
- uint32 length = 3; # 消息的长度
- };
当系统重启时,也需要重新加载有效消息即可。
以队列为单元进行管理(因为消息的所有操作都是以队列为单元)
- #ifndef __M_MSG_H__
- #define __M_MSG_H__
-
- #include "../mqcommon/logger.hpp"
- #include "../mqcommon/helper.hpp"
- #include "../mqcommon/msg.pb.h"
- #include <iostream>
- #include <mutex>
- #include <memory>
- #include <unordered_map>
- #include <list>
-
- namespace mymq
- {
-
- // 消息的持久化管理
- // a. 管理数据
- // i. 队列消息⽂件存储的路径
- // ii. 队列消息的存储⽂件名
- // iii. 队列消息的临时交换⽂件名
- // b. 管理操作
- // i. ⽇志消息存储在⽂件中(4B⻓度+(属性+内容+有效位)序列化消息,连续存储即可)
- // ii. 提供队列消息⽂件创建/删除功能
- // iii. 提供队列消息的新增持久化/删除持久化
- // iv. 提供持久化内容的垃圾回收(其实就是重新加载出所有有效消息返回,并重新⽣成新的消息
- // 存储⽂件)
- #define DATAFILE_SUBFIX ".mqd"
- #define TMPFILE_SUBFIX ".mqb.tmp"
- using MessagePtr = std::shared_ptr<mymq::Message>;
-
- // 消息持久化
- class MessageMapper
- {
- public:
- MessageMapper(std::string &basedir, const std::string &qname)
- : _qname(qname)
- {
- if (basedir.back() != '/')
- {
- basedir.push_back('/');
- }
- _datafile = basedir + qname + DATAFILE_SUBFIX;
- _tmpfile = basedir + qname + TMPFILE_SUBFIX;
- if (FileHelper(basedir).exists() == false)
- {
- FileHelper::createDirectory(basedir);
- }
- createMsgFile();
- }
-
- bool createMsgFile()
- {
- if (FileHelper(_datafile).exists() == true)
- {
- return true;
- }
- bool ret = FileHelper::createFile(_datafile);
- if (ret == false)
- {
- ELOG("创建队列数据文件:%s 失败", _datafile.c_str());
- return false;
- }
- return true;
- }
-
- void removeMsgFile()
- {
- FileHelper::removeFile(_datafile);
- FileHelper::removeFile(_tmpfile);
- }
-
- bool insert(MessagePtr &msg)
- {
- return insert(_datafile, msg);
- }
-
- bool remove(MessagePtr &msg)
- {
- // 1. 将msg中的有效标志位修改为‘0’
- msg->mutable_payload()->set_valid("0");
- // 2,对msg进行序列化
- std::string body = msg->SerializeAsString();
- if (body.size() != msg->length())
- {
- ELOG("不能修改文件中的数据信息,因为生成的数据与原数据长度不一样");
- return false;
- }
- // 3. 将序列化后的消息,写入到数据在文件中的指定位置(覆盖原有的数据)
- FileHelper helper(_datafile);
- bool ret = helper.write(body.c_str(), msg->offset(), body.size());
- if (ret == false)
- {
- DLOG("向队列数据文件写入失败!");
- return false;
- }
- return true;
- }
-
- std::list<MessagePtr> gc()
- {
- std::list<MessagePtr> result;
- bool ret = load(result);
- if (ret == false)
- {
- ELOG("加载有效数据失败\n");
- return result;
- }
- // 2. 将有效数据,进行序列化存储到临时文件中
- FileHelper::createFile(_tmpfile);
- for (auto e : result)
- {
- DLOG("向临时文件中添加数据 : %s", e->payload().body().c_str());
- ret = insert(_tmpfile, e);
- if (ret == false)
- {
- ELOG("向临时文件中添加数据失败");
- return result;
- }
- }
- // 3. 删除源文件
- ret = FileHelper::removeFile(_datafile);
- if (ret == false)
- {
- ELOG("删除原文件失败");
- return result;
- }
- // 4. 修改临时文件的名字,改成原文件的名称
- ret = FileHelper(_tmpfile).rename(_datafile.c_str());
- if (ret == false)
- {
- ELOG("修改文件名称失败");
- return result;
- }
- // 5. 返回新的有效数据
- return result;
- }
-
- private:
- bool load(std::list<MessagePtr> &result)
- {
- // 1. 加载出文件中的所有的有效数据:存储格式:4字节长度 | 数据 | 4字节长度 | 数据 ......
- FileHelper data_file_helper(_datafile);
- size_t offset = 0; // 用来定位
- size_t msg_size = 0; // 用来 4 字节长度的数据长度
- size_t file_size = data_file_helper.size();
- while (offset < file_size)
- {
- bool ret = data_file_helper.read((char*)&msg_size, offset, sizeof(size_t));
- if (ret == false)
- {
- DLOG("读取消息长度失败!");
- }
- offset += sizeof(size_t);
- std::string msg_body(msg_size, '\0');
- ret = data_file_helper.read(&msg_body[0], offset, msg_size);
- // 为什么使用 &msg_body[0]
- // &msg_body[0] 返回的是指向 std::string 内部字符数组首元素的指针,即指向字符串内容的指针。
- // 适用于需要直接操作字符串内容的场景,例如文件读写操作
-
- // &msg_body:
- // &msg_body 返回的是 std::string 对象本身的指针,即指向 std::string 对象的指针。
- // 不适用于直接操作字符串内容的场景,而是用于操作整个 std::string 对象本身。
- if (ret == false)
- {
- DLOG("读取消息数据失败");
- return false;
- }
- offset += msg_size;
- MessagePtr msgp = std::make_shared<Message>();
- msgp->mutable_payload()->ParseFromString(msg_body);
-
- // 如果是无效消息,则直接处理下一个
- if (msgp->payload().valid() == "0")
- {
- ELOG("加载到无效消息:%s", msgp->payload().body().c_str());
- continue;
- }
- // 有效消息则保存下来
- result.push_back(msgp);
- }
- return true;
- }
-
- bool insert(const std::string &filename, MessagePtr &msg)
- {
- // 新增数据都是添加在文件末尾的
- // 1. 进行消息的序列化,获取到格式化后的消息
- // 因为使用的是 sqlite 数据库,存储的是二进制信息,所以需要进行序列化
- std::string body = msg->payload().SerializeAsString();
- // 2. 获取文件长度
- FileHelper helper(filename);
- size_t fsize = helper.size();
- size_t msg_size = body.size();
- // 写入逻辑:1. 先写入 4 字节数据长度,2. 再写入指定长度数据
- bool ret = helper.write((char *)&msg_size, fsize, sizeof(size_t));
- if (ret == false)
- {
- DLOG("向队列数据文件写入数据长度失败!");
- return false;
- }
- // 将数据写入文件的指定位置
- ret = helper.write(body.c_str(), fsize + sizeof(size_t), msg_size);
- if (ret == false)
- {
- DLOG("向队列数据文件写入数据失败!");
- return false;
- }
- // 更新msg中的实际存储信息
- msg->set_offset(fsize + sizeof(size_t));
- msg->set_length(msg_size);
- return true;
- }
-
- private:
- std::string _qname;
- std::string _datafile;
- std::string _tmpfile;
- };
-
- // 管理数据
- class QueueMessage
- {
- public:
- using ptr = std::shared_ptr<QueueMessage>;
- // 初始化
- QueueMessage(std::string &basedir, const std::string &qname)
- : _mapper(basedir, qname),
- _qname(qname),
- _valid_count(0),
- _total_count(0)
- {}
-
- bool recovery()
- {
- // 恢复历史消息
- std::unique_lock<std::mutex> lock(_mutex);
- _msgs = _mapper.gc();
- for (auto &msg : _msgs)
- {
- _durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
- }
- _valid_count = _total_count = _msgs.size();
- return true;
- }
-
- bool insert(const BasicProperties *bp, const std::string &body, bool queue_is_durable)
- {
- // 构造消息对象
- MessagePtr msg = std::make_shared<Message>();
- msg->mutable_payload()->set_body(body);
- if (bp != nullptr)
- {
- DeliveryMode mode = queue_is_durable ? bp->delivery_mode() : DeliveryMode::UNDURABLE;
- msg->mutable_payload()->mutable_properties()->set_id(bp->id());
- msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
- msg->mutable_payload()->mutable_properties()->set_rounting_key(bp->rounting_key());
- }
- else
- {
- DeliveryMode mode = queue_is_durable ? DeliveryMode::DURABLE : DeliveryMode::UNDURABLE;
- msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());
- msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
- msg->mutable_payload()->mutable_properties()->set_rounting_key("");
- }
- std::unique_lock<std::mutex> lock(_mutex);
- // 2. 判断消息是否需要持久化
- if (msg->payload().properties().delivery_mode() == DURABLE)
- {
- msg->mutable_payload()->set_valid("1");
- // 3. 进行持久化存储
- bool ret = _mapper.insert(msg);
- if (ret == false)
- {
- ELOG("持久化存储信息:%s失败了!", body.c_str());
- }
-
- _valid_count += 1;
- _total_count += 1;
- _durable_msgs.insert(make_pair(msg->payload().properties().id(), msg));
- }
- // 4. 内容的管理
- _msgs.push_back(msg);
- return true;
- }
-
- MessagePtr front()
- {
- std::unique_lock<std::mutex> lock(_mutex);
- if (_msgs.size() == 0)
- {
- return MessagePtr();
- }
- // 获取一条队首消息:从_msgs中获取数据
- MessagePtr msg = _msgs.front();
- _msgs.pop_front();
- // 将该消息对象,向待确认的hash表中添加一份,等到收到消息确认后进行删除
- _waitack_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
- return msg;
- }
-
- // 每次删除消息后,判断是否需要垃圾回收
- bool remove(const std::string &msg_id)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- // 1. 从待确认队列中找到信息
- auto it = _waitack_msgs.find(msg_id);
- if (it == _waitack_msgs.end())
- {
- DLOG("没有找到要删除的消息:%s!", msg_id.c_str());
- return true;
- }
- // 2. 根据消息的持久化模式,决定是否删除持久化消息
- if (it->second->payload().properties().delivery_mode() == DeliveryMode::DURABLE)
- {
- // 删除持久化消息
- _mapper.remove(it->second);
- _durable_msgs.erase(msg_id);
- _valid_count -= 1;
- gc();
- }
- // 4. 删除内存中的信息
- _waitack_msgs.erase(msg_id);
- return true;
- }
-
- size_t getable_count()
- {
- std::unique_lock<std::mutex> lock(_mutex);
- return _msgs.size();
- }
-
- size_t total_count()
- {
- std::unique_lock<std::mutex> lock(_mutex);
- return _total_count;
- }
-
- size_t durable_count()
- {
- std::unique_lock<std::mutex> lock(_mutex);
- return _durable_msgs.size();
- }
- size_t waitack_count()
- {
- std::unique_lock<std::mutex> lock(_mutex);
- return _waitack_msgs.size();
- }
- void clear()
- {
- std::unique_lock<std::mutex> lock(_mutex);
- _mapper.removeMsgFile();
- _msgs.clear();
- _durable_msgs.clear();
- _waitack_msgs.clear();
- _valid_count = 0;
- _total_count = 0;
- }
-
- private:
- bool GCCheck() {
- //持久化的消息总量大于2000, 且其中有效比例低于50%则需要持久化
- if (_total_count > 2000 && _valid_count * 10 / _total_count < 5) {
- return true;
- }
- return false;
- }
-
- void gc()
- {
- // 1. 进行垃圾回收,获取到垃圾回收后,有效的消息信息链表
- if(GCCheck() == false)
- {
- return;
- }
- std::list<MessagePtr> msgs = _mapper.gc();
- for(auto& msg : msgs)
- {
- auto it = _durable_msgs.find(msg->payload().properties().id());
- if(it == _durable_msgs.end())
- {
- DLOG("垃圾回收之后,有一条持久化消息,在内存中没有进行管理");
- _msgs.push_back(msg); // 做法:重新添加到推送链表的末尾
- _durable_msgs.insert(make_pair(msg->payload().properties().id(), msg));
- continue;
- }
- // 2. 更新每一条消息的实际存储位置
- it->second->set_offset(msg->offset());
- it->second->set_length(msg->length());
- }
- _valid_count = _total_count = msgs.size();
- }
-
- private:
- std::mutex _mutex;
- std::string _qname;
- size_t _valid_count;
- size_t _total_count;
- MessageMapper _mapper;
- std::list<MessagePtr> _msgs;
- std::unordered_map<std::string, MessagePtr> _durable_msgs; // 持久化消息hash
- std::unordered_map<std::string, MessagePtr> _waitack_msgs; // 待确认消息hash
- };
-
- // 实现一个对外的总体消息管理类
- // 消息的总体对外管理
- // i. 初始化新建队列的消息管理结构,并创建消息存储⽂件
- // ii. 删除队列的消息管理结构,以及消息存储⽂件
- // iii. 向指定队列新增消息
- // iv. 获取指定队列队⾸消息
- // v. 确认指定队列待确认消息(删除)
- // vi. 判断指定队列消息是否为空
-
- class MessageManager
- {
- public:
- using ptr = std::shared_ptr<MessageManager>;
- MessageManager(const std::string basedir)
- :_basedir(basedir)
- {}
-
- void clear()
- {
- std::unique_lock<std::mutex> lock(_mutex);
- for(auto& it : _queue_msgs)
- {
- it.second->clear();
- }
- }
-
- void initQueueMessage(const std::string& qname)
- {
- QueueMessage::ptr qmp;
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _queue_msgs.find(qname);
- if(it != _queue_msgs.end())
- {
- return ;
- }
- qmp = std::make_shared<QueueMessage>(_basedir, qname);
- _queue_msgs.insert(std::make_pair(qname, qmp));
- }
- qmp->recovery();
- }
-
- void destroyQueueMessage(const std::string& qname)
- {
- QueueMessage::ptr qmp;
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _queue_msgs.find(qname);
- if(it == _queue_msgs.end())
- {
- return ;
- }
- qmp = it->second;
- _queue_msgs.erase(it);
- }
- qmp->clear();
- }
-
- bool insert(const std::string& qname, BasicProperties* bp, const std::string& body, bool _queue_is_durable)
- {
- QueueMessage::ptr qmp;
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _queue_msgs.find(qname);
- if(it == _queue_msgs.end())
- {
- ELOG("向队列 %s 新增消息失败,没有找到消息管理句柄", qname.c_str());
- return false;
- }
- qmp = it->second;
- }
- return qmp->insert(bp, body, _queue_is_durable);
- }
-
- MessagePtr front(const std::string& qname)
- {
- QueueMessage::ptr qmp;
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _queue_msgs.find(qname);
- if(it == _queue_msgs.end())
- {
- DLOG("获取 %s 队首数据失败:没有找到对应的消息句柄", qname.c_str() );
- return MessagePtr();
- }
- qmp = it->second;
- }
- //取出对应的相关队列的首部消息
- return qmp->front();
- }
-
- void ack(const std::string& qname, const std::string& msg_id)
- {
- QueueMessage::ptr qmp;
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _queue_msgs.find(qname);
- if(it == _queue_msgs.end())
- {
- DLOG("确认队列 %s 消息 %s 失败:没有找到消息管理句柄", qname.c_str(), msg_id.c_str());
- return ;
- }
- qmp = it->second;
- }
- qmp->remove(msg_id);
- return ;
- }
-
- size_t getable_count(const std::string& qname)
- {
- QueueMessage::ptr qmp;
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _queue_msgs.find(qname);
- if(it == _queue_msgs.end())
- {
- DLOG("获取队列 %s 待推送消息数量失败:没有找到消息管理句柄", qname.c_str());
- return 0;
- }
- qmp = it->second;
- }
- return qmp->getable_count();
- }
- size_t total_count(const std::string& qname)
- {
- QueueMessage::ptr qmp;
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _queue_msgs.find(qname);
- if(it == _queue_msgs.end())
- {
- DLOG("获取队列 %s 总持久化消息数量失败:没有找到消息管理句柄", qname.c_str());
- return 0;
- }
- qmp = it->second;
- }
- return qmp->total_count();
- }
- size_t durable_count(const std::string& qname)
- {
- QueueMessage::ptr qmp;
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _queue_msgs.find(qname);
- if(it == _queue_msgs.end())
- {
- DLOG("获取队列 %s 总持久化消息数量失败:没有找到消息管理句柄", qname.c_str());
- return 0;
- }
- qmp = it->second;
- }
- return qmp->durable_count();
- }
- size_t waitack_count(const std::string& qname)
- {
- QueueMessage::ptr qmp;
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _queue_msgs.find(qname);
- if(it == _queue_msgs.end())
- {
- DLOG("获取队列 %s 总持久化消息数量失败:没有找到消息管理句柄", qname.c_str());
- return 0;
- }
- qmp = it->second;
- }
- return qmp->waitack_count();
- }
- private:
- std::mutex _mutex;
- std::string _basedir;
- std::unordered_map<std::string, QueueMessage::ptr> _queue_msgs;
- };
-
- }
-
- #endif
注意交换机有持久化的标识,所以在进行管理操作的时候,需要分情况进行,在内存中管理,以及在硬盘中管理
- #include "../mqcommon/helper.hpp"
- #include "../mqcommon/logger.hpp"
- #include "../mqcommon/msg.pb.h"
- #include "../mqcommon/mq_proto.pb.h"
- #include <google/protobuf/map.h>
- #include <iostream>
- #include <unordered_map>
- #include <memory>
- #include <mutex>
-
- namespace mymq
- {
- // 1. 定义交换机类
- struct Exchange
- {
- using ptr = std::shared_ptr<Exchange>;
- // 1. 交换机名称
- std::string name;
- // 2. 交换机类型
- ExchangeType type;
- // 3. 交换机持久化标志
- bool durable;
- // 4. 是否自动删除标志
- bool auto_delete;
- // 5. 其他参数
- google::protobuf::Map<std::string, std::string> args;
- //因为后面的函数需要构造一个Exchange的空对象
- Exchange()
- {}
-
- Exchange(const std::string &ename, ExchangeType etype, bool edurable, bool eauto_delete, const google::protobuf::Map<std::string, std::string> eargs)
- : name(ename),
- type(etype),
- durable(edurable),
- auto_delete(eauto_delete),
- args(eargs)
- {}
-
- // args存储键值对,在存储数据库的时候,会组织一个格式字符串进行存储 key=value&key=value.....
- // 内部解析str_args字符串,将容器存储到成员中
- void setArgs(const std::string &str_args)
- {
- //key=val&key=val
- std::vector<std::string> sub_args;
- StrHelper::split(str_args, "&", sub_args);
- for(auto& str : sub_args)
- {
- size_t pos = str.find('=');
- std::string key = str.substr(0, pos);
- std::string val = str.substr(pos + 1);
- args[key] = val;
- }
- }
- // 将args中的内容进行序列化后,返回一个字符串
- std::string getArgs()
- {
- std::string result;
- for(auto e = args.begin(); e != args.end(); e++)
- {
- result = e->first + "=" + e->second + "&" + result;
- }
- return result;
- }
- };
-
- // 2. 定义交换机数据持久化管理类 -- 数据存储在sqlite数据库中
- class ExchangeMapper
- {
- public:
- ExchangeMapper(const std::string &dbfile)
- :_sqlite_helper(dbfile)
- {
- std::string path = FileHelper::parentDirectory(dbfile);
- FileHelper::createDirectory(path);
- assert(_sqlite_helper.open());
- createTable();
- }
-
- void createTable()
- {
- #define CREATE_TABLE "create table if not exists exchange_table(\
- name varchar(32) primary key, \
- type int,\
- durable int, \
- auto_delete int ,\
- args varchar(128));"
- bool ret = _sqlite_helper.exec(CREATE_TABLE, nullptr, nullptr);
- if(ret == false)
- {
- DLOG("创建交换机数据库失败");
- abort(); //直接异常退出程序
- }
- }
-
- void removeTable()
- {
- #define DROP_TABLE "drop table if exists exchange_table;"
- bool ret = _sqlite_helper.exec(DROP_TABLE, nullptr, nullptr);
- if(ret == false)
- {
- DLOG("删除交换机数据库失败");
- abort(); //直接异常退出程序
- }
- }
-
- bool insert(Exchange::ptr &exp)
- {
- #define INSERT_TABLE "insert into exchange_table values('%s', %d, %d, %d, '%s');"
- std::string args_str = exp->getArgs();
- char sql_str[4096] = {0};
- sprintf(sql_str, INSERT_TABLE, exp->name.c_str(), exp->type, exp->durable, exp->auto_delete, args_str.c_str());
- bool ret = _sqlite_helper.exec(sql_str, nullptr, nullptr);
- if(ret == false)
- {
- DLOG("插入数据库失败");
- return false;
- }
- return true;
- }
-
- void remove(const std::string &name)
- {
- std::stringstream ss;
- ss << "delete from exchange_table where name = ";
- ss << "'" << name << "';";
- _sqlite_helper.exec(ss.str(), nullptr, nullptr);
- }
-
-
- using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;
- ExchangeMap recovery()
- {
- ExchangeMap result;
- std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";
- _sqlite_helper.exec(sql.c_str(), SelectCallBack, &result);
- return result;
- }
- private:
- static int SelectCallBack(void* args, int numcol, char** row, char** fields)
- {
- ExchangeMap* result = (ExchangeMap*)args;
- auto exp = std::make_shared<Exchange>();
- exp->name = row[0];
- exp->type = (mymq::ExchangeType)std::stoi(row[1]);
- exp->durable = (bool)std::stoi(row[2]);
- exp->auto_delete = (bool)std::stoi(row[3]);
- if(row[4])
- {
- exp->setArgs(row[4]);
- }
- result->insert(std::make_pair(exp->name, exp));
- return 0;
- }
- private:
- SqliteHelper _sqlite_helper;
- };
-
- // 3. 定义交换机在内存管理类
- class ExchangeManager
- {
- public:
- using ptr = std::shared_ptr<ExchangeManager>;
- ExchangeManager(const std::string &dbfile)
- :_mapper(dbfile)
- {
- _exchanges = _mapper.recovery();
- }
- // 声明交换机
- bool declareExchange(const std::string &name, ExchangeType type, bool durable, bool auto_delete, const google::protobuf::Map<std::string, std::string> &args)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _exchanges.find(name);
- if(it != _exchanges.end())
- {
- //如果交换机已经存在,那就直接返回,不需要重复新增
- return true;
- }
- auto exp = std::make_shared<Exchange>(name, type, durable, auto_delete, args);
- if(durable == true)
- {
- bool ret = _mapper.insert(exp);
- if(ret == false)
- {
- return false;
- }
- }
- _exchanges.insert(std::make_pair(name, exp));
- return true;
- }
- // 删除交换机
- void deleteExchange(const std::string &name)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _exchanges.find(name);
- if(it == _exchanges.end())
- {
- //如果交换机不存在,那就直接返回
- return ;
- }
- if(it->second->durable == true)
- _mapper.remove(name);
- _exchanges.erase(name);
- }
-
- Exchange::ptr selectExchange(const std::string& name)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _exchanges.find(name);
- if(it == _exchanges.end())
- {
- //如果交换机不存在,那就直接返回
- return nullptr;
- }
- return it->second;
- }
-
- // 判断交换机是否存在
- bool exists(const std::string &name)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _exchanges.find(name);
- if(it == _exchanges.end())
- {
- //如果交换机不存在,那就直接返回
- return false;
- }
- return true;
- }
-
- size_t size()
- {
- std::unique_lock<std::mutex> lock(_mutex);
- return _exchanges.size();
- }
-
- // 清理所有交换机数据
- void clear()
- {
- std::unique_lock<std::mutex> lock(_mutex);
- _mapper.removeTable();
- _exchanges.clear();
- }
-
- private:
- std::mutex _mutex;
- ExchangeMapper _mapper;
- std::unordered_map<std::string, Exchange::ptr> _exchanges;
- };
-
- }
-
- #ifndef __M_QUEUE_H__
- #define __M_QUEUE_H__
-
-
- #include "../mqcommon/helper.hpp"
- #include "../mqcommon/logger.hpp"
- #include "../mqcommon/msg.pb.h"
- #include <iostream>
- #include <unordered_map>
- #include <memory>
- #include <mutex>
-
- namespace mymq
- {
- struct MsgQueue
- {
- using ptr = std::shared_ptr<MsgQueue>; // 放在这个位置合适吗?
- std::string name;
- bool durable;
- bool exclusive;
- bool auto_delete;
- google::protobuf::Map<std::string, std::string> args;
-
- MsgQueue()
- {}
-
- MsgQueue(const std::string qname, bool qdurable, bool qexclusive, bool qauto_delete, const google::protobuf::Map<std::string, std::string> &qargs)
- :name(qname),
- durable(qdurable),
- exclusive(qexclusive),
- auto_delete(qauto_delete),
- args(qargs)
- {}
-
- void setArgs(const std::string& str_args)
- {
- std::vector<std::string> sub_args;
- StrHelper::split(str_args, "&", sub_args);
- for(auto sub_string : sub_args)
- {
- size_t pos = sub_string.find("=");
- std::string key = sub_string.substr(0, pos);
- std::string value = sub_string.substr(pos + 1);
- args[key] = value;
- }
- }
-
- std::string getArgs()
- {
- std::string result;
- for(auto str = args.begin(); str != args.end(); str++)
- {
- result = str->first + "=" + str->second + result;
- }
- return result;
- }
- };
-
- using QueueMap = std::unordered_map<std::string, MsgQueue::ptr>;
- class MsgQueueMapper
- {
- public:
- MsgQueueMapper(const std::string& dbfile)
- :_sqlite_helper(dbfile)
- {
- std::string path = FileHelper::parentDirectory(dbfile);
- bool ret = FileHelper::createDirectory(path);
- if(ret == false)
- {
- ELOG("创建父目录失败: %s", strerror(errno));
- }
- _sqlite_helper.open(); //创建sqlite目录
- createTable();
- }
-
- void createTable()
- {
- std::stringstream sql;
- sql << "create table if not exists queue_table(";
- sql << "name varchar(32) primary key, ";
- sql << "durable int, ";
- sql << "exclusive int, ";
- sql << "auto_delete int, ";
- sql << "args varchar(128));";
- _sqlite_helper.exec(sql.str(), nullptr, nullptr);
- }
-
- void removeTable()
- {
- std::stringstream sql;
- sql << "drop table if exists queue_table;";
- _sqlite_helper.exec(sql.str(), nullptr, nullptr);
- }
-
- bool insert(MsgQueue::ptr& queue)
- {
- std::stringstream sql;
- sql << "insert into queue_table values(";
- sql << "'" << queue->name << "',";
- sql << queue->durable << ", ";
- sql << queue->exclusive << ", ";
- sql << queue->auto_delete << ", ";
- sql << "'" << queue->getArgs() << "');";
- return _sqlite_helper.exec(sql.str(), nullptr, nullptr);
- }
-
- void remove(const std::string& name)
- {
- std::stringstream sql;
- sql << "delete from queue_table where name = ";
- sql << "'" << name << "';";
- _sqlite_helper.exec(sql.str(), nullptr, nullptr);
- }
-
-
- QueueMap recovery()
- {
- QueueMap result;
- std::stringstream sql;
- sql << "select name, durable, exclusive, auto_delete, args from queue_table;";
- _sqlite_helper.exec(sql.str(), SelectCallBack, &result);
- return result;
- }
-
- private:
- static int SelectCallBack(void* args, int numcol, char** row, char** fields)
- {
- QueueMap* result = (QueueMap*)args;
- MsgQueue::ptr mqp = std::make_shared<MsgQueue>();
- mqp->name = row[0];
- mqp->durable = std::stoi(row[1]);
- mqp->exclusive = std::stoi(row[2]);
- mqp->auto_delete = std::stoi(row[3]);
- if(row[4]) mqp->setArgs(row[4]);
- result->insert(std::make_pair(mqp->name, mqp));
- return 0;
- }
- private:
- SqliteHelper _sqlite_helper;
- };
-
-
- class MsgQueueManager
- {
- public:
- using ptr = std::shared_ptr<MsgQueueManager>;
- MsgQueueManager(const std::string& dbfile)
- :_mapper(dbfile)
- {
- _msg_queue = _mapper.recovery();
- }
-
- bool declareQueue(const std::string& name, bool durable, bool exclusive, bool auto_delete, const google::protobuf::Map<std::string, std::string>& args)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _msg_queue.find(name);
- if(it != _msg_queue.end())
- {
- // 队列存在,直接返回true
- return true;
- }
- MsgQueue::ptr mqp = std::make_shared<MsgQueue>();
- mqp->name = name;
- mqp->durable = durable;
- mqp->exclusive = exclusive;
- mqp->auto_delete = auto_delete;
- mqp->args = args;
- if(durable ==true) //需要进行持久化存储
- {
- bool ret = _mapper.insert(mqp);
- if(ret == false) return false;
- }
- _msg_queue.insert(make_pair(mqp->name, mqp));
- return true;
- }
-
- void deleteQueue(const std::string& name)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _msg_queue.find(name);
- if(it == _msg_queue.end())
- {
- DLOG("未找到相关信息");
- return ;
- }
- if(it->second->durable == true)
- {
- _mapper.remove(name);
- }
- _msg_queue.erase(name);
- }
-
- MsgQueue::ptr selectQueue(const std::string& name)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _msg_queue.find(name);
- if(it == _msg_queue.end())
- {
- DLOG("未找到相关信息");
- return MsgQueue::ptr();
- }
- return it->second;
- }
-
- QueueMap AllQueues()
- {
- std::unique_lock<std::mutex> lock(_mutex);
- return _msg_queue;
- }
-
- bool exists(const std::string& name)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _msg_queue.find(name);
- if(it == _msg_queue.end())
- {
- return false;
- }
- return true;
- }
-
- size_t size()
- {
- std::unique_lock<std::mutex> lock(_mutex);
- return _msg_queue.size();
- }
-
- void clear()
- {
- std::unique_lock<std::mutex> lock(_mutex);
- _msg_queue.clear();
- }
-
- private:
- std::mutex _mutex;
- MsgQueueMapper _mapper;
- QueueMap _msg_queue;
- };
- }
-
- #endif
描述将哪个队列与哪个交换机绑定到了一起
由数字, 字符,_,#, * 组成 binding_key, 例如:news.music.#
1. 添加绑定
2. 解除绑定
3. 获取交换机相关的所有绑定信息
删除交换机的时候,要删除相关绑定信息
当消息发布到交换机中,交换机得通过这些信息来将消息发布到指定队列
4. 获取队列相关的所有绑定信息
删除队列的时候,要删除相关的绑定信息
5. 获取绑定信息数量
- #ifndef _M_BINDING_H_
- #define _M_BINDING_H_
-
- #include "../mqcommon/helper.hpp"
- #include "../mqcommon/msg.pb.h"
- #include "../mqcommon/logger.hpp"
- #include <iostream>
- #include <unordered_map>
- #include <memory>
- #include <mutex>
-
- namespace mymq
- {
- struct Binding
- {
- using ptr = std::shared_ptr<Binding>;
- std::string exchange_name;
- std::string msgqueue_name;
- std::string binding_key;
-
- Binding() {}
-
- Binding(const std::string &ename, const std::string &msgqname, const std::string &key)
- : exchange_name(ename),
- msgqueue_name(msgqname),
- binding_key(key)
- {
- }
- };
- // 队列与绑定信息是一一对应的(因为是给某一个交换机去绑定队列,因此一个交换机可能会有多个队列的绑定信息)
- // 因此先定义一个队列名,与绑定信息的映射信息,这个是为了方便通过队名查找绑定信息
- using MsgQueueBindingMap = std::unordered_map<std::string, Binding::ptr>;
- // 然后定义一个交换机名称与队列绑定信息之间的映射关系,这个map包含了所有的绑定信息,并且以交换机为单元进行区分
- using BindingMap = std::unordered_map<std::string, MsgQueueBindingMap>;
-
- // 采用上面两种结构,则删除交换机相关绑定信息的时候,不仅要删除交换机映射,还需要删除对应队列中的映射,否则对象得不到释放
- class BindingMapper
- {
- public:
- BindingMapper(const std::string& dbfile)
- :_sqlite_helper(dbfile)
- {
- std::string path = FileHelper::parentDirectory(dbfile);
- FileHelper::createDirectory(path);
- _sqlite_helper.open();
- createTable();
- }
-
- void createTable()
- {
- std::stringstream sql;
- sql << "create table if not exists binding_table(";
- sql << "exchange_name varchar(32), ";
- sql << "msgqueue_name varchar(32), ";
- sql << "binding_key varchar(128));";
- assert(_sqlite_helper.exec(sql.str(), nullptr, nullptr));
- }
-
- void removeTable()
- {
- std::stringstream sql;
- sql << "drop table if exists binding_table; ";
- assert(_sqlite_helper.exec(sql.str(), nullptr, nullptr));
- }
-
-
- bool insert(Binding::ptr& binding)
- {
- std::stringstream sql;
- sql << "insert into binding_table values(";
- sql << "'" << binding->exchange_name <<"',";
- sql << "'" << binding->msgqueue_name << "',";
- sql << "'" << binding->binding_key << "');";
-
- return _sqlite_helper.exec(sql.str(), nullptr, nullptr);
- }
-
- bool remove(const std::string& ename, const std::string& qname)
- {
- std::stringstream sql;
- sql << "delete from binding_table where ";
- sql << "exchange_name = '" << ename << "' and ";
- sql << "msgqueue_name = '" << qname << "';";
- return _sqlite_helper.exec(sql.str(), nullptr, nullptr);
- }
-
- void removeExchangeBindings(const std::string& ename)
- {
- std::stringstream sql;
- sql << "delete from binding_table where ";
- sql << "exchange_name = '" << ename << "';";
- _sqlite_helper.exec(sql.str(), nullptr, nullptr);
- }
-
- void removeMsgQueueBindings(const std::string& qname)
- {
- std::stringstream sql;
- sql << "delete from binding_table where ";
- sql << "msgqueue_name = '" << qname << "';";
- _sqlite_helper.exec(sql.str(), nullptr, nullptr);
- }
-
- BindingMap recovery()
- {
- BindingMap result;
- std::string sql = "select exchange_name, msgqueue_name, binding_key from binding_table;";
- _sqlite_helper.exec(sql, SelectCallBack, &result);
- return result;
- }
-
- private:
- static int SelectCallBack(void* args, int numcol, char** row, char** fileds)
- {
- BindingMap* result = (BindingMap*)args;
- Binding::ptr bq = std::make_shared<Binding>();
- bq->exchange_name = row[0];
- bq->msgqueue_name = row[1];
- bq->binding_key = row[2];
- //为了防止 交换相关的绑定信息,因此不能直接创建队列映射,这样会覆盖历史数据
- // 因此得先获取交换机对应的映射对象,往里边添加数据
- // 但是,若这时候没有交换机对应的映射信息,因此这里的获取要使用引用(会保证不存在则自动创建)
- MsgQueueBindingMap& qbm = (*result)[bq->exchange_name];
- qbm.insert(std::make_pair(bq->msgqueue_name, bq));
- return 0;
- }
- private:
- SqliteHelper _sqlite_helper;
- };
-
- class BindingManager
- {
- public:
- using ptr = std::shared_ptr<BindingManager>;
- BindingManager(const std::string dbfile)
- :_mapper(dbfile)
- {
- _bindmap = _mapper.recovery();
- }
-
- bool bind(const std::string& ename, const std::string& qname, const std::string& key ,bool durable)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _bindmap.find(ename);
- if(it != _bindmap.end() && it->second.find(qname) != it->second.end())
- {
- return true;
- }
- //绑定是否是持久化
- Binding::ptr bp = std::make_shared<Binding>(ename, qname, key);
- // 磁盘数据持久化
- if(durable == true)
- {
- bool ret = _mapper.insert(bp);
- if(ret == false)
- {
- //未添加成功
- return false;
- }
- }
- // 一个交换机对应着多条绑定信息,所以先确定交换机,再将相关绑定信息插进去
- MsgQueueBindingMap& mqbm = _bindmap[ename];
- mqbm.insert(std::make_pair(bp->msgqueue_name, bp));
- return true;
- }
-
- void unBind(const std::string& ename, const std::string& qname)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto itBindMap = _bindmap.find(ename);
- if(itBindMap == _bindmap.end())
- {
- return;
- }
- auto itMsgQueueBindings = itBindMap->second.find(qname);
- if(itMsgQueueBindings == itBindMap->second.end())
- {
- return ;
- }
- _mapper.remove(ename, qname);
- _bindmap[ename].erase(qname);
- }
-
- void removeExchangeBindings(const std::string& ename)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- _mapper.removeExchangeBindings(ename);
- _bindmap.erase(ename);
- }
-
- void removeMsgqueueBindings(const std::string& qname)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- _mapper.removeMsgQueueBindings(qname);
- for(auto it = _bindmap.begin(); it != _bindmap.end(); it++)
- {
- it->second.erase(qname);
- }
- }
-
- MsgQueueBindingMap getExchangeBindings(const std::string& ename)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _bindmap.find(ename);
- if(it == _bindmap.end())
- {
- return MsgQueueBindingMap();
- }
- return it->second;
- }
-
- Binding::ptr getBinding(const std::string& ename, const std::string& qname)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it_bindmap = _bindmap.find(ename);
- if(it_bindmap == _bindmap.end())
- {
- return Binding::ptr();
- }
- auto it_MsgQueueBindings = it_bindmap->second.find(qname);
- if(it_MsgQueueBindings == it_bindmap->second.end())
- {
- return Binding::ptr();
- }
- return it_MsgQueueBindings->second;
- }
-
- bool exists(const std::string& ename, const std::string& qname)
- {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it_bindmap = _bindmap.find(ename);
- if(it_bindmap == _bindmap.end())
- {
- return false;
- }
- auto it_MsgQueueBindings = it_bindmap->second.find(qname);
- if(it_MsgQueueBindings ==it_bindmap->second.end())
- {
- return false;
- }
- return true;
- }
-
- size_t size()
- {
- std::unique_lock<std::mutex> lock(_mutex);
- size_t total_size = 0;
- for(auto it = _bindmap.begin(); it != _bindmap.end(); it++)
- {
- total_size += it->second.size();
- }
- return total_size;
- }
-
- void clear()
- {
- std::unique_lock<std::mutex> lock(_mutex);
- _mapper.removeTable();
- _bindmap.clear();
- }
-
-
-
- private:
- std::mutex _mutex;
- BindingMapper _mapper;
- BindingMap _bindmap;
- };
- }
-
- #endif
现在虚拟机模块的所有需要管理的数据全部都安排成功了,接下来是虚拟机的代码展示
- #ifndef __M_HOST_H__
- #define __M_HOST_H__
-
- #include "mq_exchange.hpp"
- #include "mq_queue.hpp"
- #include "mq_binding.hpp"
- #include "mq_message.hpp"
-
- namespace mymq
- {
- class VirtualHost
- {
- public:
- using ptr = std::shared_ptr<VirtualHost>;
- VirtualHost(const std::string& hname, const std::string& basedir, const std::string& dbfile)
- :_host_name(hname),
- _emp(std::make_shared<ExchangeManager>(dbfile)),
- _mqmp(std::make_shared<MsgQueueManager>(dbfile)),
- _bmp(std::make_shared<BindingManager>(dbfile)),
- _mmp(std::make_shared<MessageManager>(basedir))
- {}
-
- bool declareExchange(const std::string& name, ExchangeType type, bool durable, bool auto_delete, const google::protobuf::Map<std::string, std::string>& args)
- {
- return _emp->declareExchange(name, type, durable, auto_delete, args);
- }
-
- void deleteExchange(const std::string& name)
- {
- //删除交换机的时候,需要将交换机相关的绑定信息也删除掉
- _bmp->removeExchangeBindings(name);
- return _emp->deleteExchange(name);
- }
-
- bool existsExchange(const std::string& name)
- {
- return _emp->exists(name);
- }
-
- Exchange::ptr selectExchange(const std::string& ename)
- {
- return _emp->selectExchange(ename);
- }
-
- bool declareQueue(const std::string qname, bool qdurable, bool qexclusive, bool qauto_delete, const google::protobuf::Map<std::string, std::string> args)
- {
- //初始化队列消息句柄(消息的存储管理)
- //队列的创建
- _mmp->initQueueMessage(qname);
- return _mqmp->declareQueue(qname, qdurable, qexclusive, qauto_delete, args);
- }
-
- void deleteQueue(const std::string& qname)
- {
- //删除的时候队列相关的数据有两个:队列的消息,队列的绑定消息
- //删除队列里的消息
- _mmp->destroyQueueMessage(qname);
- //删除队列的绑定消息
- _bmp->removeMsgqueueBindings(qname);
- //删除队列
- _mqmp->deleteQueue(qname);
- }
-
- bool existsQueue(const std::string qname)
- {
- return _mqmp->exists(qname);
- }
-
- QueueMap allqueue()
- {
- return _mqmp->AllQueues();
- }
-
- bool bind(const std::string& ename, const std::string& qname, const std::string& key)
- {
- Exchange::ptr ep = _emp->selectExchange(ename);
- if(ep.get() == nullptr)
- {
- DLOG("进行队列绑定失败,交换机%s不存在", ename.c_str());
- return false;
- }
- MsgQueue::ptr mqp = _mqmp->selectQueue(qname);
- if(mqp.get() == nullptr)
- {
- DLOG("进行队列绑定失败,队列%s不存在", qname.c_str());
- }
- return _bmp->bind(ename, qname,key, ep->durable && mqp->durable);
- }
-
- void unBind(const std::string& ename, const std::string& qname)
- {
- return _bmp->unBind(ename, qname);
- }
-
- MsgQueueBindingMap exchangeBindings(const std::string& ename)
- {
- return _bmp->getExchangeBindings(ename);
- }
-
- bool existsBinding(const std::string& ename, const std::string& qname)
- {
- return _bmp->exists(ename, qname);
- }
-
- // 消息发布函数
- bool basicPublish(const std::string& qname, BasicProperties* bp, const std::string& body)
- {
- MsgQueue::ptr mqp = _mqmp->selectQueue(qname);
- if(mqp.get() == nullptr)
- {
- DLOG("发布消息失败, 队列%s不存在", qname.c_str());
- return false;
- }
- return _mmp->insert(qname, bp, body, mqp->durable);
- }
-
- //获取消息
- MessagePtr basicConsume(const std::string& name)
- {
- return _mmp->front(name);
- }
-
- // 消息应答
- void basicAck(const std::string& qname, const std::string& msgid)
- {
- return _mmp->ack(qname, msgid);
- }
-
- void clear()
- {
- _emp->clear();
- _mqmp->clear();
- _bmp->clear();
- _mmp->clear();
- }
-
- private:
- std::string _host_name;
- ExchangeManager::ptr _emp;
- MsgQueueManager::ptr _mqmp;
- BindingManager::ptr _bmp;
- MessageManager::ptr _mmp;
- };
- }
-
- #endif
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。