当前位置:   article > 正文

RxJava的使用与深入学习_rxjava 使用

rxjava 使用

转载请注明出处http://blog.csdn.net/evan_man/article/details/51292099

简单介绍

    可以将RxJava是一种观察者设计模式的升级版本。使用Rxjava的好处在于,我们可以方便的切换方法的执行线程,对线程动态切换,该过程无需我们自己手动创建和启动线程。使用Rxjava创建的代码虽然出现在同一个线程中,但是我们可以设置使得不同方法在不同线程中执行。上述功能的实现主要归功于RxJavaScheduler实现,Scheduler 提供了『后台处理,前台回调』的异步机制。
    Schedulers能创建如下几种类型的线程:
  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler 
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作 
  • Schedulers.io():一个无数量上限的线程池,可以重用空闲的线程 
  • Schedulers.computation():Scheduler 使用的固定的线程池,大小为 CPU 核数 
  • AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行

    RxJava有四个基本概念Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件(就是下面会介绍的onNext、onCompleted等方法)。
    Observer, 对象中有onNext()、onCompleted()、onCompleted()onError()。RxJava 规定,当不会再有新的onNext()发出时,需要触发 onCompleted()方法作为标志。在事件处理过程中出异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出。Observer主要的业务代码大多是编写在其onNext方法中。
    Subscriber,是Observer的升级版本,相对于Observer还多了onStartunSubscribe方法。onStart方法在执行observable.subscribe(observer)方法时就被调用,不能指定线程,只能执行在subscribe()被调用时的线程。调用unSubscribe方法后,Subscriber将不再接收事件,一般在不使用该SubScriber的时候,需要及时调用该方法,以免OOM执行observable.subscribe(observer)方法时,Observer会先被转换成一个Subscriber对象。
    Observable,其构造器接受一个Observable.OnSubscribe对象。执行observable.subscribe(observer)方法时,该方法用于将Observer对象注册到observable中,该方法内部逻辑是调用Observer的onStart方法和Observable.OnSubscribe的call方法。Observable.OnSubscribe对象的call方法会调用ObserveronNext、onCompleted方法(不会调用onStart方法)。此外Observable通过如下的方法指定具体方法的执行线程:
  • subscribeOn():  
    • 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程,或者叫做事件产生的线程。 
    • subscribeOn() 的位置放在哪里都可以,但它是只能调用一次,不可以随时切换Observable.OnSubscribe 被激活时所处的线程;当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。 
  • observeOn():  
    • 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。 
    • 通过 observeOn()的多次调用,程序实现了线程的多次切换,可以随时切换消费事件所在的线程; observeOn() 控制的是它后面的线程。
        
       RxjavaRxAndroid对应的Github分别为RxJavaRxAndroid。官方给出的介绍特此摘录如下:
       RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
       RxAndroid:Android specific bindings for RxJava. This module adds the minimum classes to RxJava that make writing reactive components in Android applications easy and hassle-free. More specifically, it provides a Scheduler that schedules on the main UI thread or any given Handler.

简单使用(以Android平台为例

一、引入如下依赖:
  1. compile 'io.reactivex:rxjava:1.1.3'
  2. compile 'io.reactivex:rxandroid:1.1.0'
二、创建Observable对象
  1. Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
  2. @Override
  3. public void call(Subscriber<? super String> subscriber) {
  4. subscriber.onNext("Hello");
  5. subscriber.onNext("Hi");
  6. subscriber.onNext("Aloha");
  7. subscriber.onCompleted();
  8. }
  9. });
  10. Observable observable = Observable.just("Hello", "Hi", "Aloha");
  11. String[] words = {"Hello", "Hi", "Aloha"}; Observable observable = Observable.from(words);
注意:上面三种创建方式结果一样。

三、创建Subscribe对象,重写onNext、onCompleted、onError方法
  1. Subscriber subscriber = new Subscriber() {
  2. @Override
  3. public void onCompleted() {
  4. }
  5. @Override
  6. public void onError(Throwable e) {
  7. }
  8. @Override
  9. public void onNext(Object o) {
  10. }
  11. @Override
  12. public void onStart() {
  13. super.onStart();
  14. }
  15. };
、将SubScriber和Observable绑定
  1. observable.subscribe(observer);
  2. observable.subscribe(subscriber);
到此为止我们利用RxJava实现了一个简单的观察者模式。但是并没有使用到线程的动态切换功能,该部分才是RxJava跟普通观察者模式的最大区别;下面我们对该部分如何使用进行介绍。但是在正式介绍线程动态切换方法之前,我们先来了解一下Observable的map和flatmap方法,因为RxJava线程动态切换往往伴随着这两个方法的出现。

、map&flatMap
map方法
  1. Observable.just("images/logo.png") // 输入类型 String
  2. .map(new Func1<String, Bitmap>() {
  3. @Override
  4. public Bitmap call(String filePath) { // 参数类型 String
  5. return getBitmapFromPath(filePath); // 返回类型 Bitmap
  6. }
  7. })
  8. .subscribe(new Action1<Bitmap>() {
  9. @Override
  10. public void call(Bitmap bitmap) { // 参数类型 Bitmap
  11. showBitmap(bitmap);
  12. }
  13. });
FuncX:: 对有参数且有返回值的一类方法的包装
ActionX: 对只有参数没有返回值的一类方法的包装
可以发现map方法的参数的作用就是将上级的String数据转换成一个bitmap对象。正如它的名字一样map,完成了一个映射的功能,下面来看看flatMap对象。

flatMap方法
flatMap() 的原理:
1. 使用传入的事件对象创建一个 Observable 对象;
2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。

  1. Student[] students = ...;
  2. Subscriber<Course> subscriber = new Subscriber<Course>() {
  3. @Override
  4. public void onNext(Course course) {
  5. Log.d(tag, course.getName());
  6. }
  7. ...
  8. };
  9. Observable.from(students)
  10. .flatMap(new Func1<Student, Observable<Course>>() {
  11. @Override
  12. public Observable<Course> call(Student student) {
  13. return Observable.from(student.getCourses());
  14. }
  15. })
  16. .subscribe(subscriber);
flatMap的工作原理图如下

对map和flatmap的介绍就到这里,下面对RxJava的线程动态切换如何使用进行介绍。

六、线程的动态切换
  1. Observable.just(1, 2, 3, 4)
  2. .subscribeOn(Schedulers.io()) //决定调用observable.subscribe(subscriber)方法时的执行线程
  3. .observeOn(Schedulers.newThread()) //决定下面mapOperator方法的执行线程
  4. .map(mapOperator)
  5. .observeOn(Schedulers.io()) //决定下面mapOperator2方法的执行线程
  6. .map(mapOperator2)
  7. .observeOn(AndroidSchedulers.mainThread) //决定下面subscriber对象的onNext()、onCompleted()、onCompleted()和onError()方法执行线程。即UI线程
  8. .subscribe(subscriber);
对RxJava如何使用就介绍到这里了,下面我们通过源码分析一下,RxJava的底层实现。

源码分析

 正式分析开始之前,首先确定一下我们这次分析的目标对象和目标方法。
  • Observable对象创建的三个方法create、just、from;
  • observable.subscribe(observer)内部逻辑是什么,为何会最终执行到Subscriber对象的相关方法;
  • Observable中Observable.OnSubscribe对象的call方法何时被调用;
  • Subscriber 对象的onStart、unSubscribe、onNext()、onCompleted()、onCompleted()和onError()在何处,何种情况被调用;
  • Observable的map和flatMap方法内部实现机制,参数的传递;
  • Schedulers如何创建指定的线程,各个线程之间的区别是什么;
  • Observable的observeOn和subscribeOn方法的内部实现机制,如何实现线程的切换;
下面我们针对上面列出的顺序依次进行解答。

Observable.class


