当前位置:   article > 正文

仿RabbitMq实现消息队列正式篇(虚拟机篇)

仿RabbitMq实现消息队列正式篇(虚拟机篇)
@TOC

目录

虚拟机模块

要管理的数据

要管理的操作

消息管理模块

要管理的数据

消息信息

消息主体

消息的管理

管理方法

管理数据

管理操作

队列消息管理

交换机数据管理

要管理的数据

要管理的操作

代码展示

队列数据管理

要管理的数据

要管理的操作

代码展示

绑定数据管理模块

管理的数据

管理的操作

代码展示

虚拟机代码展示


虚拟机模块

要管理的数据

  • 交换机数据管理句柄
  • 队列数据管理句柄
  • 绑定信息数据管理模块
  • 消息数据管理模块

要管理的操作

  • 声明 / 删除交换机:在删除交换机的时候,要删除相关的绑定信息
  • 声明 / 删除队列 :在删除队列的时候,要删除相关的绑定信息以及消息数据
  • 队列的绑定 / 解绑 :绑定的时候,必须交换机和队列是存在的
  • 获取指定队列的消息
  • 对指定队列的指定消息进行确认
  • 获取交换机相关的所有绑定信息:一条消息要发布给指定交换机的时候,交换机获取所有绑定信息,来确认消息要发布到哪个队列

基于这个,那我们先完成虚拟机模块管理的相关数据

消息管理模块

  1. syntax = "proto3";
  2. package mymq;
  3. enum ExchangeType # 交换类型
  4. {
  5. UNKNOWTYPE = 0;
  6. DIRECT = 1;
  7. FANOUT = 2;
  8. TOPIC = 3;
  9. };
  10. enum DeliveryMode # 是否持久化
  11. {
  12. UNKNOWMODE = 0;
  13. UNDURABLE = 1;
  14. DURABLE = 2;
  15. };
  16. message BasicProperties #消息的属性
  17. {
  18. string id = 1; # 消息ID, UUID
  19. DeliveryMode delivery_mode = 2; # 持久化模块
  20. string rounting_key = 3; # routing_key绑定信息
  21. };
  22. message Message
  23. {
  24. message Payload
  25. {
  26. BasicProperties properties = 1; # 消息属性
  27. string body = 2; # 有效载荷数据
  28. string valid = 3; # 消息是否有效
  29. };
  30. Payload payload = 1; # 真正持久化的只有这一个字段
  31. uint32 offset = 2; # 消息的位置
  32. uint32 length = 3; # 消息的长度
  33. };

要管理的数据

消息信息

  1. ID:消息的唯一标识
  2. 持久化标识:表示是否对消息进行持久化(还取决于队列的持久化标志)
  3. routing_key:决定了当前消息要发布的队列(消息发布到交换机后,根据绑定队列binding_key决定是否发布到指定队列)
消息主体
  1. 存储偏移量:消息以队列为单元存储在文件中,这个偏移量,是当前消息相对于文件起始位置的偏移量
  2. 消息长度:从偏移量位置取出指定长度的消息(解决了粘包问题)
  3. 是否有效标志:标识当前消息是否已经删除(删除一条消息,并不是用后面的消息覆盖掉前面的数据,而是重置有效标志位,当一个文件中的有效消息占据总消息比例不到50%,且数据量超过2000,则进行垃圾回收,重新整理文件数据存储)

        当系统重启时,也需要重新加载有效消息即可。

消息的管理

管理方法

以队列为单元进行管理(因为消息的所有操作都是以队列为单元)

管理数据
  1.  消息链表
  2. 待确定消息hash
  3. 持久化消息hash
  4. 持久化的有效消息数据
  5. 持久化的总的消息数据
管理操作
  • 向队列新增消息
  • 获取队首消息
  • 对消息进行确认
  • 恢复队列历史消息
  • 垃圾回收
  • 删除队列相关消息文件
