当前位置:   article > 正文

android是切换主线程的原理,RxJava2.0(四)线程之间切换的内部原理

android是切换主线程的原理,RxJava2.0(四)线程之间切换的内部原理

基本代码

来看一下基本代码:

Observable.create((ObservableOnSubscribe) e -> {

e.onNext(1);

e.onNext(2);

e.onComplete();

}).subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(i -> System.out.println("onNext : i= " + i));

复制代码

很简单,即订阅时将task交给子线程去做,而数据的回调则在Android主线程中执行。

一、subscribeOn()

点击查看源码:

public final Observable subscribeOn(Scheduler scheduler){

//非空判断和hook

ObjectHelper.requireNonNull(scheduler, "scheduler is null");

return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));

}

复制代码

实际上这个方法返回了一个ObservableSubscribeOn对象。我们有理由猜测这个ObservableSubscribeOn应该和上文的ObservableMap及ObservableDoOnEach相似,都是Observable的一个包装类(装饰器):

//1.ObservableSubscribeOn也是Observable的一个装饰器

public final class ObservableSubscribeOn extends AbstractObservableWithUpstream{

final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler){

//2.存储上游的ObservableSource和调度器

super(source);

this.scheduler = scheduler;

}

@Override

public void subscribeActual(final Observer super T> s){

//3.new 一个SubscribeOnObserver

final SubscribeOnObserver parent = new SubscribeOnObserver(s);

//4.回调方法,这说明下游的onSubscribe回调方法所在线程和线程调度无关

// 是订阅时所在的线程

s.onSubscribe(parent);

//5.立即执行线程调度

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

}

}

复制代码

前两步我们不需要 再多解释,直接看第三点,我们看看SubscribeOnObserver这个类:

SubscribeOnObserver

static final class SubscribeOnObserver extends AtomicReference implements Observer, Disposable{

private static final long serialVersionUID = 8094547886072529208L;

//下游的Observer

final Observer super T> actual;

//保存上游的Disposable,自身dispose时,连同上游一起dispose

final AtomicReference s;

SubscribeOnObserver(Observer super T> actual) {

this.actual = actual;

this.s = new AtomicReference();

}

@Override

public void onSubscribe(Disposable s){

DisposableHelper.setOnce(this.s, s);

}

@Override

public void onNext(T t){

actual.onNext(t);

}

@Override

public void onError(Throwable t){

actual.onError(t);

}

@Override

public void onComplete(){

actual.onComplete();

}

@Override

public void dispose(){

DisposableHelper.dispose(s);

DisposableHelper.dispose(this);

}

复制代码

类似Observable和ObservableMap,SubscribeOnObserver同样是Disposable和Observer的一个装饰器,提供了对下游数据的传递,以及将task dispose的接口。

第4步我们之前就讲过了,直接看第5步:

//5.立即执行线程调度

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

复制代码

我们看看SubscribeTask这个类:

SubscribeTask

final class SubscribeTask implements Runnable{

private final SubscribeOnObserver parent;

SubscribeTask(SubscribeOnObserver parent) {

this.parent = parent;

}

@Override

public void run(){

source.subscribe(parent);

}

}

复制代码

难以置信的简单,SubscribeTask 仅仅是一个Runnable 接口的实现类而已,通过将SubscribeOnObserver作为参数存起来,在run()方法中添加了上游Observable的被订阅事件,就没有了别的操作,

接下来我们看一下scheduler.scheduleDirect(SubscribeTask)中的代码:

public abstract class Scheduler{

//...

public Disposable scheduleDirect(@NonNull Runnable run){

return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);

}

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit){

// Worker 本身就是Disposable 的实现类

// 请注意, createWorker()所创建的worker,

// 实际就是Schdulers.io()所提供的IoScheduler所创建的worker

final Worker w = createWorker();

//hook相关

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

DisposeTask task = new DisposeTask(decoratedRun, w);

//即 worker.schedule(task, 0, TimeUnit.NANOSECONDS): 立即执行task

w.schedule(task, delay, unit);

return task;

}

//...

}

复制代码

我们不要追究过深,我们看一下这个createWorker方法的注释说明:

/**

* Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.

* 检索或创建一个新的{@link Scheduler.Worker}表示一系列的action

*

* When work is completed it should be unsubscribed using {@link Scheduler.Worker#dispose()}.

* 当work完成后,应使用{@link Scheduler.Worker#dispose()}取消订阅。

*

* Work on a {@link Scheduler.Worker} is guaranteed to be sequential.

* {@link Scheduler.Worker} 上面的work保证是顺序执行的

*/

