当前位置:   article > 正文

C++ 多线程10:std::packaged_task_c++ packaged

c++ packaged

C++ 多线程:std::packaged_task


上一篇介绍的 std::promise通过set_value可以使得与之关联的 std::future获取数据。本篇介绍的 std::packaged_task则更为强大,它允许传入一个函数或其他可调用对象,并将函数计算的结果作为异步结果传递给 std::future,包括函数运行时产生的异常。下面我们就来详细介绍一下它。 std::packaged_task实例是可以 MoveConstructible(移动构造)和 MoveAssignable(移动赋值),不过不能 CopyConstructible(拷贝构造)和 CopyAssignable(拷贝赋值)。下面我们来详细看一下它的定义:

template<typename FunctionType>
class packaged_task; // undefined

template<typename ResultType,typename... ArgTypes>
class packaged_task<ResultType(ArgTypes...)>
{
public:
  packaged_task() noexcept;
  packaged_task(packaged_task&&) noexcept;
  ~packaged_task();

  packaged_task& operator=(packaged_task&&) noexcept;

  packaged_task(packaged_task const&) = delete;
  packaged_task& operator=(packaged_task const&) = delete;

  void swap(packaged_task&) noexcept;

  template<typename Callable>
  explicit packaged_task(Callable&& func);

  template<typename Callable,typename Allocator>
  packaged_task(std::allocator_arg_t, const Allocator&,Callable&&);

  bool valid() const noexcept;
  std::future<ResultType> get_future();
  void operator()(ArgTypes...);
  void make_ready_at_thread_exit(ArgTypes...);
  void reset();
};
复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

默认构造函数

构造一个std::packaged_task对象。不使用关联任务或共享状态来构造一个std::packaged_task对象。

可调用对象构造

使用关联任务和异步结果,构造一个std::packaged_task对象。该对象具有共享状态,且其存储的任务由func初始化。 表达式func(args...)必须是合法的,并且在args...中的args-i参数,必须是ArgTypes...中ArgTypes-i类型的一个值。且返回值必须可转换为ResultType。 使用ResultType类型的关联异步结果,构造一个std::packaged_task对象,异步结果是未就绪的,并且Callable类型相关的任务是对func的一个拷贝。 当构造函数无法为异步结果分配出内存时,会抛出std::bad_alloc类型的异常。其他异常会在使用Callable类型的拷贝或移动构造过程中抛出。

有分配器的可调用对象构造

除了使用接口提供的分配器为关联任务和异步结果分配内存外,其他与可调用对象构造一样。

#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <future>         // std::promise, std::future

int main ()
{
    // constructor/get_future
	auto countdown = [](int from, int to) {
		for (int i = from; i != to; --i) {
			std::cout << i << '\n';
			std::this_thread::sleep_for(std::chrono::seconds(1));
		}
		std::cout << "Lift off!\n";
		return from - to;
	};
 
	std::packaged_task<int(int, int)> tsk(countdown); // set up packaged_task
	std::future<int> ret = tsk.get_future(); // get future
 
	std::thread th(std::move(tsk), 5, 0); // spawn thread to count down from 5 to 0
 
	int value = ret.get(); // wait for the task to finish and get result
	std::cout << "The countdown lasted for " << value << " seconds.\n";
 
	th.join();
}
复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

移动构造函数

通过一个std::packaged_task对象构建另一个,将与已存在的std::packaged_task相关的共享状态和任务的所有权转移到新构建的对象当中。 通过other构建新的std::packaged_task对象。在新对象构建完成后,other与其之前相关联的共享状态就没有任何关系了。

移动赋值操作

将一个std::packaged_task对象相关的共享状态的所有权转移到另外一个。 将other相关共享状态和任务的所有权转移到*this中,并且切断共享状态和任务与other对象的关联,如同std::packaged_task(other).swap(*this)

valid成员函数

检查*this中是都具有关联任务和共享状态。当*this具有相关任务和异步结构,返回true;否则,返回false。

get_future成员函数

返回一个与packaged_task对象的共享状态关联的std::future对象。 如果一个std::future已经通过get_future()获取了异步结果,在抛出std::future_error异常时,错误码是std::future_errc::future_already_retrieved。 调用此函数后,packaged_task应在某个时候使其共享状态准备就绪(通过调用其存储的任务),否则将在销毁后自动准备就绪并包含一个std::future_error类型的异常。

#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <future>         // std::promise, std::future

int main ()
{
    // constructor/get_future/operator=/valid
	std::packaged_task<int(int)> task1; // default-constructed
	std::packaged_task<int(int)> task2([](int x) { return x * 2; }); // initialized
 
	task1 = std::move(task2); // move-assignment
	std::cout << "valid: " << task1.valid() << "\n";
	std::future<int> ret = task1.get_future(); // get future
	std::thread(std::move(task1), 10).detach(); // spawn thread and call task
 
	int value = ret.get(); // wait for the task to finish and get result
	std::cout << "The double of 10 is " << value << ".\n";
}
复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

swap成员函数

将两个std::packaged_task对象所关联的共享状态和存储任务的所有权进行交换。

析构函数