队列消息管理
  • 初始化队列消息结构
  • 移除队列消息结果
  • 向队列新增消息
  • 对队列消息进行确认
  • 恢复队列历史消息
  1. #ifndef __M_MSG_H__
  2. #define __M_MSG_H__
  3. #include "../mqcommon/logger.hpp"
  4. #include "../mqcommon/helper.hpp"
  5. #include "../mqcommon/msg.pb.h"
  6. #include <iostream>
  7. #include <mutex>
  8. #include <memory>
  9. #include <unordered_map>
  10. #include <list>
  11. namespace mymq
  12. {
  13. // 消息的持久化管理
  14. // a. 管理数据
  15. // i. 队列消息⽂件存储的路径
  16. // ii. 队列消息的存储⽂件名
  17. // iii. 队列消息的临时交换⽂件名
  18. // b. 管理操作
  19. // i. ⽇志消息存储在⽂件中(4B⻓度+(属性+内容+有效位)序列化消息,连续存储即可)
  20. // ii. 提供队列消息⽂件创建/删除功能
  21. // iii. 提供队列消息的新增持久化/删除持久化
  22. // iv. 提供持久化内容的垃圾回收(其实就是重新加载出所有有效消息返回,并重新⽣成新的消息
  23. // 存储⽂件)
  24. #define DATAFILE_SUBFIX ".mqd"
  25. #define TMPFILE_SUBFIX ".mqb.tmp"
  26. using MessagePtr = std::shared_ptr<mymq::Message>;
  27. // 消息持久化
  28. class MessageMapper
  29. {
  30. public:
  31. MessageMapper(std::string &basedir, const std::string &qname)
  32. : _qname(qname)
  33. {
  34. if (basedir.back() != '/')
  35. {
  36. basedir.push_back('/');
  37. }
  38. _datafile = basedir + qname + DATAFILE_SUBFIX;
  39. _tmpfile = basedir + qname + TMPFILE_SUBFIX;
  40. if (FileHelper(basedir).exists() == false)
  41. {
  42. FileHelper::createDirectory(basedir);
  43. }
  44. createMsgFile();
  45. }
  46. bool createMsgFile()
  47. {
  48. if (FileHelper(_datafile).exists() == true)
  49. {
  50. return true;
  51. }
  52. bool ret = FileHelper::createFile(_datafile);
  53. if (ret == false)
  54. {
  55. ELOG("创建队列数据文件:%s 失败", _datafile.c_str());
  56. return false;
  57. }
  58. return true;
  59. }
  60. void removeMsgFile()
  61. {
  62. FileHelper::removeFile(_datafile);
  63. FileHelper::removeFile(_tmpfile);
  64. }
  65. bool insert(MessagePtr &msg)
  66. {
  67. return insert(_datafile, msg);
  68. }
  69. bool remove(MessagePtr &msg)
  70. {
  71. // 1. 将msg中的有效标志位修改为‘0
  72. msg->mutable_payload()->set_valid("0");
  73. // 2,对msg进行序列化
  74. std::string body = msg->SerializeAsString();
  75. if (body.size() != msg->length())
  76. {
  77. ELOG("不能修改文件中的数据信息,因为生成的数据与原数据长度不一样");
  78. return false;
  79. }
  80. // 3. 将序列化后的消息,写入到数据在文件中的指定位置(覆盖原有的数据)
  81. FileHelper helper(_datafile);
  82. bool ret = helper.write(body.c_str(), msg->offset(), body.size());
  83. if (ret == false)
  84. {
  85. DLOG("向队列数据文件写入失败!");
  86. return false;
  87. }
  88. return true;
  89. }
  90. std::list<MessagePtr> gc()
  91. {
  92. std::list<MessagePtr> result;
  93. bool ret = load(result);
  94. if (ret == false)
  95. {
  96. ELOG("加载有效数据失败\n");
  97. return result;
  98. }
  99. // 2. 将有效数据,进行序列化存储到临时文件中
  100. FileHelper::createFile(_tmpfile);
  101. for (auto e : result)
  102. {
  103. DLOG("向临时文件中添加数据 : %s", e->payload().body().c_str());
  104. ret = insert(_tmpfile, e);
  105. if (ret == false)
  106. {
  107. ELOG("向临时文件中添加数据失败");
  108. return result;
  109. }
  110. }
  111. // 3. 删除源文件
  112. ret = FileHelper::removeFile(_datafile);
  113. if (ret == false)
  114. {
  115. ELOG("删除原文件失败");
  116. return result;
  117. }
  118. // 4. 修改临时文件的名字,改成原文件的名称
  119. ret = FileHelper(_tmpfile).rename(_datafile.c_str());
  120. if (ret == false)
  121. {
  122. ELOG("修改文件名称失败");
  123. return result;
  124. }
  125. // 5. 返回新的有效数据
  126. return result;
  127. }
  128. private:
  129. bool load(std::list<MessagePtr> &result)
  130. {
  131. // 1. 加载出文件中的所有的有效数据:存储格式:4字节长度 | 数据 | 4字节长度 | 数据 ......
  132. FileHelper data_file_helper(_datafile);
  133. size_t offset = 0; // 用来定位
  134. size_t msg_size = 0; // 用来 4 字节长度的数据长度
  135. size_t file_size = data_file_helper.size();
  136. while (offset < file_size)
  137. {
  138. bool ret = data_file_helper.read((char*)&msg_size, offset, sizeof(size_t));
  139. if (ret == false)
  140. {
  141. DLOG("读取消息长度失败!");
  142. }
  143. offset += sizeof(size_t);
  144. std::string msg_body(msg_size, '\0');
  145. ret = data_file_helper.read(&msg_body[0], offset, msg_size);
  146. // 为什么使用 &msg_body[0]
  147. // &msg_body[0] 返回的是指向 std::string 内部字符数组首元素的指针,即指向字符串内容的指针。
  148. // 适用于需要直接操作字符串内容的场景,例如文件读写操作
  149. // &msg_body:
  150. // &msg_body 返回的是 std::string 对象本身的指针,即指向 std::string 对象的指针。
  151. // 不适用于直接操作字符串内容的场景,而是用于操作整个 std::string 对象本身。
  152. if (ret == false)
  153. {
  154. DLOG("读取消息数据失败");
  155. return false;
  156. }
  157. offset += msg_size;
  158. MessagePtr msgp = std::make_shared<Message>();
  159. msgp->mutable_payload()->ParseFromString(msg_body);
  160. // 如果是无效消息,则直接处理下一个
  161. if (msgp->payload().valid() == "0")
  162. {
  163. ELOG("加载到无效消息:%s", msgp->payload().body().c_str());
  164. continue;
  165. }
  166. // 有效消息则保存下来
  167. result.push_back(msgp);
  168. }
  169. return true;
  170. }
  171. bool insert(const std::string &filename, MessagePtr &msg)
  172. {
  173. // 新增数据都是添加在文件末尾的
  174. // 1. 进行消息的序列化,获取到格式化后的消息
  175. // 因为使用的是 sqlite 数据库,存储的是二进制信息,所以需要进行序列化
  176. std::string body = msg->payload().SerializeAsString();
  177. // 2. 获取文件长度
  178. FileHelper helper(filename);
  179. size_t fsize = helper.size();
  180. size_t msg_size = body.size();
  181. // 写入逻辑:1. 先写入 4 字节数据长度,2. 再写入指定长度数据
  182. bool ret = helper.write((char *)&msg_size, fsize, sizeof(size_t));
  183. if (ret == false)
  184. {
  185. DLOG("向队列数据文件写入数据长度失败!");
  186. return false;
  187. }
  188. // 将数据写入文件的指定位置
  189. ret = helper.write(body.c_str(), fsize + sizeof(size_t), msg_size);
  190. if (ret == false)
  191. {
  192. DLOG("向队列数据文件写入数据失败!");
  193. return false;
  194. }
  195. // 更新msg中的实际存储信息
  196. msg->set_offset(fsize + sizeof(size_t));
  197. msg->set_length(msg_size);
  198. return true;
  199. }
  200. private:
  201. std::string _qname;
  202. std::string _datafile;
  203. std::string _tmpfile;
  204. };
  205. // 管理数据
  206. class QueueMessage
  207. {
  208. public:
  209. using ptr = std::shared_ptr<QueueMessage>;
  210. // 初始化
  211. QueueMessage(std::string &basedir, const std::string &qname)
  212. : _mapper(basedir, qname),
  213. _qname(qname),
  214. _valid_count(0),
  215. _total_count(0)
  216. {}
  217. bool recovery()
  218. {
  219. // 恢复历史消息
  220. std::unique_lock<std::mutex> lock(_mutex);
  221. _msgs = _mapper.gc();
  222. for (auto &msg : _msgs)
  223. {
  224. _durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
  225. }
  226. _valid_count = _total_count = _msgs.size();
  227. return true;
  228. }
  229. bool insert(const BasicProperties *bp, const std::string &body, bool queue_is_durable)
  230. {
  231. // 构造消息对象
  232. MessagePtr msg = std::make_shared<Message>();
  233. msg->mutable_payload()->set_body(body);
  234. if (bp != nullptr)
  235. {
  236. DeliveryMode mode = queue_is_durable ? bp->delivery_mode() : DeliveryMode::UNDURABLE;
  237. msg->mutable_payload()->mutable_properties()->set_id(bp->id());
  238. msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
  239. msg->mutable_payload()->mutable_properties()->set_rounting_key(bp->rounting_key());
  240. }
  241. else
  242. {
  243. DeliveryMode mode = queue_is_durable ? DeliveryMode::DURABLE : DeliveryMode::UNDURABLE;
  244. msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());
  245. msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
  246. msg->mutable_payload()->mutable_properties()->set_rounting_key("");
  247. }
  248. std::unique_lock<std::mutex> lock(_mutex);
  249. // 2. 判断消息是否需要持久化
  250. if (msg->payload().properties().delivery_mode() == DURABLE)
  251. {
  252. msg->mutable_payload()->set_valid("1");
  253. // 3. 进行持久化存储
  254. bool ret = _mapper.insert(msg);
  255. if (ret == false)
  256. {
  257. ELOG("持久化存储信息:%s失败了!", body.c_str());
  258. }
  259. _valid_count += 1;
  260. _total_count += 1;
  261. _durable_msgs.insert(make_pair(msg->payload().properties().id(), msg));
  262. }
  263. // 4. 内容的管理
  264. _msgs.push_back(msg);
  265. return true;
  266. }
  267. MessagePtr front()
  268. {
  269. std::unique_lock<std::mutex> lock(_mutex);
  270. if (_msgs.size() == 0)
  271. {
  272. return MessagePtr();
  273. }
  274. // 获取一条队首消息:从_msgs中获取数据
  275. MessagePtr msg = _msgs.front();
  276. _msgs.pop_front();
  277. // 将该消息对象,向待确认的hash表中添加一份,等到收到消息确认后进行删除
  278. _waitack_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
  279. return msg;
  280. }
  281. // 每次删除消息后,判断是否需要垃圾回收
  282. bool remove(const std::string &msg_id)
  283. {
  284. std::unique_lock<std::mutex> lock(_mutex);
  285. // 1. 从待确认队列中找到信息
  286. auto it = _waitack_msgs.find(msg_id);
  287. if (it == _waitack_msgs.end())
  288. {
  289. DLOG("没有找到要删除的消息:%s!", msg_id.c_str());
  290. return true;
  291. }
  292. // 2. 根据消息的持久化模式,决定是否删除持久化消息
  293. if (it->second->payload().properties().delivery_mode() == DeliveryMode::DURABLE)
  294. {
  295. // 删除持久化消息
  296. _mapper.remove(it->second);
  297. _durable_msgs.erase(msg_id);
  298. _valid_count -= 1;
  299. gc();
  300. }
  301. // 4. 删除内存中的信息
  302. _waitack_msgs.erase(msg_id);
  303. return true;
  304. }
  305. size_t getable_count()
  306. {
  307. std::unique_lock<std::mutex> lock(_mutex);
  308. return _msgs.size();
  309. }
  310. size_t total_count()
  311. {
  312. std::unique_lock<std::mutex> lock(_mutex);
  313. return _total_count;
  314. }
  315. size_t durable_count()
  316. {
  317. std::unique_lock<std::mutex> lock(_mutex);
  318. return _durable_msgs.size();
  319. }
  320. size_t waitack_count()
  321. {
  322. std::unique_lock<std::mutex> lock(_mutex);
  323. return _waitack_msgs.size();
  324. }
  325. void clear()
  326. {
  327. std::unique_lock<std::mutex> lock(_mutex);
  328. _mapper.removeMsgFile();
  329. _msgs.clear();
  330. _durable_msgs.clear();
  331. _waitack_msgs.clear();
  332. _valid_count = 0;
  333. _total_count = 0;
  334. }
  335. private:
  336. bool GCCheck() {
  337. //持久化的消息总量大于2000, 且其中有效比例低于50%则需要持久化
  338. if (_total_count > 2000 && _valid_count * 10 / _total_count < 5) {
  339. return true;
  340. }
  341. return false;
  342. }
  343. void gc()
  344. {
  345. // 1. 进行垃圾回收,获取到垃圾回收后,有效的消息信息链表
  346. if(GCCheck() == false)
  347. {
  348. return;
  349. }
  350. std::list<MessagePtr> msgs = _mapper.gc();
  351. for(auto& msg : msgs)
  352. {
  353. auto it = _durable_msgs.find(msg->payload().properties().id());
  354. if(it == _durable_msgs.end())
  355. {
  356. DLOG("垃圾回收之后,有一条持久化消息,在内存中没有进行管理");
  357. _msgs.push_back(msg); // 做法:重新添加到推送链表的末尾
  358. _durable_msgs.insert(make_pair(msg->payload().properties().id(), msg));
  359. continue;
  360. }
  361. // 2. 更新每一条消息的实际存储位置
  362. it->second->set_offset(msg->offset());
  363. it->second->set_length(msg->length());
  364. }
  365. _valid_count = _total_count = msgs.size();
  366. }
  367. private:
  368. std::mutex _mutex;
  369. std::string _qname;
  370. size_t _valid_count;
  371. size_t _total_count;
  372. MessageMapper _mapper;
  373. std::list<MessagePtr> _msgs;
  374. std::unordered_map<std::string, MessagePtr> _durable_msgs; // 持久化消息hash
  375. std::unordered_map<std::string, MessagePtr> _waitack_msgs; // 待确认消息hash
  376. };
  377. // 实现一个对外的总体消息管理类
  378. // 消息的总体对外管理
  379. // i. 初始化新建队列的消息管理结构,并创建消息存储⽂件
  380. // ii. 删除队列的消息管理结构,以及消息存储⽂件
  381. // iii. 向指定队列新增消息
  382. // iv. 获取指定队列队⾸消息
  383. // v. 确认指定队列待确认消息(删除)
  384. // vi. 判断指定队列消息是否为空
  385. class MessageManager
  386. {
  387. public:
  388. using ptr = std::shared_ptr<MessageManager>;
  389. MessageManager(const std::string basedir)
  390. :_basedir(basedir)
  391. {}
  392. void clear()
  393. {
  394. std::unique_lock<std::mutex> lock(_mutex);
  395. for(auto& it : _queue_msgs)
  396. {
  397. it.second->clear();
  398. }
  399. }
  400. void initQueueMessage(const std::string& qname)
  401. {
  402. QueueMessage::ptr qmp;
  403. {
  404. std::unique_lock<std::mutex> lock(_mutex);
  405. auto it = _queue_msgs.find(qname);
  406. if(it != _queue_msgs.end())
  407. {
  408. return ;
  409. }
  410. qmp = std::make_shared<QueueMessage>(_basedir, qname);
  411. _queue_msgs.insert(std::make_pair(qname, qmp));
  412. }
  413. qmp->recovery();
  414. }
  415. void destroyQueueMessage(const std::string& qname)
  416. {
  417. QueueMessage::ptr qmp;
  418. {
  419. std::unique_lock<std::mutex> lock(_mutex);
  420. auto it = _queue_msgs.find(qname);
  421. if(it == _queue_msgs.end())
  422. {
  423. return ;
  424. }
  425. qmp = it->second;
  426. _queue_msgs.erase(it);
  427. }
  428. qmp->clear();
  429. }
  430. bool insert(const std::string& qname, BasicProperties* bp, const std::string& body, bool _queue_is_durable)
  431. {
  432. QueueMessage::ptr qmp;
  433. {
  434. std::unique_lock<std::mutex> lock(_mutex);
  435. auto it = _queue_msgs.find(qname);
  436. if(it == _queue_msgs.end())
  437. {
  438. ELOG("向队列 %s 新增消息失败,没有找到消息管理句柄", qname.c_str());
  439. return false;
  440. }
  441. qmp = it->second;
  442. }
  443. return qmp->insert(bp, body, _queue_is_durable);
  444. }
  445. MessagePtr front(const std::string& qname)
  446. {
  447. QueueMessage::ptr qmp;
  448. {
  449. std::unique_lock<std::mutex> lock(_mutex);
  450. auto it = _queue_msgs.find(qname);
  451. if(it == _queue_msgs.end())
  452. {
  453. DLOG("获取 %s 队首数据失败:没有找到对应的消息句柄", qname.c_str() );
  454. return MessagePtr();
  455. }
  456. qmp = it->second;
  457. }
  458. //取出对应的相关队列的首部消息
  459. return qmp->front();
  460. }
  461. void ack(const std::string& qname, const std::string& msg_id)
  462. {
  463. QueueMessage::ptr qmp;
  464. {
  465. std::unique_lock<std::mutex> lock(_mutex);
  466. auto it = _queue_msgs.find(qname);
  467. if(it == _queue_msgs.end())
  468. {
  469. DLOG("确认队列 %s 消息 %s 失败:没有找到消息管理句柄", qname.c_str(), msg_id.c_str());
  470. return ;
  471. }
  472. qmp = it->second;
  473. }
  474. qmp->remove(msg_id);
  475. return ;
  476. }
  477. size_t getable_count(const std::string& qname)
  478. {
  479. QueueMessage::ptr qmp;
  480. {
  481. std::unique_lock<std::mutex> lock(_mutex);
  482. auto it = _queue_msgs.find(qname);
  483. if(it == _queue_msgs.end())
  484. {
  485. DLOG("获取队列 %s 待推送消息数量失败:没有找到消息管理句柄", qname.c_str());
  486. return 0;
  487. }
  488. qmp = it->second;
  489. }
  490. return qmp->getable_count();
  491. }
  492. size_t total_count(const std::string& qname)
  493. {
  494. QueueMessage::ptr qmp;
  495. {
  496. std::unique_lock<std::mutex> lock(_mutex);
  497. auto it = _queue_msgs.find(qname);
  498. if(it == _queue_msgs.end())
  499. {
  500. DLOG("获取队列 %s 总持久化消息数量失败:没有找到消息管理句柄", qname.c_str());
  501. return 0;
  502. }
  503. qmp = it->second;
  504. }
  505. return qmp->total_count();
  506. }
  507. size_t durable_count(const std::string& qname)
  508. {
  509. QueueMessage::ptr qmp;
  510. {
  511. std::unique_lock<std::mutex> lock(_mutex);
  512. auto it = _queue_msgs.find(qname);
  513. if(it == _queue_msgs.end())
  514. {
  515. DLOG("获取队列 %s 总持久化消息数量失败:没有找到消息管理句柄", qname.c_str());
  516. return 0;
  517. }
  518. qmp = it->second;
  519. }
  520. return qmp->durable_count();
  521. }
  522. size_t waitack_count(const std::string& qname)
  523. {
  524. QueueMessage::ptr qmp;
  525. {
  526. std::unique_lock<std::mutex> lock(_mutex);
  527. auto it = _queue_msgs.find(qname);
  528. if(it == _queue_msgs.end())
  529. {
  530. DLOG("获取队列 %s 总持久化消息数量失败:没有找到消息管理句柄", qname.c_str());
  531. return 0;
  532. }
  533. qmp = it->second;
  534. }
  535. return qmp->waitack_count();
  536. }
  537. private:
  538. std::mutex _mutex;
  539. std::string _basedir;
  540. std::unordered_map<std::string, QueueMessage::ptr> _queue_msgs;
  541. };
  542. }
  543. #endif

