当前位置:   article > 正文

std::future, std::promise, std::packaged_task, std::async_std::promise和std::future和std::task

std::promise和std::future和std::task

std::promise 进程间通信,std::packaged_task 任务封装,std::async 任务异步执行;std::future 获取结果。

1 std::promise

1.1 线程间同步

std::promise 可以用于线程间通信

如下代码是 std::promise 中的示例代码

std::promise - cppreference.com

(1)accumulate_promise 用于在线程间传递数据

(2)barrier 是 void 类型的,可以单纯地做线程之间的等待与唤醒

(3)std::fututre 是一个基本的组成元素,从名字也可以看出来,future 代表未来的结果。std::promise 只能使用一次,promise set_value 只能调用一次;std::future 调用 get() 也只能调用一次,如果再次调用 get(),那么会抛异常,在调用 get() 之前可以使用 valid() 来做判断,如果 valid() 是 true 的话,那么就可以调用 get();否则不能调用 get()。

  1. #include <chrono>
  2. #include <future>
  3. #include <iostream>
  4. #include <numeric>
  5. #include <thread>
  6. #include <vector>
  7. #include <unistd.h>
  8. void accumulate(std::vector<int>::iterator first,
  9. std::vector<int>::iterator last,
  10. std::promise<int> accumulate_promise)
  11. {
  12. int sum = std::accumulate(first, last, 0);
  13. accumulate_promise.set_value(sum); // Notify future
  14. }
  15. void do_work(std::promise<void> barrier)
  16. {
  17. std::cout << "before sleep\n";
  18. std::this_thread::sleep_for(std::chrono::seconds(1));
  19. barrier.set_value();
  20. std::cout << "after promise set value\n";
  21. }
  22. int main()
  23. {
  24. // Demonstrate using promise<int> to transmit a result between threads.
  25. std::vector<int> numbers = {1, 2, 3, 4, 5, 6};
  26. std::promise<int> accumulate_promise;
  27. std::future<int> accumulate_future = accumulate_promise.get_future();
  28. std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
  29. std::move(accumulate_promise));
  30. // future::get() will wait until the future has a valid result and retrieves it.
  31. // Calling wait() before get() is not needed
  32. // accumulate_future.wait(); // wait for result
  33. std::cout << "before get, future valid: " << accumulate_future.valid() << std::endl;
  34. std::cout << "result=" << accumulate_future.get() << '\n';
  35. std::cout << "after get, future valid: " << accumulate_future.valid() << std::endl;
  36. work_thread.join(); // wait for thread completion
  37. // Demonstrate using promise<void> to signal state between threads.
  38. std::promise<void> barrier;
  39. std::future<void> barrier_future = barrier.get_future();
  40. std::thread new_work_thread(do_work, std::move(barrier));
  41. std::cout << "before wait\n";
  42. barrier_future.wait();
  43. std::cout << "after wait\n";
  44. new_work_thread.join();
  45. return 0;
  46. }

1.2 线程间同步 —— 条件变量

条件变量是一个基础的线程间同步机制,在 c 和 c++ 中都有使用。条件变量经常用于生产者线程和消费者线程之间通信的场景。

1.2.1 std::condition_variable

如下是使用条件变量时经常使用的方式,第一个形参是 std::unique_lock<> 类型的锁,第二个形参是 wait() 返回的条件。

template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );

(1)条件满足之后,即使不进行 notify,wait() 也会返回

(2)条件不满足的时候,即使进行 notify(),wait() 也不会返回

这就是使用条件变量的时候最典型的用法,因为条件变量可能存在惊群问题(本人没有复现过),可能被唤醒的时候,条件还没有满足。这种用法就类似于在 c 语言使用条件变量的时候的如下代码。