放弃(abandon)共享状态并销毁一个std::packaged_task对象。如果有其它future对象关联到同一共享状态,并且结果不是一个已存储的任务或异常,那么异步结果状态将会变为就绪,伴随就绪的是一个std::future_error异常和错误码std::future_errc::broken_promise

operator() 函数调用操作

调用一个std::packaged_task实例中的相关任务,并且存储返回值,或将异常存储到异常结果当中。 像INVOKE(func,args...)那要调用相关的函数func。如果返回征程,那么将会存储到*this相关的异步结果中。当返回结果是一个异常,将这个异常存储到*this相关的异步结果中。 *this相关联的异步结果状态为就绪,并且存储了一个值或异常。所有阻塞线程,在等待到异步结果的时候被解除阻塞。 当异步结果已经存储了一个值或异常,那么将抛出一个std::future_error异常,错误码为std::future_errc::promise_already_satisfied

reset 成员函数

首先,*this已经具有关联的异步任务和共享状态。将一个std::packaged_task实例与一个新的异步结果相关联。如同*this=packaged_task(std::move(f)),f是*this中已存储的关联任务。说白了就是reset后可以重新再调用一个get_future然后再get一次。

#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <future>         // std::promise, std::future

int main ()
{
    // reset/operator()
	std::packaged_task<int(int)> tsk([](int x) { return x * 3; }); // package task
 
	std::future<int> fut = tsk.get_future();
	tsk(33);  // call operator(), result store in future
	std::cout << "The triple of 33 is " << fut.get() << ".\n";
 
	// re-use same task object:
	tsk.reset();
	fut = tsk.get_future();  // after reset, must call get_future again
	std::thread(std::move(tsk), 99).detach();  // after reset, you can call the task again
	std::cout << "Thre triple of 99 is " << fut.get() << ".\n";
}
复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

make_ready_at_thread_exit 成员函数

调用一个std::packaged_task实例中的相关任务,并且存储返回值,或将异常存储到异常结果当中,直到线程退出时,将相关异步结果的状态置为就绪。 像INVOKE(func,args...)那要调用相关的函数func。如果返回征程,那么将会存储到*this相关的异步结果中。当返回结果是一个异常,将这个异常存储到*this相关的异步结果中。当当前线程退出的时候,可调配相关异步状态为就绪。 当异步结果已经存储了一个值或异常,那么将抛出一个std::future_error异常,错误码为std::future_errc::promise_already_satisfied。当无关联异步状态时,抛出std::future_error异常,错误码为std::future_errc::no_state

#include <future>
#include <iostream>
#include <chrono>
#include <thread>
#include <functional>
#include <utility>
 
void worker(std::future<void>& output) {
    std::packaged_task<void(bool&)> my_task{ [](bool& done) { done=true; } };
    auto result = my_task.get_future();
    bool done = false;
    my_task.make_ready_at_thread_exit(done); // execute task right away
    std::cout << "worker: done = " << std::boolalpha << done << std::endl;
    auto status = result.wait_for(std::chrono::seconds(0));
    if (status == std::future_status::timeout)
        std::cout << "worker: result is not ready yet" << std::endl; //线程未退出,timeout,打印这句
 
    output = std::move(result);
}
 
int main() {
    std::future<void> result;
    std::thread{worker, std::ref(result)}.join();
    auto status = result.wait_for(std::chrono::seconds(0));
    if (status == std::future_status::ready)
        std::cout << "main: result is ready" << std::endl;  //线程退出,ready,打印这句
}
复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

输出:

worker: done = true
worker: result is not ready yet
main: result is ready
复制代码
  • 1
  • 2
  • 3
  • 4

其它例子

可以使用lambda,bind来初始化packaged_task的任务,可以直接调用packaged_task本身的operator(),也可以将packaged_task传递给thread异步调用

#include <iostream>
#include <cmath>
#include <thread>
#include <future>
#include <functional>
 
// unique function to avoid disambiguating the std::pow overload set
int f(int x, int y) { return std::pow(x,y); }
 
void task_lambda()
{
    std::packaged_task<int(int,int)> task([](int a, int b) {
        return std::pow(a, b); 
    });
    std::future<int> result = task.get_future();
 
    task(2, 9);
 
    std::cout << "task_lambda:\t" << result.get() << '\n';
}
 
void task_bind()
{
    std::packaged_task<int()> task(std::bind(f, 2, 11));
    std::future<int> result = task.get_future();
 
    task();
 
    std::cout << "task_bind:\t" << result.get() << '\n';
}
 
void task_thread()
{
    std::packaged_task<int(int,int)> task(f);
    std::future<int> result = task.get_future();
 
    std::thread task_td(std::move(task), 2, 10);
    task_td.join();
 
    std::cout << "task_thread:\t" << result.get() << '\n';
}
 
int main()
{
    task_lambda();
    task_bind();
    task_thread();
}
复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

也可以使用函数对象(即一个重载了括号操作符"()"的对象)来作为packaged_task的任务:

#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <future>         // std::promise, std::future
#include <string>


struct Tester {
	std::string operator()(std::string arg) {
		std::string str = "Data From " + arg;
		return str;
	}
};


