当前位置:   article > 正文

C++11:线程 thread_c++11 thread

c++11 thread

1、std::thread

// 创建std::thread对象,新线程调用threadFun函数,函数的参数由 args 给出
template<class Fn, class... Args> 
explicit thread(Fn&& fn, Args&&... args);
  • 1
  • 2
  • 3

特点:不能复制控制,只能移动(成功后则不再表示 thread 对象)

thread t1(threadFun, std::ref(x)); // std::ref 表示引用传递
thread t2(std::move(t1)); // t1 线程失去所有权
//t1.join(); // error,t1不再是 thread 执行对象
t2.join(); //t3拥有控制权
  • 1
  • 2
  • 3
  • 4

注:std::ref 与 & 的区别

  • & 是类型说明符
  • std::ref 是一个函数,返回 std::reference_wrapper 类型,模拟引用传递(不是引用)。在函数式编程(如std::bind、std::thread)默认是对参数的直接拷贝。只有在模板自动推导类型时,ref 用包装类型 std::reference_wrapper 代替原来会被识别的值类型。

成员函数

  • get_id() 获取线程ID,返回类型 std::thread::id 对象
  • joinable() 判断线程是否可以加入等待。新创建的线程是 joinable,可被 joinable 的 thread 对象必须在销毁之前被主线程 join 或者将其设置为 detached。
  • join() 等该线程执行完成后才返回
  • detach() 线程解绑独立运行。调用后,目标线程驻留后台运行,成为了守护线程。与之相关的 std::thread 对象失去对目标线程的关联,无法再通过 std::thread 对象取得该线程的控制权。当线程主函数执行完之后,线程就结束了。运行时库负责清理与该线程相关的资源。
    • *this 不再代表任何的线程执行实例。
    • joinable() == false
    • get_id() == std::thread::id()
  • yield() 让出 cpu 时间片
  • 1sleep_until() 睡眠直到某个时间点
  • sleep_for() 睡眠某段时间

实例:线程封装

子类继承父类,实现具体的业务逻辑处理。

// thread.h
#ifndef THREAD_H
#define THREAD_H
#include <thread>

class Thread {
public:
	Thread(); // 构造函数
	virtual ~Thread(); // 析构函数
	bool start(); //启动线程
	void stop(); //停止线程
	bool isAlive() const; // 线程是否存活.
	std::thread::id id() { return pThread->get_id(); }
	std::thread* getThread() { return pThread; }
	void join();  // 等待当前线程结束, 不能在当前线程上调用
	void detach(); //能在当前线程上调用
	static size_t CURRENT_THREADID();
protected:
	void threadEntry(); // 线程创建的函数
	virtual void run() = 0; // 子线程实现具体的业务逻辑方法
protected:
	bool  _running; //是否在运行
	std::thread *pThread;
};

#endif 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
// thread.cc
#include "thread.h"
#include <sstream>
#include <iostream>
#include <exception>

Thread::Thread() 
: _running(false), pThread(NULL)
{}

Thread::~Thread(){
    // 调用析构函数的之前,子线程要么是join则触发detach
    // 此时是一个比较危险的动作,用户必须知道他在做什么
	if (pThread != NULL) {	
		if (pThread->joinable()) {
			std::cout << "~Thread detach\n";
			pThread->detach();
		}

		delete pThread;
		pThread = NULL;
	}
	std::cout << "~Thread()" << std::endl;
}

bool Thread::start() {
	// 已经在运行了
	if (_running) {
		return false;
	}
	try {
		// 创建线程并启动
		pThread = new std::thread(&Thread::threadEntry, this);
	}
	catch (...) {
		throw  "[ZERO_Thread::start] thread start error";
	}
	return true;
}

void Thread::stop() {
	_running = false;
}

bool Thread::isAlive() const {
	return _running;
}

void Thread::join() {
	if (pThread->joinable()) {
		pThread->join();
	}
}

void Thread::detach() {
	pThread->detach();
}

