当前位置:   article > 正文

mysql连接池实现

mysql连接池实现

一、实现目的

1.创建一个连接Mysql的线程池,作用是维持和管理固定数量与数据库的TCP长连接,实现资源复用,更快的系统响应。

二、MySQL连接驱动使用

驱动的作用是将mysql语句进行封装和解析

1.安装libmysqlclient-dev;

2.加载mysql.h以及动态库或静态库;

3.初始化连接驱动mysql_library_init;

4.使用mysql连接驱动与MySQL进行交互(执行SQL语句);

5.使用连接资源 mysql_library_end;

三、连接池实现

1.连接池分为同步连接池和异步连接池。

同步连接池,当前线程使用数据库时,先向连接池中获取连接,连接池创建多个连接,为每个连接加上锁,左边的线程检测锁,未锁则使用该连接,上锁,阻塞该线程,加锁为了保证左边只有一个线程可以使用中间的接口,只有等待右边将结果返回后才解锁。主要应用于服务端启动,初始化资源。

异步连接池:主线程push用户请求到执行任务队列,连接池依次从队列中取出任务执行,阻塞等待mysql返回数据。主要应用于服务端启动后的业务处理。

2.连接池实现:

(1).一个连接池对应着MySQL中的一个database,对应着ip:port:database,可以访问这个database有权限的任何表;

(2).通过接口区分使用的同步连接池还是异步连接池;同步接口:DirectExecute、Query;异步接口:Execute、AsyncQuery、DelayQueryHolder.

(3).参数支持:简单字符串,类似“select * from table”;

void Execute(char const* sql);

预处理方式:当传入已传入的类型的语句时,可以跳过链接器、词法句法分析生成语法树、过滤器,直接到优化器的选择执行计划位置。作用:减少执行成本外,减少网络传输的数据,避免sql注入(检测数据类型,如下图当传入的是字符串时直接出错)

  1. //! Enqueues a one-way SQL operation in prepared statement format that will be executed asynchronously.
  2. //! Statement must be prepared with CONNECTION_ASYNC flag.
  3. void Execute(PreparedStatement<T>* stmt);

3.异步连接池结构:DatabaseWorkerPool、AsyncCallbackProcessor,

