当前位置:   article > 正文

C++11 std::thread线程库

c++11 std::thread

翻译原文:https://thispointer.com/c11-tutorial/

C++11 线程库介绍

C++11 中新增了多线程 std::thread,相比之前使用较多的是操作系统提供的 POSIX 线程接口,新标准引入了线程库无疑带来了许多便利。
要使用 C++11 多线程,首先 gcc 编译器版本需要大于4.8,并且编译时,需要加上参数 -std=c++11 -lpthread,可见,C++11 的线程是对 POSIX 线程的封装。

C++11 线程创建

在每个 C++ 应用程序中,都有一个默认的主线程,即 main 函数。在 C++11 中,可以通过创建 std::thread 类的对象来创建额外的线程。每个 std::thread 对象都可以与一个线程相关联。
需要引入的头文件:

#include <thread>
  • 1

std::thread 对象构造的时候接受什么参数?
构造时需要给 std::thread 对象一个回调函数,该回调函数将在新线程启动时执行。回调函数可以是如下类型函数:

  1. 函数指针
  2. 函数对象
  3. Lambda 函数
  4. 类的成员函数

线程对象的创建如下:

std::thread t(cb);
  • 1

新的线程将在创建新对象之后立即启动,并执行传入的回调函数。
此外,任何线程都可以调用该线程对象的 join() 函数来等待该线程退出。
下面示例中,主线程创建了一个新线程,创建这个新线程后,主线程会在控制台打印一些数据,然后等待新创建的线程退出。使用了三种不同的回调函数创建线程。
使用函数指针创建线程

#include <iostream>
#include <thread>

void thread_fun()
{
    for (int i = 0; i < 5; i++)
    {
        std::cout << "child thread output:" << i << std::endl;
    }
}

