赞
踩
#include<iostream> #include<thread> using namespace std; // 函数fun,接收一个整型参数并在无限循环中打印递增的值 void fun(int a) { while(1) { cout << ++a << "\n"; // 打印自增后的a this_thread::sleep_for(chrono::microseconds(50)); // 线程休眠50微秒 } } int main() { int a = 0; // 初始化变量a为0 thread t1(fun, a); // 创建线程t1,并启动,执行fun函数,传入a作为参数 cout << t1.get_id() << endl; // 获取并打印线程t1的ID // t1.detach(); // 线程与主线程分离,独立运行 t1.join(); // 等待线程t1执行完毕后再继续执行主线程的后续代码 return 0; // 返回0,程序结束 }
使用mutex互斥量
#include<iostream> #include<thread> #include<mutex> using namespace std; mutex mtx; // 定义互斥量 int gg = 0; // 全局变量 gg,作为共享资源 void fun() { int t = 1000; while (t--) { mtx.lock(); // 上锁 ++gg; // 修改共享资源 --gg; // 修改共享资源 mtx.unlock(); // 解锁 } } int main() { thread t1(fun); // 创建线程 t1 thread t2(fun); // 创建线程 t2 t1.join(); // 等待线程 t1 结束 t2.join(); // 等待线程 t2 结束 cout << gg; // 输出共享资源 gg 的值 return 0; }
多个锁的情况
#include <iostream> #include <thread> #include <mutex> using namespace std; mutex mtx1, mtx2; // 定义两个互斥量 int gg = 0; void fun() { int t = 1000; while (t--) { // 同时对两个互斥量上锁 lock(mtx1, mtx2); ++gg; // gg自增 --gg; // gg自减 mtx1.unlock(); // 解锁第一个互斥量 mtx2.unlock(); // 解锁第二个互斥量 } } int main() { thread t1(fun); // 创建线程t1 thread t2(fun); // 创建线程t2 t1.join(); // 等待线程t1结束 t2.join(); // 等待线程t2结束 cout << gg; // 输出gg的值 return 0; }
实际开发中不会直接使用互斥量,而是搭配模板lock_guard使用,或者搭配功能更多的模板unique_lock使用
#include <iostream> #include <thread> #include <mutex> using namespace std; int gg = 0; mutex mtx1; mutex mtx2; void fun() { int t = 1000; while (t--) { lock_guard<mutex> lock1(mtx1); // 使用lock_guard自动管理锁,作用域结束自动解锁 lock_guard<mutex> lock2(mtx2); // 同时锁定两个mutex,确保线程安全性 ++gg; --gg; } } int main() { thread t1(fun); thread t2(fun); t1.join(); t2.join(); cout << gg; // 输出最终的gg值,理论上应该为0,因为++gg和--gg成对出现 return 0; }
使用原子变量
#include <iostream> #include <thread> #include <atomic> using namespace std; atomic<int> gg = 0; // 原子变量 gg void fun() { int t = 1000; while (t--) { ++gg; // 原子操作:自增 --gg; // 原子操作:自减 } } int main() { thread t1(fun); // 创建线程 t1 执行 fun 函数 thread t2(fun); // 创建线程 t2 执行 fun 函数 t1.join(); // 等待线程 t1 执行完毕 t2.join(); // 等待线程 t2 执行完毕 cout << gg; // 输出原子变量 gg 的最终值 return 0; }
#include <condition_variable> #include <iostream> #include <mutex> #include <queue> #include <thread> using namespace std; mutex mtx; // 互斥量,用于保护共享资源 queue<int> q; // 共享队列,生产者向其中推送数据,消费者从中取出数据 condition_variable cv; // 条件变量,用于线程之间的同步 void producer() { int i = 0; while (true) { unique_lock<mutex> lock(mtx); // 加锁,保护共享资源 q.push(i); // 向队列中推送数据 cout << "push: " << i << endl; // 打印推送的数据 cv.notify_one(); // 唤醒一个等待的消费者线程 // cv.notify_all(); // 唤醒所有等待的消费者线程 if (i < 9999) ++i; else i = 0; } } void consumer() { int data = 0; while (true) { unique_lock<mutex> lock(mtx); // 加锁,保护共享资源 while (q.empty()) cv.wait(lock); // 等待直到队列非空,解锁互斥量并等待条件变量通知后重新加锁 data = q.front(); // 获取队列头部数据 q.pop(); // 弹出队列头部数据 cout << "pop: " << data << '\n';// 打印弹出的数据 } } int main() { thread t1(producer); // 创建生产者线程 thread t2(consumer); // 创建消费者线程 t1.join(); // 等待生产者线程结束 t2.join(); // 等待消费者线程结束 return 0; }
信号量(semaphore)只在C++20之后的标准有(了解)
#include <iostream> #include <thread> #include <semaphore> // 包含信号量的头文件 using namespace std; counting_semaphore<3> csem(0); // 定义一个初始计数为0的计数信号量,最多允许3个线程同时访问 binary_semaphore bsem(0); // 定义一个初始计数为0的二进制信号量,相当于一次只允许一个线程访问 void task() { cout << "线程开始等待信号量\n"; csem.acquire(); // 线程等待并获取计数信号量 cout << "线程获取到信号量,继续执行\n"; } int main() { thread t0(task); // 创建线程 t0 执行 task 函数 thread t1(task); // 创建线程 t1 执行 task 函数 thread t2(task); // 创建线程 t2 执行 task 函数 thread t3(task); // 创建线程 t3 执行 task 函数 thread t4(task); // 创建线程 t4 执行 task 函数 cout << "主线程释放信号量\n"; csem.release(2); // 主线程释放2个信号量,唤醒等待的线程,因为初始设置是0,也就是要唤醒两个线程 t0.join(); // 等待线程 t0 执行完毕 t1.join(); // 等待线程 t1 执行完毕 t2.join(); // 等待线程 t2 执行完毕 t3.join(); // 等待线程 t3 执行完毕 t4.join(); // 等待线程 t4 执行完毕 return 0; }
#include <future> #include <iostream> #include <thread> using namespace std; // 定义一个任务函数,接受一个整数参数和一个promise引用 void task(int a, promise<int> &r) { // 将计算结果设置到promise中 r.set_value(a + a); } int main() { // 创建一个promise对象,用于在任务函数中设置值 promise<int> p; // 从promise中获取future对象,用于获取任务函数的返回值 future<int> f = p.get_future(); // 创建一个线程,执行任务函数task,并传递参数1和promise对象p的引用 thread t(task, 1, ref(p)); /* 在此处可以进行其他操作 */ // 输出future对象的值,注意:future的get方法只能调用一次 cout << f.get(); /* 如果需要多次访问future对象的值,可以使用shared_future shared_future<int> s_f = f.share(); 这样可以直接值传递,而不是引用传递 */ // 等待线程执行完成 t.join(); return 0; }
async
#include <future> #include <iostream> using namespace std; // 定义一个函数,计算两个整数的和 int task(int a, int b) { return a + b; } int main() { // 创建一个 future 对象,用 async 异步调用 task 函数,并传入参数 1 和 2 future<int> fu = async(task, 1, 2); // 相当于 future<int> fu = async(launch::async|launch::deferred, task, 1, 2); // launch::async 会启动一个新线程执行任务 // launch::deferred 会延迟调用任务,在需要获取结果时才调用 // launch::async|launch::deferred 根据具体情况自动选择 // 输出 future 对象的结果,使用 get() 函数获取异步任务的返回值 cout << fu.get(); return 0; }
packaged_task
#include <future> #include <iostream> using namespace std; // 定义一个任务,计算两个整数的和 int task(int a, int b) { return a + b; } int main() { // 创建一个打包任务,将函数 task 绑定到 packaged_task packaged_task<int(int, int)> t(task); // 执行任务,传入参数 1 和 2 t(1, 2); // 获取任务的未来对象,并获取结果 cout << t.get_future().get(); return 0; }
bind
#include <future> #include <iostream> using namespace std; // 定义一个普通函数,返回两个整数的和 int task(int a, int b) { return a + b; } int main() { // 使用bind将函数task绑定到a,返回一个std::function对象 auto a = bind(task, 1, 2); // 返回的是std::function // 调用a,计算绑定的函数结果 int ret = a(); cout << ret << endl; // 使用packaged_task封装a,packaged_task是一个可调用对象的包装器 packaged_task<int()> t(a); t(); // 执行packaged_task,实际调用绑定的函数task // 获取packaged_task的future,等待任务完成并获取结果 cout << t.get_future().get(); // 输出任务的结果 return 0; }
#include <vector> #include <queue> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional> #include <stdexcept> #include <iostream> // 建议使用支持C++14以上的编译器以支持所有特性 class ThreadPool { public: // 构造函数, 初始化大小 ThreadPool(size_t); // typename std::result<F(Args...)> -> 编译期推断返回类型 // 可以使用auto代替,自动推断返回类型 template<class F, class... Args> auto enqueue(F &&f, Args &&... args) -> std::future<typename std::result_of<F(Args...)>::type>; // 析构函数 ~ThreadPool(); private: // need to keep track of threads so we can join them // 线程池的工作线程数 std::vector<std::thread> workers; // the task queue // 任务队列 函数应该被包装为void(void) 的task std::queue<std::function<void()> > tasks; // synchronization // 同步工具 // 互斥锁和条件变量 // stop变量检测是否关闭线程池,可以使用atomic<bool>代替 std::mutex queue_mutex; std::condition_variable condition; bool stop; }; // the constructor just launches some amount of workers inline ThreadPool::ThreadPool(size_t threads) : stop(false) { // 创建threads个新线程塞进线程池 // 使用std::move 右值传递 for (size_t i = 0; i < threads; ++i) workers.emplace_back( // 相当于 push_back(std::move(...)) [this] { // lambda函数,将class的成员变量以指针(引用)形式传递进去,*this则是以拷贝形式传递 for (;;) { // worker函数不断轮询,竞争任务 // 创建一个任务包装,以存放将要完成的task std::function<void()> task; { // 访问临界区需要上锁 std::unique_lock<std::mutex> lock(this->queue_mutex); // 若队列不为空或者需要stop,则唤醒worker this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); // 若有停止信号或者队列为空,则直接返回 if (this->stop && this->tasks.empty()) return; // 获取任务,使用右值 task = std::move(this->tasks.front()); // 弹出在工作的任务 this->tasks.pop(); } // 执行任务 task(); // 完成后继续从task队列中提取任务 } } ); } // add new work item to the pool template<class F, class... Args> auto ThreadPool::enqueue(F &&f, Args &&... args) // 下面的推断可以使用auto -> std::future<typename std::result_of<F(Args...)>::type> { // 使用萃取的方法获取返回值类型 using return_type = typename std::result_of<F(Args...)>::type; // 将任务包装成异步函数指针,封装为shared_ptr 完后后自动回收,不造成内存泄漏 // 而且在后续的lambda函数中可以直接传递函数指针然后执行 // 使用packaged_task<>,函数绑定std::bind,和完美转发std::forward // 包装需要执行的函数,然后在后台进行异步执行 auto task = std::make_shared<std::packaged_task<return_type()> >( std::bind(std::forward<F>(f), std::forward<Args>(args)...)); // 绑定异步函数task的返回值到future res中 std::future<return_type> res = task->get_future(); { // 在匿名作用域中使用unique_lock // 减小锁的粒度,出了匿名作用区锁就被释放 std::unique_lock<std::mutex> lock(queue_mutex); // don't allow enqueueing after stopping the pool // 防止在停止后放入任务 if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); // 将匿名函数包装到lambda函数void()中 // task是函数指针(即函数的地址),所以拷贝传递可以执行 tasks.emplace([task]() { (*task)(); }); } // 唤醒一个worker condition.notify_one(); return res; } // the destructor joins all threads inline ThreadPool::~ThreadPool() { { // 此处使用atomic<bool>显得更加方便 std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); // join后自动销毁回收 for (std::thread &worker: workers) worker.join(); } int main() { ThreadPool pool(4); std::vector<std::future<int> > results; results.reserve(8); for (int i = 0; i < 8; ++i) { results.emplace_back( pool.enqueue([i] { std::cout << "hello " << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "world " << i << std::endl; return i * i; }) ); } for (auto &&result: results) std::cout << result.get() << ' '; std::cout << std::endl; return 0; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。