以下是Observable类中的Fields:
  1. final OnSubscribe<T> onSubscribe;
  2. static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
  3. //RxJavaPlugins.getInstance()是一个单例模式,获取一个RxJavaPlugins对象,然后这个对象定义了Hook,然而这对Hook并没有做任何事情,至少目前是这样的。
  4. //rxJavaPlugins.getObservableExecutionHook()近似等价于得到一个RxJavaObservableExecutionHook对象,该对象目前不干任何事,完全可以把它看成透明的,你传什么进去它回什么。
下面是Observable的构造器,该构造器并不能被客户直接使用,因为它使用了Protected关键字进行修饰。
Observable()@Observable.class
  1. protected Observable(OnSubscribe<T> f) {
  2. this.onSubscribe = f;
  3. }
构造器的内容就是对onSubscriber初始化。
Observable对象创建的三个方法create、just、from具体内容如下
create()@Observable.class
  1. public static <T> Observable<T> create(OnSubscribe<T> f) {
  2. return new Observable<T>(hook.onCreate(f));
  3. }
RxJavaObservableExecutionHook.onCreate方法内部直接直接将参数f返回,所以hook.onCreate(f)等价于f;
just()@Observable.class
  1. public static <T> Observable<T> just(final T value) {
  2. return ScalarSynchronousObservable.create(value); //note1
  3. }
  4. public static <T> Observable<T> just(T t1, T t2) {
  5. return from((T[])new Object[] { t1, t2 }); //note2
  6. }
1、final class ScalarSynchronousObservable<T> extends Observable<T>是一个Observable的子类,ScalarSynchronousObservable.create方法参考后面的ScalarSynchronousObservable.class部分
2、调用了Observable的from方法
from()@Observable.class
  1. public static <T> Observable<T> from(T[] array) {
  2. int n = array.length;
  3. if (n == 0) {
  4. return empty();//note1
  5. } else
  6. if (n == 1) {
  7. return just(array[0]);//note2
  8. }
  9. return create(new OnSubscribeFromArray<T>(array));
  10. }
1、方法返回一个(Observable<T>) EmptyHolder.INSTANCE对象
  1. private static final class EmptyHolder {
  2. final static Observable<Object> INSTANCE = create(new OnSubscribe<Object>() {
  3. @Override
  4. public void call(Subscriber<? super Object> subscriber) {
  5. subscriber.onCompleted();
  6. }
  7. });
  8. }
2、调用 just(final T value)创建一个ScalarSynchronousObservable对象
3、class OnSubscribeFromArray<T> implements OnSubscribe<T>,有一种ViewGroup的感觉,其call方法内部会将所有数据依次传递给SubScriber.onNext();随后调用create(OnSubscribe<T> f)创建对象。OnSubscribeFromArray部分我们在后面OnSubscribeFromArray.class部分还会详细介绍。
到此为止我们分析了创建Observable对象的三个方法。
对于create(OnSubscribe<T> f) 方法,实际是等价于利用构造器Observable(OnSubscribe<T> f) 创建一个Observable对象,f赋值给Observable的onSubscribe域。
对于from和just方法底层实现基本一致
  • 方法只含一个参数:调用ScalarSynchronousObservable.create(value)创建一个ScalarSynchronousObservable对象。
  • 方法含多个参数:首先将多个参数构造成一个T[] array对象数组,随后利用该数组创建一个OnSubscribeFromArray<T>(array)的对象,最后调用Observable(OnSubscribe<T> f)方法创建一个Observable对象。
综上,最终我们获得的要么是一个Observable对象,要么就是其子类ScalarSynchronousObservable对象。
observable.subscribe(observer)内部逻辑是什么,为何会最终执行到Subscriber对象的相关方法。
Subscriber 对象的onStart、unSubscribe、onNext()、onCompleted()、onCompleted()和onError()在何处,何种情况被调用;
subscribe()@Observable.class
  1. public final Subscription subscribe(final Observer<? super T> observer) {
  2. if (observer instanceof Subscriber) {
  3. return subscribe((Subscriber<? super T>)observer); //note1
  4. }
  5. return subscribe(new Subscriber<T>() { //note2
  6. @Override
  7. public void onCompleted() {
  8. observer.onCompleted();
  9. }
  10. @Override
  11. public void onError(Throwable e) {
  12. observer.onError(e);
  13. }
  14. @Override
  15. public void onNext(T t) {
  16. observer.onNext(t);
  17. }
  18. });
  19. }
1、如果observer引用的对象是一个Subscriber对象,将observer对象强制转换为Subscriber对象
2、创建一个Subscriber对象,对象内部的onNext、onError、onCompleted方法调用Observer的同名方法
不论是note1、还是note2最终都会调用方法
  1. public final Subscription subscribe(Subscriber<? super T> subscriber) {
  2. return Observable.subscribe(subscriber, this);
  3. }
继续往下看
Observable.subscribe(subscriber, this)@Observable.class
  1. private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
  2. if (subscriber == null) {
  3. throw new IllegalArgumentException("observer can not be null");
  4. }
  5. if (observable.onSubscribe == null) {
  6. throw new IllegalStateException("onSubscribe function can not be null.");
  7. }
  8. subscriber.onStart();//note1
  9. if (!(subscriber instanceof SafeSubscriber)) {
  10. subscriber = new SafeSubscriber<T>(subscriber);
  11. }
  12. try {
  13. hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);//note2
  14. return hook.onSubscribeReturn(subscriber);//note3
  15. } catch (Throwable e) {
  16. Exceptions.throwIfFatal(e);
  17. try {
  18. subscriber.onError(hook.onSubscribeError(e)); //note4
  19. } catch (Throwable e2) {
  20. Exceptions.throwIfFatal(e2);
  21. RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
  22. hook.onSubscribeError(r);
  23. throw r;
  24. }
  25. return Subscriptions.unsubscribed();
  26. }
  27. }
1、调用Subscriber.start()方法;
2、当前版本中等价于observable.onSubscribe.call(subscriber)
3、等价于 return subscriber;
4、如果上述方法出现任何异常则调用 subscriber.onError方法
以上是对observable.subscribe方法的分析,我们知道了subscriber对象的onStart、onError方法是在observable.onSubscribe.call(subscriber)的前后执行。而onNext、onCompleted方法在call中出现,具体内容参考后面OnSubscribeFromArray.call方法和ScalarSynchronousObservable..onSubscribe.call方法。

往下我们该分析分析Observable的map方法内部实现机制;
Observable<R> map()@Observable.class
  1. public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
  2. return lift(new OperatorMap<T, R>(func));
  3. }
OperatorMap是一个实现了public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {  }接口的类,用于将一个Subscriber<R>类型对象转换成一个Subscriber<T>类型对象。随后将该对象传递给lift方法,该方法很关键!往下看
Tips:
  • FuncX是定义在rx.functions.*包下面的一个接口,是对一类有返回值方法的封装。Func0代表无参数的方法,Func1代表有一个参数的方法,FuncX以此类推。
  • ActionX是定义在rx.functions.*包下面的一个接口,是对一类无返回值方法的封装。Action0代表无参数的方法,Action1代表有一个参数的方法,ActionX以此类推。
Observable<R> lift()@Observable.class
  1. public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
  2. return new Observable<R>(new OnSubscribe<R>() {
  3. @Override
  4. public void call(Subscriber<? super R> o) {
  5. try {
  6. Subscriber<? super T> st = hook.onLift(operator).call(o); //note1
  7. try {
  8. st.onStart(); //note2
  9. onSubscribe.call(st); //note3
  10. } catch (Throwable e) {
  11. Exceptions.throwIfFatal(e);
  12. st.onError(e); //note4
  13. }
  14. } catch (Throwable e) {
  15. Exceptions.throwIfFatal(e);
  16. o.onError(e); //note5
  17. }
  18. }
  19. });
  20. }
1、hook.onLift(operator).call(o)等价于operator.call(o);该行语句的结果就是将Subscriber<? super R> o转换换成Subscriber<? super T> 。
2、最后会调用刚刚得到的Subscriber<? super T>对象的onStart方法,可能有一些朋友会说o.onStart方法在哪里调用的呢?其实o.onStart在进入这里call方法之前就已经执行了。
3、这里需要特别注意,onSubscribe是lift方法所属的 Observable<T>对象的onSubscribe域,而不是 Observable<R>对象的onSubscribe域
4、调用Subscriber<? super T>.的onError
5、调用Subscriber<? super R>的onError

