赞
踩
版本如下:
- implementation "io.reactivex.rxjava2:rxjava:2.2.5"
- implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
常用代码如下
- Observable observable = new Observable() {
- @Override
- protected void subscribeActual(Observer observer) {
- observer.onNext("1");
- observer.onNext("2");
- }
- };
-
- observable
- .subscribeOn(AndroidSchedulers.mainThread())
- .subscribe(new Observer() {
- @Override
- public void onSubscribe(Disposable d) {
-
- }
-
- @Override
- public void onNext(Object o) {
- Log.e("Tag" , "recive : " + o + " thread :" + Thread.currentThread().getName());
- }
-
- @Override
- public void onError(Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
-
- }
- });

从subscribeOn方法上手
- @CheckReturnValue
- @SchedulerSupport(SchedulerSupport.CUSTOM)
- public final Observable<T> subscribeOn(Scheduler scheduler) {
- //非空判断,若为空会直接抛出异常
- ObjectHelper.requireNonNull(scheduler, "scheduler is null");
- //这里将Observable和Scheduler封装成了ObservableSubscribeOn
- //onAssembly方法返回了这个new ObservableSubscribeOn<T>(this, scheduler)对象
- return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
- }
所以为了方便理解,我们可以将原来的代码分割成这样:
接下来是subscribe方法,点进去直接看:
- //这个方法在Observable中,ObservableSubscribeOn是Observable的子类
- public final void subscribe(Observer<? super T> observer) {
- //非空判断
- ObjectHelper.requireNonNull(observer, "observer is null");
- try {
- //这里直接返回了observer
- observer = RxJavaPlugins.onSubscribe(this, observer);
- //非空判断
- ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
- //这里是重点方法
- subscribeActual(observer);
- } catch (NullPointerException e) { // NOPMD
- 省略
- } catch (Throwable e) {
- 省略
- }
- }

- //ObservableSubscribeOn类的subscribeActual方法
- @Override
- public void subscribeActual(final Observer<? super T> observer) {
- final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
- //这里就调用的Observer方法,就是我们自己实现的方法
- observer.onSubscribe(parent);
- //scheduler.scheduleDirect(new SubscribeTask(parent))是重点
- parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
- }
看到这里,scheduler.scheduleDirect(new SubscribeTask(parent))是重点的方法,scheduler还是在我们分割出来的observable.subscribeOn(AndroidSchedulers.mainThread());方法中赋值的,所以现在再回头去看AndroidSchedulers.mainThread()
AndroidSchedulers.mainThread()点进mainThread()方法,里面直接返回的是RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD)
onMainThreadScheduler()方法返回了MAIN_THREAD
MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler()
RxAndroidPlugins.initMainThreadScheduler()返回了new Callable<Scheduler>().call(),也就是MainHolder.DEFAULT
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
HandlerScheduler类中就是Scheduler的子类,并且重写了scheduleDirect()方法。
-
-
- //在AndroidSchedulers类中 ,顺序调用了下列方法
- public static Scheduler mainThread() {
- //这里直接返回了MAIN_THREAD
- return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
- }
-
-
- //定义了MAIN_THREAD
- private static final Scheduler MAIN_THREAD =
- //调用了initMainThreadScheduler方法
- RxAndroidPlugins.initMainThreadScheduler(
- new Callable<Scheduler>() {
- @Override public Scheduler call() throws Exception {
- return MainHolder.DEFAULT;
- }
- });
-
-
- //方法里最终调用了callRequireNonNull
- public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
- if (scheduler == null) {
- throw new NullPointerException("scheduler == null");
- }
- Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
- if (f == null) {
- return callRequireNonNull(scheduler);
- }
- return applyRequireNonNull(f, scheduler);
- }
-
-
- //最终返回了s.call(),也就是MainHolder.DEFAULT
- static Scheduler callRequireNonNull(Callable<Scheduler> s) {
- try {
- Scheduler scheduler = s.call();
- if (scheduler == null) {
- throw new NullPointerException("Scheduler Callable returned null");
- }
- return scheduler;
- } catch (Throwable ex) {
- throw Exceptions.propagate(ex);
- }
- }
-
-
- private static final class MainHolder {
- static final Scheduler DEFAULT
- = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
- }
-
-

