赞
踩
std::promise 进程间通信,std::packaged_task 任务封装,std::async 任务异步执行;std::future 获取结果。
std::promise 可以用于线程间通信。
如下代码是 std::promise 中的示例代码。
std::promise - cppreference.com
(1)accumulate_promise 用于在线程间传递数据
(2)barrier 是 void 类型的,可以单纯地做线程之间的等待与唤醒
(3)std::fututre 是一个基本的组成元素,从名字也可以看出来,future 代表未来的结果。std::promise 只能使用一次,promise set_value 只能调用一次;std::future 调用 get() 也只能调用一次,如果再次调用 get(),那么会抛异常,在调用 get() 之前可以使用 valid() 来做判断,如果 valid() 是 true 的话,那么就可以调用 get();否则不能调用 get()。
- #include <chrono>
- #include <future>
- #include <iostream>
- #include <numeric>
- #include <thread>
- #include <vector>
- #include <unistd.h>
-
- void accumulate(std::vector<int>::iterator first,
- std::vector<int>::iterator last,
- std::promise<int> accumulate_promise)
- {
- int sum = std::accumulate(first, last, 0);
- accumulate_promise.set_value(sum); // Notify future
- }
-
- void do_work(std::promise<void> barrier)
- {
- std::cout << "before sleep\n";
- std::this_thread::sleep_for(std::chrono::seconds(1));
- barrier.set_value();
- std::cout << "after promise set value\n";
- }
-
- int main()
- {
- // Demonstrate using promise<int> to transmit a result between threads.
- std::vector<int> numbers = {1, 2, 3, 4, 5, 6};
- std::promise<int> accumulate_promise;
- std::future<int> accumulate_future = accumulate_promise.get_future();
- std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
- std::move(accumulate_promise));
-
- // future::get() will wait until the future has a valid result and retrieves it.
- // Calling wait() before get() is not needed
- // accumulate_future.wait(); // wait for result
- std::cout << "before get, future valid: " << accumulate_future.valid() << std::endl;
- std::cout << "result=" << accumulate_future.get() << '\n';
- std::cout << "after get, future valid: " << accumulate_future.valid() << std::endl;
- work_thread.join(); // wait for thread completion
-
- // Demonstrate using promise<void> to signal state between threads.
- std::promise<void> barrier;
- std::future<void> barrier_future = barrier.get_future();
- std::thread new_work_thread(do_work, std::move(barrier));
-
- std::cout << "before wait\n";
- barrier_future.wait();
- std::cout << "after wait\n";
- new_work_thread.join();
- return 0;
- }
条件变量是一个基础的线程间同步机制,在 c 和 c++ 中都有使用。条件变量经常用于生产者线程和消费者线程之间通信的场景。
如下是使用条件变量时经常使用的方式,第一个形参是 std::unique_lock<> 类型的锁,第二个形参是 wait() 返回的条件。
template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );
(1)条件满足之后,即使不进行 notify,wait() 也会返回
(2)条件不满足的时候,即使进行 notify(),wait() 也不会返回
这就是使用条件变量的时候最典型的用法,因为条件变量可能存在惊群问题(本人没有复现过),可能被唤醒的时候,条件还没有满足。这种用法就类似于在 c 语言使用条件变量的时候的如下代码。
while(!condition) {
wait();
}
- #include <condition_variable>
- #include <iostream>
- #include <mutex>
- #include <string>
- #include <thread>
- #include <unistd.h>
-
- std::mutex m;
- std::condition_variable cv;
- std::string data;
- bool ready = false;
- bool processed = false;
-
- void worker_thread()
- {
- // wait until main() sends data
- std::unique_lock<std::mutex> lk(m);
- cv.wait(lk, []{ return ready; });
-
- // after the wait, we own the lock
- std::cout << "Worker thread is processing data\n";
- data += " after processing";
-
- // send data back to main()
- processed = true;
- std::cout << "Worker thread signals data processing completed\n";
-
- // manual unlocking is done before notifying, to avoid waking up
- // the waiting thread only to block again (see notify_one for details)
- lk.unlock();
- cv.notify_one();
- }
-
- int main()
- {
- std::thread worker(worker_thread);
-
- data = "Example data";
- // send data to the worker thread
- {
- std::lock_guard<std::mutex> lk(m);
- sleep(2);
- std::cout << "after sleep\n";
- ready = true;
- std::cout << "after set value\n";
- std::cout << "main() signals data ready for processing\n";
- }
-
- std::cout << "before notify one\n";
- sleep(2);
- cv.notify_one();
- std::cout << "after notify one\n";
- // wait for the worker
- {
- std::unique_lock<std::mutex> lk(m);
- cv.wait(lk, []{ return processed; });
- }
- std::cout << "Back in main(), data = " << data << '\n';
- worker.join();
- }
pthread 中的条件变量,使用方式如下。也是需要一个 mutex 和条件变量在一块使用。
wait 端调用的函数:
pthread_mutex_lock(&mutex);
while (!condition) {
pthread_cond_wait(&cond, &mutex);
}
pthread_mutex_unlock(&mutex);唤醒端需要调用的函数:
pthread_mutex_lock(&mutex);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);在 wait 端,首先要加锁,然后进行 wait;在 wait 的时候,会释放锁,所以在 signal 端加锁是可以加成功的。
- #include <stdio.h>
- #include <pthread.h>
-
- #define MAX_COUNT 10
-
- int shared_resource = 0; // 全局共享资源
- pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
- pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 条件变量
-
- void *producer(void *arg) {
- while (1) {
- pthread_mutex_lock(&mutex);
- while (shared_resource >= MAX_COUNT) { // 当共享资源达到最大值时等待
- pthread_cond_wait(&cond, &mutex);
- }
- shared_resource++; // 增加共享资源值
- printf("Produced: %d\n", shared_resource);
- pthread_cond_signal(&cond); // 唤醒消费者线程
- pthread_mutex_unlock(&mutex);
- sleep(1); // 模拟生产过程
- }
- return NULL;
- }
-
- void *consumer(void *arg) {
- while (1) {
- pthread_mutex_lock(&mutex);
- while (shared_resource <= 0) { // 当共享资源为0时等待
- pthread_cond_wait(&cond, &mutex);
- }
- shared_resource--; // 减少共享资源值
- printf("Consumed: %d\n", shared_resource);
- pthread_cond_signal(&cond); // 唤醒生产者线程
- pthread_mutex_unlock(&mutex);
- sleep(1); // 模拟消费过程
- }
- return NULL;
- }
-
- int main() {
- pthread_t producer_thread, consumer_thread;
- // 创建生产者线程和消费者线程
- pthread_create(&producer_thread, NULL, producer, NULL);
- pthread_create(&consumer_thread, NULL, consumer, NULL);
- // 等待线程结束
- pthread_join(producer_thread, NULL);
- pthread_join(consumer_thread, NULL);
- // 销毁互斥锁和条件变量
- pthread_mutex_destroy(&mutex);
- pthread_cond_destroy(&cond);
- return 0;
- }
在使用条件变量的时候要注意,当条件变量被 wait 的时候,对条件变量进行 destroy,那么 destroy 会阻塞住,直到 wait 返回之后,destroy 才会返回。如下代码是 destroy 一个正在被 wait 的条件变量,这个时候 destroy 会阻塞住。
- #include <stdio.h>
- #include <pthread.h>
- #include <unistd.h>
-
- pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
- pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-
- void *consumer(void *arg) {
- pthread_mutex_lock(&mutex);
- pthread_cond_wait(&cond, &mutex);
- pthread_mutex_unlock(&mutex);
- return NULL;
- }
-
- int main() {
- pthread_t consumer_thread;
- pthread_create(&consumer_thread, NULL, consumer, NULL);
- sleep(1);
-
- pthread_mutex_destroy(&mutex);
- printf("before destroy condition\n");
- pthread_cond_destroy(&cond);
- printf("after destroy condition\n");
- return 0;
- }
std::package_task,从名字也可以看出来,表示一个打包的任务。这个任务自己并不能执行,而是需要显式的调用来执行。
如下是官网的示例代码。
std::packaged_task - cppreference.com
(1)task 能够封装的任务类型包括 lambda 表达式,bind 的函数,也可以是一个单纯的函数
(2)与 std::promise 类似,std::packaged_task 也可以获取一个 std::future,std::future 可以获取到 std::package_task 执行的结果
- #include <cmath>
- #include <functional>
- #include <future>
- #include <iostream>
- #include <thread>
-
- // unique function to avoid disambiguating the std::pow overload set
- int f(int x, int y) { return std::pow(x, y); }
-
- void task_lambda()
- {
- std::packaged_task<int(int, int)> task([](int a, int b)
- {
- return std::pow(a, b);
- });
- std::future<int> result = task.get_future();
-
- // 封装一个 lamda 表达式,可以将 task 当成函数名来直接调用
- task(2, 9);
-
- std::cout << "task_lambda:\t" << result.get() << '\n';
- }
-
- void task_bind()
- {
- std::packaged_task<int()> task(std::bind(f, 2, 11));
- std::future<int> result = task.get_future();
-
- // 封装的 bind() 函数,可以直接调用
- task();
-
- std::cout << "task_bind:\t" << result.get() << '\n';
- }
-
- void task_thread()
- {
- std::packaged_task<int(int, int)> task(f);
- std::future<int> result = task.get_future();
- // 非 bind 方式,不能这样直接调用
- // std::cout << "directly call: " << task(2, 10) << std::endl;
- std::thread task_td(std::move(task), 2, 10);
- task_td.join();
-
- std::cout << "task_thread:\t" << result.get() << '\n';
- }
-
- int main()
- {
- task_lambda();
- task_bind();
- task_thread();
- return 0;
- }
std::packaged_task中抛出的异常,保存在 std::future 中,可以通过 get() 来获取。
- #include <cmath>
- #include <functional>
- #include <future>
- #include <iostream>
- #include <thread>
-
- int f(int x, int y) {
- throw std::runtime_error("An error occurred in the task!");
- return std::pow(x, y);
- }
-
- void task_lambda()
- {
- std::packaged_task<int(int, int)> task([](int a, int b)
- {
- throw std::runtime_error("An error occurred in the task!");
- return std::pow(a, b);
- });
- std::future<int> result = task.get_future();
-
- task(2, 9);
-
- try {
- std::cout << "task_lambda:\t" << result.get() << '\n';
- } catch (std::exception &e) {
- std::cout << "lambda exception: " << e.what() << std::endl;
- }
- }
-
- void task_bind()
- {
- std::packaged_task<int()> task(std::bind(f, 2, 11));
- std::future<int> result = task.get_future();
-
- task();
-
- try {
- std::cout << "task_bind:\t" << result.get() << '\n';
- } catch (std::exception &e) {
- std::cout << "bind exception: " << e.what() << std::endl;
- }
- }
-
- void task_thread()
- {
- std::packaged_task<int(int, int)> task(f);
- std::future<int> result = task.get_future();
- std::thread task_td(std::move(task), 2, 10);
- task_td.join();
-
- try {
- std::cout << "task_thread:\t" << result.get() << '\n';
- } catch (std::exception &e) {
- std::cout << "task thread exception: " << e.what() << std::endl;
- }
- }
-
- int main()
- {
- task_lambda();
- task_bind();
- task_thread();
- return 0;
- }
valid 是 true,也可以 get,不过 get 之后也是 void。
- #include <cmath>
- #include <functional>
- #include <future>
- #include <iostream>
- #include <thread>
-
- void f(int x, int y) {
- std::cout << "x " << x << ", y " << y << std::endl;
- }
-
- void task_lambda()
- {
- std::packaged_task<void(int, int)> task([](int a, int b)
- {
- std::cout << "a " << a << ", b " << b << std::endl;
- });
- auto result = task.get_future();
-
- task(2, 9);
-
- try {
- std::cout << "task_lambda:\t" << result.valid() << '\n';
- result.get();
- } catch (std::exception &e) {
- std::cout << "lambda exception: " << e.what() << std::endl;
- }
- }
-
- void task_bind()
- {
- std::packaged_task<void()> task(std::bind(f, 2, 11));
- std::future<void> result = task.get_future();
-
- task();
-
- try {
- std::cout << "task_bind:\t" << result.valid() << '\n';
- result.get();
- } catch (std::exception &e) {
- std::cout << "bind exception: " << e.what() << std::endl;
- }
- }
-
- void task_thread()
- {
- std::packaged_task<void(int, int)> task(f);
- std::future<void> result = task.get_future();
- std::thread task_td(std::move(task), 2, 10);
- task_td.join();
-
- try {
- std::cout << "task_thread:\t" << result.valid() << '\n';
- result.get();
- } catch (std::exception &e) {
- std::cout << "task thread exception: " << e.what() << std::endl;
- }
- }
-
- int main()
- {
- task_lambda();
- task_bind();
- task_thread();
- return 0;
- }
std::async 提供了异步执行的方式。上一节的 std::packaged_task 只是封装了一个 task,但是这个 task 自己不会执行,std::async 相当于在 std::packaged_task 的基础上增加了自动执行的机制。
std::async 的第一个形参有两个可选值:std::launch::async 和 std::launch::deferred,第一个标志会自动异步执行;第二个标志,不会自动异步执行,需要调 wait() 同步执行。
- #include <algorithm>
- #include <future>
- #include <iostream>
- #include <mutex>
- #include <numeric>
- #include <string>
- #include <vector>
-
- std::mutex m;
-
- struct X
- {
- void foo(int i, const std::string& str)
- {
- std::lock_guard<std::mutex> lk(m);
- std::cout << str << ' ' << i << '\n';
- }
-
- void bar(const std::string& str)
- {
- std::lock_guard<std::mutex> lk(m);
- std::cout << str << '\n';
- }
-
- int operator()(int i)
- {
- std::lock_guard<std::mutex> lk(m);
- std::cout << i << '\n';
- return i + 10;
- }
- };
-
- template<typename RandomIt>
- int parallel_sum(RandomIt beg, RandomIt end)
- {
- auto len = end - beg;
- if (len < 1000)
- return std::accumulate(beg, end, 0);
-
- RandomIt mid = beg + len / 2;
- auto handle = std::async(std::launch::async,
- parallel_sum<RandomIt>, mid, end);
- int sum = parallel_sum(beg, mid);
- return sum + handle.get();
- }
-
- int main()
- {
- std::vector<int> v(10000, 1);
- std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
-
- X x;
- // Calls (&x)->foo(42, "Hello") with default policy:
- // may print "Hello 42" concurrently or defer execution
- auto a1 = std::async(&X::foo, &x, 42, "Hello");
- // Calls x.bar("world!") with deferred policy
- // prints "world!" when a2.get() or a2.wait() is called
- auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
- // Calls X()(43); with async policy
- // prints "43" concurrently
- auto a3 = std::async(std::launch::async, X(), 43);
- a2.wait(); // prints "world!"
- std::cout << a3.get() << '\n'; // prints "53"
- } // if a1 is not done at this point, destructor of a1 prints "Hello 42" here
如下代码,并没有获取 std::async 的返回值,这样的话两个任务是串行执行的,而不是异步并行执行的。
- #include <algorithm>
- #include <future>
- #include <iostream>
- #include <mutex>
- #include <numeric>
- #include <string>
- #include <vector>
- #include <unistd.h>
-
- std::mutex m;
-
- struct X
- {
- void foo()
- {
- for (int i = 0; i < 10; i++) {
- std::cout << "foo()" << std::endl;
- sleep(1);
- }
- }
-
- void bar()
- {
- for (int i = 0; i < 10; i++) {
- std::cout << "bar()" << std::endl;
- sleep(1);
- }
- }
- };
-
- int main()
- {
- X x;
- std::async(std::launch::async, &X::foo, &x);
- std::async(std::launch::async, &X::bar, &x);
- return 0;
- }
官方解释:
使用 std::async 的时候,如果任务中抛异常,异常会保存在 std::future 中,可以通过 std::future 来获取。
- #include <algorithm>
- #include <future>
- #include <iostream>
- #include <mutex>
- #include <numeric>
- #include <string>
- #include <vector>
- #include <unistd.h>
-
- std::mutex m;
-
- struct X
- {
- int bar()
- {
- throw std::runtime_error("An error occurred in the task!");
- return 10;
- }
- };
-
- int main()
- {
- X x;
- auto f1 = std::async(std::launch::async, &X::bar, &x);
-
- try {
- f1.get();
- } catch (std::exception &e) {
- std::cout << "f1 get exception: " << e.what() << std::endl;
- }
- return 0;
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。