尝试解释一:
讲到这里估计大伙儿的脑子是晕头转向的,说实话这里一时半会是真的很难以理解,那我们对照着范例来梳理一遍。
  1. Observable.just("images/logo.png") //note1
  2. .map(new Func1<String, Bitmap>() {
  3. @Override
  4. public Bitmap call(String filePath) {
  5. return getBitmapFromPath(filePath);
  6. }
  7. }) //note2
  8. .subscribe(new Action1<Bitmap>() { //note3
  9. @Override
  10. public void call(Bitmap bitmap) {
  11. showBitmap(bitmap);
  12. }
  13. });
为了下面分析的方便,在此进行如下约定,String 记号为T ;Bitmap记号为R
1、首先我们创建了一个Observable<T>对象,对应一个OnSubscribe<T>,其call方法参数Subscriber<T>,表明Subscriber.onNext方法接受参数类型为T
2、随后调用map方法,该方法内部有三个重要的方法
  • Func1<T,R>将T类型的数据转换为R类型数据;
  • OperatorMap<T, R>(func) 将Subscriber<R>转换成一个Subscriber<T>,前者Subscriber接受参数为R,后者接收参数为T
  • lift方法创建一个Observable<R>对象,对应一个OnSubscribe<R>,其call方法参数Subscriber<R>,表明Subscriber.onNext方法接受参数类型为R。
3、经过上面的第二步得到一个Observable<R>对象,调用该对象的subscribe(new Action1<R>() )方法。方法内部调用Observable<R>对象中的OnSubscriber<R>对象的call方法,此时的call方法内容如下(注:以下内容并非源码,只摘录了几个关键的点):
public void call(Subscriber<? super R> o) {
    Subscriber<? super T> st = operator.call(o);// 在此处将Subscriber<R>转换成一个Subscriber<T>
    st.onStart();
    onSubscribe.call(st);//注意!这里的onSubscribe对象是属于Observable<T>中的域。对于多个map嵌套,直接可以把它理解为调用map方法的Observable对象中的onSubscribe域。
    ...
}

上面解释还没懂?再来一发
尝试解释二:
  1. Observable.just("images/logo.png") // 对应Observable<String>
  2. .map(new Func1<String, T1>() { }) // 对应Observable<T1>
  3. .map(new Func1<T1, T2>() { }) // 对应Observable<T2>
  4. .map(new Func1<T2, R>() { }) // 对应Observable<R>
  5. .subscribe(new Action1<R>() { // 对应Observable<R>
  6. @Override
  7. public void call(R r) {
  8. //
  9. };
其实上面的一串方法等价于
  1. Observable.just("images/logo.png") // 对应Observable<String>
  2. .subscribe(new Action1<R>() { // 对应Observable<R>
  3. @Override
  4. public void call(R new Func1<T1, T2>().call (new Func1<T1, T2>().call(new Func1<String, T1>().call(String) ) ) ) {
  5. };
flatMap()@Observable.class
前面介绍完了map方法,接着我们介绍flatMap方法的实现,回顾一下flatMap的原理
1. 使用传入的事件对象创建一个 Observable 对象;
2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。
  1. public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
  2. if (getClass() == ScalarSynchronousObservable.class) {
  3. return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func); //note1
  4. }
  5. return merge(map(func));
  6. }
1、Observable只包含一个事件的情况,参考ScalarSynchronousObservable.class的源码,该部分比较简单,如果只想稍微了解下flatMap的工作原理可以略过下面的分析。该部分比Map还要让人抓狂。
2、
map方法已经介绍过,目的是将一个Observable<T>转变成一个Observable<R>; 其效果如下Observable<T>.onSubcribe.call(Subscriber<T>){  Subscriber<R>.onNext(Func(t)); }
这里对应的map效果为Observable<T>.onSubcribe.call(Subscriber<T>){  Subscriber<R>.onNext(Func(t)); } 但是这里Func(t)返回的对象类型为Observable<R>, 我们自定义的Subscriber默认情况下是不能对其进行处理的需要将Observable<R>数据进行解析得到其包含的事件。
根据ScalarSynchronousObservable<T>.scalarFlatMap方法的实现,这里的merge(map(func))效果做出如下假设:Observable<T>.onSubcribe.call(Subscriber<T>){ Observable<R> o = Func(t); o.unsafeSubscribe(Subscriber<R>); }  

我们往下看merge方法是如何做的。
merge()@Observable.class
  1. public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) { //note0
  2. if (source.getClass() == ScalarSynchronousObservable.class) {
  3. return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity()); //note1
  4. }
  5. return source.lift(OperatorMerge.<T>instance(false)); //note2
  6. }
0、source是经过map(func)获得的一个Observable对象,该对象保存的事件类型为Observable
1、经过func对参数t处理后得到的Observable source只包含一个事件。UtilityFunctions.identity()方法返回的是一个输入什么就返回什么的Func1对象
2、经过func对参数t处理后得到的Observable source只包含多个事件。lift方法的参数是一个实现了Operator<T, Observable<? extends T>>接口的对象——该接口很简单就是将T转换为Observable<? extends T>对象。lift方法最终返回Observable<T>对象。该对象运行效果如下Observable<T>.onSubcribe.call(Subscriber<T>){   Subscriber<? super T> st =  operator.call(o); st.onStart();   onSubscribe.call(st);...}有没有发现它跟map方法大体上是一样的,只是注意到的是这里的operator是一个实现了Operator<T, Observable<? extends T>>接口的对象,它将一个Subscriber<T>转换成一个Subscriber<Observable<? extends T>>对象,因此重点肯定在这个接口的实现内部

OperatorMerge@OperatorMerge<T>.class
  1. public final class OperatorMerge<T> implements Operator<T, Observable<? extends T>>
  2. public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
  3. MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent); //note1
  4. MergeProducer<T> producer = new MergeProducer<T>(subscriber);
  5. subscriber.producer = producer;
  6. child.add(subscriber);
  7. child.setProducer(producer);
  8. return subscriber;
  9. }
1、此处完成了对Subscriber child的包装,创建一个MergeSubscriber对象,下面我们看看该对象的内部实现
MergeSubscriber.class@OperatorMerge<T>.class
  1. static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>> //该Subscriber接受的参数是一个Observable
  2. final Subscriber<? super T> child;
  3. volatile Queue<Object> queue;
  4. volatile InnerSubscriber<?>[] innerSubscribers;
  5. public void onNext(Observable<? extends T> t) { //note1
  6. if (t == null) {
  7. return;
  8. }
  9. if (t instanceof ScalarSynchronousObservable) { //note2
  10. tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
  11. } else {
  12. InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++); //note3
  13. addInner(inner);
  14. t.unsafeSubscribe(inner); //note4
  15. emit(); //note5
  16. }
  17. }
1、MergeSubscriber接收的参数类型为Observable
2、ScalarSynchronousObservable.get方法返回的是ScalarSynchronousObservable中存储的事件。然后调用tryEmit方法处理事件,正如名字所言,尝试提交t给child处理,如果当前条件不允许,比如有其它事件正在被处理则将该t先存入队列queue中。
3、创建一个子Subscriber对象,其父亲为当前MergeSubscriber,并将inner加入到MergeSubscriber的innerSubscribers;数组中,inner的onNext方法会调用父类的tryEmit方法。注意每个Observable对应一个InnerSubscriber
4、该方法执行的效果是,调用inner的onNext方法,参数为Observable<? extends T> t中所包含的事件。
5、该方法内部调用emitLoop方法,从queue中不断获取数据t并传给Subscriber child去执行。内部结构很像OperatorObserveOn的shedule方法
MergeSubscriber小结:
    MergeSubscriber包含一个真实的Subscriber对象,一个存储事件的队列,一个InnerSubscriber的数组。
    onNext方法接收一个Observable对象,1、创建一个InnerSubscriber对象,意味着一个Observable对应一个InnerSubscriber对象;2、将该对象添加到MergeSubscriber的InnerSubscriber的数组中,同时InnerSubscriber对象包含一个对MergeSubscriber对象的引用;3、调用Observable对象的unsafeSubscribe(InnerSubscriber对象)方法,使得InnerSubscriber对象去消费Observable对象中的事件。
    InnerSubscriber对象消费事件的逻辑是调用MergeSubscriber的tryEmit方法去处理获取到的事件。
    tryEmit处理事件的逻辑是如果Subscriber正在处理一个事件,则先将事件存入事件队列中;否则直接调用child.onNext(t)进行处理。

