当前位置:   article > 正文

rxjava2.0 线程切换的原理及SubscribeOn,ObserveOn的用法_rxjava observeon 和 subscribeon 原理

rxjava observeon 和 subscribeon 原理

android开发过程中经常会用到线程切换,比如数据加载、文件存储、数据库操作等都是在io线程处理,而处理结果的展示刷新ui需要在ui线程进行。
如果不用rxjava,我们可能会asycTask,或者retrofit自己默认的callback来在ui线程刷新ui。但是当我们的操作变复杂时,比如一个接口的数据依赖于另一个接口的返回,或者一次上传多张图片功能,就会造成接口多层嵌套进而增加维护成本

注:由于observable实现了observablesource接口所以下文所说的observable与source等价

线程切换方式

先上一段代码及运行结果,呈现出一种直观的认识

        Observable.just("Some String")
                .map(new Function<String, Integer>() {

                    @Override
                    public Integer apply(String s) throws Exception {
                        Log.d("cong", "1 threadName:"+Thread.currentThread().getName());
                        return s.length();
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        Log.d("cong", "2 threadName:"+Thread.currentThread().getName());
                        return 2 * integer;
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        Log.d("cong", "3 threadName:"+Thread.currentThread().getName());
                        return 2 * integer;
                    }
                })
                .observeOn(Schedulers.computation()) // change thread
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        Log.d("cong", "4 threadName:"+Thread.currentThread().getName());
                        return 2 * integer;
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        Log.d("cong", "5 threadName:"+Thread.currentThread().getName());
                        return 2 * integer;
                    }
                })
                .observeOn(Schedulers.newThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d("cong", "6 threadName:"+Thread.currentThread().getName());
                    }
                });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

img
可以发现5处线程切换其实只生效了3处,第一个subscribeOn,两个ObserveOn。
要理解原因必须要了解rxjava操作符的设计模式,rxjava操作符的设计其实是大量利用包装者模式来包装observer,observable。

以map操作符为例:

public interface Function<T, R> {
   
   //传递一个类型的值并将值转换为另一个类型
    R apply(@NonNull T t) throws Exception;
}

 public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        
        // 新生成一个observable,并且将本类,及mapper 作为参数传递到新的observable 中
        return  new ObservableMap<T, R>(this, mapper));
 }
  
 public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
     
       /**1: 每个source其实就是调用操作符的时候的observable主体即被包装的observable
       *  2: MapperObserver是对当前source接收的observer类型的封装,当upstreamSource observable类型的数据传递过来的时候会在mapperobserser中做转换,最后调用到真实的被包装的observer的onnext
       *       
       *  **/
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //真实的被包装的observer
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}
     
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

线程切换操作符的原理类似,首先我们还是先给出第一个示例的流程图。
由于source包装的都是调用操作符的source,及所谓的upstreamsource

什么是upstream downstream

        upstream       downstream
  • 1

source <------------- operator ------------------> consumer/further operators
操作符的左边是upstream 操作符的右边是dowmstream

操作符中理解这张图的关键就是subscribeOn既会影响upstream也会影响downstream,而observeOn只会影响downStream

图中向上的箭头是subscribe过程,向下的箭头是onnext通知过程

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/251122
推荐阅读
相关标签
  

闽ICP备14008679号