int main () {
	// Create a packaged_task<> that encapsulated a Function Object
	std::packaged_task<std::string(std::string)> task(std::move(Tester()));
	// Fetch the associated future<> from packaged_task<>
	std::future<std::string> result = task.get_future();
	// Pass the packaged_task to thread to run asynchronously
	std::thread th(std::move(task), "Arg");
	// Join the thread. Its blocking and returns when thread is finished.
	th.join();
	// Fetch the result of packaged_task<> i.e. value returned by Tester()
	std::string str = result.get();
	std::cout << str << std::endl;
}
复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

对比packaged_task与async有一些不一样,packaged_task是在调用任务时开始的,所以如果任务中有耗时操作则是在调用任务时阻塞,而async是启用一个新的thread执行任务,不需要手动调用,所以阻塞在get中:

#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <future>         // std::promise, std::future
#include <string>
#include <chrono>

std::time_t now() 
{
    auto t0 = std::chrono::system_clock::now();
    std::time_t time_t_today = std::chrono::system_clock::to_time_t(t0);
    return time_t_today;  // seconds
}

int main ()
{
	// sleeps for one second and returns 2
	auto sleep = []() {
		std::this_thread::sleep_for(std::chrono::seconds(2));
		return 2;
	};
 
    { // std::packaged_task
        std::cout << now() << "s, " << "Start" << std::endl;
        // >>>>> A packaged_task won't start on it's own, you have to invoke it
        std::packaged_task<int()> task(sleep);
    
        auto f = task.get_future();
        task(); // invoke the function
    
        // You have to wait until task returns. Since task calls sleep
        // you will have to wait at least 2 second.
        std::cout << now() << "s, " << "You can see this after 2 second\n";
    
        // However, f.get() will be available, since task has already finished.
        std::cout << now() << "s, " << f.get() << std::endl;
    }
    
    { // std::async
        std::cout << now() << "s, " << "Start async" << std::endl;

        // >>>>> On the other hand, std::async with launch::async will try to run the task in a different thread :
        auto f = std::async(std::launch::async, sleep);
        std::cout << now() << "s, " << "You can see this immediately!\n";
    
        // However, the value of the future will be available after sleep has finished
        // so f.get() can block up to 2 second.
        int ret = f.get();
        std::cout << now() << "s, " <<  ret << ", This will be shown after 2 second!\n";
    }
}
复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

输出:

1648082939s, Start
1648082941s, You can see this after 2 second
1648082941s, 2
1648082941s, Start async
1648082941s, You can see this immediately!
1648082943s, 2, This will be shown after 2 second!
复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

函数调用操作符

std::packaged_task<>会将future与函数或可调用对象进行绑定。当调用std::packaged_task<>对象时,就会调用相关函数或可调用对象,当future状态为就绪时,会存储返回值。这可以用在构建线程池或其他任务的管理中,比如:在任务所在线程上运行其他任务,或将它们串行运行在一个特殊的后台线程上。当粒度较大的操作被分解为独立的子任务时,每个子任务都可以包含在std::packaged_task<>实例中,之后将实例传递到任务调度器或线程池中。对任务细节进行抽象,调度器仅处理std::packaged_task<>实例,而非处理单独的函数。

std::packaged_task<>的模板参数是一个函数签名,比如void()就是一个没有参数也没有返回值的函数,或int(std::string&, double*)就是有一个非const引用的std::string参数和一个指向double类型的指针参数,并且返回类型是int。构造std::packaged_task<>实例时,就必须传入函数或可调用对象。这个函数或可调用的对象,需要能接收指定的参数和返回(可转换为指定返回类型的)值。类型可以不完全匹配,因为这里类型可以隐式转换,可以用int类型参数和返回float类型的函数,来构建std::packaged_task<double(double)>实例。

函数签名的返回类型可以用来标识从get_future()返回的std::future<>的类型,而函数签名的参数列表,可用来指定packaged_task的函数调用操作符。例如,模板偏特化std::packaged_task<std::string(std::vector<char>*,int)>会在下面的代码中使用到。

实际上,packaged_task对象的函数操作符重载定义的返回值总算void,原因就是实际任务的返回值总是存储在future中:

template<>
class packaged_task<std::string(std::vector<char>*,int)>
{
public:
  template<typename Callable>
  explicit packaged_task(Callable&& f);
  std::future<std::string> get_future();
  void operator()(std::vector<char>*,int); //返回void
};
复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

std::packaged_task是个可调用对象,可以封装在std::function对象中,从而作为线程函数传递到std::thread对象中,或作为可调用对象传递到另一个函数中或直接调用。当std::packaged_task作为函数调用时,实参将由函数调用操作符传递至底层函数,并且返回值作为异步结果存储在std::future中,并且可通过get_future()获取。因此可以用std::packaged_task对任务进行打包,并适时的取回future。当异步任务需要返回值时,可以等待future状态变为“就绪”。

好了,今天的内容就到这里了。

参考 en.cppreference.com/w/cpp/threa…

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/922421
推荐阅读
  

闽ICP备14008679号