当前位置:   article > 正文

百度自动驾驶apollo源码解读12:线程池

百度自动驾驶apollo源码解读12:线程池

apollo项目里面有个线程池,源码链接:https://github.com/ApolloAuto/apollo/blob/master/cyber/base/thread_pool.h

仅用一个头文件去实现,此处贴出来源码吧。

1. 源码实现

  1. #ifndef CYBER_BASE_THREAD_POOL_H_
  2. #define CYBER_BASE_THREAD_POOL_H_
  3. #include <atomic>
  4. #include <functional>
  5. #include <future>
  6. #include <memory>
  7. #include <queue>
  8. #include <stdexcept>
  9. #include <thread>
  10. #include <utility>
  11. #include <vector>
  12. #include "cyber/base/bounded_queue.h"
  13. /*
  14. 五大池:内存池、连接池、线程池、进程池、协程池
  15. 线程池存在的意义:
  16. 传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。
  17. 任务执行完毕后,线程退出,这就是是“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间
  18. 已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停
  19. 的创建线程,销毁线程的状态。
  20. 此线程池有点:使用比较灵活
  21. 此线程池缺陷:接口较少,没有一些访问当先线程池转态、强制停止所有任务等接口
  22. 尚不明白的地方:task_queue_的类型是std::function<void()>,这个void是函数的返回值吧,那这样
  23. 是不是次线程池可以执行的函数体必须是void返回值类型,经过测试,并不是这样。
  24. */
  25. namespace apollo {
  26. namespace cyber {
  27. namespace base {
  28. class ThreadPool {
  29. public:
  30. explicit ThreadPool(std::size_t thread_num, std::size_t max_task_num = 1000);
  31. template <typename F, typename... Args>
  32. auto Enqueue(F&& f, Args&&... args)
  33. -> std::future<typename std::result_of<F(Args...)>::type>;
  34. ~ThreadPool();
  35. private:
  36. std::vector<std::thread> workers_;
  37. BoundedQueue<std::function<void()>> task_queue_;
  38. std::atomic_bool stop_;
  39. };
  40. inline ThreadPool::ThreadPool(std::size_t threads, std::size_t max_task_num)
  41. : stop_(false) {
  42. if (!task_queue_.Init(max_task_num, new BlockWaitStrategy())) {
  43. throw std::runtime_error("Task queue init failed.");
  44. }
  45. workers_.reserve(threads);
  46. for (size_t i = 0; i < threads; ++i) {
  47. workers_.emplace_back([this] {
  48. while (!stop_) {
  49. std::function<void()> task;
  50. if (task_queue_.WaitDequeue(&task)) {
  51. task();
  52. }
  53. }
  54. });
  55. }
  56. }
  57. // before using the return value, you should check value.valid()
  58. template <typename F, typename... Args>
  59. auto ThreadPool::Enqueue(F&& f, Args&&... args)
  60. -> std::future<typename std::result_of<F(Args...)>::type> {
  61. using return_type = typename std::result_of<F(Args...)>::type;
  62. auto task = std::make_shared<std::packaged_task<return_type()>>(
  63. std::bind(std::forward<F>(f), std::forward<Args>(args)...));
  64. std::future<return_type> res = task->get_future();
  65. // don't allow enqueueing after stopping the pool
  66. if (stop_) {
  67. return std::future<return_type>();
  68. }
  69. task_queue_.Enqueue([task]() { (*task)(); });
  70. return res;
  71. };
  72. // the destructor joins all threads
  73. inline ThreadPool::~ThreadPool() {
  74. if (stop_.exchange(true)) {
  75. return;
  76. }
  77. task_queue_.BreakAllWait();
  78. for (std::thread& worker : workers_) {
  79. worker.join();
  80. }
  81. }
  82. } // namespace base
  83. } // namespace cyber
  84. } // namespace apollo
  85. #endif // CYBER_BASE_THREAD_POOL_H_

2. 写一个简单的测试例子

  1. #include <atomic>
  2. #include <iostream>
  3. #include "cyber/base/thread_pool.h"
  4. using namespace apollo::cyber::base;
  5. std::atomic<int> g_int;
  6. int main(int argc, char* argv[]) {
  7. ThreadPool* tp = new ThreadPool(10, 10000);
  8. for (size_t i = 0; i < 10000; i++) {
  9. tp->Enqueue<>([&](int a) -> bool {
  10. g_int.fetch_add(1);
  11. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  12. return true;
  13. }, 1000);
  14. }
  15. // 等待所有线程运行结束
  16. std::this_thread::sleep_for(std::chrono::seconds(10));
  17. std::cout << g_int << std::endl;
  18. return 0;
  19. }