DatabaseWorkerPool.h

  1. /*
  2. * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
  3. *
  4. * This program is free software; you can redistribute it and/or modify it
  5. * under the terms of the GNU General Public License as published by the
  6. * Free Software Foundation; either version 2 of the License, or (at your
  7. * option) any later version.
  8. *
  9. * This program is distributed in the hope that it will be useful, but WITHOUT
  10. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11. * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  12. * more details.
  13. *
  14. * You should have received a copy of the GNU General Public License along
  15. * with this program. If not, see <http://www.gnu.org/licenses/>.
  16. */
  17. #ifndef _DATABASEWORKERPOOL_H
  18. #define _DATABASEWORKERPOOL_H
  19. #include "Define.h"
  20. #include "DatabaseEnvFwd.h"
  21. #include "StringFormat.h"
  22. #include <array>
  23. #include <string>
  24. #include <vector>
  25. template <typename T>
  26. class ProducerConsumerQueue;
  27. class SQLOperation;
  28. struct MySQLConnectionInfo;
  29. template <class T>
  30. class DatabaseWorkerPool
  31. {
  32. private:
  33. enum InternalIndex
  34. {
  35. IDX_ASYNC,
  36. IDX_SYNCH,
  37. IDX_SIZE
  38. };
  39. public:
  40. /* Activity state */
  41. DatabaseWorkerPool();
  42. ~DatabaseWorkerPool();
  43. void SetConnectionInfo(std::string const& infoString, uint8 const asyncThreads, uint8 const synchThreads);
  44. uint32 Open();
  45. void Close();
  46. //! Prepares all prepared statements
  47. bool PrepareStatements();
  48. inline MySQLConnectionInfo const* GetConnectionInfo() const
  49. {
  50. return _connectionInfo.get();
  51. }
  52. /**
  53. Delayed one-way statement methods.
  54. */
  55. //! Enqueues a one-way SQL operation in string format that will be executed asynchronously.
  56. //! This method should only be used for queries that are only executed once, e.g during startup.
  57. void Execute(char const* sql);
  58. //! Enqueues a one-way SQL operation in string format -with variable args- that will be executed asynchronously.
  59. //! This method should only be used for queries that are only executed once, e.g during startup.
  60. template<typename Format, typename... Args>
  61. void PExecute(Format&& sql, Args&&... args)
  62. {
  63. if (Trinity::IsFormatEmptyOrNull(sql))
  64. return;
  65. Execute(Trinity::StringFormat(std::forward<Format>(sql), std::forward<Args>(args)...).c_str());
  66. }
  67. //! Enqueues a one-way SQL operation in prepared statement format that will be executed asynchronously.
  68. //! Statement must be prepared with CONNECTION_ASYNC flag.
  69. void Execute(PreparedStatement<T>* stmt);
  70. /**
  71. Direct synchronous one-way statement methods.
  72. */
  73. //! Directly executes a one-way SQL operation in string format, that will block the calling thread until finished.
  74. //! This method should only be used for queries that are only executed once, e.g during startup.
  75. void DirectExecute(char const* sql);
  76. //! Directly executes a one-way SQL operation in string format -with variable args-, that will block the calling thread until finished.
  77. //! This method should only be used for queries that are only executed once, e.g during startup.
  78. template<typename Format, typename... Args>
  79. void DirectPExecute(Format&& sql, Args&&... args)
  80. {
  81. if (Trinity::IsFormatEmptyOrNull(sql))
  82. return;
  83. DirectExecute(Trinity::StringFormat(std::forward<Format>(sql), std::forward<Args>(args)...).c_str());
  84. }
  85. //! Directly executes a one-way SQL operation in prepared statement format, that will block the calling thread until finished.
  86. //! Statement must be prepared with the CONNECTION_SYNCH flag.
  87. void DirectExecute(PreparedStatement<T>* stmt);
  88. /**
  89. Synchronous query (with resultset) methods.
  90. */
  91. //! Directly executes an SQL query in string format that will block the calling thread until finished.
  92. //! Returns reference counted auto pointer, no need for manual memory management in upper level code.
  93. QueryResult Query(char const* sql, T* connection = nullptr);
  94. //! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished.
  95. //! Returns reference counted auto pointer, no need for manual memory management in upper level code.
  96. template<typename Format, typename... Args>
  97. QueryResult PQuery(Format&& sql, T* conn, Args&&... args)
  98. {
  99. if (Trinity::IsFormatEmptyOrNull(sql))
  100. return QueryResult(nullptr);
  101. return Query(Trinity::StringFormat(std::forward<Format>(sql), std::forward<Args>(args)...).c_str(), conn);
  102. }
  103. //! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished.
  104. //! Returns reference counted auto pointer, no need for manual memory management in upper level code.
  105. template<typename Format, typename... Args>
  106. QueryResult PQuery(Format&& sql, Args&&... args)
  107. {
  108. if (Trinity::IsFormatEmptyOrNull(sql))
  109. return QueryResult(nullptr);
  110. return Query(Trinity::StringFormat(std::forward<Format>(sql), std::forward<Args>(args)...).c_str());
  111. }
  112. //! Directly executes an SQL query in prepared format that will block the calling thread until finished.
  113. //! Returns reference counted auto pointer, no need for manual memory management in upper level code.
  114. //! Statement must be prepared with CONNECTION_SYNCH flag.
  115. PreparedQueryResult Query(PreparedStatement<T>* stmt);
  116. /**
  117. Asynchronous query (with resultset) methods.
  118. */
  119. //! Enqueues a query in string format that will set the value of the QueryResultFuture return object as soon as the query is executed.
  120. //! The return value is then processed in ProcessQueryCallback methods.
  121. QueryCallback AsyncQuery(char const* sql);
  122. //! Enqueues a query in prepared format that will set the value of the PreparedQueryResultFuture return object as soon as the query is executed.
  123. //! The return value is then processed in ProcessQueryCallback methods.
  124. //! Statement must be prepared with CONNECTION_ASYNC flag.
  125. QueryCallback AsyncQuery(PreparedStatement<T>* stmt);
  126. //! Enqueues a vector of SQL operations (can be both adhoc and prepared) that will set the value of the QueryResultHolderFuture
  127. //! return object as soon as the query is executed.
  128. //! The return value is then processed in ProcessQueryCallback methods.
  129. //! Any prepared statements added to this holder need to be prepared with the CONNECTION_ASYNC flag.
  130. SQLQueryHolderCallback DelayQueryHolder(std::shared_ptr<SQLQueryHolder<T>> holder);
  131. /**
  132. Transaction context methods.
  133. */
  134. //! Begins an automanaged transaction pointer that will automatically rollback if not commited. (Autocommit=0)
  135. SQLTransaction<T> BeginTransaction();
  136. //! Enqueues a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
  137. //! were appended to the transaction will be respected during execution.
  138. void CommitTransaction(SQLTransaction<T> transaction);
  139. //! Enqueues a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
  140. //! were appended to the transaction will be respected during execution.
  141. TransactionCallback AsyncCommitTransaction(SQLTransaction<T> transaction);
  142. //! Directly executes a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
  143. //! were appended to the transaction will be respected during execution.
  144. void DirectCommitTransaction(SQLTransaction<T>& transaction);
  145. //! Method used to execute ad-hoc statements in a diverse context.
  146. //! Will be wrapped in a transaction if valid object is present, otherwise executed standalone.
  147. void ExecuteOrAppend(SQLTransaction<T>& trans, char const* sql);
  148. //! Method used to execute prepared statements in a diverse context.
  149. //! Will be wrapped in a transaction if valid object is present, otherwise executed standalone.
  150. void ExecuteOrAppend(SQLTransaction<T>& trans, PreparedStatement<T>* stmt);
  151. /**
  152. Other
  153. */
  154. typedef typename T::Statements PreparedStatementIndex;
  155. //! Automanaged (internally) pointer to a prepared statement object for usage in upper level code.
  156. //! Pointer is deleted in this->DirectExecute(PreparedStatement*), this->Query(PreparedStatement*) or PreparedStatementTask::~PreparedStatementTask.
  157. //! This object is not tied to the prepared statement on the MySQL context yet until execution.
  158. PreparedStatement<T>* GetPreparedStatement(PreparedStatementIndex index);
  159. //! Apply escape string'ing for current collation. (utf8)
  160. void EscapeString(std::string& str);
  161. //! Keeps all our MySQL connections alive, prevent the server from disconnecting us.
  162. void KeepAlive();
  163. void WarnAboutSyncQueries([[maybe_unused]] bool warn)
  164. {
  165. #ifdef TRINITY_DEBUG
  166. _warnSyncQueries = warn;
  167. #endif
  168. }
  169. size_t QueueSize() const;
  170. private:
  171. uint32 OpenConnections(InternalIndex type, uint8 numConnections);
  172. unsigned long EscapeString(char* to, char const* from, unsigned long length);
  173. void Enqueue(SQLOperation* op);
  174. //! Gets a free connection in the synchronous connection pool.
  175. //! Caller MUST call t->Unlock() after touching the MySQL context to prevent deadlocks.
  176. T* GetFreeConnection();
  177. char const* GetDatabaseName() const;
  178. //! Queue shared by async worker threads.
  179. std::unique_ptr<ProducerConsumerQueue<SQLOperation*>> _queue;
  180. std::array<std::vector<std::unique_ptr<T>>, IDX_SIZE> _connections;
  181. std::unique_ptr<MySQLConnectionInfo> _connectionInfo;
  182. std::vector<uint8> _preparedStatementSize;
  183. uint8 _async_threads, _synch_threads;
  184. #ifdef TRINITY_DEBUG
  185. static inline thread_local bool _warnSyncQueries = false;
  186. #endif
  187. };
  188. #endif

