赞
踩
协程是一种函数对象,可以设置锚点做暂停,然后再该锚点恢复继续运行
#include <iostream> #include <boost/coroutine2/all.hpp> void coroutine_function(boost::coroutines2::coroutine<void>::pull_type & coro_back) { std::cout << "a "; coro_back(); // 锚点,返回 std::cout << "b "; coro_back(); //锚点 返回 std::cout << "c "; } int main() { boost::coroutines2::coroutine<void>::push_type coroutine_object(coroutine_function); // 创建协程 std::cout << "1 "; coroutine_object(); // 运行协程 std::cout << "2 "; coroutine_object(); // 返回锚点,继续运行协程 std::cout << "3 "; coroutine_object(); // 返回锚点,继续运行协程 return 0; } 运行结果如下: g++ test.cpp -lboost_coroutine -lboost_context -o test ./pull --------------输出分割线------------- 1 a 2 b 3 c
为什么会有协程是轻量级线程的说法呢?
boost中的协程
#include <iostream> #include <boost/coroutine2/all.hpp> void foo(boost::coroutines2::coroutine<int>::push_type & sink) { std::cout<<"start coroutine\n"; sink(1); std::cout<<"finish coroutine\n"; } int main() { boost::coroutines2::coroutine<int>::pull_type source(foo); std::cout<<source.get()<<std::endl; std::cout<<source()<<std::endl; std::cout<<"finish\n"; return 0; } 编译链接运行后 g++ pull.cpp -lboost_coroutine -lboost_context -o pull ./pull --------------输出分割线------------- start coroutine 1 finish coroutine finish
boost.corountine2中的协程增加了push_type和pull_type用于提供协程数据的流转,约束了数据的从push_type流入,从pull_type流出, 上面的demo定义协程对象source的时候使用了pull_type,所以协程函数参数类型是push_type
当协程对象被创建之后就直接运行,直到sink(1)的时候暂停返回到main中,main中使用source.get()获取数据,继续使用source()调用协程对象,协程从sink(1)之后继续运行执行完毕,返回main,main也执行完毕。
push_type的eg:
#include <iostream> #include <boost/coroutine2/all.hpp> void foo(boost::coroutines2::coroutine<int>::pull_type& sink) { std::cout<<"start coroutine\n"; //sink(); int a = sink().get(); std::cout<<a<<std::endl; std::cout<<"finish coroutine\n"; } int main() { boost::coroutines2::coroutine<int>::push_type source(foo); std::cout<<"finish\n"; source(0); source(5); return 0; } 结果: g++ push.cpp -lboost_coroutine -lboost_context ./push --------------输出分割线------------- finish start coroutine 5 finish coroutine
为了使用方便,boost::coroutine2实现了协程迭代器
template< typename T > class push_routine{ .... push_coroutine< T > & push_coroutine< T >::operator()( T const& t) { //() 切换协程 cb_->resume( t); return * this; } class iterator{// 实现迭代器 .... iterator & operator++() noexcept { return *this; } } }
boost::coroutines2::coroutine<void>::push_type source(foo);
for(auto& s : source){
std::cout<<"run"
}
fiber
因为push_type和pull_type这样的简洁组合已经可以解决基本问题—同步调用的中断恢复,但是只有多协程并发才能发挥其真正威力,为此需要同步和调度,boost搞了个fiber(纤程,这才是轻量级线程)出来,是在coroutine2的基础上添加了协程调度器以及barrier mutex channel promise future condition_variable, sleep yield 等协程同步工具,这些和线程同步工具很像,因为在多协程场景下,它两模型和解决的问题都是一样的,都是通过调度多实体实现并发
但是协程有很多好处,开销很小,而且调度是运行的协程自己控制让出cpu给下一个要运行的线程,是可预见的,同时调用上是同步的,保证了顺序性就可以避免锁
下面是boost的fiber的一个例子
#include <boost/fiber/all.hpp> #include <iostream> using namespace std; using namespace boost; void callMe(fibers::buffered_channel<string>& pipe) { pipe.push("hello world"); } int main() { fibers::buffered_channel<string> pipe(2); fibers::fiber f([&]() {callMe(pipe); }); f.detach(); string str; std::cout<<"start pop"<<std::endl; pipe.pop(str); //切换协程运行 std::cout<<"get str:"<<str<<std::endl; return 0; } 编译运行 g++ channel.cpp -o channel -lboost_fiber -lboost_context ./channel -------------------输出分割线------------------- start pop get str:hello world
这是一个最简单的例子,并没有去体现使用一个loop去做调度协程,调度还是由一些函数手动触发的
注意pull_type和push_type的操作已经没有了,那协程是如何切换的呢?
切换发生在pipe.pop( )中, fibers::buffered_channel是一个缓存队列,用来传输数据,pop的底层检测到没有数据,会就开始让出cpu,底层的协程调度器就开始调度别的协程进行运行,没有看过源码不知道执行到pipe.push的时候是否有没有发生调度,也许有也许没有,但都不太重要,因为这就和线程是一样的;
由于fiber中有调度器的存在,当前协程主动让出cpu,调度器让别的协程运行,比如上面的pipe.pop(),相当执行了一个协程的co_yield()操作让出cpu;
所以,某个协程中如果有阻塞操作,将导致整个线程都处于阻塞,所有协程都被阻塞, 此文提出两种解决方法
int read_chunk( NonblockingAPI & api, std::string & data, std::size_t desired) {
int error;
while ( EWOULDBLOCK == ( error = api.read( data, desired) ) ) {
boost::this_fiber::yield();
}
return error;
}
std::pair< AsyncAPI::errorcode, std::string > read_ec( AsyncAPI & api) {
typedef std::pair< AsyncAPI::errorcode, std::string > result_pair;
boost::fibers::promise< result_pair > promise;
boost::fibers::future< result_pair > future( promise.get_future() );
// We promise that both 'promise' and 'future' will survive until our lambda has been called.
// Need C++14
api.init_read([promise=std::move( promise)]( AsyncAPI::errorcode ec, std::string const& data) mutable {
promise.set_value( result_pair( ec, data) );
});
return future.get();
}
asio中的协程
#include <asio/co_spawn.hpp> #include <asio/detached.hpp> #include <asio/io_context.hpp> #include <asio/ip/tcp.hpp> #include <asio/signal_set.hpp> #include <asio/write.hpp> #include <cstdio> #include <iostream> using asio::ip::tcp; using asio::awaitable; using asio::co_spawn; using asio::detached; using asio::use_awaitable; namespace this_coro = asio::this_coro; #if defined(ASIO_ENABLE_HANDLER_TRACKING) # define use_awaitable \ asio::use_awaitable_t(__FILE__, __LINE__, __PRETTY_FUNCTION__) #endif awaitable<void> echo(tcp::socket socket) { try { char data[1024]; for (;;) { std::size_t n = co_await socket.async_read_some(asio::buffer(data), use_awaitable); co_await async_write(socket, asio::buffer(data, n), use_awaitable); } } catch (std::exception& e) { std::printf("echo Exception: %s\n", e.what()); } } void fn2(){ std::cout<<"hhh\n"; } void fn(){ fn2(); } awaitable<void> listener() { auto executor = co_await this_coro::executor; fn(); tcp::acceptor acceptor(executor, {tcp::v4(), 8988}); for (;;) { tcp::socket socket = co_await acceptor.async_accept(use_awaitable); //调用协程,体现同步性 co_spawn(executor, echo(std::move(socket)), detached);// 创建连接处理线程 } } int main() { try { asio::io_context io_context(1); asio::signal_set signals(io_context, SIGINT, SIGTERM); signals.async_wait([&](auto, auto){ io_context.stop(); }); co_spawn(io_context, listener(), detached); // 创建纤程,体现并发性 io_context.run(); // 开始调度 } catch (std::exception& e) { std::printf("Exception: %s\n", e.what()); } }
代码很长,但只需要看main( )就可以了,co_spawn( )创建了一个协程,然后使用io_context.run( ),对基于该io_context创建的协程进行调度, 上面实现的协程函数listener( )中,使用co_await acceptor.async_accept(use_awaitable)做一个协程的阻塞同步调用,async_accept( )中发现没有新的连接就让出cpu给当前io_context下别的协程继续运行,当时间片又切回到该协程时,发现有新的链接时候,往io_context中创建一个新的协程去处理该连接,这里就能很好的体现了协程的同步和并发的应用场景,调度过程;
asio的协程是基于c++20实现的,简单的介绍因为asio库很通用,还没有精力继续研究,但可以先来看看c++20的协程给的基础设施。
c++20的协程
c++20的协程目前只是一套框架基础,远未成熟,最好的文档参考还是cppreference,同时的这里两篇很好的文章进行了介绍文章1和文章2
#include <iostream> #include <thread> #include <coroutine> #include <future> #include <chrono> #include <functional> struct Result{ struct promise_type { Result get_return_object() { return {}; } std::suspend_never initial_suspend() { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void return_void() {} void unhandled_exception() {} }; }; std::coroutine_handle<> coroutine_handle; struct AWaitableObject { AWaitableObject() {} bool await_ready() const {return false;} int await_resume() { return 0; } void await_suspend(std::coroutine_handle<> handle){ coroutine_handle = handle; } }; Result CoroutineFunction() { std::cout<<"start coroutine\n"; int ret = co_await AWaitableObject(); std::cout<<"finish coroutine\n"; } int main() { std::cout<<"start \n"; auto coro = CoroutineFunction(); std::cout<<"coroutine co_await\n"; coroutine_handle.resume(); return 0; } 对该程序使用如下方式进行编译运行(需g++10.2.0及以上) g++ test4.cpp -O0 -g -o test4 -fcoroutines -std=c++20 start start coroutine coroutine co_await finish coroutine
接下来,介绍目前c++协程的设计思想和细节
Results CoroutineFunction(){
co_await AwaitatbleObject();
co_return {};
}
struct Result{
struct promise_type {
Result get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void unhandled_exception() {}
suspend_aways yield_value(){} // 对应co_yield
void return_void() {} //对应co_return
Result return_value(const Result& res){ return res;}
};
};
下面介绍promise_type的接口在协程运行如何交互,从头到尾,主要分成下面三个阶段
开头初始化准备:
(1)协程函数运行后,首先生成一个promise_type对象
(2)调用get_return_object()函数创建返回值对象,这个对象会在协程第一次返回时就会把这个对象返回给caller;
(3)调用initial_suspend()函数,这个返回值有两个选择suspend_never/suspend_always,never表示继续运行,always表示协程挂起,同时把返回值对象返回,所以这个接口的语义是,协程创建后是否马上运行
运行:
(1)开始运行协程函数,如果出现异常会调用unhandled_exception()去处理
(2)如果遇到co_yield var这样的表达式,表示想要挂起当前协程,返回一个值给caller店, 编译器调用yield_value(var)方法,我们可以此时将值设置到Result的相关变量中,编译器会继续根据函数的返回值判断是否为suspend_always判断要返回到caller点
(3)如果co_return这样的表达式,想要结束协程返回一个对象,则会调用return_value()这个函数,设置好要返回的相关值; 如果整个协程都没有出现co_return,则会调用return_void()
结束善后:
最后调用final_suspend() 判断协程已处理完毕释放前是否要挂起
其中有一个重要的关键字–co_await, 这是一个一元操作符,操作的对象为awaitable类型,就是实现await_ready(), await_resume(), await_suspend( ) 的类型,如例子所示的AWaitableObject
struct AWaitableObject
{
AWaitableObject() {}
bool await_ready() const {return false;}
int await_resume() { return 0; }
void await_suspend(std::coroutine_handle<> handle){
coroutine_handle = handle;
}
};
当使用co_await awaitable_object时:
struct suspend_always { bool await_ready() { return false; } void await_suspend(coroutine_handle<>) {} void await_resume() {} }; struct suspend_never { bool await_ready() { return true; } void await_suspend(coroutine_handle<>) {} void await_resume() {} };
每个协程都对应一个handle,用来管理协程的挂起和恢复,比如说handle.resume()就是用来恢复协程的运行的
协程handle的获取有两种方式:
struct Result{ //add Result(promise_type* obj):promise_type_ptr(obj){} //add void resume(){ promise_type_ptr->resume(); } struct promise_type { // mod Result get_return_object() { return Reuslt(this); } // add void resume(){ coroutine_handle<promise_type>::from_promise(*this).resume(); } std::suspend_never initial_suspend() { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void unhandled_exception() {} suspend_aways yield_value(){} void return_void() {} Result return_value(const Result& res){ return res;} }; // add promise_type *promise_type_ptr; }; 则可以通过如下方式使用 auto result = CoroutineFunction(); result.resume(); 从promise_type到awaitable object,c++20的协程目前提供的更多的是一个灵活的基础框架,离使用上还有一段距离
awaitable<void> listener() { auto executor = co_await this_coro::executor; fn(); tcp::acceptor acceptor(executor, {tcp::v4(), 8988}); for (;;) { tcp::socket socket = co_await acceptor.async_accept(use_awaitable); //调用协程,体现同步性 co_spawn(executor, echo(std::move(socket)), detached);// 创建连接处理线程 } } int main() { try { asio::io_context io_context(1); asio::signal_set signals(io_context, SIGINT, SIGTERM); signals.async_wait([&](auto, auto){ io_context.stop(); }); co_spawn(io_context, listener(), detached); // 创建协程,体现并发性 io_context.run(); // 开始调度 } catch (std::exception& e) { std::printf("Exception: %s\n", e.what()); } }
有栈协程和无栈协程
void fn(){ int a, b, c; a = b + c; yield(); b = c + a; yield(); c = a + b; } ----------------------------分割线--------------------------------- Struct fn{ int a, b, c; int __state = 0; void resume(){ switch(__state) { case 0: return fn1(); case 1: return fn2(); case 2: return fn3(); } } void fn1(){ a = b + c; } void fn2(){ b = c + a; } void fn3(){ c = a + b; } };
对称和非对称
区分点在于是否让出CPU
boost.coroutine2和libco这类属于非对称协程,这类协程的特点是存在调用链,有调用和返回的关系
比如说coroutine2中进行source()的时候去调用协程了,协程执行到阻塞点sink()返回,而不是让出cpu,随便执行别的协程;
参考:深入浅出c++协程
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。