赞
踩
// 创建std::thread对象,新线程调用threadFun函数,函数的参数由 args 给出
template<class Fn, class... Args>
explicit thread(Fn&& fn, Args&&... args);
特点:不能复制控制,只能移动(成功后则不再表示 thread 对象)
thread t1(threadFun, std::ref(x)); // std::ref 表示引用传递
thread t2(std::move(t1)); // t1 线程失去所有权
//t1.join(); // error,t1不再是 thread 执行对象
t2.join(); //t3拥有控制权
注:std::ref 与 & 的区别
成员函数
get_id()
获取线程ID,返回类型 std::thread::id
对象joinable()
判断线程是否可以加入等待。新创建的线程是 joinable,可被 joinable 的 thread 对象必须在销毁之前被主线程 join 或者将其设置为 detached。join()
等该线程执行完成后才返回detach()
线程解绑独立运行。调用后,目标线程驻留后台运行,成为了守护线程。与之相关的 std::thread 对象失去对目标线程的关联,无法再通过 std::thread 对象取得该线程的控制权。当线程主函数执行完之后,线程就结束了。运行时库负责清理与该线程相关的资源。
*this
不再代表任何的线程执行实例。joinable() == false
get_id() == std::thread::id()
yield()
让出 cpu 时间片1sleep_until()
睡眠直到某个时间点sleep_for()
睡眠某段时间实例:线程封装
子类继承父类,实现具体的业务逻辑处理。
// thread.h #ifndef THREAD_H #define THREAD_H #include <thread> class Thread { public: Thread(); // 构造函数 virtual ~Thread(); // 析构函数 bool start(); //启动线程 void stop(); //停止线程 bool isAlive() const; // 线程是否存活. std::thread::id id() { return pThread->get_id(); } std::thread* getThread() { return pThread; } void join(); // 等待当前线程结束, 不能在当前线程上调用 void detach(); //能在当前线程上调用 static size_t CURRENT_THREADID(); protected: void threadEntry(); // 线程创建的函数 virtual void run() = 0; // 子线程实现具体的业务逻辑方法 protected: bool _running; //是否在运行 std::thread *pThread; }; #endif
// thread.cc #include "thread.h" #include <sstream> #include <iostream> #include <exception> Thread::Thread() : _running(false), pThread(NULL) {} Thread::~Thread(){ // 调用析构函数的之前,子线程要么是join则触发detach // 此时是一个比较危险的动作,用户必须知道他在做什么 if (pThread != NULL) { if (pThread->joinable()) { std::cout << "~Thread detach\n"; pThread->detach(); } delete pThread; pThread = NULL; } std::cout << "~Thread()" << std::endl; } bool Thread::start() { // 已经在运行了 if (_running) { return false; } try { // 创建线程并启动 pThread = new std::thread(&Thread::threadEntry, this); } catch (...) { throw "[ZERO_Thread::start] thread start error"; } return true; } void Thread::stop() { _running = false; } bool Thread::isAlive() const { return _running; } void Thread::join() { if (pThread->joinable()) { pThread->join(); } } void Thread::detach() { pThread->detach(); } size_t Thread::CURRENT_THREADID() { // 声明为 thread_local 的本地变量在线程中是持续存在的。 // 具有static变量一样的初始化特征和生命周期,即使它不被声明为static。 static thread_local size_t threadId = 0; if (threadId == 0) { std::stringstream ss; ss << std::this_thread::get_id(); threadId = strtol(ss.str().c_str(), NULL, 0); } return threadId; } // 创建新新线程时调用的threadFunc void Thread::threadEntry() { _running = true; try { run(); // 函数运行所在 调用子类的run函数 } catch (std::exception &ex) { _running = false; throw ex; } catch (...) { _running = false; throw; } _running = false; }
// main.cc #include <iostream> #include <chrono> #include "thread.h" using namespace std; class A : public Thread { public: void run() { while (_running) { cout << "print A " << endl; std::this_thread::sleep_for(std::chrono::seconds(5)); } cout << "----- leave A " << endl; } }; class B : public Thread { public: void run() { while (_running) { cout << "print B " << endl; std::this_thread::sleep_for(std::chrono::seconds(2)); } cout << "----- leave B " << endl; } }; int main(){ { A a; a.start(); B b; b.start(); std::this_thread::sleep_for(std::chrono::seconds(5)); a.stop(); // 否则线程的run方法不会停止,线程不会退出 a.join(); b.stop(); b.join(); } cout << "Hello World!" << endl; system("pause"); return 0; }
互斥锁的种类
std::mutex
,独占互斥量,不能递归使用std::time_mutex
,带超时的独占互斥量,不能递归使用std::recursive_mutex
,递归互斥量,不带超时功能std::recursive_timed_mutex
,带超时的递归互斥量特点:不允许拷贝构造,也不允许 move 拷贝,初始状态是 unlocked。
lock()
,调用线程将锁住该互斥量。有锁 -> 其他(阻塞),自己(死锁 deadlock)unlock()
, 解锁,释放对互斥量的所有权。try_lock()
,尝试锁住互斥量。有锁 -> 其他(不阻塞),自己(死锁)都能实现自动加锁和解锁(RAII类,自动释放资源),但 unique_lock 可以临时解锁和上锁。
std::lock_guard
std::unique_lock
unlock()
再上锁 lock()
,而不必等到析构自动解锁。#include <iostream> #include <deque> #include <thread> #include <mutex> #include <condition_variable> std::deque<int> q; std::mutex mu; std::condition_variable cond; int count = 0; void producer() { while (true) { { // 离开作用域后自动析构 std::unique_lock<std::mutex> locker(mu); // 不能替换成lock_guard std::cout << "fun1 lock" << std::endl; q.push_front(count++); //locker.unlock(); // 没必要 cond.notify_one(); } std::this_thread::sleep_for(std::chrono::seconds(1)); } } void consumer() { while (true) { std::unique_lock<std::mutex> locker(mu); std::cout << "fun2 lock" << std::endl; std::cout << "fun2 wait into" << std::endl; cond.wait(locker, []() {return !q.empty(); }); std::cout << "fun2 wait leave" << std::endl; auto data = q.back(); q.pop_back(); // locker.unlock(); //没必要 std::cout << "thread2 get value form thread1: " << data << std::endl; } } int main() { std::thread t1(producer); std::thread t2(consumer); t1.join(); t2.join(); return 0; }
条件变量 std::condition_variable
:实现线程同步,即线程间需要按照预定的先后次序顺序进行的行为.
条件变量的使用
wait
循环检查某个条件,如果条件不满足则阻塞直到条件满足notify
唤醒等待线程。必须使用 unique_lock 对象,需要临时上锁和解锁。
// 唤醒后,加锁获取互斥量,继续执行
void wait (unique_lock<mutex>& lck); // unique_lock对象
// 唤醒后,加锁获取互斥量,判断pred,若为false,则解锁阻塞;若为true,则继续执行
template <class Predicate>
void wait (unique_lock<mutex>& lck, Predicate pred); // Predicate 对象(等待条件)
// 解锁正在等待当前条件的线程中的随机一个
void notify_one() noexcept;
// 解锁正在等待当前条件的所有线程
void notify_all() noexcept;
// sync_queue.h #ifndef SIMPLE_SYNC_QUEUE_H #define SIMPLE_SYNC_QUEUE_H #include <thread> #include <condition_variable> #include <mutex> #include <list> #include <iostream> template<typename T> class SimpleSyncQueue { public: SimpleSyncQueue() {} void put(const T& x) { std::lock_guard<std::mutex> locker(_mutex); _queue.push_back(x); _notEmpty.notify_one(); } void take(T& x) { std::unique_lock<std::mutex> locker(_mutex); _notEmpty.wait(locker, [this] {return !_queue.empty(); }); x = _queue.front(); _queue.pop_front(); } bool empty() { std::lock_guard<std::mutex> locker(_mutex); return _queue.empty(); } size_t size() { std::lock_guard<std::mutex> locker(_mutex); return _queue.size(); } private: std::list<T> _queue; std::mutex _mutex; std::condition_variable _notEmpty; }; #endif // SIMPLE_SYNC_QUEUE_H
// main.cc #include <iostream> #include <thread> #include <iostream> #include <mutex> #include "sync_queue.h" using namespace std; SimpleSyncQueue<int> syncQueue; void PutDatas() { for (int i = 0; i < 20; ++i) { syncQueue.put(i); } } void TakeDatas() { int x = 0; for (int i = 0; i < 20; ++i) { syncQueue.take(x); std::cout << x << std::endl; } } int main() { std::thread t1(PutDatas); std::thread t2(TakeDatas); t1.join(); t2.join(); std::cout << "main finish\n"; system("pause"); return 0; }
atomic 不能被中断的操作。store()
赋值, load()
读取。
内存顺序(memory_order)
枚举值 | 定义规则 |
---|---|
memory_order_relaxed | 不对执行顺序做任何保障 |
memory_order_acquire | 本线程中,所有后续的读操作均在本条原子操作完成后执行 |
memory_order_release | 本线程中,所有之前的写操作完成后才能执行本条原子操作 |
memory_order_acq_rel | 同时包含memory_order_acquire 和memory_order_release 标记 |
memory_order_consume | 本线程中,所有后续的有关本原子类型的操作,必须在本条原子操作完成后执行 |
memory_order_seq_cst | 全部存取都按顺序执行 |
#include <iostream> #include <atomic> #include <thread> //std::atomic<int> count = 0; //error std::atomic<int> count(0); // 准确初始化,atomic 线程安全 void set_count(int x) { std::cout << "set_count:" << x << std::endl; count.store(x, std::memory_order_relaxed); // set value atomically } void print_count() { int x; do { x = count.load(std::memory_order_relaxed); // get value atomically std::cout << "--- wait ---" << std::endl; } while (x == 0); std::cout << "count: " << x << std::endl; } int main() { std::thread t1(print_count); std::thread t2(set_count, 10); t1.join(); t2.join(); std::cout << "main finish\n"; return 0; }
future 期望,当前线程持有 future 时,期待从 future 获取到想要的结果和返回,可以把future当做异步函数的返回值。
一旦 future 就绪,就无法复位,代表的是一次性事件。
std::future
: 异步指向某个任务,然后通过 future 去获取任务函数的返回结果。当需要返回值时,对 future 使用 get()
方法线程就会阻塞直到 future 就绪,然后返回该值。
std::aysnc
: 异步运行某个任务函数,返回一个 future 对象
async(_Fty&& _Fnarg, _ArgTypes&&... _Args)
future 的类型
std::future
:仅有一个实例指向其关联事件,std::unique_ptr
std::shared_future
:可以有多个实例指向同一个关联事件,std::shared_ptr
#include <iostream> #include <future> #include <thread> using namespace std; int add() { std::this_thread::sleep_for(std::chrono::seconds(5)); // 用来测试异步延迟 std::cout << "find_result_to_add" << std::endl; return 1 + 1; } int add2(int a, int b) { std::this_thread::sleep_for(std::chrono::seconds(5)); // 用来测试异步延迟 return a + b; } void do_other_things() { std::cout << "do_other_things" << std::endl; } int main() { // std::future<T> // std::future<int> result = std::async(find_result_to_add); 1、指定类型 // 2、decltype,自动推导函数返回类型 // std::future<decltype(add())> result = std::async(add); auto result = std::async(add); // 3、auto,推荐写法 do_other_things(); // std::async异步线程运行,不阻塞主线程函数的 std::cout << "result: " << result.get() << std::endl; // get()阻塞 // std::future<decltype(add2(int, int))> result2 = std::async(add2, 10, 20); //error,把参数值传递进来 std::future<decltype (add2(0, 0))> result2 = std::async(add2, 10, 20); std::cout << "result2: " << result2.get() << std::endl; // get()阻塞 system("pause"); return 0; }
将任务和 future
绑定在一起的模板,是一种对任务的封装。可以调用get_future()
方法获得 packaged_task
对象绑定的函数的返回值类型的 future
。模板类型是函数签名。
#include <iostream> #include <future> #include <thread> using namespace std; int add(int a, int b, int c) { std::cout << "call add\n"; return a + b + c; } int main() { // 封装任务,待执行 std::packaged_task<int(int, int, int)> task(add); // 模板类型:函数签名 std::future<int> result = task.get_future(); // 待执行,这里只是获取 future //任务开始执行! task(1, 1, 2); // 必须先执行任务,否则在get()获取future的值时会一直阻塞 std::cout << "result:" << result.get() << std::endl; return 0; }
promise 承诺,当线程创建 promise 的同时创建一个 future,这个 promise 向线程承诺它必定会被人手动设置一个值,future 就是获取其返回的手段。两者配合,在线程间传递数据。
#include <future> #include <string> #include <thread> #include <iostream> using namespace std; void print(std::promise<std::string>& p) { p.set_value("helloWorld"); // promise设置值,返回 future } void do_some_other_things() { std::cout << "do_some_other_things" << std::endl; } int main() { std::promise<std::string> promise; std::future<std::string> result = promise.get_future(); // 获取待返回的future std::thread t(print, std::ref(promise)); // 线程设置,传引用 promise do_some_other_things(); cout << "result " << result.get() << endl; // 在主线程等待 promise 的返回结果 t.join(); return 0; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。