while(!condition) {

  wait();

}

  1. #include <condition_variable>
  2. #include <iostream>
  3. #include <mutex>
  4. #include <string>
  5. #include <thread>
  6. #include <unistd.h>
  7. std::mutex m;
  8. std::condition_variable cv;
  9. std::string data;
  10. bool ready = false;
  11. bool processed = false;
  12. void worker_thread()
  13. {
  14. // wait until main() sends data
  15. std::unique_lock<std::mutex> lk(m);
  16. cv.wait(lk, []{ return ready; });
  17. // after the wait, we own the lock
  18. std::cout << "Worker thread is processing data\n";
  19. data += " after processing";
  20. // send data back to main()
  21. processed = true;
  22. std::cout << "Worker thread signals data processing completed\n";
  23. // manual unlocking is done before notifying, to avoid waking up
  24. // the waiting thread only to block again (see notify_one for details)
  25. lk.unlock();
  26. cv.notify_one();
  27. }
  28. int main()
  29. {
  30. std::thread worker(worker_thread);
  31. data = "Example data";
  32. // send data to the worker thread
  33. {
  34. std::lock_guard<std::mutex> lk(m);
  35. sleep(2);
  36. std::cout << "after sleep\n";
  37. ready = true;
  38. std::cout << "after set value\n";
  39. std::cout << "main() signals data ready for processing\n";
  40. }
  41. std::cout << "before notify one\n";
  42. sleep(2);
  43. cv.notify_one();
  44. std::cout << "after notify one\n";
  45. // wait for the worker
  46. {
  47. std::unique_lock<std::mutex> lk(m);
  48. cv.wait(lk, []{ return processed; });
  49. }
  50. std::cout << "Back in main(), data = " << data << '\n';
  51. worker.join();
  52. }

1.2.2 pthread_cond_t

pthread 中的条件变量,使用方式如下。也是需要一个 mutex 和条件变量在一块使用。

wait 端调用的函数:

        pthread_mutex_lock(&mutex);
        while (!condition) {
            pthread_cond_wait(&cond, &mutex);
        }
        pthread_mutex_unlock(&mutex);

唤醒端需要调用的函数:

        pthread_mutex_lock(&mutex);
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);

在 wait 端,首先要加锁,然后进行 wait;在 wait 的时候,会释放锁,所以在 signal 端加锁是可以加成功的。

  1. #include <stdio.h>
  2. #include <pthread.h>
  3. #define MAX_COUNT 10
  4. int shared_resource = 0; // 全局共享资源
  5. pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
  6. pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 条件变量
  7. void *producer(void *arg) {
  8. while (1) {
  9. pthread_mutex_lock(&mutex);
  10. while (shared_resource >= MAX_COUNT) { // 当共享资源达到最大值时等待
  11. pthread_cond_wait(&cond, &mutex);
  12. }
  13. shared_resource++; // 增加共享资源值
  14. printf("Produced: %d\n", shared_resource);
  15. pthread_cond_signal(&cond); // 唤醒消费者线程
  16. pthread_mutex_unlock(&mutex);
  17. sleep(1); // 模拟生产过程
  18. }
  19. return NULL;
  20. }
  21. void *consumer(void *arg) {
  22. while (1) {
  23. pthread_mutex_lock(&mutex);
  24. while (shared_resource <= 0) { // 当共享资源为0时等待
  25. pthread_cond_wait(&cond, &mutex);
  26. }
  27. shared_resource--; // 减少共享资源值
  28. printf("Consumed: %d\n", shared_resource);
  29. pthread_cond_signal(&cond); // 唤醒生产者线程
  30. pthread_mutex_unlock(&mutex);
  31. sleep(1); // 模拟消费过程
  32. }
  33. return NULL;
  34. }
  35. int main() {
  36. pthread_t producer_thread, consumer_thread;
  37. // 创建生产者线程和消费者线程
  38. pthread_create(&producer_thread, NULL, producer, NULL);
  39. pthread_create(&consumer_thread, NULL, consumer, NULL);
  40. // 等待线程结束
  41. pthread_join(producer_thread, NULL);
  42. pthread_join(consumer_thread, NULL);
  43. // 销毁互斥锁和条件变量
  44. pthread_mutex_destroy(&mutex);
  45. pthread_cond_destroy(&cond);
  46. return 0;
  47. }