size_t Thread::CURRENT_THREADID() {
	// 声明为 thread_local 的本地变量在线程中是持续存在的。
	// 具有static变量一样的初始化特征和生命周期,即使它不被声明为static。
	static thread_local size_t threadId = 0;
	if (threadId == 0) {
		std::stringstream ss;
		ss << std::this_thread::get_id();
		threadId = strtol(ss.str().c_str(), NULL, 0);
	}
	return threadId;
}

// 创建新新线程时调用的threadFunc
void Thread::threadEntry() {
	_running = true;

	try {
		run();   // 函数运行所在 调用子类的run函数
	}
	catch (std::exception &ex) {
		_running = false;
		throw ex;
	}
	catch (...) {
		_running = false;
		throw;
	}
	_running = false;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
// main.cc
#include <iostream>
#include <chrono>
#include "thread.h"
using namespace std;

class A : public Thread {
public:
	void run() {
		while (_running) {
			cout << "print A " << endl;
			std::this_thread::sleep_for(std::chrono::seconds(5));
		}
		cout << "----- leave A " << endl;
	}
};

class B : public Thread
{
public:
	void run() {
		while (_running) {
			cout << "print B " << endl;
			std::this_thread::sleep_for(std::chrono::seconds(2));
		}
		cout << "----- leave B " << endl;
	}
};

int main(){
	{
		A a;
		a.start();
		B b;
		b.start();
		std::this_thread::sleep_for(std::chrono::seconds(5));
		a.stop(); // 否则线程的run方法不会停止,线程不会退出
		a.join(); 
		b.stop();
		b.join();  
	}
	cout << "Hello World!" << endl;
	system("pause");
	return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

2、互斥量

互斥锁的种类

  • std::mutex,独占互斥量,不能递归使用
  • std::time_mutex,带超时的独占互斥量,不能递归使用
  • std::recursive_mutex,递归互斥量,不带超时功能
  • std::recursive_timed_mutex,带超时的递归互斥量

std::mutex

特点:不允许拷贝构造,也不允许 move 拷贝,初始状态是 unlocked。

  • lock(),调用线程将锁住该互斥量。有锁 -> 其他(阻塞),自己(死锁 deadlock)
  • unlock(), 解锁,释放对互斥量的所有权。
  • try_lock(),尝试锁住互斥量。有锁 -> 其他(不阻塞),自己(死锁)

lock_guard 与 unique_lock

都能实现自动加锁和解锁(RAII类,自动释放资源),但 unique_lock 可以临时解锁和上锁。

std::lock_guard

  • 构造函数中进行加锁,析构函数中进行解锁。
  • look_guard 仅用于互斥。

std::unique_lock

  • unique_lock 是通用互斥包装器,允许延迟锁定、锁定的有时限尝试、递归锁定、所有权转移和与条件变量 (notify + wait) 一同使用。
  • 使用更加灵活,功能更加强大。可以临时解锁 unlock() 再上锁 lock(),而不必等到析构自动解锁。
  • 需要付出更多的时间、性能成本。
#include <iostream>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>

std::deque<int> q;
std::mutex mu;
std::condition_variable cond;
int count = 0;

void producer() {
	while (true) {
		{	// 离开作用域后自动析构
			std::unique_lock<std::mutex> locker(mu); // 不能替换成lock_guard
			std::cout << "fun1 lock" << std::endl;
			q.push_front(count++);
			//locker.unlock(); // 没必要
			cond.notify_one();  
		}
		std::this_thread::sleep_for(std::chrono::seconds(1));
	}
}

void consumer() {
	while (true) {
		std::unique_lock<std::mutex> locker(mu);
		std::cout << "fun2 lock" << std::endl;
		std::cout << "fun2 wait into" << std::endl;
		cond.wait(locker, []() {return !q.empty(); });
		std::cout << "fun2 wait leave" << std::endl;
		auto data = q.back();
		q.pop_back();
		// locker.unlock(); //没必要
		std::cout << "thread2 get value form thread1: " << data << std::endl;
	}
}

int main() {
	std::thread t1(producer);
	std::thread t2(consumer);
	t1.join();
	t2.join();

	return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

3、条件变量

条件变量 std::condition_variable:实现线程同步,即线程间需要按照预定的先后次序顺序进行的行为.

条件变量的使用

  • 拥有条件变量的线程获取互斥量
  • wait 循环检查某个条件,如果条件不满足则阻塞直到条件满足
  • 某个线程满足条件执行完之后调用 notify 唤醒等待线程。
wait 函数

必须使用 unique_lock 对象,需要临时上锁和解锁。

  • 上半部:1、条件变量上排队 -> 2、解锁 -> 3、阻塞
  • 下半部:1、被唤醒 notify -> 2、加锁(锁没被使用,加锁成功;锁正在使用,阻塞,直至锁的释放,加锁)-> 3、函数返回
// 唤醒后,加锁获取互斥量,继续执行
void wait (unique_lock<mutex>& lck); // unique_lock对象

// 唤醒后,加锁获取互斥量,判断pred,若为false,则解锁阻塞;若为true,则继续执行
template <class Predicate> 
void wait (unique_lock<mutex>& lck, Predicate pred); // Predicate 对象(等待条件)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
notify 函数
// 解锁正在等待当前条件的线程中的随机一个
void notify_one() noexcept;

// 解锁正在等待当前条件的所有线程
void notify_all() noexcept;
  • 1
  • 2
  • 3
  • 4
  • 5
例:生产者-消费者
// sync_queue.h
#ifndef SIMPLE_SYNC_QUEUE_H
#define SIMPLE_SYNC_QUEUE_H
#include <thread>
#include <condition_variable>
#include <mutex>
#include <list>
#include <iostream>

template<typename T>
class SimpleSyncQueue {
public:
	SimpleSyncQueue() {}

	void put(const T& x) {
		std::lock_guard<std::mutex> locker(_mutex);
		_queue.push_back(x);
		_notEmpty.notify_one();
	}

	void take(T& x) {
		std::unique_lock<std::mutex> locker(_mutex);
		_notEmpty.wait(locker, [this] {return !_queue.empty(); });  

		x = _queue.front();
		_queue.pop_front();
	}

	bool empty() {
		std::lock_guard<std::mutex> locker(_mutex);
		return _queue.empty();
	}

	size_t size() {
		std::lock_guard<std::mutex> locker(_mutex);
		return _queue.size();
	}

private:
	std::list<T> _queue;
	std::mutex _mutex;
	std::condition_variable _notEmpty;
};
#endif // SIMPLE_SYNC_QUEUE_H
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
// main.cc
#include <iostream>
#include <thread>
#include <iostream>
#include <mutex>

#include "sync_queue.h"

using namespace std;
SimpleSyncQueue<int> syncQueue;

void PutDatas() {
	for (int i = 0; i < 20; ++i) {
		syncQueue.put(i);
	}
}

void TakeDatas() {
	int x = 0;

	for (int i = 0; i < 20; ++i) {
		syncQueue.take(x);
		std::cout << x << std::endl;
	}
}

int main() {
	std::thread t1(PutDatas);
	std::thread t2(TakeDatas);

	t1.join();
	t2.join();

	std::cout << "main finish\n";
	system("pause");
	return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

4、原子变量

atomic 不能被中断的操作。store() 赋值, load() 读取。

内存顺序(memory_order)

枚举值定义规则
memory_order_relaxed不对执行顺序做任何保障
memory_order_acquire本线程中,所有后续的读操作均在本条原子操作完成后执行
memory_order_release本线程中,所有之前的写操作完成后才能执行本条原子操作
memory_order_acq_rel同时包含memory_order_acquirememory_order_release标记
memory_order_consume本线程中,所有后续的有关本原子类型的操作,必须在本条原子操作完成后执行
memory_order_seq_cst全部存取都按顺序执行
#include <iostream>  
#include <atomic>      
#include <thread>     

//std::atomic<int> count = 0; //error
std::atomic<int> count(0); // 准确初始化,atomic 线程安全

void set_count(int x) {
	std::cout << "set_count:" << x << std::endl;
	count.store(x, std::memory_order_relaxed); // set value atomically
}

void print_count() {
	int x;
	do {
		x = count.load(std::memory_order_relaxed);  // get value atomically
		std::cout << "--- wait ---" << std::endl;
	} while (x == 0);
	std::cout << "count: " << x << std::endl;
}

int main() {
	std::thread t1(print_count);
	std::thread t2(set_count, 10);
	t1.join();
	t2.join();
	std::cout << "main finish\n";

	return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

5、异步操作

5.1、std::future

future 期望,当前线程持有 future 时,期待从 future 获取到想要的结果和返回,可以把future当做异步函数的返回值。

一旦 future 就绪,就无法复位,代表的是一次性事件。

  • std::future: 异步指向某个任务,然后通过 future 去获取任务函数的返回结果。当需要返回值时,对 future 使用 get() 方法线程就会阻塞直到 future 就绪,然后返回该值。

  • std::aysnc 异步运行某个任务函数,返回一个 future 对象

    async(_Fty&& _Fnarg, _ArgTypes&&... _Args)
    
    • 1

future 的类型

  • std::future:仅有一个实例指向其关联事件,std::unique_ptr
  • std::shared_future:可以有多个实例指向同一个关联事件,std::shared_ptr
#include <iostream>
#include <future>
#include <thread>
using namespace std;

int add() {
	std::this_thread::sleep_for(std::chrono::seconds(5)); // 用来测试异步延迟
	std::cout << "find_result_to_add" << std::endl;
	return 1 + 1;
}

int add2(int a, int b) {
	std::this_thread::sleep_for(std::chrono::seconds(5)); // 用来测试异步延迟
	return a + b;
}

void do_other_things() {
	std::cout << "do_other_things" << std::endl;
}

int main() {
	// std::future<T>
	// std::future<int> result = std::async(find_result_to_add); 1、指定类型
    // 2、decltype,自动推导函数返回类型
	// std::future<decltype(add())> result = std::async(add); 
    
	auto result = std::async(add);  // 3、auto,推荐写法
	do_other_things(); // std::async异步线程运行,不阻塞主线程函数的
	std::cout << "result: " << result.get() << std::endl;  // get()阻塞

	// std::future<decltype(add2(int, int))> result2 = std::async(add2, 10, 20); //error,把参数值传递进来
	std::future<decltype (add2(0, 0))> result2 = std::async(add2, 10, 20);
	std::cout << "result2: " << result2.get() << std::endl;  // get()阻塞
	
	system("pause");
	return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

在这里插入图片描述

5.2、std::packaged_task

将任务和 future 绑定在一起的模板,是一种对任务的封装。可以调用get_future()方法获得 packaged_task 对象绑定的函数的返回值类型的 future。模板类型是函数签名

#include <iostream>
#include <future>
#include <thread>

using namespace std;

int add(int a, int b, int c) {
	std::cout << "call add\n";
	return a + b + c;
}

int main() {
	// 封装任务,待执行
	std::packaged_task<int(int, int, int)> task(add);  // 模板类型:函数签名
	std::future<int> result = task.get_future(); // 待执行,这里只是获取 future
	//任务开始执行!
	task(1, 1, 2); // 必须先执行任务,否则在get()获取future的值时会一直阻塞
	std::cout << "result:" << result.get() << std::endl;
	
	return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

5.3、std::promise

promise 承诺,当线程创建 promise 的同时创建一个 future,这个 promise 向线程承诺它必定会被人手动设置一个值,future 就是获取其返回的手段。两者配合,在线程间传递数据。

#include <future>
#include <string>
#include <thread>
#include <iostream>
using namespace std;

void print(std::promise<std::string>& p) {
	p.set_value("helloWorld"); // promise设置值,返回 future
}

void do_some_other_things() {
	std::cout << "do_some_other_things" << std::endl;
}

int main() {
	std::promise<std::string> promise; 

	std::future<std::string> result = promise.get_future(); // 获取待返回的future
	std::thread t(print, std::ref(promise));  // 线程设置,传引用 promise
	do_some_other_things();

	cout << "result " << result.get() << endl; // 在主线程等待 promise 的返回结果
	t.join();

	return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/153596
推荐阅读
相关标签
  

闽ICP备14008679号