AsyncCallbackProcessor.h

  1. /*
  2. * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
  3. *
  4. * This program is free software; you can redistribute it and/or modify it
  5. * under the terms of the GNU General Public License as published by the
  6. * Free Software Foundation; either version 2 of the License, or (at your
  7. * option) any later version.
  8. *
  9. * This program is distributed in the hope that it will be useful, but WITHOUT
  10. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11. * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  12. * more details.
  13. *
  14. * You should have received a copy of the GNU General Public License along
  15. * with this program. If not, see <http://www.gnu.org/licenses/>.
  16. */
  17. #ifndef AsyncCallbackProcessor_h__
  18. #define AsyncCallbackProcessor_h__
  19. #include "Define.h"
  20. #include <algorithm>
  21. #include <vector>
  22. //template <class T>
  23. //concept AsyncCallback = requires(T t) { { t.InvokeIfReady() } -> std::convertible_to<bool> };
  24. template<typename T> // requires AsyncCallback<T>
  25. class AsyncCallbackProcessor
  26. {
  27. public:
  28. AsyncCallbackProcessor() = default;
  29. ~AsyncCallbackProcessor() = default;
  30. T& AddCallback(T&& query)
  31. {
  32. _callbacks.emplace_back(std::move(query));
  33. return _callbacks.back();
  34. }
  35. void ProcessReadyCallbacks()
  36. {
  37. if (_callbacks.empty())
  38. return;
  39. std::vector<T> updateCallbacks{ std::move(_callbacks) };
  40. updateCallbacks.erase(std::remove_if(updateCallbacks.begin(), updateCallbacks.end(), [](T& callback)
  41. {
  42. return callback.InvokeIfReady();
  43. }), updateCallbacks.end());
  44. _callbacks.insert(_callbacks.end(), std::make_move_iterator(updateCallbacks.begin()), std::make_move_iterator(updateCallbacks.end()));
  45. }
  46. private:
  47. AsyncCallbackProcessor(AsyncCallbackProcessor const&) = delete;
  48. AsyncCallbackProcessor& operator=(AsyncCallbackProcessor const&) = delete;
  49. std::vector<T> _callbacks;
  50. };
  51. #endif // AsyncCallbackProcessor_h__