在使用条件变量的时候要注意,当条件变量被 wait 的时候,对条件变量进行 destroy,那么 destroy 会阻塞住,直到 wait 返回之后,destroy 才会返回。如下代码是 destroy 一个正在被 wait 的条件变量,这个时候 destroy 会阻塞住。

  1. #include <stdio.h>
  2. #include <pthread.h>
  3. #include <unistd.h>
  4. pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  5. pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  6. void *consumer(void *arg) {
  7. pthread_mutex_lock(&mutex);
  8. pthread_cond_wait(&cond, &mutex);
  9. pthread_mutex_unlock(&mutex);
  10. return NULL;
  11. }
  12. int main() {
  13. pthread_t consumer_thread;
  14. pthread_create(&consumer_thread, NULL, consumer, NULL);
  15. sleep(1);
  16. pthread_mutex_destroy(&mutex);
  17. printf("before destroy condition\n");
  18. pthread_cond_destroy(&cond);
  19. printf("after destroy condition\n");
  20. return 0;
  21. }

2 std::packaged_task

std::package_task,从名字也可以看出来,表示一个打包的任务。这个任务自己并不能执行,而是需要显式的调用来执行。

如下是官网的示例代码。

std::packaged_task - cppreference.com

(1)task 能够封装的任务类型包括 lambda 表达式,bind 的函数,也可以是一个单纯的函数

(2)与 std::promise 类似,std::packaged_task 也可以获取一个 std::future,std::future 可以获取到 std::package_task 执行的结果

  1. #include <cmath>
  2. #include <functional>
  3. #include <future>
  4. #include <iostream>
  5. #include <thread>
  6. // unique function to avoid disambiguating the std::pow overload set
  7. int f(int x, int y) { return std::pow(x, y); }
  8. void task_lambda()
  9. {
  10. std::packaged_task<int(int, int)> task([](int a, int b)
  11. {
  12. return std::pow(a, b);
  13. });
  14. std::future<int> result = task.get_future();
  15. // 封装一个 lamda 表达式,可以将 task 当成函数名来直接调用
  16. task(2, 9);
  17. std::cout << "task_lambda:\t" << result.get() << '\n';
  18. }
  19. void task_bind()
  20. {
  21. std::packaged_task<int()> task(std::bind(f, 2, 11));
  22. std::future<int> result = task.get_future();
  23. // 封装的 bind() 函数,可以直接调用
  24. task();
  25. std::cout << "task_bind:\t" << result.get() << '\n';
  26. }
  27. void task_thread()
  28. {
  29. std::packaged_task<int(int, int)> task(f);
  30. std::future<int> result = task.get_future();
  31. // 非 bind 方式,不能这样直接调用
  32. // std::cout << "directly call: " << task(2, 10) << std::endl;
  33. std::thread task_td(std::move(task), 2, 10);
  34. task_td.join();
  35. std::cout << "task_thread:\t" << result.get() << '\n';
  36. }
  37. int main()
  38. {
  39. task_lambda();
  40. task_bind();
  41. task_thread();
  42. return 0;
  43. }

2.1 异常传递