交换机数据管理

要管理的数据

  • 交换机名称:唯一标识
  • 交换机类型:决定了消息的转发方式
  • 每一个队列绑定中有个binding_key, 每条消息中有一个routing_key
  • 直接交换:binding_key和routing_key相同,则将消息放入队列
  • 广播交换:将消息放入交换机绑定的所有队列中
  • 主题交换:routing_key与多个绑定队列的binding_key有个匹配机制,匹配成功了则放入
  • 持久化标志:决定了当前交换机信息是否需要持久化存储
  • 自动化删除标志:指的是关联了当前交换机的所有客户端都退出了,是否要自动删除交换机

要管理的操作

  • 创建交换机:本质上需要的是声明 ----- 强断言的思想,没有的话,就创建
  • 删除交换机:每个交换机都会绑定一个或多个队列(意味着会有一个或多个绑定信息,因此删除交换机需要删除相关绑定信息
  • 获取指定名称的交换机
  • 获取当前交换机名称

注意交换机有持久化的标识,所以在进行管理操作的时候,需要分情况进行,在内存中管理,以及在硬盘中管理

代码展示

  1. #include "../mqcommon/helper.hpp"
  2. #include "../mqcommon/logger.hpp"
  3. #include "../mqcommon/msg.pb.h"
  4. #include "../mqcommon/mq_proto.pb.h"
  5. #include <google/protobuf/map.h>
  6. #include <iostream>
  7. #include <unordered_map>
  8. #include <memory>
  9. #include <mutex>
  10. namespace mymq
  11. {
  12. // 1. 定义交换机类
  13. struct Exchange
  14. {
  15. using ptr = std::shared_ptr<Exchange>;
  16. // 1. 交换机名称
  17. std::string name;
  18. // 2. 交换机类型
  19. ExchangeType type;
  20. // 3. 交换机持久化标志
  21. bool durable;
  22. // 4. 是否自动删除标志
  23. bool auto_delete;
  24. // 5. 其他参数
  25. google::protobuf::Map<std::string, std::string> args;
  26. //因为后面的函数需要构造一个Exchange的空对象
  27. Exchange()
  28. {}
  29. Exchange(const std::string &ename, ExchangeType etype, bool edurable, bool eauto_delete, const google::protobuf::Map<std::string, std::string> eargs)
  30. : name(ename),
  31. type(etype),
  32. durable(edurable),
  33. auto_delete(eauto_delete),
  34. args(eargs)
  35. {}
  36. // args存储键值对,在存储数据库的时候,会组织一个格式字符串进行存储 key=value&key=value.....
  37. // 内部解析str_args字符串,将容器存储到成员中
  38. void setArgs(const std::string &str_args)
  39. {
  40. //key=val&key=val
  41. std::vector<std::string> sub_args;
  42. StrHelper::split(str_args, "&", sub_args);
  43. for(auto& str : sub_args)
  44. {
  45. size_t pos = str.find('=');
  46. std::string key = str.substr(0, pos);
  47. std::string val = str.substr(pos + 1);
  48. args[key] = val;
  49. }
  50. }
  51. // 将args中的内容进行序列化后,返回一个字符串
  52. std::string getArgs()
  53. {
  54. std::string result;
  55. for(auto e = args.begin(); e != args.end(); e++)
  56. {
  57. result = e->first + "=" + e->second + "&" + result;
  58. }
  59. return result;
  60. }
  61. };
  62. // 2. 定义交换机数据持久化管理类 -- 数据存储在sqlite数据库中
  63. class ExchangeMapper
  64. {
  65. public:
  66. ExchangeMapper(const std::string &dbfile)
  67. :_sqlite_helper(dbfile)
  68. {
  69. std::string path = FileHelper::parentDirectory(dbfile);
  70. FileHelper::createDirectory(path);
  71. assert(_sqlite_helper.open());
  72. createTable();
  73. }
  74. void createTable()
  75. {
  76. #define CREATE_TABLE "create table if not exists exchange_table(\
  77. name varchar(32) primary key, \
  78. type int,\
  79. durable int, \
  80. auto_delete int ,\
  81. args varchar(128));"
  82. bool ret = _sqlite_helper.exec(CREATE_TABLE, nullptr, nullptr);
  83. if(ret == false)
  84. {
  85. DLOG("创建交换机数据库失败");
  86. abort(); //直接异常退出程序
  87. }
  88. }
  89. void removeTable()
  90. {
  91. #define DROP_TABLE "drop table if exists exchange_table;"
  92. bool ret = _sqlite_helper.exec(DROP_TABLE, nullptr, nullptr);
  93. if(ret == false)
  94. {
  95. DLOG("删除交换机数据库失败");
  96. abort(); //直接异常退出程序
  97. }
  98. }
  99. bool insert(Exchange::ptr &exp)
  100. {
  101. #define INSERT_TABLE "insert into exchange_table values('%s', %d, %d, %d, '%s');"
  102. std::string args_str = exp->getArgs();
  103. char sql_str[4096] = {0};
  104. sprintf(sql_str, INSERT_TABLE, exp->name.c_str(), exp->type, exp->durable, exp->auto_delete, args_str.c_str());
  105. bool ret = _sqlite_helper.exec(sql_str, nullptr, nullptr);
  106. if(ret == false)
  107. {
  108. DLOG("插入数据库失败");
  109. return false;
  110. }
  111. return true;
  112. }
  113. void remove(const std::string &name)
  114. {
  115. std::stringstream ss;
  116. ss << "delete from exchange_table where name = ";
  117. ss << "'" << name << "';";
  118. _sqlite_helper.exec(ss.str(), nullptr, nullptr);
  119. }
  120. using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;
  121. ExchangeMap recovery()
  122. {
  123. ExchangeMap result;
  124. std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";
  125. _sqlite_helper.exec(sql.c_str(), SelectCallBack, &result);
  126. return result;
  127. }
  128. private:
  129. static int SelectCallBack(void* args, int numcol, char** row, char** fields)
  130. {
  131. ExchangeMap* result = (ExchangeMap*)args;
  132. auto exp = std::make_shared<Exchange>();
  133. exp->name = row[0];
  134. exp->type = (mymq::ExchangeType)std::stoi(row[1]);
  135. exp->durable = (bool)std::stoi(row[2]);
  136. exp->auto_delete = (bool)std::stoi(row[3]);
  137. if(row[4])
  138. {
  139. exp->setArgs(row[4]);
  140. }
  141. result->insert(std::make_pair(exp->name, exp));
  142. return 0;
  143. }
  144. private:
  145. SqliteHelper _sqlite_helper;
  146. };
  147. // 3. 定义交换机在内存管理类
  148. class ExchangeManager
  149. {
  150. public:
  151. using ptr = std::shared_ptr<ExchangeManager>;
  152. ExchangeManager(const std::string &dbfile)
  153. :_mapper(dbfile)
  154. {
  155. _exchanges = _mapper.recovery();
  156. }
  157. // 声明交换机
  158. bool declareExchange(const std::string &name, ExchangeType type, bool durable, bool auto_delete, const google::protobuf::Map<std::string, std::string> &args)
  159. {
  160. std::unique_lock<std::mutex> lock(_mutex);
  161. auto it = _exchanges.find(name);
  162. if(it != _exchanges.end())
  163. {
  164. //如果交换机已经存在,那就直接返回,不需要重复新增
  165. return true;
  166. }
  167. auto exp = std::make_shared<Exchange>(name, type, durable, auto_delete, args);
  168. if(durable == true)
  169. {
  170. bool ret = _mapper.insert(exp);
  171. if(ret == false)
  172. {
  173. return false;
  174. }
  175. }
  176. _exchanges.insert(std::make_pair(name, exp));
  177. return true;
  178. }
  179. // 删除交换机
  180. void deleteExchange(const std::string &name)
  181. {
  182. std::unique_lock<std::mutex> lock(_mutex);
  183. auto it = _exchanges.find(name);
  184. if(it == _exchanges.end())
  185. {
  186. //如果交换机不存在,那就直接返回
  187. return ;
  188. }
  189. if(it->second->durable == true)
  190. _mapper.remove(name);
  191. _exchanges.erase(name);
  192. }
  193. Exchange::ptr selectExchange(const std::string& name)
  194. {
  195. std::unique_lock<std::mutex> lock(_mutex);
  196. auto it = _exchanges.find(name);
  197. if(it == _exchanges.end())
  198. {
  199. //如果交换机不存在,那就直接返回
  200. return nullptr;
  201. }
  202. return it->second;
  203. }
  204. // 判断交换机是否存在
  205. bool exists(const std::string &name)
  206. {
  207. std::unique_lock<std::mutex> lock(_mutex);
  208. auto it = _exchanges.find(name);
  209. if(it == _exchanges.end())
  210. {
  211. //如果交换机不存在,那就直接返回
  212. return false;
  213. }
  214. return true;
  215. }
  216. size_t size()
  217. {
  218. std::unique_lock<std::mutex> lock(_mutex);
  219. return _exchanges.size();
  220. }
  221. // 清理所有交换机数据
  222. void clear()
  223. {
  224. std::unique_lock<std::mutex> lock(_mutex);
  225. _mapper.removeTable();
  226. _exchanges.clear();
  227. }
  228. private:
  229. std::mutex _mutex;
  230. ExchangeMapper _mapper;
  231. std::unordered_map<std::string, Exchange::ptr> _exchanges;
  232. };
  233. }

