赞
踩
一直想了解rust中actor并发模式,Actix库是rust中知名的库。看看Actix库的说明,走进actor。
这个库的重要几个概念:
1、actor
任何实现Actor trait的类型,就是一个actor.actor有生命周期,几个状态:
(1)Started
(2) Running
(3)Stopping
(4)Stopped
我们来看一下Actor trait:
里面有start()、start_default()等不带参数的函数,大都返回的是Addr < Self >。我们看到了一个Addr类型。因为所有的Actors有一个邮箱Addr,而Actors之间的通信是通过messages来实现的,Actors之间只知道messages地址,而不能侵入到对方内部。
pub trait Actor: Sized + 'static { type Context: ActorContext; fn started(&mut self, ctx: &mut Self::Context) {} fn stopping(&mut self, ctx: &mut Self::Context) -> Running { Running::Stop } fn start(self) -> Addr<Self> where Self: Actor<Context = Context<Self>>, { Context::new().run(self) } fn start_default() -> Addr<Self> where Self: Actor<Context = Context<Self>> + Default, { Self::default().start() } /// Start new actor in arbiter's thread. fn start_in_arbiter<F>(arb: &Arbiter, f: F) -> Addr<Self> where Self: Actor<Context = Context<Self>>, F: FnOnce(&mut Context<Self>) -> Self + Send + 'static, { let (tx, rx) = channel::channel(DEFAULT_CAPACITY); // create actor arb.exec_fn(move || { let mut ctx = Context::with_receiver(rx); let act = f(&mut ctx); let fut = ctx.into_future(act); actix_rt::spawn(fut); }); Addr::new(tx) } fn create<F>(f: F) -> Addr<Self> where Self: Actor<Context = Context<Self>>, F: FnOnce(&mut Context<Self>) -> Self + 'static, { let mut ctx = Context::new(); let act = f(&mut ctx); ctx.run(act) } }
自定义类,实现Actor:
use actix::prelude::*;
struct MyActor {
count: usize,
}
impl Actor for MyActor {
type Context = Context<Self>;
// 启动的时侯,进行一些个性化设置,比如
fn started(&mut self, ctx: &mut Self::Context) {
ctx.set_mailbox_capacity(1);
}
}
let addr = MyActor.start();
2、message 、handler、 address、Recipient
(1)任何实现Message trait类型,是一个message.
pub trait Message {
/// The type of value that this message will resolved with if it is
/// successful.
type Result: 'static;
}
(2)所有的Actors之间的通信是通过messages来实现的。通信的message发往目标邮箱,Actors调用message handlers(句柄),执行上下文(context).
(3)handler是啥?
pub trait Handler<M>
where
Self: Actor,
M: Message,
{
/// The type of value that this handler will return.
type Result: MessageResponse<Self, M>;
/// This method is called for every message received by this actor.
fn handle(&mut self, msg: M, ctx: &mut Self::Context) -> Self::Result;
}
实现handler:
struct Ping(usize);
impl Handler<Ping> for MyActor {
type Result = usize;
fn handle(&mut self, msg: Ping, ctx: &mut Context<Self>) -> Self::Result {
self.count += msg.0;
self.count
}
}
(4) 自定义类,实现Message:
use actix::prelude::*;
struct Ping(usize);
impl Message for Ping {
type Result = usize;
}
(5)如何发送message?
首先要用到Addr object。具体地说,有几种方法在actors之间发送message:
Addr::do_send(M) - 忽视返回任何错误的方式,因为邮箱可能满了,也可能关闭。
Addr::try_send(M) - 如果错误 ,会返回 SendError。
Addr::send(M) - 会返回future object ,带有处理过程的结果信息。
(6)Recipient -收件人
收件人是一个Actor发给另外一个不同类型Actor时的地址。比如,订阅者与发送者。
3、context (上下文)、Mailbox
(1)Context字面上是上下文。具体有什么东东?看看源码:
pub struct Context<A> where A: Actor<Context = Context<A>>, { parts: ContextParts<A>, mb: Option<Mailbox<A>>, } impl<A> Context<A> where A: Actor<Context = Self>, { #[inline] pub(crate) fn new() -> Self { let mb = Mailbox::default(); Self { parts: ContextParts::new(mb.sender_producer()), mb: Some(mb), } } #[inline] pub fn with_receiver(rx: AddressReceiver<A>) -> Self { let mb = Mailbox::new(rx); Self { parts: ContextParts::new(mb.sender_producer()), mb: Some(mb), } } #[inline] pub fn run(self, act: A) -> Addr<A> { let fut = self.into_future(act); let addr = fut.address(); actix_rt::spawn(fut); addr } pub fn into_future(mut self, act: A) -> ContextFut<A, Self> { let mb = self.mb.take().unwrap(); ContextFut::new(self, act, mb) } pub fn handle(&self) -> SpawnHandle { self.parts.curr_handle() } pub fn set_mailbox_capacity(&mut self, cap: usize) { self.parts.set_mailbox_capacity(cap) } } impl<A> AsyncContextParts<A> for Context<A> where A: Actor<Context = Self>, { fn parts(&mut self) -> &mut ContextParts<A> { &mut self.parts } } pub trait ContextFutureSpawner<A> where A: Actor, A::Context: AsyncContext<A>, { fn spawn(self, ctx: &mut A::Context); fn wait(self, ctx: &mut A::Context); } impl<A, T> ContextFutureSpawner<A> for T where A: Actor, A::Context: AsyncContext<A>, T: ActorFuture<Item = (), Error = (), Actor = A> + 'static, { #[inline] fn spawn(self, ctx: &mut A::Context) { let _ = ctx.spawn(self); } #[inline] fn wait(self, ctx: &mut A::Context) { ctx.wait(self); } }
里面有 ContextParts和Option<Mailbox>两部分内容构成。
(2)找到ContextParts的源码,我们来看看:
pub struct ContextParts<A>
where
A: Actor,
A::Context: AsyncContext<A>,
{
addr: AddressSenderProducer<A>,
flags: ContextFlags,
wait: SmallVec<[ActorWaitItem<A>; 2]>,
items: SmallVec<[Item<A>; 3]>,
handles: SmallVec<[SpawnHandle; 2]>,
}
(3)我们来看一下Mailbox中的设计:
pub struct Mailbox<A> where A: Actor, A::Context: AsyncContext<A>, { msgs: AddressReceiver<A>, } impl<A> Mailbox<A> where A: Actor, A::Context: AsyncContext<A>, { #[inline] pub fn new(msgs: AddressReceiver<A>) -> Self { Self { msgs } } pub fn capacity(&self) -> usize { self.msgs.capacity() } pub fn set_capacity(&mut self, cap: usize) { self.msgs.set_capacity(cap); } #[inline] pub fn connected(&self) -> bool { self.msgs.connected() } pub fn address(&self) -> Addr<A> { Addr::new(self.msgs.sender()) } pub fn sender_producer(&self) -> AddressSenderProducer<A> { self.msgs.sender_producer() } pub fn poll(&mut self, act: &mut A, ctx: &mut A::Context) { #[cfg(feature = "mailbox_assert")] let mut n_polls = 0u16; loop { let mut not_ready = true; // sync messages loop { if ctx.waiting() { return; } match self.msgs.poll() { Ok(Async::Ready(Some(mut msg))) => { not_ready = false; msg.handle(act, ctx); } Ok(Async::Ready(None)) | Ok(Async::NotReady) | Err(_) => break, } #[cfg(feature = "mailbox_assert")] { n_polls += 1; assert!(n_polls < MAX_SYNC_POLLS, "Too many messages are being processed. Use Self::Context::notify() instead of direct use of address"); } } if not_ready { return; } } } }
4、Arbiter 、SyncArbiter
为Actors提供了异步执行的上下文环境,当一个actor运行时,Arbiters控制着actor包含特定执行状况的上下文环境。 Arbiters需要运行许多函数,包括起系统线程的函数、进行事件轮询、异步分发事件轮询任务、对异步任务进行支持。
当起一个actor时,是在一个系统的线程中运行的,这样效率比较高。也就是说,一个线程可能会针对N个actor.
事件轮询
在事件轮询时,对应的 Arbiter会控制事件轮询事件池的线程。 Arbiter会对任务队列进行排队,往往的情况是,你可以把Arbiter看成"single-threaded event loop".
5、future
从Future库可以看出:
pub trait Future {
/// The type of value produced on completion.
#[stable(feature = "futures_api", since = "1.36.0")]
type Output;
#[stable(feature = "futures_api", since = "1.36.0")]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。