当前位置:   article > 正文

Rxjava2切换线程原理_androidschedulers.mainthread().scheduledirect

androidschedulers.mainthread().scheduledirect

版本如下:

  1. implementation "io.reactivex.rxjava2:rxjava:2.2.5"
  2. implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'

建议一边看博客,一边打开Android studio看代码

首先分析 AndroidSchedulers.mainThread() ,后面分析Schedulers.io(),思路有些类似,在具体的Scheduler.scheduleDirect()方法实现上不一样,因为android的主线程使用了Handler比较简单,所以先看Android主线程。

常用代码如下

  1. Observable observable = new Observable() {
  2. @Override
  3. protected void subscribeActual(Observer observer) {
  4. observer.onNext("1");
  5. observer.onNext("2");
  6. }
  7. };
  8. observable
  9. .subscribeOn(AndroidSchedulers.mainThread())
  10. .subscribe(new Observer() {
  11. @Override
  12. public void onSubscribe(Disposable d) {
  13. }
  14. @Override
  15. public void onNext(Object o) {
  16. Log.e("Tag" , "recive : " + o + " thread :" + Thread.currentThread().getName());
  17. }
  18. @Override
  19. public void onError(Throwable e) {
  20. }
  21. @Override
  22. public void onComplete() {
  23. }
  24. });

从subscribeOn方法上手

  1. @CheckReturnValue
  2. @SchedulerSupport(SchedulerSupport.CUSTOM)
  3. public final Observable<T> subscribeOn(Scheduler scheduler) {
  4. //非空判断,若为空会直接抛出异常
  5. ObjectHelper.requireNonNull(scheduler, "scheduler is null");
  6. //这里将Observable和Scheduler封装成了ObservableSubscribeOn
  7. //onAssembly方法返回了这个new ObservableSubscribeOn<T>(this, scheduler)对象
  8. return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
  9. }

所以为了方便理解,我们可以将原来的代码分割成这样:

接下来是subscribe方法,点进去直接看:

  1. //这个方法在Observable中,ObservableSubscribeOn是Observable的子类
  2. public final void subscribe(Observer<? super T> observer) {
  3. //非空判断
  4. ObjectHelper.requireNonNull(observer, "observer is null");
  5. try {
  6. //这里直接返回了observer
  7. observer = RxJavaPlugins.onSubscribe(this, observer);
  8. //非空判断
  9. ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
  10. //这里是重点方法
  11. subscribeActual(observer);
  12. } catch (NullPointerException e) { // NOPMD
  13. 省略
  14. } catch (Throwable e) {
  15. 省略
  16. }
  17. }

到了subscribeActual(observer);习惯性直接Ctrl + 左键就想看源码,点进入发现是Observable类的抽象方法,回看上面我们分割过的代码,就可以看出来实现类是ObservableSubscribeOn,所以直接看ObservableSubscribeOn.subscribeActual()方法

  1. //ObservableSubscribeOn类的subscribeActual方法
  2. @Override
  3. public void subscribeActual(final Observer<? super T> observer) {
  4. final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
  5. //这里就调用的Observer方法,就是我们自己实现的方法
  6. observer.onSubscribe(parent);
  7. //scheduler.scheduleDirect(new SubscribeTask(parent))是重点
  8. parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
  9. }

看到这里,scheduler.scheduleDirect(new SubscribeTask(parent))是重点的方法,scheduler还是在我们分割出来的observable.subscribeOn(AndroidSchedulers.mainThread());方法中赋值的,所以现在再回头去看AndroidSchedulers.mainThread()

这里调用的方法很多,但其实并不繁琐,所以大家可以直接自己看,我文字应该可以说一下,如果你自己还是找不到那我也会贴出代码。

文字版:

AndroidSchedulers.mainThread()点进mainThread()方法,里面直接返回的是RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD)

onMainThreadScheduler()方法返回了MAIN_THREAD

MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler()

RxAndroidPlugins.initMainThreadScheduler()返回了new Callable<Scheduler>().call(),也就是MainHolder.DEFAULT

static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);

HandlerScheduler类中就是Scheduler的子类,并且重写了scheduleDirect()方法。

代码版:

  1. //在AndroidSchedulers类中 ,顺序调用了下列方法
  2. public static Scheduler mainThread() {
  3. //这里直接返回了MAIN_THREAD
  4. return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
  5. }
  6. //定义了MAIN_THREAD
  7. private static final Scheduler MAIN_THREAD =
  8. //调用了initMainThreadScheduler方法
  9. RxAndroidPlugins.initMainThreadScheduler(
  10. new Callable<Scheduler>() {
  11. @Override public Scheduler call() throws Exception {
  12. return MainHolder.DEFAULT;
  13. }
  14. });
  15. //方法里最终调用了callRequireNonNull
  16. public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
  17. if (scheduler == null) {
  18. throw new NullPointerException("scheduler == null");
  19. }
  20. Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
  21. if (f == null) {
  22. return callRequireNonNull(scheduler);
  23. }
  24. return applyRequireNonNull(f, scheduler);
  25. }
  26. //最终返回了s.call(),也就是MainHolder.DEFAULT
  27. static Scheduler callRequireNonNull(Callable<Scheduler> s) {
  28. try {
  29. Scheduler scheduler = s.call();
  30. if (scheduler == null) {
  31. throw new NullPointerException("Scheduler Callable returned null");
  32. }
  33. return scheduler;
  34. } catch (Throwable ex) {
  35. throw Exceptions.propagate(ex);
  36. }
  37. }
  38. private static final class MainHolder {
  39. static final Scheduler DEFAULT
  40. = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
  41. }