队列数据管理

要管理的数据

  1. 队列名称:唯一的标识
  2. 持久化存储标志:决定了是否将队列信息持久化存储起来,决定了重启后,这个队列是否存在
  3. 是否独占标志:独占就指的是,只有当前客户端自己能够订阅队列消息
  4. 自动删除标志:当订阅了当前队列的所有客户端退出后,是否删除队列

要管理的操作

  1. 创建队列
  2. 删除队列
  3. 获取指定队列信息
  4. 获取队列数据
  5. 获取所有队列名称:当系统重启后,需要重新加载数据,加载历史消息(消息以队列为单元存储在文件中)而加载消息需要直到队列名称,因为后边消息存储的时候,存储文件以队列名称进行的取名

代码展示

  1. #ifndef __M_QUEUE_H__
  2. #define __M_QUEUE_H__
  3. #include "../mqcommon/helper.hpp"
  4. #include "../mqcommon/logger.hpp"
  5. #include "../mqcommon/msg.pb.h"
  6. #include <iostream>
  7. #include <unordered_map>
  8. #include <memory>
  9. #include <mutex>
  10. namespace mymq
  11. {
  12. struct MsgQueue
  13. {
  14. using ptr = std::shared_ptr<MsgQueue>; // 放在这个位置合适吗?
  15. std::string name;
  16. bool durable;
  17. bool exclusive;
  18. bool auto_delete;
  19. google::protobuf::Map<std::string, std::string> args;
  20. MsgQueue()
  21. {}
  22. MsgQueue(const std::string qname, bool qdurable, bool qexclusive, bool qauto_delete, const google::protobuf::Map<std::string, std::string> &qargs)
  23. :name(qname),
  24. durable(qdurable),
  25. exclusive(qexclusive),
  26. auto_delete(qauto_delete),
  27. args(qargs)
  28. {}
  29. void setArgs(const std::string& str_args)
  30. {
  31. std::vector<std::string> sub_args;
  32. StrHelper::split(str_args, "&", sub_args);
  33. for(auto sub_string : sub_args)
  34. {
  35. size_t pos = sub_string.find("=");
  36. std::string key = sub_string.substr(0, pos);
  37. std::string value = sub_string.substr(pos + 1);
  38. args[key] = value;
  39. }
  40. }
  41. std::string getArgs()
  42. {
  43. std::string result;
  44. for(auto str = args.begin(); str != args.end(); str++)
  45. {
  46. result = str->first + "=" + str->second + result;
  47. }
  48. return result;
  49. }
  50. };
  51. using QueueMap = std::unordered_map<std::string, MsgQueue::ptr>;
  52. class MsgQueueMapper
  53. {
  54. public:
  55. MsgQueueMapper(const std::string& dbfile)
  56. :_sqlite_helper(dbfile)
  57. {
  58. std::string path = FileHelper::parentDirectory(dbfile);
  59. bool ret = FileHelper::createDirectory(path);
  60. if(ret == false)
  61. {
  62. ELOG("创建父目录失败: %s", strerror(errno));
  63. }
  64. _sqlite_helper.open(); //创建sqlite目录
  65. createTable();
  66. }
  67. void createTable()
  68. {
  69. std::stringstream sql;
  70. sql << "create table if not exists queue_table(";
  71. sql << "name varchar(32) primary key, ";
  72. sql << "durable int, ";
  73. sql << "exclusive int, ";
  74. sql << "auto_delete int, ";
  75. sql << "args varchar(128));";
  76. _sqlite_helper.exec(sql.str(), nullptr, nullptr);
  77. }
  78. void removeTable()
  79. {
  80. std::stringstream sql;
  81. sql << "drop table if exists queue_table;";
  82. _sqlite_helper.exec(sql.str(), nullptr, nullptr);
  83. }
  84. bool insert(MsgQueue::ptr& queue)
  85. {
  86. std::stringstream sql;
  87. sql << "insert into queue_table values(";
  88. sql << "'" << queue->name << "',";
  89. sql << queue->durable << ", ";
  90. sql << queue->exclusive << ", ";
  91. sql << queue->auto_delete << ", ";
  92. sql << "'" << queue->getArgs() << "');";
  93. return _sqlite_helper.exec(sql.str(), nullptr, nullptr);
  94. }
  95. void remove(const std::string& name)
  96. {
  97. std::stringstream sql;
  98. sql << "delete from queue_table where name = ";
  99. sql << "'" << name << "';";
  100. _sqlite_helper.exec(sql.str(), nullptr, nullptr);
  101. }
  102. QueueMap recovery()
  103. {
  104. QueueMap result;
  105. std::stringstream sql;
  106. sql << "select name, durable, exclusive, auto_delete, args from queue_table;";
  107. _sqlite_helper.exec(sql.str(), SelectCallBack, &result);
  108. return result;
  109. }
  110. private:
  111. static int SelectCallBack(void* args, int numcol, char** row, char** fields)
  112. {
  113. QueueMap* result = (QueueMap*)args;
  114. MsgQueue::ptr mqp = std::make_shared<MsgQueue>();
  115. mqp->name = row[0];
  116. mqp->durable = std::stoi(row[1]);
  117. mqp->exclusive = std::stoi(row[2]);
  118. mqp->auto_delete = std::stoi(row[3]);
  119. if(row[4]) mqp->setArgs(row[4]);
  120. result->insert(std::make_pair(mqp->name, mqp));
  121. return 0;
  122. }
  123. private:
  124. SqliteHelper _sqlite_helper;
  125. };
  126. class MsgQueueManager
  127. {
  128. public:
  129. using ptr = std::shared_ptr<MsgQueueManager>;
  130. MsgQueueManager(const std::string& dbfile)
  131. :_mapper(dbfile)
  132. {
  133. _msg_queue = _mapper.recovery();
  134. }
  135. bool declareQueue(const std::string& name, bool durable, bool exclusive, bool auto_delete, const google::protobuf::Map<std::string, std::string>& args)
  136. {
  137. std::unique_lock<std::mutex> lock(_mutex);
  138. auto it = _msg_queue.find(name);
  139. if(it != _msg_queue.end())
  140. {
  141. // 队列存在,直接返回true
  142. return true;
  143. }
  144. MsgQueue::ptr mqp = std::make_shared<MsgQueue>();
  145. mqp->name = name;
  146. mqp->durable = durable;
  147. mqp->exclusive = exclusive;
  148. mqp->auto_delete = auto_delete;
  149. mqp->args = args;
  150. if(durable ==true) //需要进行持久化存储
  151. {
  152. bool ret = _mapper.insert(mqp);
  153. if(ret == false) return false;
  154. }
  155. _msg_queue.insert(make_pair(mqp->name, mqp));
  156. return true;
  157. }
  158. void deleteQueue(const std::string& name)
  159. {
  160. std::unique_lock<std::mutex> lock(_mutex);
  161. auto it = _msg_queue.find(name);
  162. if(it == _msg_queue.end())
  163. {
  164. DLOG("未找到相关信息");
  165. return ;
  166. }
  167. if(it->second->durable == true)
  168. {
  169. _mapper.remove(name);
  170. }
  171. _msg_queue.erase(name);
  172. }
  173. MsgQueue::ptr selectQueue(const std::string& name)
  174. {
  175. std::unique_lock<std::mutex> lock(_mutex);
  176. auto it = _msg_queue.find(name);
  177. if(it == _msg_queue.end())
  178. {
  179. DLOG("未找到相关信息");
  180. return MsgQueue::ptr();
  181. }
  182. return it->second;
  183. }
  184. QueueMap AllQueues()
  185. {
  186. std::unique_lock<std::mutex> lock(_mutex);
  187. return _msg_queue;
  188. }
  189. bool exists(const std::string& name)
  190. {
  191. std::unique_lock<std::mutex> lock(_mutex);
  192. auto it = _msg_queue.find(name);
  193. if(it == _msg_queue.end())
  194. {
  195. return false;
  196. }
  197. return true;
  198. }
  199. size_t size()
  200. {
  201. std::unique_lock<std::mutex> lock(_mutex);
  202. return _msg_queue.size();
  203. }
  204. void clear()
  205. {
  206. std::unique_lock<std::mutex> lock(_mutex);
  207. _msg_queue.clear();
  208. }
  209. private:
  210. std::mutex _mutex;
  211. MsgQueueMapper _mapper;
  212. QueueMap _msg_queue;
  213. };
  214. }
  215. #endif