int main()
{
    std::thread t1(thread_fun);
    for (int i = 0; i < 5; i++)
    {
        std::cout << "main thread output:" << i << std::endl;
    }
    t1.join();
    std::cout << "main thread exit." << std::endl;
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

使用函数对象创建线程

#include <iostream>
#include <thread>
#include <functional>

class MyPrint
{
public:
    void print()
    {
        for (int i = 0; i < 5; i++)
        {
            std::cout << "child thread output:" << i << std::endl;
        }
    }
};

int main()
{
    MyPrint mp;
    std::function<void(void)> func(std::bind(&MyPrint::print, &mp));
    std::thread t1(func);
    for (int i = 0; i < 5; i++)
    {
        std::cout << "main thread output:" << i << std::endl;
    }
    t1.join();
    std::cout << "main thread exit." << std::endl;
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

使用 Lambda 函数创建线程

#include <iostream>
#include <thread>

int main()
{
    auto func = []()
    {
        for (int i = 0; i < 5; i++)
        {
            std::cout << "child thread output:" << i << std::endl;
        }
    };

    std::thread t1(func);
    for (int i = 0; i < 5; i++)
    {
        std::cout << "main thread output:" << i << std::endl;
    }
    t1.join();
    std::cout << "main thread exit." << std::endl;
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

使用类的成员函数创建线程

#include <iostream>
#include <thread>

class MyPrint
{
public:
    MyPrint() {}
    MyPrint(const MyPrint& obj) {}
    void print()
    {
        for (int i = 0; i < 5; i++)
        {
            std::cout << "child thread output:" << i << std::endl;
        }
    }
};

int main()
{
    MyPrint mp;
    std::thread t1(&MyPrint::print, &mp);

    t1.join();
    std::cout << "main thread exit." << std::endl;
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

上面的示例运行结果如下:

$ ./test 
main thread output:0
main thread output:1
main thread output:2
main thread output:3
main thread output:4
child thread output:0
child thread output:1
child thread output:2
child thread output:3
child thread output:4
main thread exit.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

带参数的线程函数

线程函数参数传值

上面的示例的线程执行函数都是没有参数的,如果线程的执行函数是有参数的,在创建线程对象时,可以在构造函数中附加多个参数。如下实例:

#include <iostream>
#include <string>
#include <thread>

void thread_func(int id, std::string name)
{
    std::cout << id << ":" << name << std::endl;
}

int main()
{
    std::thread t1(thread_func, 1, "yf");
    t1.join();
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

运行结果:

$ ./test 
1:yf
  • 1
  • 2

那么问题来了,线程函数参数是两个,如果传大于两个参数,或者小于两个参数运行结果是什么?
对于上例而言,传大于两个参数,只会取前两个参数,如果前两个参数合法(不合法报错)。传小于两个参数也会报错。

线程函数参数传递指针

注意:不要将变量的地址传递给线程的回调函数,因为线程 1 中该局部变量可能已经出了作用域,而线程 2 仍尝试通过其传入的地址访问并操作它,有可能会导致以外的行为。如下程序:

#include <iostream>
#include <thread>

void newThreadCallback(int* p)
{
    std::cout << "Inside Thread:p = " << p << std::endl;
    std::chrono::milliseconds dura(1000);
    std::this_thread::sleep_for(dura);
    *p = 20;
    std::cout << "Inside Thread:p = " << p << std::endl;
}

void startNewThread()
{
    int i = 10;
    std::cout << "Inside Main Thread: &i = " << &i << std::endl;
    std::thread t(newThreadCallback, &i);
    t.detach();
    std::cout << "Inside Main Thread: i = " << i << std::endl;
    std::cout << "Range quit." << i << std::endl;
}

int main()
{
    startNewThread();
    std::chrono::milliseconds dura(3000);
    std::this_thread::sleep_for(dura);
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

同样,将指向位于堆上的内存的指针传递给线城时也要小心。因为新的线程在去访问它之前,某些线程可能会删除该内存,这种情况将导致程序崩溃掉。

线程函数参数传递引用

由于参数会被复制到新线程的堆栈,因此如果你要以引用的方式传递参数,需要对其进行检查,如下示例:

#include <iostream>
#include <thread>

void threadCallback(int const& x)
{
    int& y = const_cast<int&>(x);
    y++;
    std::cout << "Child Thread x=" << x << std::endl;
}

int main()
{
    int x = 9;
    std::cout << "Main Thread(begin) x=" << x << std::endl;
    std::thread t1(threadCallback, x);
    t1.join();
    std::cout << "Main Thread(after) x=" << x << std::endl;
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

运行结果:

$ ./test 
Main Thread(begin) x=9
Child Thread x=10
Main Thread(after) x=9
  • 1
  • 2
  • 3
  • 4

即使线程函数 threadCallback 接收引用参数,并且对参数进行了更改,但是在线程外是不可见的。这是因为线程函数 threadCallback 中的 x 是对新线程堆栈中复制的临时值的引用。
如何解决这个问题?使用 std::ref() ,更改程序如下:

void threadCallback(int const& x)
{
    int& y = const_cast<int&>(x);
    y++;
    std::cout << "Child Thread x=" << x << std::endl;
}

int main()
{
    int x = 9;
    std::cout << "Main Thread(begin) x=" << x << std::endl;
    std::thread t1(threadCallback, std::ref(x));
    t1.join();
    std::cout << "Main Thread(after) x=" << x << std::endl;
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

现在运行结果就正确了:

$ ./test 
Main Thread(begin) x=9
Child Thread x=10
Main Thread(after) x=10
  • 1
  • 2
  • 3
  • 4

获取线程ID

每个线程都有一个唯一 ID 与之关联,我们可以使用这个 ID 来识别线程。

this_thread::get_id(); //获取当前线程id
t1.get_id();	//获取指定线程id
  • 1
  • 2

看下面示例:

#include <iostream>
#include <thread>

void thread_fun()
{
    std::cout << "Child Thread ID:" << std::this_thread::get_id() << std::endl;
}

int main()
{
    std::cout << "Main Thread ID:" << std::this_thread::get_id() << std::endl;
    std::thread t1(thread_fun);
    std::cout << "Child Thread ID:" << t1.get_id() << std::endl;
    t1.join();
    std::cout << "main thread exit." << std::endl;
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

输出结果:

$ ./test 
Main Thread ID:140688050960192
Child Thread ID:140688033273600
Child Thread ID:140688033273600
main thread exit.
  • 1
  • 2
  • 3
  • 4
  • 5

线程等待(join)和分离(detach)

线程等待

一个新的线程启动,另一个线程就可以等待这个新线程完成。只需要调用新线程对象的 join() 函数即可。

std::thread th(thread_fun);
//...
th.join();
  • 1
  • 2
  • 3

看一个例子,假设主线程必须启动 10 个工作线程,在启动这些线程后,主函数等待它们完成,然后继续向下执行。

#include <iostream>
#include <thread>
#include <vector>
#include <algorithm>
#include <functional>

void thread_fun()
{
    std::cout << "Child Thread ID:" << std::this_thread::get_id() << std::endl;
}

int main()
{
    std::vector<std::thread> threadList;
    for (int i = 0; i < 10; i++)
    {
        threadList.push_back(std::thread(thread_fun));
    }
    std::cout << "Wait All Child Tread Finish." << std::endl;
    std::for_each(threadList.begin(), threadList.end(), std::mem_fn(&std::thread::join));
    std::cout << "main thread exit." << std::endl;
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

运行结果:

$ ./test 
Wait All Child Tread Finish.
Child Thread ID:139788647843584
Child Thread ID:139788656236288
Child Thread ID:139788639450880
Child Thread ID:139788664628992
Child Thread ID:139788631058176
Child Thread ID:139788622665472
Child Thread ID:139788673021696
Child Thread ID:139788681414400
Child Thread ID:139788689807104
Child Thread ID:139788698199808
main thread exit.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

线程分离

分离的线程也成为守护线程或后台线程。将一个线程分离,只需要调用当前线程对象的 detach() 函数即可。

std::thread t(thread_fun);
t.detach();
  • 1
  • 2

调用 detach() 后,std::thread 对象不再与实际运行的线程相关联。如下实例:

void thread_fun()
{
    std::cout << "Child Thread ID:" << std::this_thread::get_id() << std::endl;
}
int main()
{
    std::thread t1(thread_fun);
    t1.detach();
    std::cout << t1.get_id() << std::endl;
    t1.join();
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

运行结果如下:

$ ./test 
thread::id of a non-executing thread
Child Thread ID:139700035708672
terminate called after throwing an instance of 'std::system_error' what():  Invalid argument
Aborted
  • 1
  • 2
  • 3
  • 4
  • 5

创建了线程 t1 后,调用了线程的 detach 函数后,线程进行了分离,所以主线程不能再调用线程 t1 的方法,因为主线程和 t1 线程之间已经没有了联系。
注意1:永远不要在没有关联的线程的 std::thread 对象上调用 join() 或者 detach()。
当在线程上调用 join() 函数时,当此 join() 返回时,该 std::thread 对象没有与之关联的线程,如果再次在此类对象上调用 join() 函数,则会导致要终止的程序。

std::thread t1(thread_fun);
t1.join();
t1.join(); //程序将终止运行
  • 1
  • 2
  • 3

类似地,调用 detach() 会使 std::thread 对象不与任何线程函数链接。在这种情况下,在 std::thread 对象上调用 detach() 函数两次将导致程序终止。

std::thread t1(thread_fun);
t1.detach();
t1.detach(); //程序将终止运行
  • 1
  • 2
  • 3

因此,在调用 join() 或 detach() 之前,我们应该每次都检查线程对象是否可以连接,即调用线程对象的 joinable() 函数,如下所示:

std::thread t1(thread_fun);
t1.detach();
if (t1.joinable())
{
    t1.detach();
}

if (t1.joinable())
{
    t1.join();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

注意2:不要忘记在有关联的执行线程的 std::thread 对象上调用 join 或 detach。
如果没有调用,程序运行结束时,该线程对象调用析构函数时会终止程序。当 thread 对象消亡时,如果 thread 是 joinable 的,析构函数会调用 terminate(),terminate() 会调用 abort(),abort() 是非正常结束进程,不进行任何清理工作,直接终止程序,其典型实现是输出标准错误流(即cerr使用的错误流)。如下程序:

void thread_fun()
{
    std::cout << "Child Thread ID:" << std::this_thread::get_id() << std::endl;
}

int main()
{
    std::thread t1(thread_fun);
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

运行结果:

$ ./test 
Main Thread Exit.
terminate called without an active exception
Aborted (core dumped)
  • 1
  • 2
  • 3
  • 4

同样,我们不应该忘记在异常情况下调用 join() 或 detach()。为了防止,可以使用 C++ 的 RAII 机制:

#include <iostream>
#include <thread>
using namespace std;

class ThreadRAII
{
public:
    ThreadRAII(std::thread& thd) : m_thread(thd) {}
    ~ThreadRAII()
    {
        if (m_thread.joinable())
        {
            m_thread.detach();
        }
    }

private:
    std::thread& m_thread;
};

void thread_func()
{
    std::cout << "Child Thread ID:" << std::this_thread::get_id() << std::endl;
}

int main()
{
    std::thread t1(thread_func);
    ThreadRAII wrapthd(t1);
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

thread::joinable

thread::joinable 是 C++ std::thread 中的内置函数。它是一个观察器函数,表示它观察状态,然后返回相应的输出并检查线程对象是否可连接。
如果线程对象标识着执行中的活动线程,则称该线程对象是可连接的。
在以下情况下,线程不可联接:
• 它是默认构造的。
• 如果其成员函数 join 或 detach 中的任何一个。
• 通过移动构造获得的。
示例如下:

#include <chrono> 
#include <iostream> 
#include <thread> 
using namespace std;

void threadFunc()
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

int main()
{
    std::thread t1;
    cout << "t1 joinable when default created.\n";
    
    if (t1.joinable())
        cout << "YES\n";
    else
        cout << "NO\n";
        
    t1 = std::thread(threadFunc);
    cout << "t1 joinable when put to sleep.\n";

    if (t1.joinable())
        cout << "YES\n";
    else
        cout << "NO\n";
        
    t1.join();
    cout << "t1 joinable after join is called.\n";
    
    if (t1.joinable())
        cout << "YES\n";
    else
        cout << "NO\n";
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

运行结果:

$ ./test 
t1 joinable when default created.
NO
t1 joinable when put to sleep.
YES
t1 joinable after join is called.
NO
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

线程的构造函数

std::thread 定义了四个构造函数:
• 默认构造函数,创建一个空的 std::thread 执行对象。
• 初始化构造函数,创建一个 std::thread 对象,该std::thread 对象可被 joinable,新产生的线程会调用 fn 函数,该函数的参数由 args 给出。
• 拷贝构造函数(被禁用),意味着 std::thread 对象不可拷贝构造,因为线程无法拷贝。
• 移动构造函数,调用成功之后 x 不代表任何 std::thread 执行对象。

void f1(int n)
{
    for (int i = 0; i < 5; ++i)
    {
        std::cout << "Thread " << n << " executing" << i << "\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

void f2(int &n)
{
    for (int i = 0; i < 5; ++i)
    {
        std::cout << "Thread 2 executing" << i << "\n";
        ++n;
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

int main()
{
    int n = 0;
    std::thread t1;                  // 没有执行线程
    std::thread t2(f1, n + 1);       // 执行线程,运行f1
    std::thread t3(f2, std::ref(n)); // 执行线程,运行f2
    std::thread t4(std::move(t3));   // f2在t4中运行,t3不再是线程

    t2.join();
    t4.join();

    std::cout << "Final value of n is " << n << '\n';
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

线程执行了 move 操作,如果当前对象不可 joinable,需要传递一个右值引用 rhs 给 move 操作,如果当前对象可被 joinable,则会调用 terminate() 报错。

线程的其他成员函数

除了上面介绍的 get_id、join、detach、joinable,线程还有如下的成员函数。
swap
交换两个线程对象所代表的底层句柄。
native_handle
返回本地句柄(由于 std::thread 的实现和操作系统相关,因此该函数返回与 std::thread 具体实现相关的线程句柄),通过这个句柄就能用对应操作系统的线程相关接口了。
std::this_thread 命名空间中相关辅助函数
在上面的介绍中已经有用到了std::this_thread命名空间的部分函数,std::this_thread是作用于当前运行的线程。
get_id
获取当前的线程 ID,上面的有实例已经展示过了。
yield
当前线程放弃执行,操作系统调度另一线程继续执行,有点类似于 sleep,不过 yield 会将自己抢到的时间片让给其他线程,而 sleep 只是等待。该方法的具体行为取决于实现,尤其是正在使用的操作系统调度器的机制以及系统的状态。比如,一个先进先出的实时调度器(Linux中的SCHED_FIFO)将会使当前线程暂停,并将其置于同优先级线程队列的末尾(如果同优先级线程队列里没有其他线程,yield 就没有效果了。)
sleep_until
线程休眠至某个指定的时刻(time point),该线程才被重新唤醒。
sleep_for
线程休眠某个指定的时间片(time span),该线程才被重新唤醒,不过由于线程调度等原因,实际休眠时间可能比 sleep_duration 所表示的时间片更长。
如下操作:

#include <iostream>
#include <thread>
#include <chrono>
#include <ctime>

int main()
{
    std::time_t tt = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());

    struct std::tm *ptm = std::localtime(&tt);
    std::cout << "Current time: " << tt << '\n';

    std::this_thread::sleep_for(std::chrono::milliseconds(100));

    tt = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
    ptm = std::localtime(&tt);
    std::cout << "Current time: " << tt << '\n'; // 等待了100ms

    std::cout << "Waiting for the next minute to begin...\n";
    ptm->tm_min += 1;
    ptm->tm_sec = 0;
    std::this_thread::sleep_until(std::chrono::system_clock::from_time_t(mktime(ptm)));

    std::cout << ptm << " reached!\n";

    tt = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
    std::cout << "Current time: " << tt << '\n'; // 等到了下一分钟

    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

线程 swap

调用 std::swap 线程,交换两个线程对象所代表的底层线程句柄,其结果和线程对象的 swap 操作一样。通过下面的代码可以看到,thread 对象的句柄会被交换。

#include <iostream>
#include <thread>
#include <chrono>

void foo()
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

void bar()
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

int main()
{
    std::thread t1(foo);
    std::thread t2(bar);

    std::cout << "thread 1 id: " << t1.get_id() << std::endl;
    std::cout << "thread 2 id: " << t2.get_id() << std::endl;

    std::swap(t1, t2);

    std::cout << "after std::swap(t1, t2):" << std::endl;
    std::cout << "thread 1 id: " << t1.get_id() << std::endl;
    std::cout << "thread 2 id: " << t2.get_id() << std::endl;

    t1.swap(t2);

    std::cout << "after t1.swap(t2):" << std::endl;
    std::cout << "thread 1 id: " << t1.get_id() << std::endl;
    std::cout << "thread 2 id: " << t2.get_id() << std::endl;

    t1.join();
    t2.join();

    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

运行结果:

$ ./test 
thread 1 id: 140357567055616
thread 2 id: 140357558601472
after std::swap(t1, t2):
thread 1 id: 140357558601472
thread 2 id: 140357567055616
after t1.swap(t2):
thread 1 id: 140357567055616
thread 2 id: 140357558601472
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

线程间数据共享与竞争

竟态条件(Race Condition)

在多线程环境中,线程之间的数据共享非常容易。但是这种简单的数据共享可能会导致应用程序出现问题。一个这样的问题是 Race Condition (竞争条件)。
竞争条件是当两个或多个线程并行执行一组操作时,操作了相同的内存,其中的一个或多个线程会修改该内存位置中的数据,有时这可能会导致意外结果。。竞争条件不一定会每次都发生。
如下实例:
有一个 Wallet 类,它的一个方法是 addMoney(),功能是按指定的顺序递增货币。创建五个线程,所有这些线程共享相同的 Wallet 类对象,并行的调用 addMoney 成员函数增加 1000 个货币。如果最初钱包是 0,那么所有线程完成后,钱包里的钱应该是 5000。但是,由于所有线程都在同时修改共享数据,因此在某些情况下,最终钱包中的钱可能会远小于 5000。看一下代码:

#include <iostream>
#include <thread>
#include <vector>
using namespace std;

class Wallet
{
public:
    Wallet() : mMoney(0) {}
    int getMoney() { return mMoney; }
    void addMoney(int money)
    {
        for (int i = 0; i < money; ++i)
        {
            mMoney++;
        }
    }

private:
    int mMoney;
};

int testMultithreadedWallet()
{
    Wallet walletObject;
    std::vector<std::thread> threads;
    for (int i = 0; i < 5; ++i)
    {
        threads.push_back(std::thread(&Wallet::addMoney, &walletObject, 1000));
    }
    for (int i = 0; i < threads.size(); i++)
    {
        threads.at(i).join();
    }
    return walletObject.getMoney();
}
int main()
{
    int val = 0;
    for (int k = 0; k < 10; k++)
    {
        if ((val = testMultithreadedWallet()) != 5000)
        {
            std::cout << "Error at count = " << k << " Money in Wallet = " << val << std::endl;
        }
    }
    return 0;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

运行结果:

$ ./test 
Error at count = 1 Money in Wallet = 4424
Error at count = 4 Money in Wallet = 4503
  • 1
  • 2
  • 3

可以看到,10 次中,有两次发在多线程操作发生了的 race condition,使得最终的结果小于 5000。是多个线程试图同时修改相同的内存发生的意外结果。
为什么会这样?
因为每个线程并行增加相同的 mMoney 成员变量,虽然看起来是一行,但是 mMoney++ 实际上转换成了三个机器指令:

  1. 在寄存器中加载 mMoney 变量值
  2. 增加寄存器的值
  3. 用寄存器的值更新变量 mMoney

如果发生了如下情况:

在这里插入图片描述
一个自增运算将被忽略,因为不是将 mMoney 递增两次,而是不同的寄存器递增并且 mMoney 变量的值被覆盖。

互斥锁(std::mutex)

为了解决多线程竞争问题,需要使用 Lock 机制,即每个线程在修改或读取共享数据之前需要获取一个锁,并且在修改数据之后每个线程都应该解锁。
C++11 线程库中,互斥锁位于 <mutex> 头文件中。表示互斥锁的类型是 std::mutex 类。
互斥锁的主要方法:
构造函数:std::mutex 不允许拷贝构造,也不允许 move 拷贝,最初产生的 mutex 对象是处于unlocked 状态的。
lock():调用线程将锁住该互斥量,线程调用该函数会发生以下 3 种情况:

  1. 如果该互斥量当前没有被锁住,则调用线程将该互斥量锁住,直到调用 unlock之前,该线程一直拥有该锁。
  2. 如果当前互斥量被其他线程锁住,则当前的调用线程被阻塞住。
  3. 如果当前互斥量被当前调用线程锁住,则会产生死锁,,也就是说同一个线程中不允许锁两次。

unlock():解锁,释放对互斥量的所有权。
try_lock():尝试锁住互斥量,如果互斥量被其他线程占有,则当前线程也不会被阻塞,线程调用该函数会出现下面3种情况:

  1. 如果当前互斥量没有被其他线程占有,则该线程锁住互斥量,直到该线程调用 unlock 释放互斥量。
  2. 如果当前互斥量被其他线程锁住,则当前调用线程返回 false,而并不会被阻塞掉。
  3. 如果当前互斥量被当前调用线程锁住,则会产生死锁。

我们可以使用互斥锁保户共享数据同时只能被一个线程修改。看如下代码:

class Wallet
{
public:
    Wallet() : mMoney(0) {}
    int getMoney() { return mMoney; }
    void addMoney(int money)
    {
        mtx.lock();
        for (int i = 0; i < money; ++i)
        {
            mMoney++;
        }
        mtx.unlock();
    }

private:
    int mMoney;
    std::mutex mtx;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

我们修改了 Wallet 类,添加了 mtx 互斥锁变量,保证了每次只有一个线程修改钱包的钱。
那么又有一个问题:如果我们忘了在函数结束时解锁互斥锁呢?这种情况下,一个线程退出而不释放锁,其他线程将会一直等待,为了避免这种情况,应该使用 std::lock_guard,将在下文中介绍。
其他的互斥锁:
std::recursive_mutex:与 std::mutex 类似,但是它能够进行多次 lock,这样能够规避一些死锁问题。

int counter = 0;
std::recursive_mutex mtx;

void func2()
{
    mtx.lock();
    counter++;
    mtx.unlock();
}

void func1()
{
    mtx.lock();
    func2();
    counter++;
    mtx.unlock();
}

int main()
{
    std::thread t(func1);
    t.join();
    std::cout << counter << std::endl; // 2
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

如上面的代码所示,有时候会在两个函数中分别对数据进行 lock,如果在一个函数中又调用了另一个函数,此时如果使用 std::mutex 将会死锁,而用 std::recursive_mutex 则不会。看起来 std::recursive_mutex 很不错,但是使用的时候也需要多注意,lock 和 unlock 的数量必须相等,否则会出错。另外还有性能的问题,std::recursive_mutex 的性能会比较差一些,从下面的例子中可以看出来,性能上要差了 1 倍左右。
std::time_mutex:定时 mutex 类,可以锁定一定的时间。
std::recursive_timed_mutex:定时的可多次lock的 mutex 类。
这两种互斥量类型和其不带 time 的相比,多了两个成员函数:

  1. try_lock_for():函数参数表示一个时间范围,在这一段时间范围之内线程如果没有获得锁则保持阻塞;如果在此期间其他线程释放了锁,则该线程可获得该互斥锁;如果超时(指定时间范围内没有获得锁),则函数调用返回false。
  2. try_lock_until():函数参数表示一个时刻,在这一时刻之前线程如果没有获得锁则保持阻塞;如果在此时刻前其他线程释放了锁,则该线程可获得该互斥锁;如果超过指定时刻没有获得锁,则函数调用返回false。
    首先来看看 try_lock_for 的用法,下面的例子可以看出,try_lock_for 等待指定时间后没有获取到锁,会返回false。
#include <iostream> // std::cout
#include <chrono>   // std::chrono::milliseconds
#include <thread>   // std::thread
#include <mutex>    // std::timed_mutex

std::timed_mutex mtx;

void fireworks(int n)
{
    // 为这个锁等待200ms
    while (!mtx.try_lock_for(std::chrono::milliseconds(200)))
    {
        std::string out = std::to_string(n);
        out += "wait\n";
        std::cout << out;
    }

    // 获取锁后等待700ms再解锁
    std::this_thread::sleep_for(std::chrono::milliseconds(700));
    std::cout << "end" << std::endl;
    mtx.unlock();
}

int main()
{
    std::thread threads[3];
    for (int i = 0; i < 3; ++i)
    {
        threads[i] = std::thread(fireworks, i);
        // 为了保证线程按照顺序开始,保证输出一致
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }

    for (auto &th : threads)
    {
        th.join();
    }

    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

运行结果:

$ ./test 
1wait
2wait
1wait
2wait
1wait
2wait
end
2wait
2wait
2wait
end
end
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

try_lock_until 的作用类似,不同的是传入参数是时间点:

void fireworks(int n)
{
    auto now = std::chrono::steady_clock::now();
    while (!mtx.try_lock_until(now + std::chrono::milliseconds(200)))
    {
        std::string out = std::to_string(n);
        out += "wait\n";
        std::cout << out;
        now = std::chrono::steady_clock::now();
    }

    std::this_thread::sleep_for(std::chrono::milliseconds(700));
    std::cout << "end" << std::endl;
    mtx.unlock();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

std::lock_guard

lock_guard 是一个互斥量包装程序,它提供了一种方便的 RAII(Resource acquisition is initialization )风格的机制来在作用域块的持续时间内拥有一个互斥量。
创建 lock_guard 对象时,它将尝试获取提供给它的互斥锁的所有权。当控制流离开 lock_guard 对象的作用域时,lock_guard 析构并释放互斥量。
它的特点如下:
• 创建即加锁,作用域结束自动析构并解锁,无需手工解锁
• 不能中途解锁,必须等作用域结束才解锁
• 不能复制
实例代码如下:

class Wallet
{
public:
    Wallet() : mMoney(0) {}
    int getMoney() { return mMoney; }
    void addMoney(int money)
    {
        std::lock_guard<std::mutex> lockGuard(mutex);
        for (int i = 0; i < money; ++i)
        {
            mMoney++;
        }
    }

private:
    int mMoney;
    std::mutex mtx;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

std::unique_lock

unique_lock 是一个通用的互斥量锁定包装器,它允许延迟锁定,限时深度锁定,递归锁定,锁定所有权的转移以及与条件变量一起使用等。
简单地讲,unique_lock 是 lock_guard 的升级加强版,它具有 lock_guard 的所有功能,同时又具有其他很多方法,使用起来更强灵活方便,能够应对更复杂的锁定需要,但是效率上肯定要差一点,同时内存占用也会多一点。
特点如下:
• 创建时可以不锁定(通过指定第二个参数为std::defer_lock),而在需要时再锁定
• 可以随时加锁解锁
• 作用域规则同 lock_grard,析构时自动释放锁
• 不可复制,可移动
• 条件变量需要该类型的锁作为参数(此时必须使用unique_lock)
std::unique_lock 的第二个参数可以设置为如下几种:

  1. std::adopt_lock unique_lock 也可以带 std::adopt_lock 标记,和 lock_guard 含义相同,就是不希望再 unique_lock() 的构造函数中 lock 这个 mutex。用 std::adopt_lock 的前提是,自己需要先把 mutex lock 上;用法与 lock_guard 相同。
  2. std::try_to_lock 尝试用 mutex 的 lock() 去锁定这个 mutex,但如果没有锁定成功,也会立即返回,并不会阻塞在那里。用这个 try_to_lock 的前提是你不能先 lock。
  3. std::defer_lock 用 std::defer_lock 的前提是,你不能先 lock,否则会报异常,std::defer_lock 的意思就是并没有给 mutex 加锁:初始化了一个没有加锁的mutex。

示例代码:

#include <mutex>
#include <thread>
#include <chrono>

struct Box
{
    explicit Box(int num) : num_things{num} {}
    int num_things;
    std::mutex m;
};

void transfer(Box &from, Box &to, int num)
{
    std::unique_lock<std::mutex> lock1(from.m, std::defer_lock);
    std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);
    std::lock(lock1, lock2); //支持多个锁的锁定,并且能避免死锁
    from.num_things -= num;
    to.num_things += num;
}

int main()
{
    Box acc1(100);
    Box acc2(50);

    std::thread t1(transfer, std::ref(acc1), std::ref(acc2), 10);
    std::thread t2(transfer, std::ref(acc2), std::ref(acc1), 5);

    t1.join();
    t2.join();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

std::unique_lock 还有以下成员函数:
try_lock() 尝试给互斥量加锁,如果拿不到锁,返回 false,如果拿到了锁,返回 true,这个函数是不阻塞的。
release() 返回它所管理的 mutex 对象指针,并释放所有权;也就是说,这个 unique_lock 和 mutex 不再有关系。严格区分 unlock() 与 release() 的区别,不要混淆。
如果原来mutex对像处于加锁状态,你有责任接管过来并负责解锁。(release 返回的是原始 mutex 的指针),如下代码:

void inMsgRecvQueue()
{
    for (int i = 0; i < 10000; i++)
    {
        std::unique_lock<std::mutex> sbguard(my_mutex);
        std::mutex *ptx = sbguard.release(); //现在你有责任自己解锁了
        msgRecvQueue.push_back(i);
        ptx->unlock(); //自己负责mutex的unlock了
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

unique_lock 所有权转移
sbguard 拥有 my_mutex 的所有权;sbguard 可以把自己对 mutex(my_mutex) 的所有权转移给其他的 unique_lock 对象;所以 unique_lock 对象这个 mutex 的所有权是可以转移,但是不能复制。

std::unique_lock<std::mutex> sbguard1(my_mutex);
std::unique_lock<std::mutex> sbguard2(sbguard1); //此句是非法的,复制所有权是非法的
//可以使用 std::move
std::unique_lock<std::mutex> sbguard2(std::move(sbguard));//移动语义,现在先当与sbguard2与my_mutex绑定到一起了

//返回 std::unique_lock 对象
std::unique_lock<std::mutex> rtn_unique_lock()
{
    std::unique_lock<std::mutex> tmpguard(my_mutex);
    return tmpguard; //从函数中返回一个局部的unique_lock对象是可以的。会调用移动构造函数。
}

void inMsgRecvQueue()
{
    for (int i = 0; i < 10000; i++)
    {
        std::unique_lock<std::mutex> sbguard1 = rtn_unique_lock();

        msgRecvQueue.push_back(i);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

条件变量(std::condition_variable)

在生产者-消费者模型中,需要用到条件变量,生产者向产品队列中生产,消费者从产品队列中取出产品消费。在产品队列中没有产品时,消费者线程将会阻塞,直到生产者线程生产产品放入队列,此时条件满足便通知消费者线程,消费者线程从产品队列中取出消费。
而 C++11 也加入了条件变量,条件变量类似于两个线程之间发送信号的事件,一个线程等待它要得到的信号,另一个线程可以发出信号通知。
条件变量所需要的头文件是:

#include <condition_variable>
  • 1

条件变量需要和互斥锁配合,和 POSIX 线程的原理一样。条件变量的工作原理:

  1. 线程 1 再条件变量上调用 wait,该变量在内部获取互斥锁并检查是否满足条件。
  2. 如果不满足条件,则释放该互斥锁并阻塞等待条件变量收到信号,wait 函数以原子的方式执行这两步操作,原子的方式就是两步要么都执行,否则都不执行。
  3. 另一个线程 2 在满足条件时向条件变量发出信号。
  4. 一旦条件变量得到信号,等待它的线程 1 就会恢复。然后再次获取互斥锁并检查与条件变量关联的条件是否满足或者是上级调用。如果多个线程都在等待一个条件,则 notify_one 将只会解除一个线程的阻塞。
  5. 如果是上级调用,则它再次调用 wait 函数。

std::condition_variable 的主要成员函数
wait()
使当前线程阻塞,直到条件变量得到信号或虚假唤醒发生。它原子地释放附加的互斥锁,阻塞当前线程,并将其添加到等待当前条件变量对象的线程列表中。当某个线程在同一个条件变量对象上调用 notify_one() 或 notify_all() 时,线程将被解除阻塞。它也可能被虚假解锁,因此每次解锁后都需要再次检查条件。当线程被解锁时,wait() 函数重新获取互斥锁并检查实际条件是否满足。如果不满足条件,则再次自动释放附加的互斥锁,阻塞当前线程,并将其添加到等待当前条件变量对象的线程列表中。
notify_one()
如果任何线程正在等待相同的条件变量对象,则 notify_one 解除阻塞等待线程之一。
notify_all()
如果有任何线程正在等待相同的条件变量对象,则 notify_all 会解除所有等待线程的阻塞。

实例如下:
线程 1 的职责是:

  1. 与服务器执行一些握手。
  2. 等待线程 2 从 XML 加载数据
  3. 对从 XML 加载的数据进行处理。

线程 2 的职责是:

  1. 从 XML 加载数据
  2. 通知另一个线程,即等待消息。

使用条件变量实现此目的的代码如下,

#include <iostream>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>
using namespace std::placeholders;

class Application
{
public:
    Application()
    {
        m_bDataLoaded = false;
    }
    
    void loadData()
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        std::cout << "Loading Data from XML" << std::endl;
        std::lock_guard<std::mutex> guard(m_mutex);
        m_bDataLoaded = true;
        m_condVar.notify_one();
    }
    
    bool isDataLoaded()
    {
        return m_bDataLoaded;
    }
    
    void mainTask()
    {
        std::cout << "Do Some Handshaking" << std::endl;
        std::unique_lock<std::mutex> mlock(m_mutex);
        m_condVar.wait(mlock, std::bind(&Application::isDataLoaded, this));
        std::cout << "Do Processing On loaded Data" << std::endl;
    }

private:
    std::mutex m_mutex;
    std::condition_variable m_condVar;
    bool m_bDataLoaded;
};
int main()
{
    Application app;
    std::thread thread_1(&Application::mainTask, &app);
    std::thread thread_2(&Application::loadData, &app);
    thread_2.join();
    thread_1.join();
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

std::condition_variable_any
其实现也包含在 <condition_variable> 头文件的声明中。和 std::condition_variable 一样都需要一个互斥量,后者只接受 std::mutex,但 std::condition_variable_any 更加通用,可以和任何满足最低标准的互斥量一起工作,同时这增加了开销。

操作锁的相关函数

std 中有一些操作锁的函数,如:std::try_lock、std::lock、std::call_once等
std::try_lock
支持尝试对多个互斥量进行锁定,尝试锁定成功返回 -1,否则返回锁定失败的互斥量的位置,例如第一个锁定失败返回 0、第二个失败返回 1。

int main()
{
    std::mutex mtx1;
    std::mutex mtx2;

    if (-1 == std::try_lock(mtx1, mtx2))
    {
        std::cout << "locked" << std::endl;
        mtx1.unlock();
        mtx2.unlock();
    }

    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

std::lock
支持对多个锁锁定,并且避免死锁的出现,以下代码运行时有可能出现死锁的情况:

void func(std::mutex *mtx1, std::mutex *mtx2, int index)
{
    std::lock_guard<std::mutex> lock1(std::adopt_lock);
    std::lock_guard<std::mutex> lock2(std::adopt_lock);

    std::cout << index << "out\n";
}

int main()
{
    std::mutex mtx1;
    std::mutex mtx2;
    // 两个线程的互斥量锁定顺序不同,可能造成死锁
    std::thread t1(func, &mtx1, &mtx2, 1);
    std::thread t2(func, &mtx2, &mtx1, 2);
    t1.join();
    t2.join();

    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

但是使用 std::lock 能避免多个锁出现死锁:

void func(std::mutex *mtx1, std::mutex *mtx2, int index)
{
    std::lock(*mtx1, *mtx2); // 同时锁定
    // std::adopt_lock作用是声明互斥量已在本线程锁定,std::lock_guard只是保证互斥量在作用域结束时被释放
    std::lock_guard<std::mutex> lock1(*mtx1, std::adopt_lock);
    std::lock_guard<std::mutex> lock2(*mtx2, std::adopt_lock);

    // 等价方法
    // std::unique_lock<std::mutex> lock1(from.m, std::defer_lock);
    // std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);
    // std::lock(lock1, lock2);

    std::cout << index << "out\n";
}

int main()
{
    std::mutex mtx1;
    std::mutex mtx2;

    std::thread t1(func, &mtx1, &mtx2, 1);
    std::thread t2(func, &mtx2, &mtx1, 2);

    t1.join();
    t2.join();

    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

std::call_once
即使在多线程的情况下,也只执行一次指定的可调用对象(可以是函数、成员函数、函数对象、lambda函数),需要通过配合 std::once_flag 实现。具体的细节如下:

  1. 若在调用 call_once 的时刻, flag 指示已经调用了 f 指定的可调用对象,则 call_once 立即返回,就是说不再执行可调用对象。
  2. 否则,call_once 会调用指定的可调用对象。若该调用对象抛异常,则传播异常给 call_once 的调用方,并且不翻转 flag ,让下一次调用仍然执行。若该调用正常返回,则翻转 flag ,并保证同一 flag不在执行可调用对象。
#include <iostream>
#include <mutex>
#include <thread>

std::once_flag flag1, flag2;

void simple_do_once()
{
    std::call_once(flag1, []()
                   { std::cout << "Simple example: called once\n"; });
}

void may_throw_function(bool do_throw)
{
    if (do_throw)
    {
        std::cout << "throw: call_once will retry\n"; // this may appear more than once
        throw std::exception();
    }
    std::cout << "Didn't throw, call_once will not attempt again\n"; // guaranteed once
}

void do_once(bool do_throw)
{
    try
    {
        std::call_once(flag2, may_throw_function, do_throw);
    }
    catch (...)
    {
    }
}

int main()
{
    std::thread st1(simple_do_once);
    std::thread st2(simple_do_once);
    std::thread st3(simple_do_once);
    std::thread st4(simple_do_once);
    st1.join();
    st2.join();
    st3.join();
    st4.join();

    std::thread t5(do_once, true);
    std::thread t6(do_once, true);
    std::thread t7(do_once, false);
    std::thread t8(do_once, true);
    t5.join();
    t6.join();
    t7.join();
    t8.join();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

运行结果:

Simple example: called once
throw: call_once will retry
throw: call_once will retry
Didn't throw, call_once will not attempt again
  • 1
  • 2
  • 3
  • 4

线程返回值

std::future 和 std::promise

std::future 对象可以与 asych、std::packaged_task 和 std::promise 一起使用。很多时候我们会遇到希望线程返回结果的情况。现在的问题是如何做到这一点?
举个例子:假设在我们的应用程序中,我们创建了一个线程来压缩给定的文件夹,并且我们希望该线程返回新的 zip 文件名及其大小。
现在要做到这一点,我们有两种方法:
首先是使用指针在线程之间共享数据
传递一个指向新线程的指针,新线程在其中设置数据。在主线程中使用条件变量继续等待。当新线程设置数据并通知条件变量时,主线程将唤醒并从该指针获取数据。
为了做一件简单的事情,我们使用了一个条件变量、一个互斥锁和一个指针,即 3 个变量协助来捕获返回值。现在假设我们希望这个线程在不同的时间点返回 3 个不同的值,那么问题将变得更加复杂。是否有一个简单的解决方案来从线程返回值。
答案是肯定的,使用 std::future,请查看它的下一个解决方案。
C++11 方式:使用 std::future 和 std::promise
std::future 是一个模板类,它的对象存储未来值。实际上,std::future 对象内部存储了一个将在未来分配的值,它还提供了一种访问该值的机制,即使用 get() 成员函数。但是如果有人试图在它可用之前通过 get() 函数访问这个关联的 future 值,那么 get() 函数将阻塞直到值不可用。
std::promise 也是一个类模板,它的对象承诺在未来设置该值。每个 std::promise 对象都有一个关联的 std::future 对象,该对象将给出由 std::promise 对象设置的值。
一个标准::承诺对象共享数据,与其相关联的std ::未来的对象。
一步一步来看,
在 Thread1 中创建一个 std::promise 对象。

std::promise<int> promiseObj;
  • 1

到目前为止,这个 promise 对象没有任何关联的值。但它承诺有人肯定会在其中设置值,一旦设置好,您就可以通过关联的 std::future 对象获取该值。
但是现在假设线程 1 创建了这个 promise 对象并将其传递给线程 2 对象。现在线程 1 如何知道线程 2 何时将在这个 promise 对象中设置值?答案是使用 std::future 对象。
每个 std::promise 对象都有一个关联的 std::future 对象,其他人可以通过它获取 promise 设置的值。因此,线程 1 将创建 std::promise 对象,然后在将 std::promise 对象传递给线程 2 之前从中获取 std::future 对象,即:

std::future<int> futureObj = promiseObj.get_future();
  • 1

现在线程 1 将 promiseObj 传递给线程 2。
然后线程 1 将通过 std::future 的 get 函数获取线程 2 在 std::promise 中设置的值:

int val = futureObj.get();
  • 1

但是如果线程 2 还没有设置值,那么这个调用将被阻塞,直到线程 2 在 promise 对象中设置值:

promiseObj.set_value(45);
  • 1

如下流程:
在这里插入图片描述
来看一个完整的 std::future 和 std::promise 示例:

#include <iostream>
#include <thread>
#include <future>

void initiazer(std::promise<int> *promObj)
{
    std::cout << "Inside Thread" << std::endl;
    promObj->set_value(35);
}

int main()
{
    std::promise<int> promiseObj;
    std::future<int> futureObj = promiseObj.get_future();
    std::thread th(initiazer, &promiseObj);
    std::cout << futureObj.get() << std::endl;
    th.join();
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

如果 std::promise 对象在设置值之前被销毁,则关联 std::future 对象上的调用 get() 函数将抛出异常。
其中的一部分,如果你希望线程在不同的时间点返回多个值,那么只需在线程中传递多个 std::promise 对象并从关联的多个 std::future 对象中获取多个返回值。

std::packaged_task

我们介绍了std::promise的使用方法,其实 std::packaged_task 和 std::promise 非常相似,简单来说std::packaged_task 是对 std::promise<T= std::function> 中 T= std::function 这一可调对象(如函数、lambda表达式等)进行了包装,简化了使用方法。并将这一可调对象的返回结果传递给关联的std::future对象。
比如,使用 std::promise 的如下程序:

//声明一个可调对象T
using T = std::function<int(int)>; //等同于typedef std::function<int(int)> T;

//函数
int Test_Fun(int iVal)
{
    std::cout << "Value is:" << iVal << std::endl;
    return iVal + 232;
}

//声明一个std::promise对象pr1,其保存的值类型为int
std::promise<T> pr1;
//声明一个std::future对象fu1,并通过std::promise的get_future()函数与pr1绑定
std::future<T> fu1 = pr1.get_future();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

使用 std::packaged_task 就简化了很多:

//函数
int Test_Fun(int iVal)
{
    std::cout << "Value is:" << iVal << std::endl;
    return iVal + 232;
}
 
//声明一个std::packaged_task对象pt1,包装函数Test_Fun
std::packaged_task<int(int)> pt1(Test_Fun);
//声明一个std::future对象,包装Test_Fun的返回结果,并与pt1关联
std::future<int> fu1 = pt1.get_future();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

注意:使用std::packaged_task关联的std::future对象保存的数据类型是可调对象的返回结果类型,如示例函数的返回结果类型是int,那么声明为std::future,而不是std::future<int(int)>。
std::packaged_task::valid
检查当前 packaged_task 是否和一个有效的共享状态相关联,对于由默认构造函数生成的 packaged_task 对象,该函数返回 false,除非中间进行了 move 赋值操作或者 swap 操作。

#include <iostream> // std::cout
#include <utility>  // std::move
#include <future>   // std::packaged_task, std::future
#include <thread>   // std::thread

// 在新线程中启动一个 int(int) packaged_task.
std::future<int> launcher(std::packaged_task<int(int)> &tsk, int arg)
{
    if (tsk.valid())
    {
        std::future<int> ret = tsk.get_future();
        std::thread(std::move(tsk), arg).detach();
        return ret;
    }
    else
        return std::future<int>();
}

int main()
{
    std::packaged_task<int(int)> tsk([](int x)
                                     { return x * 2; });

    std::future<int> fut = launcher(tsk, 25);
    std::cout << "The double of 25 is " << fut.get() << ".\n";
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

返回一个与 packaged_task 对象共享状态相关的 future 对象。返回的 future 对象可以获得由另外一个线程在该 packaged_task 对象的共享状态上设置的某个值或者异常。
std::packaged_task::make_ready_at_thread_exit
该函数会调用被包装的任务,并向任务传递参数,类似 std::packaged_task 的 operator() 成员函数。但是与 operator() 函数不同的是,make_ready_at_thread_exit 并不会立即设置共享状态的标志为 ready,而是在线程退出时设置共享状态的标志。
如果与该 packaged_task 共享状态相关联的 future 对象在 future::get 处等待,则当前的 future::get 调用会被阻塞,直到线程退出。而一旦线程退出,future::get 调用继续执行,或者抛出异常。
注意,该函数已经设置了 promise 共享状态的值,如果在线程结束之前有其他设置或者修改共享状态的值的操作,则会抛出 future_error( promise_already_satisfied )。
std::packaged_task::reset()
重置 packaged_task 的共享状态,但是保留之前的被包装的任务。请看例子,该例子中,packaged_task 被重用了多次:

#include <iostream> // std::cout
#include <utility>  // std::move
#include <future>   // std::packaged_task, std::future
#include <thread>   // std::thread

// a simple task:
int triple(int x) { return x * 3; }

int main()
{
    std::packaged_task<int(int)> tsk(triple); // package task

    std::future<int> fut = tsk.get_future();
    std::thread(std::move(tsk), 100).detach();
    std::cout << "The triple of 100 is " << fut.get() << ".\n";

    // re-use same task object:
    tsk.reset();
    fut = tsk.get_future();
    std::thread(std::move(tsk), 200).detach();
    std::cout << "Thre triple of 200 is " << fut.get() << ".\n";

    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

std::async 异步

C++11 中引入 std::async。它是一个函数模板,它接受一个回调(即函数或函数对象)作为参数并可能异步执行它们,回调函数同样也可以是仿函数或者Lambda。

template <class Fn, class... Args>
future<typename result_of<Fn(Args...)>::type> async(launch policy, Fn &&fn, Args &&...args);
  • 1
  • 2

std::async 返回一个 std::future<T>,它存储由 std::async() 执行的函数对象返回的值。函数期望的参数可以作为函数指针参数之后的参数传递给 std::async()。
可以使用 3 个不同的启动策略创建 std::async,即:
std::launch::async 它保证异步行为,即传递的函数将在单独的线程中执行。
std::launch::deferred 非异步行为,即当其他线程将来调用 get() 以访问共享状态时,将调用函数。
std::launch::async | std::launch::deferred 它的默认行为。使用此启动策略,它可以异步运行或不同步运行,具体取决于系统负载。但我们无法控制它。
如果我们不指定启动策略。它的行为类似于 std::launch::async | std::launch::deferred。
如下通过一个例子来理解 std::async:
假设我们必须从 DB 和文件系统中的文件中获取一些数据(字符串)。然后我需要合并两个字符串并打印。在单个线程中,我们将这样做:

#include <iostream>
#include <string>
#include <chrono>
#include <thread>

using namespace std::chrono;

std::string fetchDataFromDB(std::string recvdData)
{
    std::this_thread::sleep_for(seconds(5));
    return "DB_" + recvdData;
}

std::string fetchDataFromFile(std::string recvdData)
{
    std::this_thread::sleep_for(seconds(5));
    return "File_" + recvdData;
}

int main()
{
    system_clock::time_point start = system_clock::now();
    std::string dbData = fetchDataFromDB("Data");
    std::string fileData = fetchDataFromFile("Data");
    auto end = system_clock::now();
    auto diff = duration_cast<std::chrono::seconds>(end - start).count();
    std::cout << "Total Time Taken = " << diff << " Seconds" << std::endl;
    std::string data = dbData + " :: " + fileData;
    std::cout << "Data = " << data << std::endl;
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

运行结果如下:

$ ./test 
Total Time Taken = 10 Seconds
Data = DB_Data :: File_Data
  • 1
  • 2
  • 3

由于 fetchDataFromDB() 和 fetchDataFromFile() 两个函数都需要 5 秒并且在单个线程中运行,因此消耗的总时间为 10 秒。
现在从数据库和文件中获取数据是相互独立的,也很耗时。所以,我们可以并行运行它们。
一种方法是创建一个新线程,将 promise 作为参数传递给线程函数,并在调用线程时从关联的 std::future 对象中获取数据。
另一种简单的方法是使用 std::async。
现在使用函数指针作为回调调用 std::async:

std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");
std::string dbData = resultFromDB.get();
  • 1
  • 2

std::async() 做了以下事情,
• 它会自动为我们创建一个线程(或从内部线程池中选择)和一个 promise 对象。
• 然后将 std::promise 对象传递给线程函数并返回关联的 std::future 对象。
• 当我们传递的参数函数退出时,它的值将在这个 promise 对象中设置,因此最终返回值将在 std::future 对象中可用。
现在改变上面的例子并使用 std::async 从数据库异步读取数据,如下示例:

#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include <future>
using namespace std::chrono;

std::string fetchDataFromDB(std::string recvdData)
{
    std::this_thread::sleep_for(seconds(5));
    return "DB_" + recvdData;
}

std::string fetchDataFromFile(std::string recvdData)
{
    std::this_thread::sleep_for(seconds(5));
    return "File_" + recvdData;
}

int main()
{
    system_clock::time_point start = system_clock::now();
    std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");
    std::string fileData = fetchDataFromFile("Data");
    std::string dbData = resultFromDB.get();
    auto end = system_clock::now();
    auto diff = duration_cast<std::chrono::seconds>(end - start).count();
    std::cout << "Total Time Taken = " << diff << " Seconds" << std::endl;
    std::string data = dbData + " :: " + fileData;
    std::cout << "Data = " << data << std::endl;
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

输出结果如下:

$ ./test 
Total Time Taken = 5 Seconds
Data = DB_Data :: File_Data
  • 1
  • 2
  • 3
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/1008994
推荐阅读
相关标签
  

闽ICP备14008679号