赞
踩
1.创建一个连接Mysql的线程池,作用是维持和管理固定数量与数据库的TCP长连接,实现资源复用,更快的系统响应。
驱动的作用是将mysql语句进行封装和解析
1.安装libmysqlclient-dev;
2.加载mysql.h以及动态库或静态库;
3.初始化连接驱动mysql_library_init;
4.使用mysql连接驱动与MySQL进行交互(执行SQL语句);
5.使用连接资源 mysql_library_end;
1.连接池分为同步连接池和异步连接池。
同步连接池,当前线程使用数据库时,先向连接池中获取连接,连接池创建多个连接,为每个连接加上锁,左边的线程检测锁,未锁则使用该连接,上锁,阻塞该线程,加锁为了保证左边只有一个线程可以使用中间的接口,只有等待右边将结果返回后才解锁。主要应用于服务端启动,初始化资源。
异步连接池:主线程push用户请求到执行任务队列,连接池依次从队列中取出任务执行,阻塞等待mysql返回数据。主要应用于服务端启动后的业务处理。
2.连接池实现:
(1).一个连接池对应着MySQL中的一个database,对应着ip:port:database,可以访问这个database有权限的任何表;
(2).通过接口区分使用的同步连接池还是异步连接池;同步接口:DirectExecute、Query;异步接口:Execute、AsyncQuery、DelayQueryHolder.
(3).参数支持:简单字符串,类似“select * from table”;
void Execute(char const* sql);
预处理方式:当传入已传入的类型的语句时,可以跳过链接器、词法句法分析生成语法树、过滤器,直接到优化器的选择执行计划位置。作用:减少执行成本外,减少网络传输的数据,避免sql注入(检测数据类型,如下图当传入的是字符串时直接出错)
- //! Enqueues a one-way SQL operation in prepared statement format that will be executed asynchronously.
- //! Statement must be prepared with CONNECTION_ASYNC flag.
- void Execute(PreparedStatement<T>* stmt);
3.异步连接池结构:DatabaseWorkerPool、AsyncCallbackProcessor,
DatabaseWorkerPool.h
- /*
- * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program. If not, see <http://www.gnu.org/licenses/>.
- */
- #ifndef _DATABASEWORKERPOOL_H
- #define _DATABASEWORKERPOOL_H
-
- #include "Define.h"
- #include "DatabaseEnvFwd.h"
- #include "StringFormat.h"
- #include <array>
- #include <string>
- #include <vector>
-
- template <typename T>
- class ProducerConsumerQueue;
-
- class SQLOperation;
- struct MySQLConnectionInfo;
-
- template <class T>
- class DatabaseWorkerPool
- {
- private:
- enum InternalIndex
- {
- IDX_ASYNC,
- IDX_SYNCH,
- IDX_SIZE
- };
-
- public:
- /* Activity state */
- DatabaseWorkerPool();
-
- ~DatabaseWorkerPool();
-
- void SetConnectionInfo(std::string const& infoString, uint8 const asyncThreads, uint8 const synchThreads);
-
- uint32 Open();
-
- void Close();
-
- //! Prepares all prepared statements
- bool PrepareStatements();
-
- inline MySQLConnectionInfo const* GetConnectionInfo() const
- {
- return _connectionInfo.get();
- }
-
- /**
- Delayed one-way statement methods.
- */
-
- //! Enqueues a one-way SQL operation in string format that will be executed asynchronously.
- //! This method should only be used for queries that are only executed once, e.g during startup.
- void Execute(char const* sql);
-
- //! Enqueues a one-way SQL operation in string format -with variable args- that will be executed asynchronously.
- //! This method should only be used for queries that are only executed once, e.g during startup.
- template<typename Format, typename... Args>
- void PExecute(Format&& sql, Args&&... args)
- {
- if (Trinity::IsFormatEmptyOrNull(sql))
- return;
-
- Execute(Trinity::StringFormat(std::forward<Format>(sql), std::forward<Args>(args)...).c_str());
- }
-
- //! Enqueues a one-way SQL operation in prepared statement format that will be executed asynchronously.
- //! Statement must be prepared with CONNECTION_ASYNC flag.
- void Execute(PreparedStatement<T>* stmt);
-
- /**
- Direct synchronous one-way statement methods.
- */
-
- //! Directly executes a one-way SQL operation in string format, that will block the calling thread until finished.
- //! This method should only be used for queries that are only executed once, e.g during startup.
- void DirectExecute(char const* sql);
-
- //! Directly executes a one-way SQL operation in string format -with variable args-, that will block the calling thread until finished.
- //! This method should only be used for queries that are only executed once, e.g during startup.
- template<typename Format, typename... Args>
- void DirectPExecute(Format&& sql, Args&&... args)
- {
- if (Trinity::IsFormatEmptyOrNull(sql))
- return;
-
- DirectExecute(Trinity::StringFormat(std::forward<Format>(sql), std::forward<Args>(args)...).c_str());
- }
-
- //! Directly executes a one-way SQL operation in prepared statement format, that will block the calling thread until finished.
- //! Statement must be prepared with the CONNECTION_SYNCH flag.
- void DirectExecute(PreparedStatement<T>* stmt);
-
- /**
- Synchronous query (with resultset) methods.
- */
-
- //! Directly executes an SQL query in string format that will block the calling thread until finished.
- //! Returns reference counted auto pointer, no need for manual memory management in upper level code.
- QueryResult Query(char const* sql, T* connection = nullptr);
-
- //! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished.
- //! Returns reference counted auto pointer, no need for manual memory management in upper level code.
- template<typename Format, typename... Args>
- QueryResult PQuery(Format&& sql, T* conn, Args&&... args)
- {
- if (Trinity::IsFormatEmptyOrNull(sql))
- return QueryResult(nullptr);
-
- return Query(Trinity::StringFormat(std::forward<Format>(sql), std::forward<Args>(args)...).c_str(), conn);
- }
-
- //! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished.
- //! Returns reference counted auto pointer, no need for manual memory management in upper level code.
- template<typename Format, typename... Args>
- QueryResult PQuery(Format&& sql, Args&&... args)
- {
- if (Trinity::IsFormatEmptyOrNull(sql))
- return QueryResult(nullptr);
-
- return Query(Trinity::StringFormat(std::forward<Format>(sql), std::forward<Args>(args)...).c_str());
- }
-
- //! Directly executes an SQL query in prepared format that will block the calling thread until finished.
- //! Returns reference counted auto pointer, no need for manual memory management in upper level code.
- //! Statement must be prepared with CONNECTION_SYNCH flag.
- PreparedQueryResult Query(PreparedStatement<T>* stmt);
-
- /**
- Asynchronous query (with resultset) methods.
- */
-
- //! Enqueues a query in string format that will set the value of the QueryResultFuture return object as soon as the query is executed.
- //! The return value is then processed in ProcessQueryCallback methods.
- QueryCallback AsyncQuery(char const* sql);
-
- //! Enqueues a query in prepared format that will set the value of the PreparedQueryResultFuture return object as soon as the query is executed.
- //! The return value is then processed in ProcessQueryCallback methods.
- //! Statement must be prepared with CONNECTION_ASYNC flag.
- QueryCallback AsyncQuery(PreparedStatement<T>* stmt);
-
- //! Enqueues a vector of SQL operations (can be both adhoc and prepared) that will set the value of the QueryResultHolderFuture
- //! return object as soon as the query is executed.
- //! The return value is then processed in ProcessQueryCallback methods.
- //! Any prepared statements added to this holder need to be prepared with the CONNECTION_ASYNC flag.
- SQLQueryHolderCallback DelayQueryHolder(std::shared_ptr<SQLQueryHolder<T>> holder);
- /**
- Transaction context methods.
- */
-
- //! Begins an automanaged transaction pointer that will automatically rollback if not commited. (Autocommit=0)
- SQLTransaction<T> BeginTransaction();
-
- //! Enqueues a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
- //! were appended to the transaction will be respected during execution.
- void CommitTransaction(SQLTransaction<T> transaction);
-
- //! Enqueues a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
- //! were appended to the transaction will be respected during execution.
- TransactionCallback AsyncCommitTransaction(SQLTransaction<T> transaction);
-
- //! Directly executes a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
- //! were appended to the transaction will be respected during execution.
- void DirectCommitTransaction(SQLTransaction<T>& transaction);
-
- //! Method used to execute ad-hoc statements in a diverse context.
- //! Will be wrapped in a transaction if valid object is present, otherwise executed standalone.
- void ExecuteOrAppend(SQLTransaction<T>& trans, char const* sql);
-
- //! Method used to execute prepared statements in a diverse context.
- //! Will be wrapped in a transaction if valid object is present, otherwise executed standalone.
- void ExecuteOrAppend(SQLTransaction<T>& trans, PreparedStatement<T>* stmt);
-
- /**
- Other
- */
-
- typedef typename T::Statements PreparedStatementIndex;
-
- //! Automanaged (internally) pointer to a prepared statement object for usage in upper level code.
- //! Pointer is deleted in this->DirectExecute(PreparedStatement*), this->Query(PreparedStatement*) or PreparedStatementTask::~PreparedStatementTask.
- //! This object is not tied to the prepared statement on the MySQL context yet until execution.
- PreparedStatement<T>* GetPreparedStatement(PreparedStatementIndex index);
-
- //! Apply escape string'ing for current collation. (utf8)
- void EscapeString(std::string& str);
- //! Keeps all our MySQL connections alive, prevent the server from disconnecting us.
- void KeepAlive();
- void WarnAboutSyncQueries([[maybe_unused]] bool warn)
- {
- #ifdef TRINITY_DEBUG
- _warnSyncQueries = warn;
- #endif
- }
- size_t QueueSize() const;
- private:
- uint32 OpenConnections(InternalIndex type, uint8 numConnections);
- unsigned long EscapeString(char* to, char const* from, unsigned long length);
- void Enqueue(SQLOperation* op);
- //! Gets a free connection in the synchronous connection pool.
- //! Caller MUST call t->Unlock() after touching the MySQL context to prevent deadlocks.
- T* GetFreeConnection();
- char const* GetDatabaseName() const;
- //! Queue shared by async worker threads.
- std::unique_ptr<ProducerConsumerQueue<SQLOperation*>> _queue;
- std::array<std::vector<std::unique_ptr<T>>, IDX_SIZE> _connections;
- std::unique_ptr<MySQLConnectionInfo> _connectionInfo;
- std::vector<uint8> _preparedStatementSize;
- uint8 _async_threads, _synch_threads;
- #ifdef TRINITY_DEBUG
- static inline thread_local bool _warnSyncQueries = false;
- #endif
- };
- #endif