下面直接看HandlerScheduler中的scheduleDirect方法:

  1. @Override
  2. public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
  3. if (run == null) throw new NullPointerException("run == null");
  4. if (unit == null) throw new NullPointerException("unit == null");
  5. run = RxJavaPlugins.onSchedule(run);
  6. ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
  7. //ScheduledRunnable 实现了 Runnable接口 ,handler 是new Handler(Looper.getMainLooper()),这样就实现在切换到主线程
  8. handler.postDelayed(scheduled, unit.toMillis(delay));
  9. return scheduled;
  10. }

handler 是new Handler(Looper.getMainLooper()),这样就实现在切换到主线程

 

 

——————————分割线——————————

 

 

 

接下来分析Schedulers.io(),其实之前的逻辑是一样的,只是在调用scheduler.scheduleDirect()时,是由不同子类实现的,所以我们先找到实现scheduleDirect方法的类再分析。

  1. @NonNull
  2. public static Scheduler io() {
  3. // 这里直接返回了IO
  4. return RxJavaPlugins.onIoScheduler(IO);
  5. }
  6. // 这里帮大家整理成这样,源码里IO = RxJavaPlugins.initIoScheduler(new IOTask());是写在static{}里的
  7. // initIoScheduler最终返回了IOTask的Call()方法,即Schedulers.IoHolder.DEFAULT
  8. static final Scheduler IO = RxJavaPlugins.initIoScheduler(new IOTask());
  9. static final class IOTask implements Callable<Scheduler> {
  10. @Override
  11. public Scheduler call() throws Exception {
  12. return Schedulers.IoHolder.DEFAULT;
  13. }
  14. }
  15. static final class IoHolder {
  16. static final Scheduler DEFAULT = new IoScheduler();
  17. }

这里已经找到了Scheduler的子类IoScheduler,但是发现IoScheduler并未重写scheduleDirect方法,所以是直接调用了父类Scheduler的scheduleDirect方法

  1. @NonNull
  2. public Disposable scheduleDirect(@NonNull Runnable run) {
  3. return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
  4. }
  5. @NonNull
  6. public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
  7. // createWorker()是子类实现的,在Schedulers.io()中是IoScheduler类
  8. final Worker w = createWorker();
  9. final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
  10. DisposeTask task = new DisposeTask(decoratedRun, w);
  11. // 这里是重点
  12. w.schedule(task, delay, unit);
  13. return task;
  14. }

 

IoScheduler重写的createWorker如下:

  1. public Worker createWorker() {
  2. return new EventLoopWorker(pool.get());
  3. }

所以w.schedule(task, delay, unit);就可以直接去EventLoopWorker中找,代码如下

  1. static final class EventLoopWorker extends Scheduler.Worker {
  2. private final CompositeDisposable tasks;
  3. private final IoScheduler.CachedWorkerPool pool;
  4. private final IoScheduler.ThreadWorker threadWorker;
  5. EventLoopWorker(IoScheduler.CachedWorkerPool pool) {
  6. this.pool = pool;
  7. this.tasks = new CompositeDisposable();
  8. this.threadWorker = pool.get();
  9. }
  10. // 省略一些方法
  11. @NonNull
  12. @Override
  13. public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
  14. // 省略
  15. return threadWorker.scheduleActual(action, delayTime, unit, tasks);
  16. }
  17. }

发现threadWorker.scheduleActual(action, delayTime, unit, tasks);而threadWorker = pool.get();所以还得去找pool.get

点进入后可以找到贴出部分代码:

  1. ThreadWorker get() {
  2. // 检查缓存队列是否为空,不为空则直接取出来用
  3. while (!expiringWorkerQueue.isEmpty()) {
  4. ThreadWorker threadWorker = expiringWorkerQueue.poll();
  5. if (threadWorker != null) {
  6. return threadWorker;
  7. }
  8. }
  9. // 如果缓存是空的则new ThreadWorker
  10. // No cached worker found, so create a new one.
  11. ThreadWorker w = new ThreadWorker(threadFactory);
  12. allWorkers.add(w);
  13. return w;
  14. }

返回了ThreadWorker,再查看ThreadWorker代码,发现其中并没有scheduleActual方法,发现它的父类已经写好了,代码如下:

  1. public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
  2. Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
  3. ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
  4. Future<?> f;
  5. try {
  6. if (delayTime <= 0) {
  7. // 提交了到了线程池中
  8. f = executor.submit((Callable<Object>)sr);
  9. } else {
  10. // 提交了到了线程池中
  11. f = executor.schedule((Callable<Object>)sr, delayTime, unit);
  12. }
  13. sr.setFuture(f);
  14. } catch (RejectedExecutionException ex) {
  15. //省略代码
  16. }
  17. return sr;
  18. }

其中将实现了Runnable接口的ScheduledRunnable类提交到了线程池中,实现了线程的切换。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/251114
推荐阅读
  

闽ICP备14008679号