std::packaged_task中抛出的异常,保存在 std::future 中,可以通过 get() 来获取。

  1. #include <cmath>
  2. #include <functional>
  3. #include <future>
  4. #include <iostream>
  5. #include <thread>
  6. int f(int x, int y) {
  7. throw std::runtime_error("An error occurred in the task!");
  8. return std::pow(x, y);
  9. }
  10. void task_lambda()
  11. {
  12. std::packaged_task<int(int, int)> task([](int a, int b)
  13. {
  14. throw std::runtime_error("An error occurred in the task!");
  15. return std::pow(a, b);
  16. });
  17. std::future<int> result = task.get_future();
  18. task(2, 9);
  19. try {
  20. std::cout << "task_lambda:\t" << result.get() << '\n';
  21. } catch (std::exception &e) {
  22. std::cout << "lambda exception: " << e.what() << std::endl;
  23. }
  24. }
  25. void task_bind()
  26. {
  27. std::packaged_task<int()> task(std::bind(f, 2, 11));
  28. std::future<int> result = task.get_future();
  29. task();
  30. try {
  31. std::cout << "task_bind:\t" << result.get() << '\n';
  32. } catch (std::exception &e) {
  33. std::cout << "bind exception: " << e.what() << std::endl;
  34. }
  35. }
  36. void task_thread()
  37. {
  38. std::packaged_task<int(int, int)> task(f);
  39. std::future<int> result = task.get_future();
  40. std::thread task_td(std::move(task), 2, 10);
  41. task_td.join();
  42. try {
  43. std::cout << "task_thread:\t" << result.get() << '\n';
  44. } catch (std::exception &e) {
  45. std::cout << "task thread exception: " << e.what() << std::endl;
  46. }
  47. }
  48. int main()
  49. {
  50. task_lambda();
  51. task_bind();
  52. task_thread();
  53. return 0;
  54. }

2.2 void 类型的 std::future

valid 是 true,也可以 get,不过 get 之后也是 void。

  1. #include <cmath>
  2. #include <functional>
  3. #include <future>
  4. #include <iostream>
  5. #include <thread>
  6. void f(int x, int y) {
  7. std::cout << "x " << x << ", y " << y << std::endl;
  8. }
  9. void task_lambda()
  10. {
  11. std::packaged_task<void(int, int)> task([](int a, int b)
  12. {
  13. std::cout << "a " << a << ", b " << b << std::endl;
  14. });
  15. auto result = task.get_future();
  16. task(2, 9);
  17. try {
  18. std::cout << "task_lambda:\t" << result.valid() << '\n';
  19. result.get();
  20. } catch (std::exception &e) {
  21. std::cout << "lambda exception: " << e.what() << std::endl;
  22. }
  23. }
  24. void task_bind()
  25. {
  26. std::packaged_task<void()> task(std::bind(f, 2, 11));
  27. std::future<void> result = task.get_future();
  28. task();
  29. try {
  30. std::cout << "task_bind:\t" << result.valid() << '\n';
  31. result.get();
  32. } catch (std::exception &e) {
  33. std::cout << "bind exception: " << e.what() << std::endl;
  34. }
  35. }
  36. void task_thread()
  37. {
  38. std::packaged_task<void(int, int)> task(f);
  39. std::future<void> result = task.get_future();
  40. std::thread task_td(std::move(task), 2, 10);
  41. task_td.join();
  42. try {
  43. std::cout << "task_thread:\t" << result.valid() << '\n';
  44. result.get();
  45. } catch (std::exception &e) {
  46. std::cout << "task thread exception: " << e.what() << std::endl;
  47. }
  48. }
  49. int main()
  50. {
  51. task_lambda();
  52. task_bind();
  53. task_thread();
  54. return 0;
  55. }

3 std::async

std::async 提供了异步执行的方式。上一节的 std::packaged_task 只是封装了一个 task,但是这个 task 自己不会执行,std::async 相当于在 std::packaged_task 的基础上增加了自动执行的机制。