AsyncCallbackProcessor.h
- /*
- * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program. If not, see <http://www.gnu.org/licenses/>.
- */
- #ifndef AsyncCallbackProcessor_h__
- #define AsyncCallbackProcessor_h__
-
- #include "Define.h"
- #include <algorithm>
- #include <vector>
-
- //template <class T>
- //concept AsyncCallback = requires(T t) { { t.InvokeIfReady() } -> std::convertible_to<bool> };
-
- template<typename T> // requires AsyncCallback<T>
- class AsyncCallbackProcessor
- {
- public:
- AsyncCallbackProcessor() = default;
- ~AsyncCallbackProcessor() = default;
-
- T& AddCallback(T&& query)
- {
- _callbacks.emplace_back(std::move(query));
- return _callbacks.back();
- }
-
- void ProcessReadyCallbacks()
- {
- if (_callbacks.empty())
- return;
-
- std::vector<T> updateCallbacks{ std::move(_callbacks) };
-
- updateCallbacks.erase(std::remove_if(updateCallbacks.begin(), updateCallbacks.end(), [](T& callback)
- {
- return callback.InvokeIfReady();
- }), updateCallbacks.end());
-
- _callbacks.insert(_callbacks.end(), std::make_move_iterator(updateCallbacks.begin()), std::make_move_iterator(updateCallbacks.end()));
- }
-
- private:
- AsyncCallbackProcessor(AsyncCallbackProcessor const&) = delete;
- AsyncCallbackProcessor& operator=(AsyncCallbackProcessor const&) = delete;
-
- std::vector<T> _callbacks;
- };
-
- #endif // AsyncCallbackProcessor_h__

