当前位置:   article > 正文

C++11并发编程-线程的启用与销毁_c++ 阻塞线程销毁

c++ 阻塞线程销毁

使用方式与POSIX线程接口对比

用了很久的C的线程POSIX接口来管理多线程,一直觉得C++11对于线程的封装就是多此一举,但是还是抵挡不了真香定律,就像当初刚开始嫌弃STL后来写代码离不开一样。

当然使用POSIX接口尽可能的将所有可控制的细节展现在用户面前,但是有句话说的好,水满则溢,暴露过多的细节会导致开发者不能将中心放到编码工作上,而是浪费大量的时间在学习接口特性上。

如下是使用POSIX对线程进行管理:

// 创建线程
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
                          void *(*start_routine) (void *), void *arg);
// 清理线程资源
int pthread_join(pthread_t thread, void **retval);
// 线程分离
int pthread_detach(pthread_t thread);
// 以及众多的pthread_attr_*函数还有pthread_cancle_*等
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

但是在C++11中,线程管理只是暴露一个线程管理类std::thread,该类只是暴露了一些必要的接口用来创建和销毁线程

std::thread outThread([]()->int32_t{
    std::cout << "Out put string " << std::endl;
    return 0;
});
// 输出当前系统核心数
std::cout << std::thread::hardware_concurrency() << std::endl;
// 获取线程id
std::cout << outThread.get_id() << std::endl;
if (outThread.joinable()) {
    outThread.join();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

从对比来看无论从入参来看还是从使用方便成都C++11线程管理类都更加的轻松,当然处理上述lambda的方式启用线程,还可以使用如下方式启用启用线程

// 1. 使用全局函数启用
void do_some_work();
std::thread my_thread(do_some_work());
// 2. 使用仿函数启用线程
class ThreadWork {
public:
    void operator() () const {
        do_something();
        do_otherthing();
    }
};
ThreadWork threadWork;
std::thread my_thread(threadWork);
std::thread my_thread(threadWork());
std::thread my_thread({threadWork()});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

等待线程结束

通常情况下我们都会在主线程中通过join()函数来等待线程的结束并回收资源,但是有时没有回收的线程时,需要及时将线程分离出去,注意分离出去的线程中能够使用的变量都要保证是非局部的,否则线程会一直在后台运行,如果引用的变量生命周期到了会导致oops。

如果想分离线程可以使用my_thread.detach()接口来实现。

注意一个线程在结束之前必须保证已经调用了join或者detach负责会导致异常发生

当你启用一个线程之后,原先线程可能因为抛出异常而退出,这时你可以通过在catch异常时将线程退出来保证程序的正常执行

void do_something() {

}
void func() {
    std::thread my_thread(do_something);

    try {
        do_something_in_current_thread();
    } catch (...) {
        // 在抛出异常时也同时销毁线程
        my_thread.join();
        // 这里拿到异常之后,处理完自己的异常情况之后,可以将异常继续向上层传递
        throw;
    }
    my_thread.join();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

如果嫌每次创建线程太过麻烦可以使用RAIL风格来管理线程

class ThreadGuard {
    std::thread& t;
public:
    explicit ThreadGuard(std::thread& t_) : t(t_) {}
    ~ThreadGuard() {
        if (t.joinable()) {
            t.join();
        }
    }
    ThreadGuard(const ThreadGuard&) = delete;
    ThreadGuard&operator=(ThreadGuard const &) = delete;
};

void do_something() {

}
void func() {
    std::thread my_thread(do_something);
    // 将当前需要销毁的线程添加到ThreadGuard中
    // 这样在当前函数结束时就会自动释放并销毁对应的线程
    // 就算在do_something_in_current_thread()函数中有异常抛出,线程也能在函数结束时进行释放
    ThreadGuard threadGuard(my_thread);
    do_something_in_current_thread();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

如何向线程函数传参

  • 第一次传参(向 std::thread 构造函数传参)

注:如果要达到按引用传参的效果,可使用std::ref来传递。

  • **第二次传参(向线程函数的传参):**由于std::thread对象里保存的是参数的副本,为了效率同时兼顾一些只移动类型的对象,所有的副本均被std::move到线程函数,即以右值的形式传入,所以最终传给线程函数参数的均为右值。

通常的线程启动过程

void f(int i,std::string const& s);
std::thread t(f,3,”hello”);
  • 1
  • 2

其他线程启动方式

// 只是定义一个线程对象,并没有启用线程
std::thread t1;
// 启用一个线程,按值传递参数
std::thread t2(f1, n + 1);
// 启用一个参数,按照引用传递参数 
std::thread t3(f2, std::ref(n));
// 启用一个线程,不过线程是将线程3的转移过来的,并不是新创建一个线程
// 执行之后t3就不在是一个线程了
std::thread t4(std::move(t3)); // t4 is now running f2(). t3 is no longer a thread
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

C++中将类函数作为线程函数

// 将C++类作为线程函数时,需要取出函数的偏移地址&Driver::Process,并给出对象的指针
// 只有这样在实际调用的时候才能调用到指定对象的函数,除此之外需要在紧随其后给出成员函数
// 需要的参数,不过这些参数都是按值传递的,如果需要按引用传递需要使用std::ref()明确的指出
class Driver {
public:
    void Process(int32_t id, const std::string& name) {
        std::cout << id << ":" << name << std::endl;
    }

    void Run() {
        myThread = std::thread(&Driver::Process, this, 3, "Driver::Process");
    }

    ~Driver() {
        if (myThread.joinable()) {
            myThread.join();
        }
    }
private:
    std::thread myThread;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

使用lambda将成员函数作为线程函数使用

class Driver {
public:
    void Process(int32_t id, const std::string& name) {
        std::cout << id << ":" << name << std::endl;
    }

    void Run() {
        // [] 中是传递给lambda的环境变量,这里传递this之后lambda里面就可以直接免去this调用
        // this指向对象的成员函数了
        myThread = std::thread([this] { Process(3, "Driver::Process"); });
    }

    ~Driver() {
        if (myThread.joinable()) {
            myThread.join();
        }
    }
private:
    std::thread myThread;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

当然可以使用std::bind启用带参的函数,但是相比较Lambda,还是优先推荐使用lambda来启用线程

class Driver {
public:
    void Process(int32_t id, const std::string& name) {
        std::cout << id << ":" << name << std::endl;
    }

    void Run() {
        myThread = std::thread(std::bind(&Driver::Process, this, 3, "Driver::Process"));
    }

    ~Driver() {
        if (myThread.joinable()) {
            myThread.join();
        }
    }
private:
    std::thread myThread;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

并发算法实现

使用以上启用线程的知识我们可以设计一个并发计算的程序

该算法首先根据当前系统中核心个数,以及数据的个数来确定启动线程的个数,确定线程个数之后将数据平均分配给每个线程之后将各个线程计算的结果进行汇总

#include <numeric> // std::accumulate

template<typename Iterator, typename T>
struct accumulate_block {
    void operator()(Iterator first, Iterator last, T &result) {
        result = std::accumulate(first, last, result);
    }
};
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
    // std::distance 用来计算两个迭代器中包含的个数,就是容器中元素的个数
    // [first, last)
    unsigned long const length = std::distance(first, last);
    if (!length)
        return init;
    unsigned long const min_per_thread = 25;
    unsigned long const max_threads =
            (length + min_per_thread - 1) / min_per_thread;
    // 获取核心数目,用来确定能够启用的线程个数
    unsigned long const hardware_threads =
            std::thread::hardware_concurrency();
    unsigned long const num_threads =
            std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);

    unsigned long const block_size = length / num_threads;
    std::vector<T> results(num_threads);
    std::vector<std::thread> threads(num_threads - 1);
    Iterator block_start = first;
    for (unsigned long i = 0; i < (num_threads - 1); ++i) {
        Iterator block_end = block_start;
        // 将对应迭代器或者指针向前走对应步数
        std::advance(block_end, block_size);
        threads[i] = std::thread(
                accumulate_block<Iterator, T>(),
                block_start, block_end, std::ref(results[i]));
        block_start = block_end;
    }
    // 最后一段距离不能和上述保持统一,这里单独在另外一个线程中进行计算
    accumulate_block<Iterator, T>()(
            block_start, last, results[num_threads - 1]);
    for (auto &entry: threads)
        entry.join();
    // 将计算得出的结果进行累加
    return std::accumulate(results.begin(), results.end(), init);
}

std::vector<int32_t> vec;
vec.reserve(100);
for (int32_t i = 0; i < 100; i++) {
    vec.emplace_back(i);
}
int32_t init = 0;
std::cout << parallel_accumulate(std::begin(vec), std::end(vec), init) << std::endl;

// output 4950
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/1014158
推荐阅读
相关标签
  

闽ICP备14008679号