到此为止我们对map和flatmap的介绍都介绍完毕了,下面将对线程的动态切换进行介绍。
Observable的observeOn和subscribeOn方法的内部实现机制,如何实现线程的切换?
为了回答上面的问题首先看看observeOn方法是如何实现的
observeOn()@Observable.class
  1. public final Observable<T> observeOn(Scheduler scheduler) {
  2. return observeOn(scheduler, RxRingBuffer.SIZE);
  3. }
  4. public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
  5. return observeOn(scheduler, false, bufferSize);
  6. }
  7. public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { //note1
  8. if (this instanceof ScalarSynchronousObservable) {
  9. return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); //note2
  10. }
  11. return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize)); //note3
  12. }
1、当调用observeOn(Scheduler scheduler)方法时默认调用的是observeOn(scheduler, false, RxRingBuffer.SIZE)
2、scalarScheduleOn方法参见后面的ScalarSynchronousObservable.class源码
3、OperatorObserveOn<T>实现了Operator<T, T>接口,实现了call方法。lift方法在前面已经介绍过了,不同之处是这里利用OperatorObserveOn对SubScriber进行包装处理。由一个SubScriber<T>对象转变成另一个SubScriber<T>,两者的参数不变,但是具体执行的线程可能发生了变化。
下面我们对OperatorObserveOn对象的call方法进行探究,跳转至OperatorObserveOn.class

subscribeOn()@Observable.class
  1. public final Observable<T> subscribeOn(Scheduler scheduler) {
  2. if (this instanceof ScalarSynchronousObservable) {
  3. return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);//note1
  4. }
  5. return create(new OperatorSubscribeOn<T>(this, scheduler)); //note2
  6. }
1、对于ScalarSynchronousObservable<T>).scalarScheduleOn()方法,参考ScalarSynchronousObservable.class源码部分
2、这里等价于return new Observable<T>(OperatorSubscribeOn<T>(this, scheduler)); 即利用当前Observable对象和scheduler对象构建一个OnSubscribe对象,随后利用OnSubscribe构建一个Observable对象。
往下我们看看OperatorSubscriberOn类是如何实现的。

OperatorSubscribeOn.class

OperatorSubscribeOn@OperatorSubscribeOn.class
  1. public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
  2. final Scheduler scheduler;
  3. final Observable<T> source;
  4. public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
  5. this.scheduler = scheduler;
  6. this.source = source;
  7. }
  8. @Override
  9. public void call(final Subscriber<? super T> subscriber) {
  10. final Worker inner = scheduler.createWorker(); //inner 工作线程别名。
  11. inner.schedule(new Action0() { //note1
  12. @Override
  13. public void call() {
  14. final Thread t = Thread.currentThread();
  15. Subscriber<T> s = new Subscriber<T>(subscriber) { //note2
  16. @Override
  17. public void onNext(T t) { subscriber.onNext(t); }
  18. @Override
  19. public void onError(Throwable e) { subscriber.onError(e); inner.unsubscribe(); }
  20. @Override
  21. public void setProducer(final Producer p) {
  22. subscriber.setProducer(new Producer() {
  23. @Override
  24. public void request(final long n) {
  25. if (t == Thread.currentThread()) {
  26. p.request(n);
  27. } else {
  28. inner.schedule(new Action0() {
  29. @Override
  30. public void call() { p.request(n); }
  31. });
  32. } //end of else
  33. } //end of request
  34. }); //end of subscriber.setProducer
  35. } //end of setProducer
  36. };//end of new Subscriber<T>(subscriber)
  37. source.unsafeSubscribe(s); //note3
  38. }//end of inner call
  39. }); // end of inner.schedule(new Action0()
  40. }// end of outter call
  41. }//end of class
1、该方法作用就是将参数Action的call方法传递给work的一个线程去执行。
2、对外部call接收到的Subscriber对象进行一次封装
3、该方法底层如下
unsafeSubscribe()@Observable.class
  1. public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
  2. try {
  3. subscriber.onStart(); //note1
  4. onSubscribe.call(subscriber); //note2
  5. return subscriber;
  6. } catch (Throwable e) {
  7. try {
  8. subscriber.onError(e);
  9. } catch (Throwable e2) {
  10. ...
  11. }
  12. return Subscriptions.unsubscribed();
  13. }
  14. }
1、首先调用Subscriber对象的onStart方法
2、随后调用Observable_outter<T>.onSubscribe.call(subscriber)方法,该方法内部会将执行subscriber的onNext、onCompleted方法

这里我们对subscribeOn方法进行一下小结:
    subscribeOn方法用于指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程,或者叫做事件产生的线程。 当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。 
    调用subscribeOn方法的Observable对象命名为(命名为Observable_inner,方法执行之后得到一个Observable对象(命名为Observable_outter),Observable_outter的OnSubscribe.call方法内部会创建一个call方法,该该方法会在一个指定的线程中执行,该方法内部首先将Observable_outter的OnSubscribe.call接受的参数Subscriber对象进行一次封装,随后调用Observable_inner对象的unsafeSubscribe()方法,该方法内部会执行封装过的Subscriber的onStart、OnSubscribe的call方法。
    

OperatorObserveOn.class

call()@OperatorObserveOn.class
  1. @Override public Subscriber<? super T> call(Subscriber<? super T> child) {
  2. if (scheduler instanceof ImmediateScheduler) { //note1
  3. return child;
  4. } else if (scheduler instanceof TrampolineScheduler) { //note2
  5. return child;
  6. } else {
  7. ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); //note3
  8. parent.init();
  9. return parent;
  10. }
  11. }
1、当前的scheduler对象为ImmediateScheduler表明该SubScriber在当前线程执行,因此不需要切换线程。
2、当前的scheduler对象为TrampolineScheduler表明该SubScriber在当前线程执行,只是并不立即执行,因此不需要切换线程。
3、利用SubScriber和Scheduler创建一个ObserveOnSubscriber对象,调用该对象的init方法。最后返回该包装后的ObserveOnSubscriber对象
往下分析下ObserveOnSubscriber对象,因为其继承自SubScriber因此我们看看它的onNext、onCompleted方法跟之前的SubScriber child有什么关系。随后分析一下如何保证是在另一个线程中执行。
ObserveOnSubscriber.class@OperatorObserveOn.class
class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0
Fields
  1. final Subscriber<? super T> child;
  2. final Scheduler.Worker recursiveScheduler;
  3. final NotificationLite<T> on;
  4. final Queue<Object> queue;
  5. volatile boolean finished; //当前流的状态
  6. long emitted;//当前被处理的事件数
  7. final int limit; //处理的门限,达到该值需要重新调用request方法
  8. final AtomicLong requested = new AtomicLong();
  9. final AtomicLong counter = new AtomicLong();
ObserveOnSubscriber()@ObserveOnSubscriber.class
  1. public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
  2. this.child = child; //note1
  3. this.recursiveScheduler = scheduler.createWorker(); //note2
  4. this.on = NotificationLite.instance(); //note3
  5. this.limit = calculatedSize - (calculatedSize >> 2);
  6. int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
  7. if (UnsafeAccess.isUnsafeAvailable()) { //note4
  8. queue = new SpscArrayQueue<Object>(calculatedSize);
  9. } else {
  10. queue = new SpscAtomicArrayQueue<Object>(calculatedSize); //note5
  11. }
  12. request(calculatedSize); //note6
  13. }
