赞
踩
android开发过程中经常会用到线程切换,比如数据加载、文件存储、数据库操作等都是在io线程处理,而处理结果的展示刷新ui需要在ui线程进行。
如果不用rxjava,我们可能会asycTask,或者retrofit自己默认的callback来在ui线程刷新ui。但是当我们的操作变复杂时,比如一个接口的数据依赖于另一个接口的返回,或者一次上传多张图片功能,就会造成接口多层嵌套进而增加维护成本
注:由于observable实现了observablesource接口所以下文所说的observable与source等价
先上一段代码及运行结果,呈现出一种直观的认识
Observable.just("Some String") .map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { Log.d("cong", "1 threadName:"+Thread.currentThread().getName()); return s.length(); } }) .subscribeOn(AndroidSchedulers.mainThread()) .map(new Function<Integer, Integer>() { @Override public Integer apply(Integer integer) throws Exception { Log.d("cong", "2 threadName:"+Thread.currentThread().getName()); return 2 * integer; } }) .subscribeOn(Schedulers.newThread()) .map(new Function<Integer, Integer>() { @Override public Integer apply(Integer integer) throws Exception { Log.d("cong", "3 threadName:"+Thread.currentThread().getName()); return 2 * integer; } }) .observeOn(Schedulers.computation()) // change thread .map(new Function<Integer, Integer>() { @Override public Integer apply(Integer integer) throws Exception { Log.d("cong", "4 threadName:"+Thread.currentThread().getName()); return 2 * integer; } }) .subscribeOn(AndroidSchedulers.mainThread()) .map(new Function<Integer, Integer>() { @Override public Integer apply(Integer integer) throws Exception { Log.d("cong", "5 threadName:"+Thread.currentThread().getName()); return 2 * integer; } }) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d("cong", "6 threadName:"+Thread.currentThread().getName()); } });
可以发现5处线程切换其实只生效了3处,第一个subscribeOn,两个ObserveOn。
要理解原因必须要了解rxjava操作符的设计模式,rxjava操作符的设计其实是大量利用包装者模式来包装observer,observable。
以map操作符为例:
public interface Function<T, R> { //传递一个类型的值并将值转换为另一个类型 R apply(@NonNull T t) throws Exception; } public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { // 新生成一个observable,并且将本类,及mapper 作为参数传递到新的observable 中 return new ObservableMap<T, R>(this, mapper)); } public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { /**1: 每个source其实就是调用操作符的时候的observable主体即被包装的observable * 2: MapperObserver是对当前source接收的observer类型的封装,当upstreamSource observable类型的数据传递过来的时候会在mapperobserser中做转换,最后调用到真实的被包装的observer的onnext * * **/ source.subscribe(new MapObserver<T, U>(t, function)); } static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } //真实的被包装的observer actual.onNext(v); } @Override public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } @Nullable @Override public U poll() throws Exception { T t = qs.poll(); return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; } } }
线程切换操作符的原理类似,首先我们还是先给出第一个示例的流程图。
由于source包装的都是调用操作符的source,及所谓的upstreamsource
upstream downstream
source <------------- operator ------------------> consumer/further operators
操作符的左边是upstream 操作符的右边是dowmstream
操作符中理解这张图的关键就是subscribeOn既会影响upstream也会影响downstream,而observeOn只会影响downStream
图中向上的箭头是subscribe过程,向下的箭头是onnext通知过程
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。