另外。在激光雷达里面看到了一个线程池的代码,默认项目里面没有,是激光雷达厂商提供的吧。

3. 激光雷达里面看到了一个线程池的代码

3.1 utility.h头文件

  1. //
  2. // Created by ljy on 2023/3/16.
  3. //
  4. #ifndef INC_20230307_UTILITY_H
  5. #define INC_20230307_UTILITY_H
  6. #pragma once
  7. #include <atomic>
  8. #include <condition_variable>
  9. #include <functional>
  10. #include <future>
  11. #include <memory>
  12. #include <mutex>
  13. #include <queue>
  14. #include <thread>
  15. #include <utility>
  16. #include <vector>
  17. #define MAX_THREAD_NUM 4
  18. class ThreadPool {
  19. private:
  20. inline ThreadPool();
  21. public:
  22. typedef std::shared_ptr<ThreadPool> Ptr;
  23. ThreadPool(ThreadPool &) = delete;
  24. ThreadPool &operator=(const ThreadPool &) = delete;
  25. ~ThreadPool();
  26. public:
  27. static Ptr getInstance();
  28. int idlCount();
  29. template <class F, class... Args>
  30. inline auto commit(F &&f, Args &&... args)
  31. -> std::future<decltype(f(args...))> {
  32. if (stoped.load())
  33. throw std::runtime_error("Commit on ThreadPool is stopped.");
  34. using RetType = decltype(f(args...));
  35. auto task = std::make_shared<std::packaged_task<RetType()>>(
  36. std::bind(std::forward<F>(f), std::forward<Args>(args)...)); // wtf !
  37. std::future<RetType> future = task->get_future();
  38. {
  39. std::lock_guard<std::mutex> lock{m_lock};
  40. tasks.emplace([task]() { (*task)(); });
  41. }
  42. cv_task.notify_one();
  43. return future;
  44. }
  45. private:
  46. using Task = std::function<void()>;
  47. std::vector<std::thread> pool;
  48. std::queue<Task> tasks;
  49. std::mutex m_lock;
  50. std::condition_variable cv_task;
  51. std::atomic<bool> stoped;
  52. std::atomic<int> idl_thr_num;
  53. static Ptr instance_ptr;
  54. static std::mutex instance_mutex;
  55. };
  56. #endif //INC_20230307_UTILITY_H

3.2 utility.cpp实现文件

  1. //
  2. // Created by ljy on 2023/3/16.
  3. //
  4. #include "utility.h"
  5. ThreadPool::Ptr ThreadPool::instance_ptr = nullptr;
  6. std::mutex ThreadPool::instance_mutex;
  7. ThreadPool::ThreadPool() : stoped{false} {
  8. idl_thr_num = MAX_THREAD_NUM;
  9. for (int i = 0; i < idl_thr_num; ++i) {
  10. pool.emplace_back([this] {
  11. while (!this->stoped) {
  12. std::function<void()> task;
  13. {
  14. std::unique_lock<std::mutex> lock{this->m_lock};
  15. this->cv_task.wait(lock, [this] {
  16. return this->stoped.load() || !this->tasks.empty();
  17. });
  18. if (this->stoped && this->tasks.empty()) return;
  19. task = std::move(this->tasks.front());
  20. this->tasks.pop();
  21. }
  22. idl_thr_num--;
  23. task();
  24. idl_thr_num++;
  25. }
  26. });
  27. }
  28. }
  29. ThreadPool::Ptr ThreadPool::getInstance() {
  30. if (instance_ptr == nullptr) {
  31. std::lock_guard<std::mutex> lk(instance_mutex);
  32. if (instance_ptr == nullptr) {
  33. instance_ptr = std::shared_ptr<ThreadPool>(new ThreadPool);
  34. }
  35. }
  36. return instance_ptr;
  37. }
  38. ThreadPool::~ThreadPool() {
  39. stoped.store(true);
  40. cv_task.notify_all();
  41. for (std::thread &thread: pool) {
  42. thread.join();
  43. }
  44. }
  45. int ThreadPool::idlCount() { return idl_thr_num; }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/879202
推荐阅读
相关标签
  

闽ICP备14008679号