1、child为SubScriber<T>对象在ObserveOnSubscriber对象中对应的别名;
2、从Scheduler对象中获取一个work对象,随后就不用Scheduler对象了,只是对work进行操作;
3、获取一个NotificationLite对象的实例,用于对一个空对象的再次包装,使其不出现空指针异常;
4、检测当前系统是否包含"suc.misc.Unsafe"对象,如Android该方法返回值为false;所以执行else语句;但是java环境则一般执行if语句
5、创建的是一个Single-Producer-Single-Consumer queue大小由bufferSize决定,存储和获取队列中元素具有原子性
6、该方法的实现在Subscriber.class中,参见SubScriber.class源码,可以透露的是方法的作用就是设置该Subscriber接收的最大事件数
init()@ObserveOnSubscriber.class
  1. void init() {
  2. Subscriber<? super T> localChild = child;
  3. localChild.setProducer(new Producer() { //note0
  4. @Override
  5. public void request(long n) {
  6. if (n > 0L) {
  7. BackpressureUtils.getAndAddRequest(requested, n); //note1
  8. schedule(); //note2
  9. }
  10. }
  11. });
  12. localChild.add(recursiveScheduler); //note3
  13. localChild.add(this); //note4
  14. }
该方法主要完成向localChild注入一些参数,如果直接忽略这部分对我们对线程切换的机制影响并不大,不感兴趣的同学可以跳过。
0、调用Subscriber的setProducer方法,方法内部执行Producer.request(calculatedSize)方法
1、requested为之前创建的AtomicLong对象,requested的值正常情况为n,即需要向Subscriber传递的最大事件数
2、schedule方法
3、recursiveScheduler为之前通过scheduler.createWorker()得到的对象,将该对象添加到SubScriber的SubscriptionList中
4、将当前ObserveOnSubscriber对象添加到SubScriber的SubscriptionList中,对于为何不将init方法放置在构造器中,是因为这里要使用关键字this

schedule()@ObserveOnSubscriber.class
  1. protected void schedule() {
  2. if (counter.getAndIncrement() == 0) {//note1
  3. recursiveScheduler.schedule(this); //note2
  4. }
  5. }
1、加1操作成功后,返回成功之前的被加数,被加数等于0则调用work的schedule方法传入参数this,即第一次执行该方法结果值为0,往后的方法大多数情况将为假,不过虽然为假,但是调用的次数会被记录到counter中,call方法会在退出循环前检测是否重新执行一次循环,还是退出循环,保证后面提交的事件能够被处理。
2、work的schedule方法内部根据不同的Schedule有不同的形式,在后面的内容中我们将对AndroidSchedulers.class和NewThreadScheduler.class进行分析。这里可以提前告知的是在work的schedule方法内部会调用ObserveOnSubscriber的call方法。
分析call方法之前,首先分析ObserveOnSubscriber对象和普通SubScriber的onNext、onCompleted、onError方法的区别。
onNext()@ObserveOnSubscriber.class
  1. @Override public void onNext(final T t) {
  2. if (isUnsubscribed() || finished) {
  3. return;
  4. }
  5. if (!queue.offer(on.next(t))) { //note1
  6. onError(new MissingBackpressureException());
  7. return;
  8. }
  9. schedule();//note2
  10. }
1、queue是之前创建的SpscAtomicArrayQueue<Object>(calculatedSize)对象,on是NotificationLite实例,on.next作用是当t非空则返回t,否则返回具有一个结束标志的对象。该行语句正常情况下是将t存入queue中,同时返回true。
2、调用schedule方法。可见ObserveOnSubscriber的onNext方法并不执行其包含的SubScriber child的onNext方法
onCompleted()@ObserveOnSubscriber.class
  1. @Override public void onCompleted() {
  2. if (isUnsubscribed() || finished) {
  3. return;
  4. }
  5. finished = true; //note1
  6. schedule(); //note2
  7. }
1、设置结束标志
2、调用schedule方法。可见ObserveOnSubscriber的onCompleted方法并不执行其包含的SubScriber child的onCompleted方法
onError()@ObserveOnSubscriber.class
  1. @Override public void onError(final Throwable e) {
  2. if (isUnsubscribed() || finished) {
  3. RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
  4. return;
  5. }
  6. error = e; //ntoe1
  7. finished = true; //note1
  8. schedule(); //note2
  9. }
1、设置error标志和结束标志
2、调用schedule方法。可见ObserveOnSubscriber的onError方法并不执行其包含的SubScriber child的onError方法
因为work的schedule方法内部会调用ObserveOnSubscriber的call方法。所以这里有两个重要的地方,第一call方法执行的位置在work指定的线程中执行,第二call方法内部调用了SubScriber child的onNext方法。对于前者在后面的内容中我们将根据AndroidSchedulers.class和NewThreadScheduler.class进行具体分析,对于后者我们先来看其代码
call()@ObserveOnSubscriber.class
  1. @Override public void call() {
  2. long missed = 1L; //任务数门限
  3. long currentEmission = emitted; //当前任务数
  4. final Queue<Object> q = this.queue; //任务队列
  5. final Subscriber<? super T> localChild = this.child; //子Subscriber
  6. final NotificationLite<T> localOn = this.on;
  7. for (;;) {
  8. long requestAmount = requested.get(); //Subscriber所能接收的最大事件数
  9. while (requestAmount != currentEmission) { //当前完成任务数没有达到门限值
  10. boolean done = finished;
  11. Object v = q.poll(); //note1
  12. boolean empty = v == null;
  13. if (checkTerminated(done, empty, localChild, q)) {//note3
  14. return;
  15. }
  16. if (empty) {
  17. break;
  18. }
  19. localChild.onNext(localOn.getValue(v)); //note2
  20. currentEmission++;//当前任务数+1
  21. if (currentEmission == limit) {
  22. requestAmount = BackpressureUtils.produced(requested, currentEmission);//将requestted值减去当前执行结束的任务数,即得到Subscriber所能接收的最大事件数减少。
  23. request(currentEmission); //调用Subscriber的request方法参数为当前执行的任务数
  24. currentEmission = 0L; //当前任务数为0
  25. }
  26. }
  27. if (requestAmount == currentEmission) {//Subscriber所能接收的最大事件数等于当前执行完的任务数
  28. if (checkTerminated(finished, q.isEmpty(), localChild, q)) { //note3
  29. return;
  30. }
  31. }
  32. emitted = currentEmission; //执行到这里表明还有没有被消费的事件,记录当前消费到的事件位置
  33. missed = counter.addAndGet(-missed); //note4
  34. if (missed == 0L) {
  35. break;
  36. }
  37. }
该方法会在某条线程中被执行!
1、获取得到待处理的值v,在调用ObserveOnSubscriber的onNext方法是会将该方法 的参数存储queue队列中。则call方法中会读取queue中数。
2、可以将它近似看成localChild.onNext(v);
3、该处调用的方法内部根据判断是否执行onCompleted()方法或者执行onError方法,该方法内部最外面的判断是是判断finished的值,而该值只有在外界调用了ObserveOnSubscriber..onCompleted或者ObserveOnSubscriber.onError方法时才会被设定为true。
4、将counte的值减去missed后不能与0,表明在执行call方法时,shedule方法在外别被再次调用,因此需要继续执行循环,检查是否有新的事件等待消费

Schedulers如何创建指定的线程,各个线程之间的区别是什么.

Scheduler.class

该抽象类定义了如下几个方法
createWorker()@Scheduler.class
  1. public abstract Worker createWorker();//note1
  2. public abstract static class Worker implements Subscription {
  3. public abstract Subscription schedule(Action0 action);//note2
  4. public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
  5. }
1、返回一个Worker该Worker对象内部管理一个线程池
2、外界一般会调用该方法将Action0对象传给Worker,方法内部会在上面的线程池中选取一条线程执行Action0的call()方法。
下面分别以Schedulers.newThread()创建的NewThreadScheduler和AndroidSchedulers.mainThread()创建的AndroidSchedulers为例进行说明。

NewThreadScheduler.class

createWorker()@NewThreadScheduler.class
  1. private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler-";//note1
  2. private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);//note2
  3. public Worker createWorker() {
  4. return new NewThreadWorker(THREAD_FACTORY); //note3
  5. }
