赞
踩
翻译原文:https://thispointer.com/c11-tutorial/
C++11 中新增了多线程 std::thread,相比之前使用较多的是操作系统提供的 POSIX 线程接口,新标准引入了线程库无疑带来了许多便利。
要使用 C++11 多线程,首先 gcc 编译器版本需要大于4.8,并且编译时,需要加上参数 -std=c++11 -lpthread
,可见,C++11 的线程是对 POSIX 线程的封装。
在每个 C++ 应用程序中,都有一个默认的主线程,即 main 函数。在 C++11 中,可以通过创建 std::thread 类的对象来创建额外的线程。每个 std::thread 对象都可以与一个线程相关联。
需要引入的头文件:
#include <thread>
std::thread 对象构造的时候接受什么参数?
构造时需要给 std::thread 对象一个回调函数,该回调函数将在新线程启动时执行。回调函数可以是如下类型函数:
线程对象的创建如下:
std::thread t(cb);
新的线程将在创建新对象之后立即启动,并执行传入的回调函数。
此外,任何线程都可以调用该线程对象的 join()
函数来等待该线程退出。
下面示例中,主线程创建了一个新线程,创建这个新线程后,主线程会在控制台打印一些数据,然后等待新创建的线程退出。使用了三种不同的回调函数创建线程。
• 使用函数指针创建线程
#include <iostream> #include <thread> void thread_fun() { for (int i = 0; i < 5; i++) { std::cout << "child thread output:" << i << std::endl; } } int main() { std::thread t1(thread_fun); for (int i = 0; i < 5; i++) { std::cout << "main thread output:" << i << std::endl; } t1.join(); std::cout << "main thread exit." << std::endl; return 0; }
• 使用函数对象创建线程
#include <iostream> #include <thread> #include <functional> class MyPrint { public: void print() { for (int i = 0; i < 5; i++) { std::cout << "child thread output:" << i << std::endl; } } }; int main() { MyPrint mp; std::function<void(void)> func(std::bind(&MyPrint::print, &mp)); std::thread t1(func); for (int i = 0; i < 5; i++) { std::cout << "main thread output:" << i << std::endl; } t1.join(); std::cout << "main thread exit." << std::endl; return 0; }
• 使用 Lambda 函数创建线程
#include <iostream> #include <thread> int main() { auto func = []() { for (int i = 0; i < 5; i++) { std::cout << "child thread output:" << i << std::endl; } }; std::thread t1(func); for (int i = 0; i < 5; i++) { std::cout << "main thread output:" << i << std::endl; } t1.join(); std::cout << "main thread exit." << std::endl; return 0; }
• 使用类的成员函数创建线程
#include <iostream> #include <thread> class MyPrint { public: MyPrint() {} MyPrint(const MyPrint& obj) {} void print() { for (int i = 0; i < 5; i++) { std::cout << "child thread output:" << i << std::endl; } } }; int main() { MyPrint mp; std::thread t1(&MyPrint::print, &mp); t1.join(); std::cout << "main thread exit." << std::endl; return 0; }
上面的示例运行结果如下:
$ ./test
main thread output:0
main thread output:1
main thread output:2
main thread output:3
main thread output:4
child thread output:0
child thread output:1
child thread output:2
child thread output:3
child thread output:4
main thread exit.
上面的示例的线程执行函数都是没有参数的,如果线程的执行函数是有参数的,在创建线程对象时,可以在构造函数中附加多个参数。如下实例:
#include <iostream>
#include <string>
#include <thread>
void thread_func(int id, std::string name)
{
std::cout << id << ":" << name << std::endl;
}
int main()
{
std::thread t1(thread_func, 1, "yf");
t1.join();
return 0;
}
运行结果:
$ ./test
1:yf
那么问题来了,线程函数参数是两个,如果传大于两个参数,或者小于两个参数运行结果是什么?
对于上例而言,传大于两个参数,只会取前两个参数,如果前两个参数合法(不合法报错)。传小于两个参数也会报错。
注意:不要将变量的地址传递给线程的回调函数,因为线程 1 中该局部变量可能已经出了作用域,而线程 2 仍尝试通过其传入的地址访问并操作它,有可能会导致以外的行为。如下程序:
#include <iostream> #include <thread> void newThreadCallback(int* p) { std::cout << "Inside Thread:p = " << p << std::endl; std::chrono::milliseconds dura(1000); std::this_thread::sleep_for(dura); *p = 20; std::cout << "Inside Thread:p = " << p << std::endl; } void startNewThread() { int i = 10; std::cout << "Inside Main Thread: &i = " << &i << std::endl; std::thread t(newThreadCallback, &i); t.detach(); std::cout << "Inside Main Thread: i = " << i << std::endl; std::cout << "Range quit." << i << std::endl; } int main() { startNewThread(); std::chrono::milliseconds dura(3000); std::this_thread::sleep_for(dura); return 0; }
同样,将指向位于堆上的内存的指针传递给线城时也要小心。因为新的线程在去访问它之前,某些线程可能会删除该内存,这种情况将导致程序崩溃掉。
由于参数会被复制到新线程的堆栈,因此如果你要以引用的方式传递参数,需要对其进行检查,如下示例:
#include <iostream> #include <thread> void threadCallback(int const& x) { int& y = const_cast<int&>(x); y++; std::cout << "Child Thread x=" << x << std::endl; } int main() { int x = 9; std::cout << "Main Thread(begin) x=" << x << std::endl; std::thread t1(threadCallback, x); t1.join(); std::cout << "Main Thread(after) x=" << x << std::endl; return 0; }
运行结果:
$ ./test
Main Thread(begin) x=9
Child Thread x=10
Main Thread(after) x=9
即使线程函数 threadCallback 接收引用参数,并且对参数进行了更改,但是在线程外是不可见的。这是因为线程函数 threadCallback 中的 x 是对新线程堆栈中复制的临时值的引用。
如何解决这个问题?使用 std::ref()
,更改程序如下:
void threadCallback(int const& x) { int& y = const_cast<int&>(x); y++; std::cout << "Child Thread x=" << x << std::endl; } int main() { int x = 9; std::cout << "Main Thread(begin) x=" << x << std::endl; std::thread t1(threadCallback, std::ref(x)); t1.join(); std::cout << "Main Thread(after) x=" << x << std::endl; return 0; }
现在运行结果就正确了:
$ ./test
Main Thread(begin) x=9
Child Thread x=10
Main Thread(after) x=10
每个线程都有一个唯一 ID 与之关联,我们可以使用这个 ID 来识别线程。
this_thread::get_id(); //获取当前线程id
t1.get_id(); //获取指定线程id
看下面示例:
#include <iostream> #include <thread> void thread_fun() { std::cout << "Child Thread ID:" << std::this_thread::get_id() << std::endl; } int main() { std::cout << "Main Thread ID:" << std::this_thread::get_id() << std::endl; std::thread t1(thread_fun); std::cout << "Child Thread ID:" << t1.get_id() << std::endl; t1.join(); std::cout << "main thread exit." << std::endl; return 0; }
输出结果:
$ ./test
Main Thread ID:140688050960192
Child Thread ID:140688033273600
Child Thread ID:140688033273600
main thread exit.
一个新的线程启动,另一个线程就可以等待这个新线程完成。只需要调用新线程对象的 join() 函数即可。
std::thread th(thread_fun);
//...
th.join();
看一个例子,假设主线程必须启动 10 个工作线程,在启动这些线程后,主函数等待它们完成,然后继续向下执行。
#include <iostream> #include <thread> #include <vector> #include <algorithm> #include <functional> void thread_fun() { std::cout << "Child Thread ID:" << std::this_thread::get_id() << std::endl; } int main() { std::vector<std::thread> threadList; for (int i = 0; i < 10; i++) { threadList.push_back(std::thread(thread_fun)); } std::cout << "Wait All Child Tread Finish." << std::endl; std::for_each(threadList.begin(), threadList.end(), std::mem_fn(&std::thread::join)); std::cout << "main thread exit." << std::endl; return 0; }
运行结果:
$ ./test
Wait All Child Tread Finish.
Child Thread ID:139788647843584
Child Thread ID:139788656236288
Child Thread ID:139788639450880
Child Thread ID:139788664628992
Child Thread ID:139788631058176
Child Thread ID:139788622665472
Child Thread ID:139788673021696
Child Thread ID:139788681414400
Child Thread ID:139788689807104
Child Thread ID:139788698199808
main thread exit.
分离的线程也成为守护线程或后台线程。将一个线程分离,只需要调用当前线程对象的 detach() 函数即可。
std::thread t(thread_fun);
t.detach();
调用 detach() 后,std::thread 对象不再与实际运行的线程相关联。如下实例:
void thread_fun()
{
std::cout << "Child Thread ID:" << std::this_thread::get_id() << std::endl;
}
int main()
{
std::thread t1(thread_fun);
t1.detach();
std::cout << t1.get_id() << std::endl;
t1.join();
return 0;
}
运行结果如下:
$ ./test
thread::id of a non-executing thread
Child Thread ID:139700035708672
terminate called after throwing an instance of 'std::system_error' what(): Invalid argument
Aborted
创建了线程 t1 后,调用了线程的 detach 函数后,线程进行了分离,所以主线程不能再调用线程 t1 的方法,因为主线程和 t1 线程之间已经没有了联系。
注意1:永远不要在没有关联的线程的 std::thread 对象上调用 join() 或者 detach()。
当在线程上调用 join() 函数时,当此 join() 返回时,该 std::thread 对象没有与之关联的线程,如果再次在此类对象上调用 join() 函数,则会导致要终止的程序。
std::thread t1(thread_fun);
t1.join();
t1.join(); //程序将终止运行
类似地,调用 detach() 会使 std::thread 对象不与任何线程函数链接。在这种情况下,在 std::thread 对象上调用 detach() 函数两次将导致程序终止。
std::thread t1(thread_fun);
t1.detach();
t1.detach(); //程序将终止运行
因此,在调用 join() 或 detach() 之前,我们应该每次都检查线程对象是否可以连接,即调用线程对象的 joinable() 函数,如下所示:
std::thread t1(thread_fun);
t1.detach();
if (t1.joinable())
{
t1.detach();
}
if (t1.joinable())
{
t1.join();
}
注意2:不要忘记在有关联的执行线程的 std::thread 对象上调用 join 或 detach。
如果没有调用,程序运行结束时,该线程对象调用析构函数时会终止程序。当 thread 对象消亡时,如果 thread 是 joinable 的,析构函数会调用 terminate(),terminate() 会调用 abort(),abort() 是非正常结束进程,不进行任何清理工作,直接终止程序,其典型实现是输出标准错误流(即cerr使用的错误流)。如下程序:
void thread_fun()
{
std::cout << "Child Thread ID:" << std::this_thread::get_id() << std::endl;
}
int main()
{
std::thread t1(thread_fun);
return 0;
}
运行结果:
$ ./test
Main Thread Exit.
terminate called without an active exception
Aborted (core dumped)
同样,我们不应该忘记在异常情况下调用 join() 或 detach()。为了防止,可以使用 C++ 的 RAII 机制:
#include <iostream> #include <thread> using namespace std; class ThreadRAII { public: ThreadRAII(std::thread& thd) : m_thread(thd) {} ~ThreadRAII() { if (m_thread.joinable()) { m_thread.detach(); } } private: std::thread& m_thread; }; void thread_func() { std::cout << "Child Thread ID:" << std::this_thread::get_id() << std::endl; } int main() { std::thread t1(thread_func); ThreadRAII wrapthd(t1); return 0; }
thread::joinable
是 C++ std::thread 中的内置函数。它是一个观察器函数,表示它观察状态,然后返回相应的输出并检查线程对象是否可连接。
如果线程对象标识着执行中的活动线程,则称该线程对象是可连接的。
在以下情况下,线程不可联接:
• 它是默认构造的。
• 如果其成员函数 join 或 detach 中的任何一个。
• 通过移动构造获得的。
示例如下:
#include <chrono> #include <iostream> #include <thread> using namespace std; void threadFunc() { std::this_thread::sleep_for(std::chrono::seconds(1)); } int main() { std::thread t1; cout << "t1 joinable when default created.\n"; if (t1.joinable()) cout << "YES\n"; else cout << "NO\n"; t1 = std::thread(threadFunc); cout << "t1 joinable when put to sleep.\n"; if (t1.joinable()) cout << "YES\n"; else cout << "NO\n"; t1.join(); cout << "t1 joinable after join is called.\n"; if (t1.joinable()) cout << "YES\n"; else cout << "NO\n"; return 0; }
运行结果:
$ ./test
t1 joinable when default created.
NO
t1 joinable when put to sleep.
YES
t1 joinable after join is called.
NO
std::thread 定义了四个构造函数:
• 默认构造函数,创建一个空的 std::thread 执行对象。
• 初始化构造函数,创建一个 std::thread 对象,该std::thread 对象可被 joinable,新产生的线程会调用 fn 函数,该函数的参数由 args 给出。
• 拷贝构造函数(被禁用),意味着 std::thread 对象不可拷贝构造,因为线程无法拷贝。
• 移动构造函数,调用成功之后 x 不代表任何 std::thread 执行对象。
void f1(int n) { for (int i = 0; i < 5; ++i) { std::cout << "Thread " << n << " executing" << i << "\n"; std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } void f2(int &n) { for (int i = 0; i < 5; ++i) { std::cout << "Thread 2 executing" << i << "\n"; ++n; std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } int main() { int n = 0; std::thread t1; // 没有执行线程 std::thread t2(f1, n + 1); // 执行线程,运行f1 std::thread t3(f2, std::ref(n)); // 执行线程,运行f2 std::thread t4(std::move(t3)); // f2在t4中运行,t3不再是线程 t2.join(); t4.join(); std::cout << "Final value of n is " << n << '\n'; return 0; }
线程执行了 move 操作,如果当前对象不可 joinable,需要传递一个右值引用 rhs 给 move 操作,如果当前对象可被 joinable,则会调用 terminate() 报错。
除了上面介绍的 get_id、join、detach、joinable,线程还有如下的成员函数。
• swap
交换两个线程对象所代表的底层句柄。
• native_handle
返回本地句柄(由于 std::thread 的实现和操作系统相关,因此该函数返回与 std::thread 具体实现相关的线程句柄),通过这个句柄就能用对应操作系统的线程相关接口了。
std::this_thread 命名空间中相关辅助函数
在上面的介绍中已经有用到了std::this_thread命名空间的部分函数,std::this_thread是作用于当前运行的线程。
• get_id
获取当前的线程 ID,上面的有实例已经展示过了。
• yield
当前线程放弃执行,操作系统调度另一线程继续执行,有点类似于 sleep,不过 yield 会将自己抢到的时间片让给其他线程,而 sleep 只是等待。该方法的具体行为取决于实现,尤其是正在使用的操作系统调度器的机制以及系统的状态。比如,一个先进先出的实时调度器(Linux中的SCHED_FIFO)将会使当前线程暂停,并将其置于同优先级线程队列的末尾(如果同优先级线程队列里没有其他线程,yield 就没有效果了。)
• sleep_until
线程休眠至某个指定的时刻(time point),该线程才被重新唤醒。
• sleep_for
线程休眠某个指定的时间片(time span),该线程才被重新唤醒,不过由于线程调度等原因,实际休眠时间可能比 sleep_duration 所表示的时间片更长。
如下操作:
#include <iostream> #include <thread> #include <chrono> #include <ctime> int main() { std::time_t tt = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); struct std::tm *ptm = std::localtime(&tt); std::cout << "Current time: " << tt << '\n'; std::this_thread::sleep_for(std::chrono::milliseconds(100)); tt = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); ptm = std::localtime(&tt); std::cout << "Current time: " << tt << '\n'; // 等待了100ms std::cout << "Waiting for the next minute to begin...\n"; ptm->tm_min += 1; ptm->tm_sec = 0; std::this_thread::sleep_until(std::chrono::system_clock::from_time_t(mktime(ptm))); std::cout << ptm << " reached!\n"; tt = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); std::cout << "Current time: " << tt << '\n'; // 等到了下一分钟 return 0; }
调用 std::swap 线程,交换两个线程对象所代表的底层线程句柄,其结果和线程对象的 swap 操作一样。通过下面的代码可以看到,thread 对象的句柄会被交换。
#include <iostream> #include <thread> #include <chrono> void foo() { std::this_thread::sleep_for(std::chrono::seconds(1)); } void bar() { std::this_thread::sleep_for(std::chrono::seconds(1)); } int main() { std::thread t1(foo); std::thread t2(bar); std::cout << "thread 1 id: " << t1.get_id() << std::endl; std::cout << "thread 2 id: " << t2.get_id() << std::endl; std::swap(t1, t2); std::cout << "after std::swap(t1, t2):" << std::endl; std::cout << "thread 1 id: " << t1.get_id() << std::endl; std::cout << "thread 2 id: " << t2.get_id() << std::endl; t1.swap(t2); std::cout << "after t1.swap(t2):" << std::endl; std::cout << "thread 1 id: " << t1.get_id() << std::endl; std::cout << "thread 2 id: " << t2.get_id() << std::endl; t1.join(); t2.join(); return 0; }
运行结果:
$ ./test
thread 1 id: 140357567055616
thread 2 id: 140357558601472
after std::swap(t1, t2):
thread 1 id: 140357558601472
thread 2 id: 140357567055616
after t1.swap(t2):
thread 1 id: 140357567055616
thread 2 id: 140357558601472
在多线程环境中,线程之间的数据共享非常容易。但是这种简单的数据共享可能会导致应用程序出现问题。一个这样的问题是 Race Condition (竞争条件)。
竞争条件是当两个或多个线程并行执行一组操作时,操作了相同的内存,其中的一个或多个线程会修改该内存位置中的数据,有时这可能会导致意外结果。。竞争条件不一定会每次都发生。
如下实例:
有一个 Wallet 类,它的一个方法是 addMoney(),功能是按指定的顺序递增货币。创建五个线程,所有这些线程共享相同的 Wallet 类对象,并行的调用 addMoney 成员函数增加 1000 个货币。如果最初钱包是 0,那么所有线程完成后,钱包里的钱应该是 5000。但是,由于所有线程都在同时修改共享数据,因此在某些情况下,最终钱包中的钱可能会远小于 5000。看一下代码:
#include <iostream> #include <thread> #include <vector> using namespace std; class Wallet { public: Wallet() : mMoney(0) {} int getMoney() { return mMoney; } void addMoney(int money) { for (int i = 0; i < money; ++i) { mMoney++; } } private: int mMoney; }; int testMultithreadedWallet() { Wallet walletObject; std::vector<std::thread> threads; for (int i = 0; i < 5; ++i) { threads.push_back(std::thread(&Wallet::addMoney, &walletObject, 1000)); } for (int i = 0; i < threads.size(); i++) { threads.at(i).join(); } return walletObject.getMoney(); } int main() { int val = 0; for (int k = 0; k < 10; k++) { if ((val = testMultithreadedWallet()) != 5000) { std::cout << "Error at count = " << k << " Money in Wallet = " << val << std::endl; } } return 0; }
运行结果:
$ ./test
Error at count = 1 Money in Wallet = 4424
Error at count = 4 Money in Wallet = 4503
可以看到,10 次中,有两次发在多线程操作发生了的 race condition,使得最终的结果小于 5000。是多个线程试图同时修改相同的内存发生的意外结果。
为什么会这样?
因为每个线程并行增加相同的 mMoney 成员变量,虽然看起来是一行,但是 mMoney++ 实际上转换成了三个机器指令:
如果发生了如下情况:
一个自增运算将被忽略,因为不是将 mMoney 递增两次,而是不同的寄存器递增并且 mMoney 变量的值被覆盖。
为了解决多线程竞争问题,需要使用 Lock 机制,即每个线程在修改或读取共享数据之前需要获取一个锁,并且在修改数据之后每个线程都应该解锁。
C++11 线程库中,互斥锁位于 <mutex>
头文件中。表示互斥锁的类型是 std::mutex
类。
互斥锁的主要方法:
构造函数:std::mutex 不允许拷贝构造,也不允许 move 拷贝,最初产生的 mutex 对象是处于unlocked 状态的。
lock():调用线程将锁住该互斥量,线程调用该函数会发生以下 3 种情况:
unlock():解锁,释放对互斥量的所有权。
try_lock():尝试锁住互斥量,如果互斥量被其他线程占有,则当前线程也不会被阻塞,线程调用该函数会出现下面3种情况:
我们可以使用互斥锁保户共享数据同时只能被一个线程修改。看如下代码:
class Wallet { public: Wallet() : mMoney(0) {} int getMoney() { return mMoney; } void addMoney(int money) { mtx.lock(); for (int i = 0; i < money; ++i) { mMoney++; } mtx.unlock(); } private: int mMoney; std::mutex mtx; };
我们修改了 Wallet 类,添加了 mtx 互斥锁变量,保证了每次只有一个线程修改钱包的钱。
那么又有一个问题:如果我们忘了在函数结束时解锁互斥锁呢?这种情况下,一个线程退出而不释放锁,其他线程将会一直等待,为了避免这种情况,应该使用 std::lock_guard,将在下文中介绍。
其他的互斥锁:
• std::recursive_mutex:与 std::mutex 类似,但是它能够进行多次 lock,这样能够规避一些死锁问题。
int counter = 0; std::recursive_mutex mtx; void func2() { mtx.lock(); counter++; mtx.unlock(); } void func1() { mtx.lock(); func2(); counter++; mtx.unlock(); } int main() { std::thread t(func1); t.join(); std::cout << counter << std::endl; // 2 return 0; }
如上面的代码所示,有时候会在两个函数中分别对数据进行 lock,如果在一个函数中又调用了另一个函数,此时如果使用 std::mutex 将会死锁,而用 std::recursive_mutex 则不会。看起来 std::recursive_mutex 很不错,但是使用的时候也需要多注意,lock 和 unlock 的数量必须相等,否则会出错。另外还有性能的问题,std::recursive_mutex 的性能会比较差一些,从下面的例子中可以看出来,性能上要差了 1 倍左右。
• std::time_mutex:定时 mutex 类,可以锁定一定的时间。
• std::recursive_timed_mutex:定时的可多次lock的 mutex 类。
这两种互斥量类型和其不带 time 的相比,多了两个成员函数:
#include <iostream> // std::cout #include <chrono> // std::chrono::milliseconds #include <thread> // std::thread #include <mutex> // std::timed_mutex std::timed_mutex mtx; void fireworks(int n) { // 为这个锁等待200ms while (!mtx.try_lock_for(std::chrono::milliseconds(200))) { std::string out = std::to_string(n); out += "wait\n"; std::cout << out; } // 获取锁后等待700ms再解锁 std::this_thread::sleep_for(std::chrono::milliseconds(700)); std::cout << "end" << std::endl; mtx.unlock(); } int main() { std::thread threads[3]; for (int i = 0; i < 3; ++i) { threads[i] = std::thread(fireworks, i); // 为了保证线程按照顺序开始,保证输出一致 std::this_thread::sleep_for(std::chrono::milliseconds(10)); } for (auto &th : threads) { th.join(); } return 0; }
运行结果:
$ ./test
1wait
2wait
1wait
2wait
1wait
2wait
end
2wait
2wait
2wait
end
end
try_lock_until 的作用类似,不同的是传入参数是时间点:
void fireworks(int n)
{
auto now = std::chrono::steady_clock::now();
while (!mtx.try_lock_until(now + std::chrono::milliseconds(200)))
{
std::string out = std::to_string(n);
out += "wait\n";
std::cout << out;
now = std::chrono::steady_clock::now();
}
std::this_thread::sleep_for(std::chrono::milliseconds(700));
std::cout << "end" << std::endl;
mtx.unlock();
}
lock_guard 是一个互斥量包装程序,它提供了一种方便的 RAII(Resource acquisition is initialization )风格的机制来在作用域块的持续时间内拥有一个互斥量。
创建 lock_guard 对象时,它将尝试获取提供给它的互斥锁的所有权。当控制流离开 lock_guard 对象的作用域时,lock_guard 析构并释放互斥量。
它的特点如下:
• 创建即加锁,作用域结束自动析构并解锁,无需手工解锁
• 不能中途解锁,必须等作用域结束才解锁
• 不能复制
实例代码如下:
class Wallet { public: Wallet() : mMoney(0) {} int getMoney() { return mMoney; } void addMoney(int money) { std::lock_guard<std::mutex> lockGuard(mutex); for (int i = 0; i < money; ++i) { mMoney++; } } private: int mMoney; std::mutex mtx; };
unique_lock 是一个通用的互斥量锁定包装器,它允许延迟锁定,限时深度锁定,递归锁定,锁定所有权的转移以及与条件变量一起使用等。
简单地讲,unique_lock 是 lock_guard 的升级加强版,它具有 lock_guard 的所有功能,同时又具有其他很多方法,使用起来更强灵活方便,能够应对更复杂的锁定需要,但是效率上肯定要差一点,同时内存占用也会多一点。
特点如下:
• 创建时可以不锁定(通过指定第二个参数为std::defer_lock),而在需要时再锁定
• 可以随时加锁解锁
• 作用域规则同 lock_grard,析构时自动释放锁
• 不可复制,可移动
• 条件变量需要该类型的锁作为参数(此时必须使用unique_lock)
std::unique_lock 的第二个参数可以设置为如下几种:
std::adopt_lock unique_lock
也可以带 std::adopt_lock 标记,和 lock_guard 含义相同,就是不希望再 unique_lock() 的构造函数中 lock 这个 mutex。用 std::adopt_lock 的前提是,自己需要先把 mutex lock 上;用法与 lock_guard 相同。std::try_to_lock
尝试用 mutex 的 lock() 去锁定这个 mutex,但如果没有锁定成功,也会立即返回,并不会阻塞在那里。用这个 try_to_lock 的前提是你不能先 lock。std::defer_lock
用 std::defer_lock 的前提是,你不能先 lock,否则会报异常,std::defer_lock 的意思就是并没有给 mutex 加锁:初始化了一个没有加锁的mutex。示例代码:
#include <mutex> #include <thread> #include <chrono> struct Box { explicit Box(int num) : num_things{num} {} int num_things; std::mutex m; }; void transfer(Box &from, Box &to, int num) { std::unique_lock<std::mutex> lock1(from.m, std::defer_lock); std::unique_lock<std::mutex> lock2(to.m, std::defer_lock); std::lock(lock1, lock2); //支持多个锁的锁定,并且能避免死锁 from.num_things -= num; to.num_things += num; } int main() { Box acc1(100); Box acc2(50); std::thread t1(transfer, std::ref(acc1), std::ref(acc2), 10); std::thread t2(transfer, std::ref(acc2), std::ref(acc1), 5); t1.join(); t2.join(); }
std::unique_lock 还有以下成员函数:
• try_lock()
尝试给互斥量加锁,如果拿不到锁,返回 false,如果拿到了锁,返回 true,这个函数是不阻塞的。
• release()
返回它所管理的 mutex 对象指针,并释放所有权;也就是说,这个 unique_lock 和 mutex 不再有关系。严格区分 unlock() 与 release() 的区别,不要混淆。
如果原来mutex对像处于加锁状态,你有责任接管过来并负责解锁。(release 返回的是原始 mutex 的指针),如下代码:
void inMsgRecvQueue()
{
for (int i = 0; i < 10000; i++)
{
std::unique_lock<std::mutex> sbguard(my_mutex);
std::mutex *ptx = sbguard.release(); //现在你有责任自己解锁了
msgRecvQueue.push_back(i);
ptx->unlock(); //自己负责mutex的unlock了
}
}
unique_lock 所有权转移
sbguard 拥有 my_mutex 的所有权;sbguard 可以把自己对 mutex(my_mutex) 的所有权转移给其他的 unique_lock 对象;所以 unique_lock 对象这个 mutex 的所有权是可以转移,但是不能复制。
std::unique_lock<std::mutex> sbguard1(my_mutex); std::unique_lock<std::mutex> sbguard2(sbguard1); //此句是非法的,复制所有权是非法的 //可以使用 std::move std::unique_lock<std::mutex> sbguard2(std::move(sbguard));//移动语义,现在先当与sbguard2与my_mutex绑定到一起了 //返回 std::unique_lock 对象 std::unique_lock<std::mutex> rtn_unique_lock() { std::unique_lock<std::mutex> tmpguard(my_mutex); return tmpguard; //从函数中返回一个局部的unique_lock对象是可以的。会调用移动构造函数。 } void inMsgRecvQueue() { for (int i = 0; i < 10000; i++) { std::unique_lock<std::mutex> sbguard1 = rtn_unique_lock(); msgRecvQueue.push_back(i); } }
在生产者-消费者模型中,需要用到条件变量,生产者向产品队列中生产,消费者从产品队列中取出产品消费。在产品队列中没有产品时,消费者线程将会阻塞,直到生产者线程生产产品放入队列,此时条件满足便通知消费者线程,消费者线程从产品队列中取出消费。
而 C++11 也加入了条件变量,条件变量类似于两个线程之间发送信号的事件,一个线程等待它要得到的信号,另一个线程可以发出信号通知。
条件变量所需要的头文件是:
#include <condition_variable>
条件变量需要和互斥锁配合,和 POSIX 线程的原理一样。条件变量的工作原理:
std::condition_variable 的主要成员函数
• wait()
使当前线程阻塞,直到条件变量得到信号或虚假唤醒发生。它原子地释放附加的互斥锁,阻塞当前线程,并将其添加到等待当前条件变量对象的线程列表中。当某个线程在同一个条件变量对象上调用 notify_one() 或 notify_all() 时,线程将被解除阻塞。它也可能被虚假解锁,因此每次解锁后都需要再次检查条件。当线程被解锁时,wait() 函数重新获取互斥锁并检查实际条件是否满足。如果不满足条件,则再次自动释放附加的互斥锁,阻塞当前线程,并将其添加到等待当前条件变量对象的线程列表中。
• notify_one()
如果任何线程正在等待相同的条件变量对象,则 notify_one 解除阻塞等待线程之一。
• notify_all()
如果有任何线程正在等待相同的条件变量对象,则 notify_all 会解除所有等待线程的阻塞。
实例如下:
线程 1 的职责是:
线程 2 的职责是:
使用条件变量实现此目的的代码如下,
#include <iostream> #include <thread> #include <functional> #include <mutex> #include <condition_variable> using namespace std::placeholders; class Application { public: Application() { m_bDataLoaded = false; } void loadData() { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); std::cout << "Loading Data from XML" << std::endl; std::lock_guard<std::mutex> guard(m_mutex); m_bDataLoaded = true; m_condVar.notify_one(); } bool isDataLoaded() { return m_bDataLoaded; } void mainTask() { std::cout << "Do Some Handshaking" << std::endl; std::unique_lock<std::mutex> mlock(m_mutex); m_condVar.wait(mlock, std::bind(&Application::isDataLoaded, this)); std::cout << "Do Processing On loaded Data" << std::endl; } private: std::mutex m_mutex; std::condition_variable m_condVar; bool m_bDataLoaded; }; int main() { Application app; std::thread thread_1(&Application::mainTask, &app); std::thread thread_2(&Application::loadData, &app); thread_2.join(); thread_1.join(); return 0; }
std::condition_variable_any
其实现也包含在 <condition_variable>
头文件的声明中。和 std::condition_variable 一样都需要一个互斥量,后者只接受 std::mutex,但 std::condition_variable_any 更加通用,可以和任何满足最低标准的互斥量一起工作,同时这增加了开销。
std 中有一些操作锁的函数,如:std::try_lock、std::lock、std::call_once等
• std::try_lock
支持尝试对多个互斥量进行锁定,尝试锁定成功返回 -1,否则返回锁定失败的互斥量的位置,例如第一个锁定失败返回 0、第二个失败返回 1。
int main()
{
std::mutex mtx1;
std::mutex mtx2;
if (-1 == std::try_lock(mtx1, mtx2))
{
std::cout << "locked" << std::endl;
mtx1.unlock();
mtx2.unlock();
}
return 0;
}
• std::lock
支持对多个锁锁定,并且避免死锁的出现,以下代码运行时有可能出现死锁的情况:
void func(std::mutex *mtx1, std::mutex *mtx2, int index) { std::lock_guard<std::mutex> lock1(std::adopt_lock); std::lock_guard<std::mutex> lock2(std::adopt_lock); std::cout << index << "out\n"; } int main() { std::mutex mtx1; std::mutex mtx2; // 两个线程的互斥量锁定顺序不同,可能造成死锁 std::thread t1(func, &mtx1, &mtx2, 1); std::thread t2(func, &mtx2, &mtx1, 2); t1.join(); t2.join(); return 0; }
但是使用 std::lock 能避免多个锁出现死锁:
void func(std::mutex *mtx1, std::mutex *mtx2, int index) { std::lock(*mtx1, *mtx2); // 同时锁定 // std::adopt_lock作用是声明互斥量已在本线程锁定,std::lock_guard只是保证互斥量在作用域结束时被释放 std::lock_guard<std::mutex> lock1(*mtx1, std::adopt_lock); std::lock_guard<std::mutex> lock2(*mtx2, std::adopt_lock); // 等价方法 // std::unique_lock<std::mutex> lock1(from.m, std::defer_lock); // std::unique_lock<std::mutex> lock2(to.m, std::defer_lock); // std::lock(lock1, lock2); std::cout << index << "out\n"; } int main() { std::mutex mtx1; std::mutex mtx2; std::thread t1(func, &mtx1, &mtx2, 1); std::thread t2(func, &mtx2, &mtx1, 2); t1.join(); t2.join(); return 0; }
• std::call_once
即使在多线程的情况下,也只执行一次指定的可调用对象(可以是函数、成员函数、函数对象、lambda函数),需要通过配合 std::once_flag 实现。具体的细节如下:
#include <iostream> #include <mutex> #include <thread> std::once_flag flag1, flag2; void simple_do_once() { std::call_once(flag1, []() { std::cout << "Simple example: called once\n"; }); } void may_throw_function(bool do_throw) { if (do_throw) { std::cout << "throw: call_once will retry\n"; // this may appear more than once throw std::exception(); } std::cout << "Didn't throw, call_once will not attempt again\n"; // guaranteed once } void do_once(bool do_throw) { try { std::call_once(flag2, may_throw_function, do_throw); } catch (...) { } } int main() { std::thread st1(simple_do_once); std::thread st2(simple_do_once); std::thread st3(simple_do_once); std::thread st4(simple_do_once); st1.join(); st2.join(); st3.join(); st4.join(); std::thread t5(do_once, true); std::thread t6(do_once, true); std::thread t7(do_once, false); std::thread t8(do_once, true); t5.join(); t6.join(); t7.join(); t8.join(); }
运行结果:
Simple example: called once
throw: call_once will retry
throw: call_once will retry
Didn't throw, call_once will not attempt again
std::future 对象可以与 asych、std::packaged_task 和 std::promise 一起使用。很多时候我们会遇到希望线程返回结果的情况。现在的问题是如何做到这一点?
举个例子:假设在我们的应用程序中,我们创建了一个线程来压缩给定的文件夹,并且我们希望该线程返回新的 zip 文件名及其大小。
现在要做到这一点,我们有两种方法:
首先是使用指针在线程之间共享数据
传递一个指向新线程的指针,新线程在其中设置数据。在主线程中使用条件变量继续等待。当新线程设置数据并通知条件变量时,主线程将唤醒并从该指针获取数据。
为了做一件简单的事情,我们使用了一个条件变量、一个互斥锁和一个指针,即 3 个变量协助来捕获返回值。现在假设我们希望这个线程在不同的时间点返回 3 个不同的值,那么问题将变得更加复杂。是否有一个简单的解决方案来从线程返回值。
答案是肯定的,使用 std::future,请查看它的下一个解决方案。
C++11 方式:使用 std::future 和 std::promise
std::future 是一个模板类,它的对象存储未来值。实际上,std::future 对象内部存储了一个将在未来分配的值,它还提供了一种访问该值的机制,即使用 get() 成员函数。但是如果有人试图在它可用之前通过 get() 函数访问这个关联的 future 值,那么 get() 函数将阻塞直到值不可用。
std::promise 也是一个类模板,它的对象承诺在未来设置该值。每个 std::promise 对象都有一个关联的 std::future 对象,该对象将给出由 std::promise 对象设置的值。
一个标准::承诺对象共享数据,与其相关联的std ::未来的对象。
一步一步来看,
在 Thread1 中创建一个 std::promise 对象。
std::promise<int> promiseObj;
到目前为止,这个 promise 对象没有任何关联的值。但它承诺有人肯定会在其中设置值,一旦设置好,您就可以通过关联的 std::future 对象获取该值。
但是现在假设线程 1 创建了这个 promise 对象并将其传递给线程 2 对象。现在线程 1 如何知道线程 2 何时将在这个 promise 对象中设置值?答案是使用 std::future 对象。
每个 std::promise 对象都有一个关联的 std::future 对象,其他人可以通过它获取 promise 设置的值。因此,线程 1 将创建 std::promise 对象,然后在将 std::promise 对象传递给线程 2 之前从中获取 std::future 对象,即:
std::future<int> futureObj = promiseObj.get_future();
现在线程 1 将 promiseObj 传递给线程 2。
然后线程 1 将通过 std::future 的 get 函数获取线程 2 在 std::promise 中设置的值:
int val = futureObj.get();
但是如果线程 2 还没有设置值,那么这个调用将被阻塞,直到线程 2 在 promise 对象中设置值:
promiseObj.set_value(45);
如下流程:
来看一个完整的 std::future 和 std::promise 示例:
#include <iostream> #include <thread> #include <future> void initiazer(std::promise<int> *promObj) { std::cout << "Inside Thread" << std::endl; promObj->set_value(35); } int main() { std::promise<int> promiseObj; std::future<int> futureObj = promiseObj.get_future(); std::thread th(initiazer, &promiseObj); std::cout << futureObj.get() << std::endl; th.join(); return 0; }
如果 std::promise 对象在设置值之前被销毁,则关联 std::future 对象上的调用 get() 函数将抛出异常。
其中的一部分,如果你希望线程在不同的时间点返回多个值,那么只需在线程中传递多个 std::promise 对象并从关联的多个 std::future 对象中获取多个返回值。
我们介绍了std::promise的使用方法,其实 std::packaged_task 和 std::promise 非常相似,简单来说std::packaged_task 是对 std::promise<T= std::function> 中 T= std::function 这一可调对象(如函数、lambda表达式等)进行了包装,简化了使用方法。并将这一可调对象的返回结果传递给关联的std::future对象。
比如,使用 std::promise 的如下程序:
//声明一个可调对象T
using T = std::function<int(int)>; //等同于typedef std::function<int(int)> T;
//函数
int Test_Fun(int iVal)
{
std::cout << "Value is:" << iVal << std::endl;
return iVal + 232;
}
//声明一个std::promise对象pr1,其保存的值类型为int
std::promise<T> pr1;
//声明一个std::future对象fu1,并通过std::promise的get_future()函数与pr1绑定
std::future<T> fu1 = pr1.get_future();
使用 std::packaged_task 就简化了很多:
//函数
int Test_Fun(int iVal)
{
std::cout << "Value is:" << iVal << std::endl;
return iVal + 232;
}
//声明一个std::packaged_task对象pt1,包装函数Test_Fun
std::packaged_task<int(int)> pt1(Test_Fun);
//声明一个std::future对象,包装Test_Fun的返回结果,并与pt1关联
std::future<int> fu1 = pt1.get_future();
注意:使用std::packaged_task关联的std::future对象保存的数据类型是可调对象的返回结果类型,如示例函数的返回结果类型是int,那么声明为std::future,而不是std::future<int(int)>。
std::packaged_task::valid
检查当前 packaged_task 是否和一个有效的共享状态相关联,对于由默认构造函数生成的 packaged_task 对象,该函数返回 false,除非中间进行了 move 赋值操作或者 swap 操作。
#include <iostream> // std::cout #include <utility> // std::move #include <future> // std::packaged_task, std::future #include <thread> // std::thread // 在新线程中启动一个 int(int) packaged_task. std::future<int> launcher(std::packaged_task<int(int)> &tsk, int arg) { if (tsk.valid()) { std::future<int> ret = tsk.get_future(); std::thread(std::move(tsk), arg).detach(); return ret; } else return std::future<int>(); } int main() { std::packaged_task<int(int)> tsk([](int x) { return x * 2; }); std::future<int> fut = launcher(tsk, 25); std::cout << "The double of 25 is " << fut.get() << ".\n"; return 0; }
返回一个与 packaged_task 对象共享状态相关的 future 对象。返回的 future 对象可以获得由另外一个线程在该 packaged_task 对象的共享状态上设置的某个值或者异常。
std::packaged_task::make_ready_at_thread_exit
该函数会调用被包装的任务,并向任务传递参数,类似 std::packaged_task 的 operator() 成员函数。但是与 operator() 函数不同的是,make_ready_at_thread_exit 并不会立即设置共享状态的标志为 ready,而是在线程退出时设置共享状态的标志。
如果与该 packaged_task 共享状态相关联的 future 对象在 future::get 处等待,则当前的 future::get 调用会被阻塞,直到线程退出。而一旦线程退出,future::get 调用继续执行,或者抛出异常。
注意,该函数已经设置了 promise 共享状态的值,如果在线程结束之前有其他设置或者修改共享状态的值的操作,则会抛出 future_error( promise_already_satisfied )。
std::packaged_task::reset()
重置 packaged_task 的共享状态,但是保留之前的被包装的任务。请看例子,该例子中,packaged_task 被重用了多次:
#include <iostream> // std::cout #include <utility> // std::move #include <future> // std::packaged_task, std::future #include <thread> // std::thread // a simple task: int triple(int x) { return x * 3; } int main() { std::packaged_task<int(int)> tsk(triple); // package task std::future<int> fut = tsk.get_future(); std::thread(std::move(tsk), 100).detach(); std::cout << "The triple of 100 is " << fut.get() << ".\n"; // re-use same task object: tsk.reset(); fut = tsk.get_future(); std::thread(std::move(tsk), 200).detach(); std::cout << "Thre triple of 200 is " << fut.get() << ".\n"; return 0; }
C++11 中引入 std::async。它是一个函数模板,它接受一个回调(即函数或函数对象)作为参数并可能异步执行它们,回调函数同样也可以是仿函数或者Lambda。
template <class Fn, class... Args>
future<typename result_of<Fn(Args...)>::type> async(launch policy, Fn &&fn, Args &&...args);
std::async 返回一个 std::future<T>
,它存储由 std::async() 执行的函数对象返回的值。函数期望的参数可以作为函数指针参数之后的参数传递给 std::async()。
可以使用 3 个不同的启动策略创建 std::async,即:
std::launch::async
它保证异步行为,即传递的函数将在单独的线程中执行。
std::launch::deferred
非异步行为,即当其他线程将来调用 get() 以访问共享状态时,将调用函数。
std::launch::async | std::launch::deferred
它的默认行为。使用此启动策略,它可以异步运行或不同步运行,具体取决于系统负载。但我们无法控制它。
如果我们不指定启动策略。它的行为类似于 std::launch::async | std::launch::deferred。
如下通过一个例子来理解 std::async:
假设我们必须从 DB 和文件系统中的文件中获取一些数据(字符串)。然后我需要合并两个字符串并打印。在单个线程中,我们将这样做:
#include <iostream> #include <string> #include <chrono> #include <thread> using namespace std::chrono; std::string fetchDataFromDB(std::string recvdData) { std::this_thread::sleep_for(seconds(5)); return "DB_" + recvdData; } std::string fetchDataFromFile(std::string recvdData) { std::this_thread::sleep_for(seconds(5)); return "File_" + recvdData; } int main() { system_clock::time_point start = system_clock::now(); std::string dbData = fetchDataFromDB("Data"); std::string fileData = fetchDataFromFile("Data"); auto end = system_clock::now(); auto diff = duration_cast<std::chrono::seconds>(end - start).count(); std::cout << "Total Time Taken = " << diff << " Seconds" << std::endl; std::string data = dbData + " :: " + fileData; std::cout << "Data = " << data << std::endl; return 0; }
运行结果如下:
$ ./test
Total Time Taken = 10 Seconds
Data = DB_Data :: File_Data
由于 fetchDataFromDB() 和 fetchDataFromFile() 两个函数都需要 5 秒并且在单个线程中运行,因此消耗的总时间为 10 秒。
现在从数据库和文件中获取数据是相互独立的,也很耗时。所以,我们可以并行运行它们。
一种方法是创建一个新线程,将 promise 作为参数传递给线程函数,并在调用线程时从关联的 std::future 对象中获取数据。
另一种简单的方法是使用 std::async。
现在使用函数指针作为回调调用 std::async:
std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");
std::string dbData = resultFromDB.get();
std::async() 做了以下事情,
• 它会自动为我们创建一个线程(或从内部线程池中选择)和一个 promise 对象。
• 然后将 std::promise 对象传递给线程函数并返回关联的 std::future 对象。
• 当我们传递的参数函数退出时,它的值将在这个 promise 对象中设置,因此最终返回值将在 std::future 对象中可用。
现在改变上面的例子并使用 std::async 从数据库异步读取数据,如下示例:
#include <iostream> #include <string> #include <chrono> #include <thread> #include <future> using namespace std::chrono; std::string fetchDataFromDB(std::string recvdData) { std::this_thread::sleep_for(seconds(5)); return "DB_" + recvdData; } std::string fetchDataFromFile(std::string recvdData) { std::this_thread::sleep_for(seconds(5)); return "File_" + recvdData; } int main() { system_clock::time_point start = system_clock::now(); std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data"); std::string fileData = fetchDataFromFile("Data"); std::string dbData = resultFromDB.get(); auto end = system_clock::now(); auto diff = duration_cast<std::chrono::seconds>(end - start).count(); std::cout << "Total Time Taken = " << diff << " Seconds" << std::endl; std::string data = dbData + " :: " + fileData; std::cout << "Data = " << data << std::endl; return 0; }
输出结果如下:
$ ./test
Total Time Taken = 5 Seconds
Data = DB_Data :: File_Data
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。