4.使用c++中的promise和future获取连接池中线程返回的结果,核心线程产生一个任务时,该任务会携带一个promise,核心线程得到一个future,任务经连接池线程与MySQL交互,最后核心线程的future使用.get()或取promise的结果。

如下图所示,产生一个带有sql的task,task获取future,task进入任务队列,future返回到核心线程。

下图为连接池的连接接收到MySQL返回的结果,当结果不满足期望时set_value将结果置为nullptr,返回false。

下图为核心线程的回调函数调用InvokeIfReady接收返回的结果:_string.valid()检测promise是否还有效,_string.wait_for(std::chrono::seconds(0)) == std::future_status::ready检测promise是否实现。

四、异步接口封装

1.chain模式:责任链模式,发送一条语句,等待数据库返回的结果,根据结果判断是否发送第二条语句。

2.pipeline模式:发送多条mysql语句,等待数据库返回的所有结果。

五、代码实现

1.创建一个异步连接池

  1. #include "DatabaseEnvFwd.h"
  2. #include "Log.h"
  3. #include "DatabaseEnv.h"
  4. #include "DatabaseLoader.h"
  5. #include "Implementation/SakilaDatabase.h"
  6. #include "MySQLThreading.h"
  7. #include "AsyncCallbackProcessor.h"
  8. #include "QueryHolder.h"
  9. #include <chrono>
  10. #include <memory>
  11. #include <thread>
  12. int main()
  13. {
  14. MySQL::Library_Init();
  15. DatabaseLoader loader;
  16. loader.AddDatabase(SakilaDatabase, "192.168.232.1;3306;root;123456;sakila",
  17. 8, 2);//连接池中线程数量8
  18. if (!loader.Load()) {
  19. TC_LOG_ERROR("", "SakilaDatabase connect error");
  20. return 1;
  21. }
  22. TC_LOG_INFO("", "SakilaDatabase connect success");

 2.责任链模式

  1. {//责任链模式
  2. auto *stmt1 = SakilaDatabase.GetPreparedStatement(SAKILA_SEL_ACTOR_INFO_ASYNC);
  3. stmt1->setUInt8(0, 1);
  4. AsyncCallbackProcessor<QueryCallback> processor;
  5. processor.AddCallback(SakilaDatabase.AsyncQuery(stmt1)
  6. .WithChainingPreparedCallback([](QueryCallback &callback, PreparedQueryResult result){
  7. TC_LOG_INFO("", "actor_id=%u,first_name=%s,last_name=%s,last_update=%s",
  8. (*result)[0].GetUInt8(), (*result)[1].GetString(), (*result)[2].GetString(),(*result)[3].GetString());
  9. auto *stmt2 = SakilaDatabase.GetPreparedStatement(SAKILA_SEL_ACTOR_INFO_ASYNC);
  10. stmt2->setUInt8(0, 2);
  11. callback.SetNextQuery(SakilaDatabase.AsyncQuery(stmt2));
  12. })
  13. .WithChainingPreparedCallback([](QueryCallback &callback, PreparedQueryResult result){
  14. TC_LOG_INFO("", "actor_id=%u,first_name=%s,last_name=%s,last_update=%s",
  15. (*result)[0].GetUInt8(), (*result)[1].GetString(), (*result)[2].GetString(),(*result)[3].GetString());
  16. // auto *stmt3 = SakilaDatabase.GetPreparedStatement(SAKILA_SEL_ACTOR_INFO_ASYNC);
  17. // stmt3->setUInt8(0, 3);
  18. // callback.SetNextQuery(SakilaDatabase.AsyncQuery(stmt3));
  19. })
  20. .WithChainingPreparedCallback([](QueryCallback &callback, PreparedQueryResult result){
  21. TC_LOG_INFO("", "actor_id=%u,first_name=%s,last_name=%s,last_update=%s",
  22. (*result)[0].GetUInt8(), (*result)[1].GetString(), (*result)[2].GetString(),(*result)[3].GetString());
  23. }));
  24. while (true) {
  25. processor.ProcessReadyCallbacks();
  26. std::this_thread::sleep_for(std::chrono::milliseconds(50));
  27. }
  28. }

3. pipeline模式:

  1. {//pipeline
  2. // 异步获取多个sql 的结果 delayholder
  3. class ActorInfoHolder : public SQLQueryHolder<SakilaDatabaseConnection>
  4. {
  5. public:
  6. enum ACTOR_INFO_IDX : uint8 {
  7. AI_1,
  8. AI_3,
  9. AI_5,
  10. AI_MAX
  11. };
  12. public:
  13. ActorInfoHolder() {
  14. SetSize(AI_MAX);
  15. auto *stmt1 = SakilaDatabase.GetPreparedStatement(SAKILA_SEL_ACTOR_INFO_ASYNC);
  16. stmt1->setUInt8(0, 1);
  17. SetPreparedQuery(AI_1, stmt1);
  18. auto *stmt3 = SakilaDatabase.GetPreparedStatement(SAKILA_SEL_ACTOR_INFO_ASYNC);
  19. stmt3->setUInt8(0, 3);
  20. SetPreparedQuery(AI_3, stmt3);
  21. auto *stmt5 = SakilaDatabase.GetPreparedStatement(SAKILA_SEL_ACTOR_INFO_ASYNC);
  22. stmt5->setUInt8(0, 5);
  23. SetPreparedQuery(AI_5, stmt5);
  24. }
  25. };
  26. AsyncCallbackProcessor<SQLQueryHolderCallback> processor;
  27. auto holder = std::make_shared<ActorInfoHolder>();
  28. processor.AddCallback(SakilaDatabase.DelayQueryHolder(holder)).AfterComplete([](const SQLQueryHolderBase &hdr){
  29. auto holder = static_cast<const ActorInfoHolder&>(hdr);
  30. auto result1 = holder.GetPreparedResult(ActorInfoHolder::AI_1);
  31. auto result3 = holder.GetPreparedResult(ActorInfoHolder::AI_3);
  32. auto result5 = holder.GetPreparedResult(ActorInfoHolder::AI_5);
  33. TC_LOG_INFO("", "actor_id=%u,first_name=%s,last_name=%s,last_update=%s",
  34. (*result1)[0].GetUInt8(), (*result1)[1].GetString(), (*result1)[2].GetString(),(*result1)[3].GetString());
  35. TC_LOG_INFO("", "actor_id=%u,first_name=%s,last_name=%s,last_update=%s",
  36. (*result3)[0].GetUInt8(), (*result3)[1].GetString(), (*result3)[2].GetString(),(*result3)[3].GetString());
  37. TC_LOG_INFO("", "actor_id=%u,first_name=%s,last_name=%s,last_update=%s",
  38. (*result5)[0].GetUInt8(), (*result5)[1].GetString(), (*result5)[2].GetString(),(*result5)[3].GetString());
  39. });
  40. while (true) {
  41. processor.ProcessReadyCallbacks();
  42. std::this_thread::sleep_for(std::chrono::milliseconds(50));
  43. }
  44. }

分享一个学习链接,有需要的同学可以看一下:

https://xxetb.xetslk.com/s/3yNycZ

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号