1、线程工厂创建线程的名字前缀
2、创建一个线程工厂,该对象的newThread方法会返回一个特定的线程
3、利用该线程工厂创建一个NewThreadWorker对象

NewThreadWorker.class

NewThreadWorker()@NewThreadWorker.class
  1. public NewThreadWorker(ThreadFactory threadFactory) {
  2. ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); //note1
  3. ....
  4. executor = exec;
  5. }
1、创建一个预定执行的固定线程池
schedule()@NewThreadWorker.class
  1. public Subscription schedule(final Action0 action) {
  2. return schedule(action, 0, null);
  3. }
  4. public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
  5. if (isUnsubscribed) {//note2
  6. return Subscriptions.unsubscribed();
  7. }
  8. return scheduleActual(action, delayTime, unit);
  9. }
1、会在执行前判断是否绑定,否则不进行下面的操作,表明当我们调用Subscribe.unsubscribed方法时,它将不会再收到消息,同时对应的线程也不再向下执行
  1. public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
  2. ScheduledAction run = new ScheduledAction(action); //note1
  3. Future<?> f;
  4. if (delayTime <= 0) {
  5. f = executor.submit(run); //note2
  6. } else {
  7. f = executor.schedule(run, delayTime, unit);
  8. }
  9. return run;
  10. }
1、ScheduledAction实现了Runnable接口,run方法内部调用action的call方法
2、将run对象提交给线程池执行

AndroidSchedulers.class

mainThread()@AndroidSchedulers.class
  1. public static Scheduler mainThread() {
  2. Scheduler scheduler = RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler(); //note1
  3. return scheduler != null ? scheduler : MainThreadSchedulerHolder.MAIN_THREAD_SCHEDULER; //note2
  4. }
1、等价于rxAndroidPlugins.getSchedulersHook().getMainThreadScheduler();等价于RxAndroidSchedulersHook.getDefaultInstance().getMainThreadScheduler()等价于null
2、return HandlerScheduler(new Handler(Looper.getMainLooper()));获取到了一个和UI线程的Looper绑定的Handler

HandlerScheduler.class

  1. private final Handler handler;
  2. HandlerScheduler(Handler handler) {
  3. this.handler = handler;
  4. }
createWorker()@HandlerScheduler.class
  1. public Worker createWorker() {
  2. return new HandlerWorker(handler);
  3. }
  4. static class HandlerWorker extends Worker {
  5. private final Handler handler;
  6. HandlerWorker(Handler handler) {
  7. this.handler = handler;
  8. }
  9. @Override
  10. public Subscription schedule(final Action0 action) {
  11. return schedule(action, 0, TimeUnit.MILLISECONDS);
  12. }
  13. @Override
  14. public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
  15. if (compositeSubscription.isUnsubscribed()) {//note0
  16. return Subscriptions.unsubscribed();
  17. }
  18. action = RxAndroidPlugins.getInstance().getSchedulersHook().onSchedule(action); //note1
  19. final ScheduledAction scheduledAction = new ScheduledAction(action); //note2
  20. ...
  21. handler.postDelayed(scheduledAction, unit.toMillis(delayTime)); //note3
  22. ....
  23. return scheduledAction;
  24. }
  25. }
0、会在执行前判断是否绑定,否则不进行如下的操作,表明当我们调用Subscribe.unsubscribed方法时,它将不会再收到消息,同时对应的线程也不再向下执行
1、很明显,前面一堆现在基本用不上,所以该行语句在目前来看是没有意义的,Action最终还是Action
2、我们只需要知道ScheduledAction对象实现了Runnable接口,在run方法内部调用了action的call方法
3、将ScheduledAction对象交给Handler,在合适的时间将被执行
到此为止我们对AndroidSchedulers.class和NewThreadScheduler.class两个类进行了分析,其它类型的继承Scheduler的类基本类似,在此不再详细介绍。大体上它们完成的任务就是将Action0对象交给自己的线程池去执行。

下面我们一次来看一下ScalarSynchronousObservable对象和OnSubscribeFromArray对象中的OnSubscriber对象,重点看下其各自的call方法

ScalarSynchronousObservable.class

Subscriber 对象的onStart、unSubscribe、onNext()、onCompleted()、onCompleted()和onError()在何处,何种情况被调用;
ScalarSynchronousObservable()@ScalarSynchronousObservable.class
  1. final T t;
  2. protected ScalarSynchronousObservable(final T t) {
  3. super(new OnSubscribe<T>() {
  4. @Override
  5. public void call(Subscriber<? super T> s) {
  6. s.setProducer(createProducer(s, t)); //note1
  7. }
  8. });
  9. this.t = t;
  10. }
1、ScalarSynchronousObservable对象的OnSubscriber对象的call方法,调用Subscriber.setProducer方法,该方法的具体内容参考Subscriber.class的源码。
可以提前预告的是Subscriber.setProducer(producer)方法完成的任务有:给Subscriber对象的Producer域赋值,调用producer.request方法。
Producer是一个接口,它只有一个request方法;一般实现该接口的类,都会包含一个Subscriber对象和一个待处理的数据,createProducer(s, t)方法中,s是一个Subscriber对象,t是一个待处理的参数。发现没,我们完全可以在Producer中先对t进行相应的处理随后,再将数据传送给s。
create(T t)@ScalarSynchronousObservable.class
  1. public static <T> ScalarSynchronousObservable<T> create(T t) {
  2. return new ScalarSynchronousObservable<T>(t);
  3. }
createProducer()@ScalarSynchronousObservable.class
  1. static <T> Producer createProducer(Subscriber<? super T> s, T v) {
  2. if (STRONG_MODE) { //默认是false
  3. return new SingleProducer<T>(s, v);
  4. }
  5. return new WeakSingleProducer<T>(s, v); 返回这个Producer
  6. }
WeakSingleProducer.class@ScalarSynchronousObservable.class
  1. static final class WeakSingleProducer<T> implements Producer{
  2. final Subscriber<? super T> actual;
  3. final T value;
  4. boolean once;
  5. public void request(long n) {
  6. if (once) {
  7. return;
  8. }
  9. if (n < 0L) {
  10. throw new IllegalStateException("n >= required but it was " + n);
  11. }
  12. if (n != 0L) {
  13. once = true;
  14. Subscriber<? super T> a = actual;
  15. if (a.isUnsubscribed()) {
  16. return;
  17. }
  18. T v = value;
  19. try {
  20. a.onNext(v); //note1
  21. } catch (Throwable e) {
  22. Exceptions.throwOrReport(e, a, v);
  23. return;
  24. }
  25. if (a.isUnsubscribed()) {
  26. return;
  27. }
  28. a.onCompleted(); //note2
  29. }
  30. }
  31. }
1、执行onNext方法,ScalarSynchronousObservable类对应只有一个值需要处理,因此这里也就只需要调用一次onNext方法。
2、执行onCompleted方法,该方法不一定被执行,因为不排除异常情况。

scalarFlatMap()@ScalarSynchronousObservable.class
  1. public <R> Observable<R> scalarFlatMap(final Func1<? super T, ? extends Observable<? extends R>> func) {
  2. return create(new OnSubscribe<R>() {
  3. @Override public void call(final Subscriber<? super R> child) {
  4. Observable<? extends R> o = func.call(t); //note1
  5. if (o instanceof ScalarSynchronousObservable) {
  6. child.setProducer(createProducer(child, ((ScalarSynchronousObservable<? extends R>)o).t)); //note2
  7. } else {
  8. o.unsafeSubscribe(child); //note3
  9. }
  10. }
  11. });
  12. }
