当前位置:   article > 正文

仿RabbitMq实现简易消息队列正式篇(消费者篇)

仿RabbitMq实现简易消息队列正式篇(消费者篇)
@TOC

消费者管理模块

客户端由两种:发布消息,订阅消息

因此订阅了指定队列消息的客户端才是一个消费者。

消费者数据存在的意义:当指定队列有了消息以后,就需要将消息推送给这个消费者客户端(推送的时候就需要找到这个客户端相关的信息--连接)

消费者信息:

消费者标识 --tag

订阅队列名称:当当前队列有消息就会推送给这个客户端,以及当客户端收到消息,需要对指定队列的消息进行确认

消费处理回调函数:队列有一个消息后,通过哪个函数进行处理

客户端消费者设计

  1. #ifndef __M_CONSUMER_H__
  2. #define __M_CONSUMER_H__
  3. #include "../mqcommon/logger.hpp"
  4. #include "../mqcommon/helper.hpp"
  5. #include "../mqcommon/msg.pb.h"
  6. #include <iostream>
  7. #include <unordered_map>
  8. #include <mutex>
  9. #include <memory>
  10. #include <vector>
  11. #include <functional>
  12. namespace mymq
  13. {
  14. using ConsumerCallback = std::function<void(const std::string, const BasicProperties* bp, const std::string)>;
  15. struct Consumer
  16. {
  17. using ptr = std::shared_ptr<Consumer>;
  18. std::string tag; //消费者标识
  19. std::string qname; //消费者订阅的队列名称
  20. bool auto_ack; //自动确认标志
  21. ConsumerCallback callback;
  22. Consumer()
  23. {
  24. DLOG("new Consumer: %p", this);
  25. }
  26. Consumer(const std::string& ctag, const std::string& queue_name, bool auto_flag, const ConsumerCallback& cb)
  27. :tag(ctag),
  28. qname(queue_name),
  29. auto_ack(auto_flag),
  30. callback(std::move(cb))
  31. {
  32. DLOG("new Consumer: %p", this);
  33. }
  34. ~Consumer()
  35. {
  36. DLOG("del Consumer: %p", this);
  37. }
  38. };
  39. }
  40. #endif

服务端消费者管理

管理思想

以队列为单位进行管理

每个消费者订阅的都是指定队列的消息,消费者对消息进行确认也是以队列进行确认

最关键的是:当指定队列中有消息了,必然是获取订阅了这个队列的消费者信息进行消息推送

队列消费者管理结构

数据信息
  • 消费者数组
  • RR轮转盘
  • 存储消费者的队列名
管理操作
  • 新增消费者
  • RR轮转获取一个消费者
  • 删除消费者
  • 队列消费者数量
  • 是否为空
代码展示
  1. // 以队列为单元的消费者管理结构
  2. class QueueConsumer
  3. {
  4. public:
  5. using ptr = std::shared_ptr<QueueConsumer>;
  6. QueueConsumer(const std::string &qname)
  7. : _qname(qname),
  8. _rr_seq(0)
  9. {
  10. }
  11. // 队列新增消费者
  12. Consumer::ptr create(const std::string &ctag, const std::string &queue_name, bool ack_flag, const ConsumerCallback &cb)
  13. {
  14. // 1. 加锁
  15. std::unique_lock<std::mutex> lock(_mutex);
  16. // 2. 判断消费者是否重复
  17. for (auto e : _consumers)
  18. {
  19. if (e->tag == ctag)
  20. {
  21. return Consumer::ptr();
  22. }
  23. }
  24. // 3. 没有重复则新增 -- 构造对象
  25. auto consumer = std::make_shared<Consumer>(ctag, _qname, ack_flag, cb);
  26. // 4. 添加管理后返回对象
  27. _consumers.push_back(consumer);
  28. return consumer;
  29. }
  30. // 队列移除消费者
  31. void remove(const std::string &ctag)
  32. {
  33. // 1. 加锁
  34. std::unique_lock<std::mutex> lock(_mutex);
  35. // 2. 遍历 - 删除
  36. for (auto it = _consumers.begin(); it != _consumers.end(); it++)
  37. {
  38. if ((*it)->tag == ctag)
  39. {
  40. _consumers.erase(it);
  41. return;
  42. }
  43. }
  44. return;
  45. }
  46. // 队列获取消费者:RR轮转获取
  47. Consumer::ptr choose()
  48. {
  49. // 1. 加锁
  50. std::unique_lock<std::mutex> lock(_mutex);
  51. // 2. 获取当前轮转的下标
  52. if (_consumers.size() == 0)
  53. {
  54. return Consumer::ptr();
  55. }
  56. int index = _rr_seq % _consumers.size();
  57. // 3. 获取对象, 返回
  58. _rr_seq++;
  59. return _consumers[index];
  60. }
  61. // 是否为空
  62. bool empty()
  63. {
  64. std::unique_lock<std::mutex> lock(_mutex);
  65. return _consumers.size() == 0;
  66. }
  67. // 判断指定消费者是否存在
  68. bool exists(const std::string &ctag)
  69. {
  70. std::unique_lock<std::mutex> lock(_mutex);
  71. for (auto it = _consumers.begin(); it != _consumers.end(); it++)
  72. {
  73. if ((*it)->tag == ctag)
  74. {
  75. return true;
  76. }
  77. }
  78. return false;
  79. }
  80. void clear()
  81. {
  82. std::unique_lock<std::mutex> lock(_mutex);
  83. _consumers.clear();
  84. _rr_seq = 0;
  85. }
  86. private:
  87. std::string _qname;
  88. std::mutex _mutex;
  89. uint64_t _rr_seq;
  90. std::vector<Consumer::ptr> _consumers;
  91. };