复制代码

现在我们知道了:我们通过调用subscribeOn()传入Scheduler,当下游ObservableSource被订阅时(请注意,订阅顺序是由下到上的),距离最近的线程调度subscribeOn()方法中,保存的Scheduler会创建一个worker(对应相应的线程,本文中为IoScheduler),在其对应的线程中,立即执行task

多次subscribeOn()

现在考虑一个问题,假如在我们的代码中,多次使用了subscribeOn()代码,到线程会怎么处理呢?

上文已经讲到了,不管我们怎么通过subscribeOn()方法切换线程,由于订阅执行顺序是由下到上,因此当最上游的ObservableSource被订阅时,所在线程当然是距离上游最近的subscribeOn()所提供的线程,即最终Observable总是在第一个subscribeOn()所在的线程中执行。

二、observeOn()

先看observeOn()内部,果然是hook+Observable的包装类:

public final Observable observeOn(Scheduler scheduler){

return observeOn(scheduler, false, bufferSize());

}

public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize){

ObjectHelper.requireNonNull(scheduler, "scheduler is null");

ObjectHelper.verifyPositive(bufferSize, "bufferSize");

//实例化ObservableObserveOn对象并返回

return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));

}

复制代码

再看ObservableObserveOn:

public final class ObservableObserveOn extends AbstractObservableWithUpstream{

final Scheduler scheduler;

final boolean delayError;

final int bufferSize;

public ObservableObserveOn(ObservableSource source, Scheduler scheduler, boolean delayError, int bufferSize){

super(source);

//1.相关依赖注入

this.scheduler = scheduler;

this.delayError = delayError;

this.bufferSize = bufferSize;

}

@Override

protected void subscribeActual(Observer super T> observer){

if (scheduler instanceof TrampolineScheduler) {

source.subscribe(observer);

} else {

//2.创建主线程的worker

Scheduler.Worker w = scheduler.createWorker();

//3.上游数据源被订阅

source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));

}

}

}

复制代码

和subscribeOn()不同的是,我们并不是立即在对应的线程执行task,而是将对应的线程(实际上是worker)作为参数,实例化ObserveOnObserver并存储起来。

当上游的数据传递过来时,ObserveOnObserver执行对应的方法,比如onNext(T),再切换到对应线程中,并交由下游的Observer去接收:

ObserveOnObserver

ObserveOnObserver中代码极多,我们简单了解原理后,以onNext(T)为例:

static final class ObserveOnObserver extends BasicIntQueueDisposable

implements Observer, Runnable{

//...省略其他代码

ObserveOnObserver(Observer super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {

this.actual = actual;

this.worker = worker;

this.delayError = delayError;

this.bufferSize = bufferSize;

}

//队列

SimpleQueue queue;

@Override

public void onNext(T t){

if (done) {

return;

}

//将数据存入队列

if (sourceMode != QueueDisposable.ASYNC) {

queue.offer(t);

}

//对应线程取出数据并交由下游的Observer

schedule();

}

void schedule(){

if (getAndIncrement() == 0) {

worker.schedule(this);

}

}

//...省略其他代码

}

复制代码

多次observerOn()

由上文得知,与subscribeOn()相反,observerOn()操作会将切换到对应的线程,然后交由下游的Observer处理,因此observerOn()仅对下游的Observer生效,并且,如果多次调用,observerOn()的线程调度会持续到下一个observerOn()操作之前。

总结

subscribeOn()

订阅顺序当从下到上,上游的ObservableSource被订阅时,先切换线程,然后立即执行task;

当存在多个subscribeOn()方法时,仅第一个subscribeOn()有效。

observerOn()

订阅顺序当从下到上,上游的ObservableSource被订阅时,会将对应的worker创建并作为构造参数存储在Observer的装饰器中,并不会立即切换线程;

当数据由上游发送过来时,先将数据存储到队列中,然后切换线程,然后在新的线程中将数据发送给下游的Observer;

当存在多个observerOn()方法时,仅对距下游下一个observerOn()之前的observer有效

有兴趣可以关注我的小专栏,学习更多知识:小专栏

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/251095
推荐阅读
相关标签
  

闽ICP备14008679号