1、将当前ScalarSynchronousObservable对象的唯一元素t交给func处理,得到一个Observable对象o
2、如果o是一个ScalarSynchronousObservable对象;createProducer方法前面已经介绍,默认情况是创建一个WeakSingleProducer对象;然后将WeakSingleProducer对象传入child.setProducer方法,最终会执行WeakSingleProducer对象的request方法,方法中再将o的事件交给Subscriber处理
3、该方法的效果就是调用child.onStart, o.onSubscribe.call(child)方法,即将事件传递给child消费

OnSubscribeFromArray.class

public final class OnSubscribeFromArray<T> implements OnSubscribe<T>
Fields
final T[] array;
OnSubscribeFromArray()@OnSubscribeFromArray.class
public OnSubscribeFromArray(T[] array) { this.array = array;  }
Subscriber 对象的onStart、unSubscribe、onNext()、onCompleted()、onCompleted()和onError()在何处,何种情况被调用;
call()@OnSubscribeFromArray.class
  1. @Override public void call(Subscriber<? super T> child) {
  2. child.setProducer(new FromArrayProducer<T>(child, array));
  3. }
child.setProducer()方法内部一般情况会调用producer.request(Long.MAX_VALUE)方法
FromArrayProducer<T>@OnSubscribeFromArray.class
  1. static final class FromArrayProducer<T> extends AtomicLong implements Producer
  2. final Subscriber<? super T> child;
  3. final T[] array;
  4. int index;
  5. public FromArrayProducer(Subscriber<? super T> child, T[] array) {
  6. this.child = child;
  7. this.array = array;
  8. }
  9. @Override public void request(long n) {
  10. if (n < 0) {
  11. throw new IllegalArgumentException("n >= 0 required but it was " + n);
  12. }
  13. if (n == Long.MAX_VALUE) {
  14. if (BackpressureUtils.getAndAddRequest(this, n) == 0) { //note1
  15. fastPath();
  16. }
  17. } else if (n != 0) {
  18. if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
  19. slowPath(n);
  20. }
  21. }
  22. }
1、n一般情况等于Long.MAX_VALUE,原因参考Subscriber.class。返回对this的long域执行加n操作成功后,加成功之前的值。是CAS操作,set-and-swap操作。即如果是第一次调用FromArrayProducer.request方法则在该行语句BackpressureUtils.getAndAddRequest(this, n) == 0为真
fastPath()@OnSubscribeFromArray.class
  1. void fastPath() {
  2. final Subscriber<? super T> child = this.child;
  3. for (T t : array) {
  4. if (child.isUnsubscribed()) {
  5. return;
  6. }
  7. child.onNext(t); //note1
  8. }
  9. if (child.isUnsubscribed()) {
  10. return;
  11. }
  12. child.onCompleted(); //note2
  13. }
1、调用Subscriber的onNext方法,这里执行的是一个循环将构造Observable时的参数一次传给onNext方法。
2、调用Subscriber的onCompleted方法。
slowPath()@OnSubscribeFromArray.class
  1. void slowPath(long r) {
  2. final Subscriber<? super T> child = this.child;
  3. final T[] array = this.array;
  4. final int n = array.length;
  5. long e = 0L;
  6. int i = index; //若是第一次执行这里的index=0
  7. for (;;) {
  8. while (r != 0L && i != n) { //note1
  9. if (child.isUnsubscribed()) {
  10. return;
  11. }
  12. child.onNext(array[i]);
  13. i++;
  14. if (i == n) {
  15. if (!child.isUnsubscribed()) {
  16. child.onCompleted();
  17. }
  18. return;
  19. }
  20. r--;
  21. e--;
  22. } //note2
  23. r = get() + e; //note3
  24. if (r == 0L) { //note2
  25. index = i; //记录当前执行到的任务数
  26. r = addAndGet(e);
  27. if (r == 0L) {
  28. return;
  29. }
  30. e = 0L;
  31. }
  32. }
  33. }
1、判断r剩下执行任务数不为0,当前执行完事件数不大于事件总数
2、 运行到这里证明当前还有事件没有被Subscriber处理
3、get获取到的值是前面request方法BackpressureUtils.getAndAddRequest(this, n) == 0设置的值,即该Subscriber接受的最大任务数,所以note3执行之后r为待处理的最大任务数
4、一般情况这里为真,即任务已经全部处理完毕,但是还有事件没有被处理完毕,index记录当前执行到的事件次数,并且将当前的AtomicLong的long域设置为0L,否则下次BackpressureUtils.getAndAddRequest(this, n) == 0将不会为真了。

Subscriber.class

public abstract class Subscriber<T> implements Observer<T>, Subscription
Fields
  1. private final SubscriptionList subscriptions; //构造器中赋初值
  2. private final Subscriber<?> subscriber; //构造器中赋初值
  3. private Producer producer; //事件制造者
  4. private long requested = NOT_SET; //最大接受任务数
  5. private static final Long NOT_SET = Long.MIN_VALUE;
Subscriber()@Subscriber.class
  1. protected Subscriber() { this(null, false); }
  2. protected Subscriber(Subscriber<?> subscriber) { this(subscriber, true); }
  3. protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
  4. this.subscriber = subscriber;
  5. this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList(); //note1
  6. }
1、class SubscriptionList implements Subscription;该对象内部包含private LinkedList<Subscription> subscriptions和private volatile boolean unsubscribed;两个域
add()@Subscriber.class
  1. public final void add(Subscription s) {
  2. subscriptions.add(s);
  3. }
将s添加到SubscriptionList的LinkedList<Subscription> 中,在添加前需要判断SubscriptionList的unsubscribed对象是否为假
onStart()@Subscriber.class
  1. <pre name="code" class="java">public void onStart() {
  2. // do nothing by default
  3. }
 
unsubscribe()@Subscriber.class
  1. @Override public final void unsubscribe() {
  2. subscriptions.unsubscribe();
  3. }
SubscriptionList的unsubscribed = true;subscriptions = null设值
isUnsubscribed()@Subscriber.class
  1. @Override public final boolean isUnsubscribed() {
  2. return subscriptions.isUnsubscribed();
  3. }
返回SubscriptionList的subscriptions值
request()@Subscriber.class
  1. protected final void request(long n) {
  2. if (n < 0) {
  3. throw new IllegalArgumentException("number requested cannot be negative: " + n);
  4. }
  5. Producer producerToRequestFrom = null;
  6. synchronized (this) {
  7. if (producer != null) {
  8. producerToRequestFrom = producer;
  9. } else {
  10. addToRequested(n); //note1
  11. return;
  12. }
  13. }
  14. producerToRequestFrom.request(n); //note2
  15. }
1、如果Producer为空,则先将n加到requested上面去,requested的大小是Subscribe需要处理的事件数
2、调用事件生产者的request方法,同时将n传递给Producer,告知它本Subscriber准备消费的事件数
addToRequested()@Subscriber.class
  1. private void addToRequested(long n) {
  2. if (requested == NOT_SET) {
  3. requested = n;
  4. } else {
  5. final long total = requested + n;
  6. if (total < 0) {
  7. requested = Long.MAX_VALUE; //note1
  8. } else {
  9. requested = total;
  10. }
  11. }
  12. }