管理操作

  • 初始化队列消费者结构
  • 删除队列消费者结构
  • 向指定队列添加消费者
  • 获取指定队列消费者
  • 删除指定队列消费者
  1. class ConsumerManager
  2. {
  3. public:
  4. using ptr = std::shared_ptr<ConsumerManager>;
  5. ConsumerManager()
  6. {
  7. }
  8. void initQueueConsumer(const std::string &qname)
  9. {
  10. // 1. 加锁
  11. std::unique_lock<std::mutex> lock(_mutex);
  12. // 2. 重复判断
  13. auto it = _qconsumers.find(qname);
  14. if (it != _qconsumers.end())
  15. {
  16. return;
  17. }
  18. // 3. 新增
  19. QueueConsumer::ptr qcp = std::make_shared<QueueConsumer>(qname);
  20. _qconsumers.insert(std::make_pair(qname, qcp));
  21. }
  22. void destroyQueueConsumer(const std::string &qname)
  23. {
  24. std::unique_lock<std::mutex> lock(_mutex);
  25. _qconsumers.erase(qname);
  26. }
  27. // 这里不理解,为啥找到了还要进行创建
  28. Consumer::ptr create(const std::string &ctag, const std::string &queue_name, bool ack_flag, const ConsumerCallback &cb)
  29. {
  30. // 获取队列的消费者管理单元句柄,通过句柄完成新建
  31. QueueConsumer::ptr qcp;
  32. {
  33. std::unique_lock<std::mutex> lock(_mutex);
  34. auto it = _qconsumers.find(queue_name);
  35. if (it == _qconsumers.end())
  36. {
  37. DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());
  38. return Consumer::ptr();
  39. }
  40. qcp = it->second;
  41. }
  42. return qcp->create(ctag, queue_name, ack_flag, cb);
  43. }
  44. void remove(const std::string &ctag, const std::string &queue_name)
  45. {
  46. QueueConsumer::ptr qcp;
  47. {
  48. std::unique_lock<std::mutex> lock(_mutex);
  49. auto it = _qconsumers.find(queue_name);
  50. if (it == _qconsumers.end())
  51. {
  52. DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());
  53. return;
  54. }
  55. qcp = it->second;
  56. }
  57. return qcp->remove(ctag);
  58. }
  59. Consumer::ptr choose(const std::string &queue_name)
  60. {
  61. QueueConsumer::ptr qcp;
  62. {
  63. std::unique_lock<std::mutex> lock(_mutex);
  64. auto it = _qconsumers.find(queue_name);
  65. if (it == _qconsumers.end())
  66. {
  67. DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());
  68. return Consumer::ptr();
  69. }
  70. qcp = it->second;
  71. }
  72. return qcp->choose();
  73. }
  74. bool empty(const std::string &queue_name)
  75. {
  76. QueueConsumer::ptr qcp;
  77. {
  78. std::unique_lock<std::mutex> lock(_mutex);
  79. auto it = _qconsumers.find(queue_name);
  80. if (it == _qconsumers.end())
  81. {
  82. DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());
  83. return false;
  84. }
  85. qcp = it->second;
  86. }
  87. return qcp->empty();
  88. }
  89. bool exists(const std::string &ctag, const std::string &queue_name)
  90. {
  91. QueueConsumer::ptr qcp;
  92. {
  93. std::unique_lock<std::mutex> lock(_mutex);
  94. auto it = _qconsumers.find(queue_name);
  95. if (it == _qconsumers.end())
  96. {
  97. DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());
  98. return false;
  99. }
  100. qcp = it->second;
  101. }
  102. return qcp->exists(ctag);
  103. }
  104. void clear()
  105. {
  106. std::unique_lock<std::mutex> lock(_mutex);
  107. _qconsumers.clear();
  108. }
  109. private:
  110. std::mutex _mutex;
  111. std::unordered_map<std::string, QueueConsumer::ptr> _qconsumers;
  112. };
  113. }

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/988732
推荐阅读
相关标签
  

闽ICP备14008679号