绑定数据管理模块

描述将哪个队列与哪个交换机绑定到了一起

管理的数据

  • 交换机名称
  • 队列名称
  • binding_key:绑定密钥----描述在交换机的主题交换&直接交换的消息发布匹配规则

由数字, 字符,_,#, * 组成 binding_key, 例如:news.music.#

管理的操作

1. 添加绑定

2. 解除绑定

3. 获取交换机相关的所有绑定信息

        删除交换机的时候,要删除相关绑定信息

        当消息发布到交换机中,交换机得通过这些信息来将消息发布到指定队列

4. 获取队列相关的所有绑定信息

        删除队列的时候,要删除相关的绑定信息

5. 获取绑定信息数量

代码展示

  1. #ifndef _M_BINDING_H_
  2. #define _M_BINDING_H_
  3. #include "../mqcommon/helper.hpp"
  4. #include "../mqcommon/msg.pb.h"
  5. #include "../mqcommon/logger.hpp"
  6. #include <iostream>
  7. #include <unordered_map>
  8. #include <memory>
  9. #include <mutex>
  10. namespace mymq
  11. {
  12. struct Binding
  13. {
  14. using ptr = std::shared_ptr<Binding>;
  15. std::string exchange_name;
  16. std::string msgqueue_name;
  17. std::string binding_key;
  18. Binding() {}
  19. Binding(const std::string &ename, const std::string &msgqname, const std::string &key)
  20. : exchange_name(ename),
  21. msgqueue_name(msgqname),
  22. binding_key(key)
  23. {
  24. }
  25. };
  26. // 队列与绑定信息是一一对应的(因为是给某一个交换机去绑定队列,因此一个交换机可能会有多个队列的绑定信息)
  27. // 因此先定义一个队列名,与绑定信息的映射信息,这个是为了方便通过队名查找绑定信息
  28. using MsgQueueBindingMap = std::unordered_map<std::string, Binding::ptr>;
  29. // 然后定义一个交换机名称与队列绑定信息之间的映射关系,这个map包含了所有的绑定信息,并且以交换机为单元进行区分
  30. using BindingMap = std::unordered_map<std::string, MsgQueueBindingMap>;
  31. // 采用上面两种结构,则删除交换机相关绑定信息的时候,不仅要删除交换机映射,还需要删除对应队列中的映射,否则对象得不到释放
  32. class BindingMapper
  33. {
  34. public:
  35. BindingMapper(const std::string& dbfile)
  36. :_sqlite_helper(dbfile)
  37. {
  38. std::string path = FileHelper::parentDirectory(dbfile);
  39. FileHelper::createDirectory(path);
  40. _sqlite_helper.open();
  41. createTable();
  42. }
  43. void createTable()
  44. {
  45. std::stringstream sql;
  46. sql << "create table if not exists binding_table(";
  47. sql << "exchange_name varchar(32), ";
  48. sql << "msgqueue_name varchar(32), ";
  49. sql << "binding_key varchar(128));";
  50. assert(_sqlite_helper.exec(sql.str(), nullptr, nullptr));
  51. }
  52. void removeTable()
  53. {
  54. std::stringstream sql;
  55. sql << "drop table if exists binding_table; ";
  56. assert(_sqlite_helper.exec(sql.str(), nullptr, nullptr));
  57. }
  58. bool insert(Binding::ptr& binding)
  59. {
  60. std::stringstream sql;
  61. sql << "insert into binding_table values(";
  62. sql << "'" << binding->exchange_name <<"',";
  63. sql << "'" << binding->msgqueue_name << "',";
  64. sql << "'" << binding->binding_key << "');";
  65. return _sqlite_helper.exec(sql.str(), nullptr, nullptr);
  66. }
  67. bool remove(const std::string& ename, const std::string& qname)
  68. {
  69. std::stringstream sql;
  70. sql << "delete from binding_table where ";
  71. sql << "exchange_name = '" << ename << "' and ";
  72. sql << "msgqueue_name = '" << qname << "';";
  73. return _sqlite_helper.exec(sql.str(), nullptr, nullptr);
  74. }
  75. void removeExchangeBindings(const std::string& ename)
  76. {
  77. std::stringstream sql;
  78. sql << "delete from binding_table where ";
  79. sql << "exchange_name = '" << ename << "';";
  80. _sqlite_helper.exec(sql.str(), nullptr, nullptr);
  81. }
  82. void removeMsgQueueBindings(const std::string& qname)
  83. {
  84. std::stringstream sql;
  85. sql << "delete from binding_table where ";
  86. sql << "msgqueue_name = '" << qname << "';";
  87. _sqlite_helper.exec(sql.str(), nullptr, nullptr);
  88. }
  89. BindingMap recovery()
  90. {
  91. BindingMap result;
  92. std::string sql = "select exchange_name, msgqueue_name, binding_key from binding_table;";
  93. _sqlite_helper.exec(sql, SelectCallBack, &result);
  94. return result;
  95. }
  96. private:
  97. static int SelectCallBack(void* args, int numcol, char** row, char** fileds)
  98. {
  99. BindingMap* result = (BindingMap*)args;
  100. Binding::ptr bq = std::make_shared<Binding>();
  101. bq->exchange_name = row[0];
  102. bq->msgqueue_name = row[1];
  103. bq->binding_key = row[2];
  104. //为了防止 交换相关的绑定信息,因此不能直接创建队列映射,这样会覆盖历史数据
  105. // 因此得先获取交换机对应的映射对象,往里边添加数据
  106. // 但是,若这时候没有交换机对应的映射信息,因此这里的获取要使用引用(会保证不存在则自动创建)
  107. MsgQueueBindingMap& qbm = (*result)[bq->exchange_name];
  108. qbm.insert(std::make_pair(bq->msgqueue_name, bq));
  109. return 0;
  110. }
  111. private:
  112. SqliteHelper _sqlite_helper;
  113. };
  114. class BindingManager
  115. {
  116. public:
  117. using ptr = std::shared_ptr<BindingManager>;
  118. BindingManager(const std::string dbfile)
  119. :_mapper(dbfile)
  120. {
  121. _bindmap = _mapper.recovery();
  122. }
  123. bool bind(const std::string& ename, const std::string& qname, const std::string& key ,bool durable)
  124. {
  125. std::unique_lock<std::mutex> lock(_mutex);
  126. auto it = _bindmap.find(ename);
  127. if(it != _bindmap.end() && it->second.find(qname) != it->second.end())
  128. {
  129. return true;
  130. }
  131. //绑定是否是持久化
  132. Binding::ptr bp = std::make_shared<Binding>(ename, qname, key);
  133. // 磁盘数据持久化
  134. if(durable == true)
  135. {
  136. bool ret = _mapper.insert(bp);
  137. if(ret == false)
  138. {
  139. //未添加成功
  140. return false;
  141. }
  142. }
  143. // 一个交换机对应着多条绑定信息,所以先确定交换机,再将相关绑定信息插进去
  144. MsgQueueBindingMap& mqbm = _bindmap[ename];
  145. mqbm.insert(std::make_pair(bp->msgqueue_name, bp));
  146. return true;
  147. }
  148. void unBind(const std::string& ename, const std::string& qname)
  149. {
  150. std::unique_lock<std::mutex> lock(_mutex);
  151. auto itBindMap = _bindmap.find(ename);
  152. if(itBindMap == _bindmap.end())
  153. {
  154. return;
  155. }
  156. auto itMsgQueueBindings = itBindMap->second.find(qname);
  157. if(itMsgQueueBindings == itBindMap->second.end())
  158. {
  159. return ;
  160. }
  161. _mapper.remove(ename, qname);
  162. _bindmap[ename].erase(qname);
  163. }
  164. void removeExchangeBindings(const std::string& ename)
  165. {
  166. std::unique_lock<std::mutex> lock(_mutex);
  167. _mapper.removeExchangeBindings(ename);
  168. _bindmap.erase(ename);
  169. }
  170. void removeMsgqueueBindings(const std::string& qname)
  171. {
  172. std::unique_lock<std::mutex> lock(_mutex);
  173. _mapper.removeMsgQueueBindings(qname);
  174. for(auto it = _bindmap.begin(); it != _bindmap.end(); it++)
  175. {
  176. it->second.erase(qname);
  177. }
  178. }
  179. MsgQueueBindingMap getExchangeBindings(const std::string& ename)
  180. {
  181. std::unique_lock<std::mutex> lock(_mutex);
  182. auto it = _bindmap.find(ename);
  183. if(it == _bindmap.end())
  184. {
  185. return MsgQueueBindingMap();
  186. }
  187. return it->second;
  188. }
  189. Binding::ptr getBinding(const std::string& ename, const std::string& qname)
  190. {
  191. std::unique_lock<std::mutex> lock(_mutex);
  192. auto it_bindmap = _bindmap.find(ename);
  193. if(it_bindmap == _bindmap.end())
  194. {
  195. return Binding::ptr();
  196. }
  197. auto it_MsgQueueBindings = it_bindmap->second.find(qname);
  198. if(it_MsgQueueBindings == it_bindmap->second.end())
  199. {
  200. return Binding::ptr();
  201. }
  202. return it_MsgQueueBindings->second;
  203. }
  204. bool exists(const std::string& ename, const std::string& qname)
  205. {
  206. std::unique_lock<std::mutex> lock(_mutex);
  207. auto it_bindmap = _bindmap.find(ename);
  208. if(it_bindmap == _bindmap.end())
  209. {
  210. return false;
  211. }
  212. auto it_MsgQueueBindings = it_bindmap->second.find(qname);
  213. if(it_MsgQueueBindings ==it_bindmap->second.end())
  214. {
  215. return false;
  216. }
  217. return true;
  218. }
  219. size_t size()
  220. {
  221. std::unique_lock<std::mutex> lock(_mutex);
  222. size_t total_size = 0;
  223. for(auto it = _bindmap.begin(); it != _bindmap.end(); it++)
  224. {
  225. total_size += it->second.size();
  226. }
  227. return total_size;
  228. }
  229. void clear()
  230. {
  231. std::unique_lock<std::mutex> lock(_mutex);
  232. _mapper.removeTable();
  233. _bindmap.clear();
  234. }
  235. private:
  236. std::mutex _mutex;
  237. BindingMapper _mapper;
  238. BindingMap _bindmap;
  239. };
  240. }
  241. #endif