1、如果requested+n大于Long.MAX_VALUE则表明,当前该Subscriber接收的任务数不受任何限定,由Producer自行决定向Subscribe提交多少事件
addToRequested()方法只会被request()调用,客户代码不可调用
setProducer()@Subscriber.class
  1. public void setProducer(Producer p) {
  2. long toRequest;
  3. boolean passToSubscriber = false;
  4. synchronized (this) {
  5. toRequest = requested; //note1
  6. producer = p;
  7. if (subscriber != null) {//一般情况下该结果为假
  8. if (toRequest == NOT_SET) {
  9. passToSubscriber = true;
  10. }
  11. }
  12. }
  13. if (passToSubscriber) {//一般情况该行结果为假
  14. subscriber.setProducer(producer);
  15. } else {
  16. if (toRequest == NOT_SET) { //note2
  17. producer.request(Long.MAX_VALUE);
  18. } else {
  19. producer.request(toRequest);
  20. }
  21. }
1、得到当前Subscriber接收的最大任务数
2、如果之前没有调用过Subscriber的request方法,即没有对Subscriber对象的最大任务请求数做过设置,则该Subscriber默认接收所有来自Producer的事件


总结:

    Java中的map、flatMap、observeOn、subscribeOn、filter方法底层实现都是使用lift、operator机制进行实现的。lift实现Observable<T>向Observable<R>的转变,operator实现Subscribe<R>向Subscribe<T>的转变。其中flatMap比较特殊使用了一次map和一次lift&operator。它们之间的区别主要是Operator的区别,observeOn对应OperatorObserveOn 、subscribeOn对应OperatorSubscribeOn 、flatMap对应OperatorMerge 、map对应OperatorMap、 filter对应OperatorFilter。
    之前对lift和Operator的讲解很多同学可能也不是很理解,在此我们特意将这两个知识点单独拿出来捋一捋:
    首先假设调用lift方法的对象为observable<T>。lift方法返回一个observable<R>对象,observable<R>对象的OnSubscribe.call方法执行如下的代码
  1. 利用operator将subscriber<R>转成subscriber<T>  [或者将subscriber_inner<T>转成subscriber<T> ], Operator在此起到了连接subscriber<T>和subscriber<R>的作用,可以对来自subscriber<T>的t数据进行处理之后再传递给subscriber<R>
  2. 调用subscriber<T>对象的onstart方法, 
  3. 调用observable<T>.onsubscribe.call方法,
    • call方法内部会调用subscriber<T>的onNext 、onCompleted方法。onNext方法内部先使用Operator进行一定的预处理,之后根据预处理的结果执行Subscribe<R>.onNext方法
    • 以map为例:subscribe<T>.onnext(T t) 内部执行过程是{ subscribe<R>.onnext(Fun1.call(t)) }
    • 以observeOn为例:subscribe<T>.onnext(T t) 内部执行过程是{queue.add(t); executor.submit(new Runnable(){  call() }); } 上面的call方法内容如下{ t=queue.get();  subscribe_inner<T>.onnext(t);}



番外篇1:对于RxJava和Volley如何使用呢?

难点在于:

  • volley只支持异步请求,何通过volley如何将处理的结果返回给RxJava?

解决方案:

  • Volley有一个类public class RequestFuture<T> implements Future<T>, Response.Listener<T>,  Response.ErrorListener{}利用该类的get方法可以获得返回值。
  • RequestFuture工作原理:
    • 构造RequestFuture:RequestFuture<JSONObject> future = RequestFuture.newFuture();
    • 构造volley请求的时候,将上面的对象传入请求中;JSONObjectRequest(Request.Method.GET, Url,future,future,....);
    • 随后调用RequestFuture.get方法,该方法会在当前线程阻塞;阻塞时间可以自己get的时候设置;return future.get();
    • get方法内部调用wait(time),在等待时间内还没有结果这抛出超时异常;(wait(0)是无限期等待)
    • 在get的wait过程中,如果volley请求到来了,则会调用RequestFuture的onResponse方法,设置该对象中的private T mResult;,同时调用notifyall(),唤醒等待中的线程。
    • 注意,因为该方法是会阻塞的,因此千万不要在UI线程中调用get方法!!因此需要在一个新线程中进行阻塞,这样RXJava的优势就出来了~~~!撒花~
  • reference:http://stackoverflow.com/questions/32701331/rxjava-and-volley-requests

番外篇2:RxJava如何实现RxBus?

 
   虽然博客已经很长了,但也只能算是对RxJava的简单了解。这篇文章http://nerds.weddingpartyapp.com/tech/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/使用RxJava实现了跟EventBus和Otto同样的事件分发功能。下面对其实现方式进行简单分析

简单使用:

  1. RxBus _rxBus = new RxBus(); //获取RxBus对象,建议将RxBus对象的获取写成一个单例模式
  2. _rxBus.toObserverable()//订阅事件代码, xx中定义事件处理方法
    • .subscribe(new Action1<Object>() {
    •       @Override  public void call(Object event) {
    •           if(event instanceof ClassA) {  .......}
    •       }
    • });
  3. _rxBus.send(new TapEvent()); //发送事件

源码分析:


以下是摘录自开源项目https://github.com/evanman/RxJava-Android-Samples中的一种RxBus实现。个人觉得应该将单例模式嵌套在RxBus类中,如EventBus那样使用getDefault方法获取一个单例,否则用户还需要自己重新写一个单例出来。
RxBus.class
  1. import rx.Observable;
  2. import rx.subjects.PublishSubject;
  3. import rx.subjects.SerializedSubject;
  4. import rx.subjects.Subject;
  5. public class RxBus {
  6. private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());
  7. public void send(Object o) { _bus.onNext(o); }
  8. public Observable<Object> toObserverable() { return _bus; }
  9. public boolean hasObservers() { return _bus.hasObservers(); }
  10. }
SerializedSubject是RxJava开源框架中的一个类,具体内容如下:
SerializedSubject.class
  1. public class SerializedSubject<T, R> extends Subject<T, R> {
  2. private final SerializedObserver<T> observer;
  3. private final Subject<T, R> actual;
  4. public SerializedSubject(final Subject<T, R> actual) {
  5. super(new OnSubscribe<R>() {
  6. @Override
  7. public void call(Subscriber<? super R> child) {
  8. actual.unsafeSubscribe(child);
  9. }
  10. });
  11. this.actual = actual;
  12. this.observer = new SerializedObserver<T>(actual);
  13. }
  14. @Override public void onCompleted() { observer.onCompleted(); }
  15. @Override public void onError(Throwable e) { observer.onError(e); }
  16. @Override public void onNext(T t) { observer.onNext(t); }
  17. @Override public boolean hasObservers() { return actual.hasObservers(); }
  18. }
Subject<T, R>继承自Observable类; SerializedObserver构造器接收一个Subject<T, R>参数,利用该参数初始化SerializedSubject中的SerializedObserver<T>和Subject<T, R>两个类型的域; RxBus创建一个SerializedObserver对象时,构造器接收的参数是PublishSubject.create(),即PublishSubject对象.
简单分析
对RxBus简单使用中的方法调用可以做出如下映射:
  1. rxBus.toObserverable()==>serializedSubject
  2. rxBus.toObserverable().subscribe(new Subscriber)==>serializedSubject.subscribe(new Subscriber)
  3. rxBus.send(Object o)==>serializedSubject.onNext(o)==>serializedSubject.serializedObserver.onNext(0)
  4. rxBus.hasObservers()==>serializedSubject.hasObservers()==>serializedSubject.subject.hasObservers()
    针对第二步,实际会利用SerializedSubject的OnSubscribe域的call方法处理Subscriber<? super R> child,即actual.unsafeSubscribe(child)==>publishSubject.unsafeSubscribe(child);而publishSubject.unsafeSubscribe方法内部又会调用publishSubject的OnSubscribe域的call方法处理,publishSubject的OnSubscribe域实际是一个SubjectSubscriptionManager类型,SubjectSubscriptionManager类型的call方法内部先将当前传入的Subcribe对象转换成一个SubjectObserver对象,随后存入一个集合中。
    针对第三步,将事件传给SerializedSubject的SerializedObserver域的onNext方法处理;而该方法内部会先将事件存入一个队列中,随后将队列中的数据交给publishSubject的onNext方法处理,publishSubject的onNext方法内部会将参数交给第二步的集合中的所有SubjectObserver对象的onNext方法处理。

与EventBus的对比

    在第三步中,publishSubject的onNext方法将事件交给所有的监听者进行处理,因此 事件处理方法中常常使用 if(obj instansof XX) 的语句判断当前接收到的obj是否是自己想要的事件。而EventBus会根据接收事件的类型交给对应的监听者进行处理,代码中不需要使用 if(obj instansof XX) 的语句,事件发送精准度提高;但是这是利用反射实现的,这对性能有一定的影响。



 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/177362
推荐阅读
相关标签
  

闽ICP备14008679号