赞
踩
RxJava的github地址:https://github.com/ReactiveX/RxJava
RxAndroid的github地址:https://github.com/ReactiveX/RxAndroid
提示:先别急着看github,我们先来认识RxJava和RxAndroid,从基本的使用学起。
提示:以下是本篇文章正文内容,下面案例可供参考
1.Observable 被观察者 ,事件源。是一个抽象类。
------ 1.1 ObservableEmitter 发射器;
2.Observer 观察者,事件接收处理。是一个接口。
3.subscribe 订阅,把被观察者和观察者关联起来。
先看下面RxJavaAndroid .java就好理解了这几个概念了。
build.gradle中dependencies 下依赖;代码如下(示例):
dependencies {
implementation "io.reactivex.rxjava3:rxjava:3.0.12"
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
}
/** * 注意别导错包。 */ public class RxJavaAndroid { public static void main(String[] args) { doRxJava(); } private static void doRxJava() { //通过 Observable.create 创建被观察者,Observable是一个抽象类 Observable<String> observable = Observable.create( new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable { //发射器发送消息 emitter.onNext("hello world"); //通过发射器发射异常 //emitter.onError(new Throwable("模拟一个异常")); //发射完成 emitter.onComplete(); } } ); //通过 new Observer 创建观察者;Observer是一个 interface Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { //第一个执行的方法 System.out.println("onSubscribe"); } @Override public void onNext(@NonNull String s) { System.out.println("onNext>>>" + s); } @Override public void onError(@NonNull Throwable e) { System.out.println("onError>>>" + e.getMessage()); } @Override public void onComplete() { System.out.println("onComplete"); } }; //通过被观察者 Observable 的 subscribe (订阅) 绑定观察者 Observer observable.subscribe(observer); } }
代码执行结果如下:
Observable 是数据的上游,事件源,即事件生产者。
//一:接看代码 Observable.create()方法 public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) { //ObservableOnSubscribe是一个只有subscribe方法的接口 Objects.requireNonNull(source, "source is null");//判空 return RxJavaPlugins.onAssembly(new ObservableCreate<>(source)); } //二:继续向下看,进入RxJavaPlugins.onAssembly方法 /** * Calls the associated hook function. * @param <T> the value type * @param source the hook's input value * @return the value returned by the hook */ @NonNull public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; } //源码注释有介绍,Calls the associated hook function. hook java反射、钩子。底层管用黑科技。 //apply(f, source)的返回值依旧是Observable,里面不过多探究。
综上我们能发现
Observable.create(new ObservableOnSubscribe() { })
相当于
new ObservableCreate(new ObservableOnSubscribe() { }))
总结:事件的源就是 new ObservableCreate()对象,将 ObservableOnSubscribe 作为参数传递给 ObservableCreate 的构造函数。
Observer 是数据的下游,即事件消费者。Observer 是个 interface。
查看源码,只有如下几个方法
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
public final void subscribe(@NonNull Observer<? super T> observer) { Objects.requireNonNull(observer, "observer is null"); //判空 try { observer = RxJavaPlugins.onSubscribe(this, observer);//通过hook返回observer Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer);//核心、真正实现订阅的方法,仔细看subscribeActual是一个抽象方法,所以我们需要去Observable的实现类ObservableCreate中去查看 } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }
ObservableCreate类中发现
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer); //创建发射器,并把观察者当参数传递给发射器
observer.onSubscribe(parent);//直接回调了observer的onSubscribe方法
try {
source.subscribe(parent);//source为Observable,事件源
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
由上述源码可以看到,在ObservableCreate类的subscribeActual方法下,调用了Observer(观察者即事件接收者)的onSubscribe,并把observer对象当做参数专递给了发射器CreateEmitter;同时将发射器CreateEmitter传递给了事件源。因此可以得出结论:
只有当 observable.subscribe(observer);时,发射器才会被创建,Observer才会被绑定onSubscribe; Observable的subscribe方法才会执行发射事件和数据;此时Observable和Observer的方法和回调都已经准备就绪,只待发送与接收。换言之,事件流是在订阅后才产生的。而 observable 被创建出来时并不生产事件,同时也不发射事件。
CreateEmitter parent = new CreateEmitter<>(observer);
CreateEmitter实现了ObservableEmitter,同时ObservableEmitter继承自Emitter,CreateEmitter 还实现了 Disposable 接口,这个 disposable 接口是用来判断是否中断事件发射的。
CreateEmitter的主要方法如下
public void onNext(T t) {}
public void onError(Throwable t) {}
public boolean tryOnError(Throwable t) {}
public void onComplete() {}
前面我们说了 observer是个接口,与 observer的方法神似,几乎一一对应。
然后我们来分析一下CreateEmitter主要方法内都做了什么。
@Override public void onNext(T t) { if (t == null) { onError(ExceptionHelper.createNullPointerException("onNext called with a null value.")); return; } if (!isDisposed()) { //判断是否丢弃 observer.onNext(t); //调用Emitter的onNext,它会直接调用observer的 onNext } } @Override public void onError(Throwable t) { if (!tryOnError(t)) { RxJavaPlugins.onError(t); //调用 Emitter 的 onError,它会直接调用 observer 的 onError } } @Override public boolean tryOnError(Throwable t) { if (t == null) { t = ExceptionHelper.createNullPointerException("onError called with a null Throwable."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); //执行完中断发射 } return true; } return false; } @Override public void onComplete() { if (!isDisposed()) { /判断是否丢弃 try { observer.onComplete();//调用Emitter的onComplete,它会直接调用observer的 onComplete } finally { dispose(); //执行完中断发射 } } }
CreateEmitter 的 onError 和 onComplete 方法任何一个执行完都会执行 dispose()中断事件
发射,所以 observer 中的 onError 和 onComplete 也只能有一个被执行。
结合以上,我们知道了,当订阅成功后,数据 源ObservableOnSubscribe 开始生产事件 , 调用 Emitter 的 的 onNext ,onComplete 向下游发射事件,Emitter 包含了 observer 的引用,又调用了 observer onNext ,onComplete ,这样下 游observer 就接收到了上游发射的数据。
提示:这里对文章进行总结:
前面是为了更直观的分析代码才拆分开写的。现在我们用rxjava的Builder建造者模式来书写代码。
private static void doWork() { Observable .create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable { System.out.println("subscribe>>> thread name [" + Thread.currentThread().getName() + "]"); emitter.onNext("轻轻地我走了"); } }) //.subscribeOn(Schedulers.io()) //先注释掉 .subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull String s) { System.out.println("onNext>>>thread name [" + Thread.currentThread().getName() + "]----" + s); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } }); }
执行结果肯定都是在main线程中,rxjava默认主线程。如下图:
当我们把.subscribeOn(Schedulers.io()) 代码行释放注释,再执行代码。执行结果如下图:
rxjava是通过 subscribeOn()来调度线程的
接着上面subscribeOn(Schedulers.io())查看源码:
//1. Schedulers类 @NonNull public static Scheduler io() { return RxJavaPlugins.onIoScheduler(IO); //进入了RxJavaPlugins } //2.RxJavaPlugins类 /** * Calls the associated hook function. * @param defaultScheduler the hook's input value * @return the value returned by the hook */ @NonNull public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) { //有是 调用 hook 函数(方法). Function<? super Scheduler, ? extends Scheduler> f = onIoHandler; if (f == null) { return defaultScheduler; } return apply(f, defaultScheduler); } //3.Schedulers类中 IO = RxJavaPlugins.initIoScheduler(new IOTask()); //IO为IOTask的实例 //4.Schedulers类中 static final class IOTask implements Supplier<Scheduler> { @Override public Scheduler get() { return IoHolder.DEFAULT; // IOTask 为 IoHolder.DEFAULT 返回值 } } //5.Schedulers类中 static final class IoHolder { static final Scheduler DEFAULT = new IoScheduler(); //DEFAULT 为IoScheduler实例 } //6.IoScheduler类中 public IoScheduler() { this(WORKER_THREAD_FACTORY); // IoScheduler实例是这个常量 } //7。IoScheduler类中 代码量过于庞大,这里只提取了关键代码行 static final RxThreadFactory WORKER_THREAD_FACTORY; //是 RxThreadFactory WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority); private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler"; //WORKER_THREAD_NAME_PREFIX 这是IO的默认线程名前缀 // 8.RxThreadFactory 类 /** * A ThreadFactory that counts how many threads have been created and given a prefix, * sets the created Thread's name to {@code prefix-count}. */ public final class RxThreadFactory extends AtomicLong implements ThreadFactory { //注释已经说明RxThreadFactory 是创建线程的工厂类 ······ } //9.IoScheduler 是个创建和缓存线程池 /** * Scheduler that creates and caches a set of thread pools and reuses them if possible. */ public final class IoScheduler extends Scheduler {······}
在此就不再继续刨根问底了,我们知道RxThreadFactory是用来创建线程的类。IoScheduler 创建和缓存线程池;大概就明白rxjava是通过 Scheduler (调度器)调度这些线程的;
那么具体是怎么实现调度的呢? 通过IoScheduler类,我们可以明白Scheduler 是如何创建IO线程的。
public IoScheduler() { this(WORKER_THREAD_FACTORY); } /** * Constructs an IoScheduler with the given thread factory and starts the pool of workers. * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any * system properties for configuring new thread creation. Cannot be null. */ public IoScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; this.pool = new AtomicReference<>(NONE); start(); } @Override public void start() { CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory); if (!pool.compareAndSet(NONE, update)) { update.shutdown(); } } @Override public void shutdown() { CachedWorkerPool curr = pool.getAndSet(NONE); if (curr != NONE) { curr.shutdown(); } } @NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get()); } public int size() { return pool.get().allWorkers.size(); } static final class EventLoopWorker extends Scheduler.Worker implements Runnable { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean(); EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); this.threadWorker = pool.get(); } @Override public void dispose() { if (once.compareAndSet(false, true)) { tasks.dispose(); if (USE_SCHEDULED_RELEASE) { threadWorker.scheduleActual(this, 0, TimeUnit.NANOSECONDS, null); } else { // releasing the pool should be the last action pool.release(threadWorker); } } } @Override public void run() { pool.release(threadWorker); } @Override public boolean isDisposed() { return once.get(); } @NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } }
通过上述代码分析, new IoScheduler()后,会创建new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority)线程工厂类,然后执行 start()方法,创建 CachedWorkerPool 线程池,进行了一系列初始化配置和准备工作,但是并没有调度线程。所以 Schedulers.io()相当于只做了线程调度的前期准备。
我们再回到subscribeOn()方法。关键代码如下:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
// RxJavaPlugins.onAssembly 又是调用hook,关键之处是new ObservableSubscribeOn
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}
很熟悉的代码 RxJavaPlugins.onAssembly,前面已经分析过这个方法,就是个 hook function, 等价于直接 return new ObservableSubscribeOn(this, scheduler);, 现在知道了这里的 scheduler 其实就是 IoScheduler。跟踪代码进入 ObservableSubscribeOn,可以看到这个 ObservableSubscribeOn 继承自 Observable,并且扩展了一些属性,增加了scheduler。我们已经知道了 Observable.subscribe()方法最终都是调用了对应的实现类的subscribeActual 方法。我们重点分析下 subscribeActual:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。