std::async 的第一个形参有两个可选值:std::launch::async 和 std::launch::deferred,第一个标志会自动异步执行;第二个标志,不会自动异步执行,需要调 wait() 同步执行。

  1. #include <algorithm>
  2. #include <future>
  3. #include <iostream>
  4. #include <mutex>
  5. #include <numeric>
  6. #include <string>
  7. #include <vector>
  8. std::mutex m;
  9. struct X
  10. {
  11. void foo(int i, const std::string& str)
  12. {
  13. std::lock_guard<std::mutex> lk(m);
  14. std::cout << str << ' ' << i << '\n';
  15. }
  16. void bar(const std::string& str)
  17. {
  18. std::lock_guard<std::mutex> lk(m);
  19. std::cout << str << '\n';
  20. }
  21. int operator()(int i)
  22. {
  23. std::lock_guard<std::mutex> lk(m);
  24. std::cout << i << '\n';
  25. return i + 10;
  26. }
  27. };
  28. template<typename RandomIt>
  29. int parallel_sum(RandomIt beg, RandomIt end)
  30. {
  31. auto len = end - beg;
  32. if (len < 1000)
  33. return std::accumulate(beg, end, 0);
  34. RandomIt mid = beg + len / 2;
  35. auto handle = std::async(std::launch::async,
  36. parallel_sum<RandomIt>, mid, end);
  37. int sum = parallel_sum(beg, mid);
  38. return sum + handle.get();
  39. }
  40. int main()
  41. {
  42. std::vector<int> v(10000, 1);
  43. std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
  44. X x;
  45. // Calls (&x)->foo(42, "Hello") with default policy:
  46. // may print "Hello 42" concurrently or defer execution
  47. auto a1 = std::async(&X::foo, &x, 42, "Hello");
  48. // Calls x.bar("world!") with deferred policy
  49. // prints "world!" when a2.get() or a2.wait() is called
  50. auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
  51. // Calls X()(43); with async policy
  52. // prints "43" concurrently
  53. auto a3 = std::async(std::launch::async, X(), 43);
  54. a2.wait(); // prints "world!"
  55. std::cout << a3.get() << '\n'; // prints "53"
  56. } // if a1 is not done at this point, destructor of a1 prints "Hello 42" here

3.1 使用 std::sync 注意事项

3.1.1 要获取 std::async 的返回值

如下代码,并没有获取 std::async 的返回值,这样的话两个任务是串行执行的,而不是异步并行执行的。

  1. #include <algorithm>
  2. #include <future>
  3. #include <iostream>
  4. #include <mutex>
  5. #include <numeric>
  6. #include <string>
  7. #include <vector>
  8. #include <unistd.h>
  9. std::mutex m;
  10. struct X
  11. {
  12. void foo()
  13. {
  14. for (int i = 0; i < 10; i++) {
  15. std::cout << "foo()" << std::endl;
  16. sleep(1);
  17. }
  18. }
  19. void bar()
  20. {
  21. for (int i = 0; i < 10; i++) {
  22. std::cout << "bar()" << std::endl;
  23. sleep(1);
  24. }
  25. }
  26. };
  27. int main()
  28. {
  29. X x;
  30. std::async(std::launch::async, &X::foo, &x);
  31. std::async(std::launch::async, &X::bar, &x);
  32. return 0;
  33. }

官方解释:

3.1.2 异常传递

使用 std::async 的时候,如果任务中抛异常,异常会保存在 std::future 中,可以通过 std::future 来获取。

  1. #include <algorithm>
  2. #include <future>
  3. #include <iostream>
  4. #include <mutex>
  5. #include <numeric>
  6. #include <string>
  7. #include <vector>
  8. #include <unistd.h>
  9. std::mutex m;
  10. struct X
  11. {
  12. int bar()
  13. {
  14. throw std::runtime_error("An error occurred in the task!");
  15. return 10;
  16. }
  17. };
  18. int main()
  19. {
  20. X x;
  21. auto f1 = std::async(std::launch::async, &X::bar, &x);
  22. try {
  23. f1.get();
  24. } catch (std::exception &e) {
  25. std::cout << "f1 get exception: " << e.what() << std::endl;
  26. }
  27. return 0;
  28. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/922428
推荐阅读
相关标签
  

闽ICP备14008679号