下面直接看HandlerScheduler中的scheduleDirect方法:
- @Override
- public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
- if (run == null) throw new NullPointerException("run == null");
- if (unit == null) throw new NullPointerException("unit == null");
-
- run = RxJavaPlugins.onSchedule(run);
- ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
- //ScheduledRunnable 实现了 Runnable接口 ,handler 是new Handler(Looper.getMainLooper()),这样就实现在切换到主线程
- handler.postDelayed(scheduled, unit.toMillis(delay));
- return scheduled;
- }
接下来分析Schedulers.io(),其实之前的逻辑是一样的,只是在调用scheduler.scheduleDirect()时,是由不同子类实现的,所以我们先找到实现scheduleDirect方法的类再分析。
- @NonNull
- public static Scheduler io() {
- // 这里直接返回了IO
- return RxJavaPlugins.onIoScheduler(IO);
- }
- // 这里帮大家整理成这样,源码里IO = RxJavaPlugins.initIoScheduler(new IOTask());是写在static{}里的
- // initIoScheduler最终返回了IOTask的Call()方法,即Schedulers.IoHolder.DEFAULT
- static final Scheduler IO = RxJavaPlugins.initIoScheduler(new IOTask());
-
- static final class IOTask implements Callable<Scheduler> {
- @Override
- public Scheduler call() throws Exception {
- return Schedulers.IoHolder.DEFAULT;
- }
- }
- static final class IoHolder {
- static final Scheduler DEFAULT = new IoScheduler();
- }

这里已经找到了Scheduler的子类IoScheduler,但是发现IoScheduler并未重写scheduleDirect方法,所以是直接调用了父类Scheduler的scheduleDirect方法
- @NonNull
- public Disposable scheduleDirect(@NonNull Runnable run) {
- return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
- }
-
- @NonNull
- public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
- // createWorker()是子类实现的,在Schedulers.io()中是IoScheduler类
- final Worker w = createWorker();
- final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
- DisposeTask task = new DisposeTask(decoratedRun, w);
- // 这里是重点
- w.schedule(task, delay, unit);
- return task;
- }
IoScheduler重写的createWorker如下:
- public Worker createWorker() {
- return new EventLoopWorker(pool.get());
- }
所以w.schedule(task, delay, unit);就可以直接去EventLoopWorker中找,代码如下
- static final class EventLoopWorker extends Scheduler.Worker {
- private final CompositeDisposable tasks;
- private final IoScheduler.CachedWorkerPool pool;
- private final IoScheduler.ThreadWorker threadWorker;
-
- EventLoopWorker(IoScheduler.CachedWorkerPool pool) {
- this.pool = pool;
- this.tasks = new CompositeDisposable();
- this.threadWorker = pool.get();
- }
-
- // 省略一些方法
-
- @NonNull
- @Override
- public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
- // 省略
- return threadWorker.scheduleActual(action, delayTime, unit, tasks);
- }
- }

发现threadWorker.scheduleActual(action, delayTime, unit, tasks);而threadWorker = pool.get();所以还得去找pool.get
点进入后可以找到贴出部分代码:
- ThreadWorker get() {
- // 检查缓存队列是否为空,不为空则直接取出来用
- while (!expiringWorkerQueue.isEmpty()) {
- ThreadWorker threadWorker = expiringWorkerQueue.poll();
- if (threadWorker != null) {
- return threadWorker;
- }
- }
- // 如果缓存是空的则new ThreadWorker
- // No cached worker found, so create a new one.
- ThreadWorker w = new ThreadWorker(threadFactory);
- allWorkers.add(w);
- return w;
- }
返回了ThreadWorker,再查看ThreadWorker代码,发现其中并没有scheduleActual方法,发现它的父类已经写好了,代码如下:
- public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
- Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
-
- ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
-
- Future<?> f;
- try {
- if (delayTime <= 0) {
- // 提交了到了线程池中
- f = executor.submit((Callable<Object>)sr);
- } else {
- // 提交了到了线程池中
- f = executor.schedule((Callable<Object>)sr, delayTime, unit);
- }
- sr.setFuture(f);
- } catch (RejectedExecutionException ex) {
- //省略代码
- }
-
- return sr;
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。