赞
踩
用了很久的C的线程POSIX接口来管理多线程,一直觉得C++11对于线程的封装就是多此一举,但是还是抵挡不了真香定律,就像当初刚开始嫌弃STL后来写代码离不开一样。
当然使用POSIX接口尽可能的将所有可控制的细节展现在用户面前,但是有句话说的好,水满则溢,暴露过多的细节会导致开发者不能将中心放到编码工作上,而是浪费大量的时间在学习接口特性上。
如下是使用POSIX对线程进行管理:
// 创建线程
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
// 清理线程资源
int pthread_join(pthread_t thread, void **retval);
// 线程分离
int pthread_detach(pthread_t thread);
// 以及众多的pthread_attr_*函数还有pthread_cancle_*等
但是在C++11中,线程管理只是暴露一个线程管理类std::thread,该类只是暴露了一些必要的接口用来创建和销毁线程
std::thread outThread([]()->int32_t{
std::cout << "Out put string " << std::endl;
return 0;
});
// 输出当前系统核心数
std::cout << std::thread::hardware_concurrency() << std::endl;
// 获取线程id
std::cout << outThread.get_id() << std::endl;
if (outThread.joinable()) {
outThread.join();
}
从对比来看无论从入参来看还是从使用方便成都C++11线程管理类都更加的轻松,当然处理上述lambda的方式启用线程,还可以使用如下方式启用启用线程
// 1. 使用全局函数启用
void do_some_work();
std::thread my_thread(do_some_work());
// 2. 使用仿函数启用线程
class ThreadWork {
public:
void operator() () const {
do_something();
do_otherthing();
}
};
ThreadWork threadWork;
std::thread my_thread(threadWork);
std::thread my_thread(threadWork());
std::thread my_thread({threadWork()});
通常情况下我们都会在主线程中通过join()函数来等待线程的结束并回收资源,但是有时没有回收的线程时,需要及时将线程分离出去,注意分离出去的线程中能够使用的变量都要保证是非局部的,否则线程会一直在后台运行,如果引用的变量生命周期到了会导致oops。
如果想分离线程可以使用my_thread.detach()接口来实现。
注意一个线程在结束之前必须保证已经调用了join或者detach负责会导致异常发生
当你启用一个线程之后,原先线程可能因为抛出异常而退出,这时你可以通过在catch异常时将线程退出来保证程序的正常执行
void do_something() {
}
void func() {
std::thread my_thread(do_something);
try {
do_something_in_current_thread();
} catch (...) {
// 在抛出异常时也同时销毁线程
my_thread.join();
// 这里拿到异常之后,处理完自己的异常情况之后,可以将异常继续向上层传递
throw;
}
my_thread.join();
}
如果嫌每次创建线程太过麻烦可以使用RAIL风格来管理线程
class ThreadGuard {
std::thread& t;
public:
explicit ThreadGuard(std::thread& t_) : t(t_) {}
~ThreadGuard() {
if (t.joinable()) {
t.join();
}
}
ThreadGuard(const ThreadGuard&) = delete;
ThreadGuard&operator=(ThreadGuard const &) = delete;
};
void do_something() {
}
void func() {
std::thread my_thread(do_something);
// 将当前需要销毁的线程添加到ThreadGuard中
// 这样在当前函数结束时就会自动释放并销毁对应的线程
// 就算在do_something_in_current_thread()函数中有异常抛出,线程也能在函数结束时进行释放
ThreadGuard threadGuard(my_thread);
do_something_in_current_thread();
}
注:如果要达到按引用传参的效果,可使用std::ref来传递。
通常的线程启动过程
void f(int i,std::string const& s);
std::thread t(f,3,”hello”);
其他线程启动方式
// 只是定义一个线程对象,并没有启用线程
std::thread t1;
// 启用一个线程,按值传递参数
std::thread t2(f1, n + 1);
// 启用一个参数,按照引用传递参数
std::thread t3(f2, std::ref(n));
// 启用一个线程,不过线程是将线程3的转移过来的,并不是新创建一个线程
// 执行之后t3就不在是一个线程了
std::thread t4(std::move(t3)); // t4 is now running f2(). t3 is no longer a thread
C++中将类函数作为线程函数
// 将C++类作为线程函数时,需要取出函数的偏移地址&Driver::Process,并给出对象的指针
// 只有这样在实际调用的时候才能调用到指定对象的函数,除此之外需要在紧随其后给出成员函数
// 需要的参数,不过这些参数都是按值传递的,如果需要按引用传递需要使用std::ref()明确的指出
class Driver {
public:
void Process(int32_t id, const std::string& name) {
std::cout << id << ":" << name << std::endl;
}
void Run() {
myThread = std::thread(&Driver::Process, this, 3, "Driver::Process");
}
~Driver() {
if (myThread.joinable()) {
myThread.join();
}
}
private:
std::thread myThread;
};
使用lambda将成员函数作为线程函数使用
class Driver {
public:
void Process(int32_t id, const std::string& name) {
std::cout << id << ":" << name << std::endl;
}
void Run() {
// [] 中是传递给lambda的环境变量,这里传递this之后lambda里面就可以直接免去this调用
// this指向对象的成员函数了
myThread = std::thread([this] { Process(3, "Driver::Process"); });
}
~Driver() {
if (myThread.joinable()) {
myThread.join();
}
}
private:
std::thread myThread;
};
当然可以使用std::bind启用带参的函数,但是相比较Lambda,还是优先推荐使用lambda来启用线程
class Driver {
public:
void Process(int32_t id, const std::string& name) {
std::cout << id << ":" << name << std::endl;
}
void Run() {
myThread = std::thread(std::bind(&Driver::Process, this, 3, "Driver::Process"));
}
~Driver() {
if (myThread.joinable()) {
myThread.join();
}
}
private:
std::thread myThread;
};
使用以上启用线程的知识我们可以设计一个并发计算的程序
该算法首先根据当前系统中核心个数,以及数据的个数来确定启动线程的个数,确定线程个数之后将数据平均分配给每个线程之后将各个线程计算的结果进行汇总
#include <numeric> // std::accumulate
template<typename Iterator, typename T>
struct accumulate_block {
void operator()(Iterator first, Iterator last, T &result) {
result = std::accumulate(first, last, result);
}
};
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
// std::distance 用来计算两个迭代器中包含的个数,就是容器中元素的个数
// [first, last)
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread;
// 获取核心数目,用来确定能够启用的线程个数
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads - 1);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
// 将对应迭代器或者指针向前走对应步数
std::advance(block_end, block_size);
threads[i] = std::thread(
accumulate_block<Iterator, T>(),
block_start, block_end, std::ref(results[i]));
block_start = block_end;
}
// 最后一段距离不能和上述保持统一,这里单独在另外一个线程中进行计算
accumulate_block<Iterator, T>()(
block_start, last, results[num_threads - 1]);
for (auto &entry: threads)
entry.join();
// 将计算得出的结果进行累加
return std::accumulate(results.begin(), results.end(), init);
}
std::vector<int32_t> vec;
vec.reserve(100);
for (int32_t i = 0; i < 100; i++) {
vec.emplace_back(i);
}
int32_t init = 0;
std::cout << parallel_accumulate(std::begin(vec), std::end(vec), init) << std::endl;
// output 4950
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。