当前位置:   article > 正文

Android Rxjava框架的原理和使用

android rxjava

原理

Rx是Reactive Extensions的缩写的简写,可以使用可观察数据流对编程接口进行异步编程,它结合了观察者模式,迭代器模式和函数式的精华。

Rxjava是一种异步数据处理库,也是一种观察者模式。最早是Netflix公司用于重构当前架构时减少REST调用的次数,参考了Microsoft公司的响应式编程,把Microsoft的Rx库迁移到Java JVM中,其中最有名的就是RxJava。

它的特点主要有以下:

  1. 支持Java 8 Lambda。
  2. 支持异步和同步。
  3. 单一依赖关系。
  4. 简洁,优雅。

RxAndroid

在开发项目的时候,开发者在使用Rxjava时会搭配RxAndroid,他是针对Rxjava在Android平台使用的一个响应式扩展组件。使用RxAndroid的Schedulers(调度器)可以解决Android主线程问题, 多线程等问题。

观察者模式的四大要素

  1. Observable 被观察者
  2. Observer
  3. 观察者 subscribe 订阅
  4. 事件
    在这里插入图片描述

观察者订阅被观察者,一旦被观察者发出事件,观察者就可以接收到。

扩展的观察者模式

在这里插入图片描述
onNext()订阅了一个事件,当事件完成时会回调onComplete(),在完成过程中发生了异常会回调onError()。

使用

依赖

//在Project的gradle下添加maven仓库
maven {
    url "https://oss.jfrog.org/libs-snapshot" }

implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Hello World

//1.创建被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
   
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
   
                emitter.onNext("hello world");
                emitter.onComplete();
            }
        });
//2.创建观察者
Observer<String> observer = new Observer<String>() {
   
            @Override
            public void onSubscribe(@NonNull Disposable d) {
   
                System.out.println("onSubscribe():");
            }

            @Override
            public void onNext(@NonNull String s) {
   
                System.out.println("onNext():" + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
   
                System.out.println("onError():" + e.getMessage());
            }

            @Override
            public void onComplete() {
   
                System.out.println("onComplete():");
            }
        };
//3.订阅事件
observable.subscribe(observer);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

**注意:**onError()和onComplete()只会回调一个。

操作符

Creating Observables(创建 Observable)

Create

//链式写法
        Observable.create(new ObservableOnSubscribe<String>() {
   
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
   
                emitter.onNext("hello world");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
   
            @Override
            public void onSubscribe(Disposable d) {
   
                System.out.println("onSubscribe():"+d.toString());
            }

            @Override
            public void onNext(String o) {
   
                System.out.println("onNext():" + o);
            }

            @Override
            public void onError(Throwable e) {
   
                System.out.println("onError():" + e.getMessage());
            }

            @Override
            public void onComplete() {
   
                System.out.println("onComplete():");
            }
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

Just
使用将为你创建一个Observable并自动为你调用onNext( )发射数据,just中传递的参数将直接在Observer的onNext()方法中接收到。

Observable.just("hello world").subscribe(new Observer<String>() {
   
            @Override
            public void onSubscribe(Disposable d) {
   
            }

            @Override
            public void onNext(String s) {
   
             	System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {
   

            }

            @Override
            public void onComplete() {
   

            }
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

From

将一个Iterable, 一个Future, 或者一个数组转换成一个Observable,遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item。

List<String> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
   
            list.add("Hello" + i);
        }

        Observable.fromArray(list).subscribe(new Observer<List<String>>() {
   
            @Override
            public void onSubscribe(Disposable d) {
   

            }

            @Override
            public void onNext(List<String> strings) {
   
                System.out.println(strings);
            }

            @Override
            public void onError(Throwable e) {
   

            }

            @Override
            public void onComplete() {
   

            }
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

Defer

当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。 以何种方式创建这个Observable对象,当满足回调条件后,就会进行相应的回调。

value = "2020/12/13";
    Observable<String> observable = Observable.defer(new Supplier<ObservableSource<? extends String>>() {
   
        @Override
        public ObservableSource<? extends String> get() throws Throwable {
   
            return Observable.just(value);
        }
    });
    value = "12345";
    observable.subscribe(new Observer<String>() {
   
        @Override
        public void onSubscribe(Disposable d) {
   

        }

        @Override
        public void onNext(String s) {
   
            System.out.println(s);
        }

        @Override
        public void onError(Throwable e) {
   

        }

        @Override
        public void onComplete() {
   

        }
    });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

Empty/Never/Throw

Empty是创建一个不发射任何数据但是正常终止的Observable。 Never是创建一个不发射数据也不终止的Observable。 Throw是创建一个不发射数据以一个错误终止的Observable。 这三个操作符生成的Observable行为非常特殊和受限。测试的时候很有用,有时候也用于结合其它的Observables,或者作为其它需要Observable的操作符的参数。

Observable.defer(new Supplier<ObservableSource<?>>() {
   
            @Override
            public ObservableSource<?> get() throws Throwable {
   
                return Observable.error(new Throwable("你写了个bug"));
            }
        }).subscribe(new Observer<Object>() {
   
            @Override
            public void onSubscribe(@NonNull Disposable d) {
   

            }

            @Override
            public void onNext(@NonNull Object o) {
   
            }

            @Override
            public void onError(@NonNull Throwable e) {
   
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {
   

            }
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

Interval

创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。即按照固定1秒一次调用onNext()方法。

//TrampolineScheduler不会立即执行,当其他排队任务结束时才执行,TrampolineScheduler运行在主线程。

Observable.interval(1000, TimeUnit.MILLISECONDS, Schedulers.trampoline()).subscribe(new Observer<Long>() {
   
            @Override
            public void onSubscribe(@NonNull Disposable d) {
   
            }

            @Override
            public void onNext(@NonNull Long aLong) {
   
                System.out.println(aLong);
            }

            @Override
            public void onError(@NonNull Throwable e) {
   

            }

            @Override
            public void onComplete() {
   

            }
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

Repeat

创建一个Observable,该Observable的事件可以重复调用。

 Observable.just(123).repeat(2).subscribe(new Observer<Integer>() {
   
            @Override
            public void onSubscribe(Disposable d) {
   

            }

            @Override
            public void onNext(Integer integer) {
   
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {
   

            }

            @Override
            public void onComplete() {
   

            }
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

Start

返回一个Observable,它发射一个类似于函数声明的值。

Timer

创建一个Observable,它在一个给定的延迟后发射一个特殊的值,即表示延迟2秒后,调用onNext()方法。

 Observable.timer(2000, TimeUnit.MILLISECONDS,Schedulers.trampoline()).subscribe(new Observer<Long>() {
   
            @Override
            public void onSubscribe(Disposable d) {
   

            }

            @Override
            public void onNext(Long aLong) {
   
                System.out.println(aLong);
            }

            @Override
            public void onError(Throwable e) {
   

            }

            @Override
            public void onComplete() {
   

            }
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

Transforming Observables(转换 Observable)

Map

Map就是把原来的Observable对象转换成另一个Observable对象,同时将传输的数据进行一些灵活的操作,方便Observer获得想要的数据形式。

//Integer to String
Observable.just(123).map(new Function<Integer, String>() {
   
            @Override
            public String apply(Integer s) throws Exception {
   
                return s.toString();
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号