现在虚拟机模块的所有需要管理的数据全部都安排成功了,接下来是虚拟机的代码展示

虚拟机代码展示

  1. #ifndef __M_HOST_H__
  2. #define __M_HOST_H__
  3. #include "mq_exchange.hpp"
  4. #include "mq_queue.hpp"
  5. #include "mq_binding.hpp"
  6. #include "mq_message.hpp"
  7. namespace mymq
  8. {
  9. class VirtualHost
  10. {
  11. public:
  12. using ptr = std::shared_ptr<VirtualHost>;
  13. VirtualHost(const std::string& hname, const std::string& basedir, const std::string& dbfile)
  14. :_host_name(hname),
  15. _emp(std::make_shared<ExchangeManager>(dbfile)),
  16. _mqmp(std::make_shared<MsgQueueManager>(dbfile)),
  17. _bmp(std::make_shared<BindingManager>(dbfile)),
  18. _mmp(std::make_shared<MessageManager>(basedir))
  19. {}
  20. bool declareExchange(const std::string& name, ExchangeType type, bool durable, bool auto_delete, const google::protobuf::Map<std::string, std::string>& args)
  21. {
  22. return _emp->declareExchange(name, type, durable, auto_delete, args);
  23. }
  24. void deleteExchange(const std::string& name)
  25. {
  26. //删除交换机的时候,需要将交换机相关的绑定信息也删除掉
  27. _bmp->removeExchangeBindings(name);
  28. return _emp->deleteExchange(name);
  29. }
  30. bool existsExchange(const std::string& name)
  31. {
  32. return _emp->exists(name);
  33. }
  34. Exchange::ptr selectExchange(const std::string& ename)
  35. {
  36. return _emp->selectExchange(ename);
  37. }
  38. bool declareQueue(const std::string qname, bool qdurable, bool qexclusive, bool qauto_delete, const google::protobuf::Map<std::string, std::string> args)
  39. {
  40. //初始化队列消息句柄(消息的存储管理)
  41. //队列的创建
  42. _mmp->initQueueMessage(qname);
  43. return _mqmp->declareQueue(qname, qdurable, qexclusive, qauto_delete, args);
  44. }
  45. void deleteQueue(const std::string& qname)
  46. {
  47. //删除的时候队列相关的数据有两个:队列的消息,队列的绑定消息
  48. //删除队列里的消息
  49. _mmp->destroyQueueMessage(qname);
  50. //删除队列的绑定消息
  51. _bmp->removeMsgqueueBindings(qname);
  52. //删除队列
  53. _mqmp->deleteQueue(qname);
  54. }
  55. bool existsQueue(const std::string qname)
  56. {
  57. return _mqmp->exists(qname);
  58. }
  59. QueueMap allqueue()
  60. {
  61. return _mqmp->AllQueues();
  62. }
  63. bool bind(const std::string& ename, const std::string& qname, const std::string& key)
  64. {
  65. Exchange::ptr ep = _emp->selectExchange(ename);
  66. if(ep.get() == nullptr)
  67. {
  68. DLOG("进行队列绑定失败,交换机%s不存在", ename.c_str());
  69. return false;
  70. }
  71. MsgQueue::ptr mqp = _mqmp->selectQueue(qname);
  72. if(mqp.get() == nullptr)
  73. {
  74. DLOG("进行队列绑定失败,队列%s不存在", qname.c_str());
  75. }
  76. return _bmp->bind(ename, qname,key, ep->durable && mqp->durable);
  77. }
  78. void unBind(const std::string& ename, const std::string& qname)
  79. {
  80. return _bmp->unBind(ename, qname);
  81. }
  82. MsgQueueBindingMap exchangeBindings(const std::string& ename)
  83. {
  84. return _bmp->getExchangeBindings(ename);
  85. }
  86. bool existsBinding(const std::string& ename, const std::string& qname)
  87. {
  88. return _bmp->exists(ename, qname);
  89. }
  90. // 消息发布函数
  91. bool basicPublish(const std::string& qname, BasicProperties* bp, const std::string& body)
  92. {
  93. MsgQueue::ptr mqp = _mqmp->selectQueue(qname);
  94. if(mqp.get() == nullptr)
  95. {
  96. DLOG("发布消息失败, 队列%s不存在", qname.c_str());
  97. return false;
  98. }
  99. return _mmp->insert(qname, bp, body, mqp->durable);
  100. }
  101. //获取消息
  102. MessagePtr basicConsume(const std::string& name)
  103. {
  104. return _mmp->front(name);
  105. }
  106. // 消息应答
  107. void basicAck(const std::string& qname, const std::string& msgid)
  108. {
  109. return _mmp->ack(qname, msgid);
  110. }
  111. void clear()
  112. {
  113. _emp->clear();
  114. _mqmp->clear();
  115. _bmp->clear();
  116. _mmp->clear();
  117. }
  118. private:
  119. std::string _host_name;
  120. ExchangeManager::ptr _emp;
  121. MsgQueueManager::ptr _mqmp;
  122. BindingManager::ptr _bmp;
  123. MessageManager::ptr _mmp;
  124. };
  125. }
  126. #endif

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/983363
推荐阅读
相关标签
  

闽ICP备14008679号