赞
踩
apollo项目里面有个线程池,源码链接:https://github.com/ApolloAuto/apollo/blob/master/cyber/base/thread_pool.h
仅用一个头文件去实现,此处贴出来源码吧。
- #ifndef CYBER_BASE_THREAD_POOL_H_
- #define CYBER_BASE_THREAD_POOL_H_
-
- #include <atomic>
- #include <functional>
- #include <future>
- #include <memory>
- #include <queue>
- #include <stdexcept>
- #include <thread>
- #include <utility>
- #include <vector>
-
- #include "cyber/base/bounded_queue.h"
-
- /*
- 五大池:内存池、连接池、线程池、进程池、协程池
- 线程池存在的意义:
- 传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。
- 任务执行完毕后,线程退出,这就是是“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间
- 已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停
- 的创建线程,销毁线程的状态。
- 此线程池有点:使用比较灵活
- 此线程池缺陷:接口较少,没有一些访问当先线程池转态、强制停止所有任务等接口
- 尚不明白的地方:task_queue_的类型是std::function<void()>,这个void是函数的返回值吧,那这样
- 是不是次线程池可以执行的函数体必须是void返回值类型,经过测试,并不是这样。
- */
-
- namespace apollo {
- namespace cyber {
- namespace base {
-
- class ThreadPool {
- public:
- explicit ThreadPool(std::size_t thread_num, std::size_t max_task_num = 1000);
-
- template <typename F, typename... Args>
- auto Enqueue(F&& f, Args&&... args)
- -> std::future<typename std::result_of<F(Args...)>::type>;
-
- ~ThreadPool();
-
- private:
- std::vector<std::thread> workers_;
- BoundedQueue<std::function<void()>> task_queue_;
- std::atomic_bool stop_;
- };
-
- inline ThreadPool::ThreadPool(std::size_t threads, std::size_t max_task_num)
- : stop_(false) {
- if (!task_queue_.Init(max_task_num, new BlockWaitStrategy())) {
- throw std::runtime_error("Task queue init failed.");
- }
- workers_.reserve(threads);
- for (size_t i = 0; i < threads; ++i) {
- workers_.emplace_back([this] {
- while (!stop_) {
- std::function<void()> task;
- if (task_queue_.WaitDequeue(&task)) {
- task();
- }
- }
- });
- }
- }
-
- // before using the return value, you should check value.valid()
- template <typename F, typename... Args>
- auto ThreadPool::Enqueue(F&& f, Args&&... args)
- -> std::future<typename std::result_of<F(Args...)>::type> {
- using return_type = typename std::result_of<F(Args...)>::type;
-
- auto task = std::make_shared<std::packaged_task<return_type()>>(
- std::bind(std::forward<F>(f), std::forward<Args>(args)...));
-
- std::future<return_type> res = task->get_future();
-
- // don't allow enqueueing after stopping the pool
- if (stop_) {
- return std::future<return_type>();
- }
- task_queue_.Enqueue([task]() { (*task)(); });
- return res;
- };
-
- // the destructor joins all threads
- inline ThreadPool::~ThreadPool() {
- if (stop_.exchange(true)) {
- return;
- }
- task_queue_.BreakAllWait();
- for (std::thread& worker : workers_) {
- worker.join();
- }
- }
-
- } // namespace base
- } // namespace cyber
- } // namespace apollo
-
- #endif // CYBER_BASE_THREAD_POOL_H_
- #include <atomic>
- #include <iostream>
-
- #include "cyber/base/thread_pool.h"
-
- using namespace apollo::cyber::base;
-
- std::atomic<int> g_int;
-
- int main(int argc, char* argv[]) {
- ThreadPool* tp = new ThreadPool(10, 10000);
- for (size_t i = 0; i < 10000; i++) {
- tp->Enqueue<>([&](int a) -> bool {
- g_int.fetch_add(1);
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- return true;
- }, 1000);
- }
- // 等待所有线程运行结束
- std::this_thread::sleep_for(std::chrono::seconds(10));
- std::cout << g_int << std::endl;
-
- return 0;
- }
另外。在激光雷达里面看到了一个线程池的代码,默认项目里面没有,是激光雷达厂商提供的吧。
- //
- // Created by ljy on 2023/3/16.
- //
-
- #ifndef INC_20230307_UTILITY_H
- #define INC_20230307_UTILITY_H
- #pragma once
-
- #include <atomic>
- #include <condition_variable>
- #include <functional>
- #include <future>
- #include <memory>
- #include <mutex>
- #include <queue>
- #include <thread>
- #include <utility>
- #include <vector>
- #define MAX_THREAD_NUM 4
-
- class ThreadPool {
- private:
- inline ThreadPool();
-
- public:
- typedef std::shared_ptr<ThreadPool> Ptr;
- ThreadPool(ThreadPool &) = delete;
- ThreadPool &operator=(const ThreadPool &) = delete;
- ~ThreadPool();
-
- public:
- static Ptr getInstance();
- int idlCount();
- template <class F, class... Args>
- inline auto commit(F &&f, Args &&... args)
- -> std::future<decltype(f(args...))> {
- if (stoped.load())
- throw std::runtime_error("Commit on ThreadPool is stopped.");
- using RetType = decltype(f(args...));
- auto task = std::make_shared<std::packaged_task<RetType()>>(
- std::bind(std::forward<F>(f), std::forward<Args>(args)...)); // wtf !
- std::future<RetType> future = task->get_future();
- {
- std::lock_guard<std::mutex> lock{m_lock};
- tasks.emplace([task]() { (*task)(); });
- }
- cv_task.notify_one();
- return future;
- }
-
- private:
- using Task = std::function<void()>;
- std::vector<std::thread> pool;
- std::queue<Task> tasks;
- std::mutex m_lock;
- std::condition_variable cv_task;
- std::atomic<bool> stoped;
- std::atomic<int> idl_thr_num;
- static Ptr instance_ptr;
- static std::mutex instance_mutex;
- };
-
-
- #endif //INC_20230307_UTILITY_H
- //
- // Created by ljy on 2023/3/16.
- //
-
- #include "utility.h"
-
- ThreadPool::Ptr ThreadPool::instance_ptr = nullptr;
- std::mutex ThreadPool::instance_mutex;
-
- ThreadPool::ThreadPool() : stoped{false} {
- idl_thr_num = MAX_THREAD_NUM;
- for (int i = 0; i < idl_thr_num; ++i) {
- pool.emplace_back([this] {
- while (!this->stoped) {
- std::function<void()> task;
- {
- std::unique_lock<std::mutex> lock{this->m_lock};
- this->cv_task.wait(lock, [this] {
- return this->stoped.load() || !this->tasks.empty();
- });
- if (this->stoped && this->tasks.empty()) return;
- task = std::move(this->tasks.front());
- this->tasks.pop();
- }
- idl_thr_num--;
- task();
- idl_thr_num++;
- }
- });
- }
- }
-
- ThreadPool::Ptr ThreadPool::getInstance() {
- if (instance_ptr == nullptr) {
- std::lock_guard<std::mutex> lk(instance_mutex);
- if (instance_ptr == nullptr) {
- instance_ptr = std::shared_ptr<ThreadPool>(new ThreadPool);
- }
- }
- return instance_ptr;
- }
-
- ThreadPool::~ThreadPool() {
- stoped.store(true);
- cv_task.notify_all();
- for (std::thread &thread: pool) {
- thread.join();
- }
- }
-
- int ThreadPool::idlCount() { return idl_thr_num; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。