4.使用c++中的promise和future获取连接池中线程返回的结果,核心线程产生一个任务时,该任务会携带一个promise,核心线程得到一个future,任务经连接池线程与MySQL交互,最后核心线程的future使用.get()或取promise的结果。
如下图所示,产生一个带有sql的task,task获取future,task进入任务队列,future返回到核心线程。
下图为连接池的连接接收到MySQL返回的结果,当结果不满足期望时set_value将结果置为nullptr,返回false。
下图为核心线程的回调函数调用InvokeIfReady接收返回的结果:_string.valid()检测promise是否还有效,_string.wait_for(std::chrono::seconds(0)) == std::future_status::ready检测promise是否实现。
1.chain模式:责任链模式,发送一条语句,等待数据库返回的结果,根据结果判断是否发送第二条语句。
2.pipeline模式:发送多条mysql语句,等待数据库返回的所有结果。
1.创建一个异步连接池
- #include "DatabaseEnvFwd.h"
- #include "Log.h"
-
- #include "DatabaseEnv.h"
- #include "DatabaseLoader.h"
- #include "Implementation/SakilaDatabase.h"
- #include "MySQLThreading.h"
- #include "AsyncCallbackProcessor.h"
- #include "QueryHolder.h"
- #include <chrono>
- #include <memory>
- #include <thread>
-
- int main()
- {
- MySQL::Library_Init();
-
- DatabaseLoader loader;
- loader.AddDatabase(SakilaDatabase, "192.168.232.1;3306;root;123456;sakila",
- 8, 2);//连接池中线程数量8
-
- if (!loader.Load()) {
- TC_LOG_ERROR("", "SakilaDatabase connect error");
- return 1;
- }
- TC_LOG_INFO("", "SakilaDatabase connect success");

2.责任链模式
- {//责任链模式
- auto *stmt1 = SakilaDatabase.GetPreparedStatement(SAKILA_SEL_ACTOR_INFO_ASYNC);
- stmt1->setUInt8(0, 1);
-
- AsyncCallbackProcessor<QueryCallback> processor;
- processor.AddCallback(SakilaDatabase.AsyncQuery(stmt1)
- .WithChainingPreparedCallback([](QueryCallback &callback, PreparedQueryResult result){
- TC_LOG_INFO("", "actor_id=%u,first_name=%s,last_name=%s,last_update=%s",
- (*result)[0].GetUInt8(), (*result)[1].GetString(), (*result)[2].GetString(),(*result)[3].GetString());
- auto *stmt2 = SakilaDatabase.GetPreparedStatement(SAKILA_SEL_ACTOR_INFO_ASYNC);
- stmt2->setUInt8(0, 2);
- callback.SetNextQuery(SakilaDatabase.AsyncQuery(stmt2));
- })
- .WithChainingPreparedCallback([](QueryCallback &callback, PreparedQueryResult result){
- TC_LOG_INFO("", "actor_id=%u,first_name=%s,last_name=%s,last_update=%s",
- (*result)[0].GetUInt8(), (*result)[1].GetString(), (*result)[2].GetString(),(*result)[3].GetString());
- // auto *stmt3 = SakilaDatabase.GetPreparedStatement(SAKILA_SEL_ACTOR_INFO_ASYNC);
- // stmt3->setUInt8(0, 3);
- // callback.SetNextQuery(SakilaDatabase.AsyncQuery(stmt3));
- })
- .WithChainingPreparedCallback([](QueryCallback &callback, PreparedQueryResult result){
- TC_LOG_INFO("", "actor_id=%u,first_name=%s,last_name=%s,last_update=%s",
- (*result)[0].GetUInt8(), (*result)[1].GetString(), (*result)[2].GetString(),(*result)[3].GetString());
- }));
-
- while (true) {
- processor.ProcessReadyCallbacks();
- std::this_thread::sleep_for(std::chrono::milliseconds(50));
- }
- }

3. pipeline模式:
- {//pipeline
- // 异步获取多个sql 的结果 delayholder
- class ActorInfoHolder : public SQLQueryHolder<SakilaDatabaseConnection>
- {
- public:
- enum ACTOR_INFO_IDX : uint8 {
- AI_1,
- AI_3,
- AI_5,
- AI_MAX
- };
- public:
- ActorInfoHolder() {
- SetSize(AI_MAX);
- auto *stmt1 = SakilaDatabase.GetPreparedStatement(SAKILA_SEL_ACTOR_INFO_ASYNC);
- stmt1->setUInt8(0, 1);
- SetPreparedQuery(AI_1, stmt1);
-
- auto *stmt3 = SakilaDatabase.GetPreparedStatement(SAKILA_SEL_ACTOR_INFO_ASYNC);
- stmt3->setUInt8(0, 3);
- SetPreparedQuery(AI_3, stmt3);
-
- auto *stmt5 = SakilaDatabase.GetPreparedStatement(SAKILA_SEL_ACTOR_INFO_ASYNC);
- stmt5->setUInt8(0, 5);
- SetPreparedQuery(AI_5, stmt5);
- }
- };
-
- AsyncCallbackProcessor<SQLQueryHolderCallback> processor;
- auto holder = std::make_shared<ActorInfoHolder>();
- processor.AddCallback(SakilaDatabase.DelayQueryHolder(holder)).AfterComplete([](const SQLQueryHolderBase &hdr){
- auto holder = static_cast<const ActorInfoHolder&>(hdr);
- auto result1 = holder.GetPreparedResult(ActorInfoHolder::AI_1);
- auto result3 = holder.GetPreparedResult(ActorInfoHolder::AI_3);
- auto result5 = holder.GetPreparedResult(ActorInfoHolder::AI_5);
-
- TC_LOG_INFO("", "actor_id=%u,first_name=%s,last_name=%s,last_update=%s",
- (*result1)[0].GetUInt8(), (*result1)[1].GetString(), (*result1)[2].GetString(),(*result1)[3].GetString());
-
- TC_LOG_INFO("", "actor_id=%u,first_name=%s,last_name=%s,last_update=%s",
- (*result3)[0].GetUInt8(), (*result3)[1].GetString(), (*result3)[2].GetString(),(*result3)[3].GetString());
-
- TC_LOG_INFO("", "actor_id=%u,first_name=%s,last_name=%s,last_update=%s",
- (*result5)[0].GetUInt8(), (*result5)[1].GetString(), (*result5)[2].GetString(),(*result5)[3].GetString());
-
- });
-
- while (true) {
- processor.ProcessReadyCallbacks();
- std::this_thread::sleep_for(std::chrono::milliseconds(50));
- }
- }

分享一个学习链接,有需要的同学可以看一下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。