当前位置:   article > 正文

【RxJava】走进RxJava 从关键类开始_rxjava maven

rxjava maven

我们知道RxJava是基于观察者模式的,所以其核心类肯定有被观察者 Observable 用于发射数据; 观察者Observer 用于接收处理数据;还有一个特殊的SubjectObservable 的一个扩展,同时还实现了 Observer 接口Subject同时作为观察者和 被观察者,可以用来转发事件,当做 Rx 中的 事件管道。

  • class rx.Observable< T>
  • interface rx.Observer< T>
  • abstract class rx.subjects.Subject< T, R>

maven工具引入rxJava

<!-- https://mvnrepository.com/artifact/io.reactivex/rxjava -->
<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>1.2.6</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Observable

创建被观察者 发射数据

    //Observable 发射数据
    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> t) {//t 作为一个订阅者(观察者)
            t.onNext("hello");//这里会触发所有订阅者的onNext()函数
            t.onCompleted();
            //一个事件流结束(onError 或者 onCompleted 都会导致事件流结束)后就不会发射任何数据了。
            //t.onError(new Throwable("sry,there is something wrong!"));
        }
    });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

observer

再创建一个观察者 接受处理数据

    //Observer 观察者  接受处理数据
    Observer<String> observer = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted");
        }
        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
        @Override
        public void onNext(String t) {
            System.out.println("onNext");
            System.out.println("t:" + t);
        }
    };
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

确定观察者与被观察者之间的关系

    observable.subscribe(observer);//这里确定关系
  • 1

输出:

observer:from observable
observer:onCompleted

Subject

Subject 是 Observable 的一个扩展,同时还实现了 Observer 接口,可以把 SubjectSubject 同时作为观察者和 被观察者,可以用来转发事件,当做 Rx 中的 事件管道。

    //Subject = Observable + Observer
    Subject<String, String> subject = new Subject<String, String>(new OnSubscribe<String>(){
        @Override
        public void call(Subscriber<? super String> t) {
            t.onNext("from subject");
        }
    }) {
        @Override
        public void onNext(String t) {
            System.out.println("subject:" + t);
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onCompleted() {
        }
        @Override
        public boolean hasObservers() {
            return false;
        }
    };
    observable.subscribe(subject);//subject作为观察者
    subject.subscribe(v-> System.out.println("subject:" + v));//subject作为被观察者
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

输出

subject:from observable
subject:from subject

PublishSubject

PublishSubject 是最直接的一个 Subject。当一个数据发射到 PublishSubject 中时,将立刻把这个数据发射到订阅到该 subject 上的所有 subscriber 中。

PublishSubject ps = PublishSubject.create();
ps.onNext(1);
ps.subscribe(System.out::println);
ps.onNext(2);
ps.onNext(3);
ps.onNext(4);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

这里的 System.out::println 是JDK1.8之后的 Lambda表达式中的函数引用
输出

2
3
4

ReplaySubject

ReplaySubject缓存所有发射给他的数据。当一个新的订阅者订阅的时候,缓存的所有数据都会发射给这个订阅者。 由于使用了缓存,所以每个订阅者都会收到所有的数据。

ReplaySubject rs = ReplaySubject.create();
rs.onNext(0);
rs.subscribe(v -> System.out.println("A:" + v));
rs.onNext(1);
rs.onNext(2);
rs.subscribe(v -> System.out.println("B:" + v));
rs.onNext(3);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

输出

A:0
A:1
A:2
B:0
B:1
B:2
A:3
B:3

BehaviorSubject

BehaviorSubject 只保留最后一个值。 等同于 ReplaySubject 的限制个数为 1 的情况。在创建的时候可以指定一个初始值,这样可以确保党订阅者订阅的时候可以立刻收到一个值。

BehaviorSubject bs = BehaviorSubject.create();
bs.onNext(0);
bs.onNext(1);
bs.subscribe(v-> System.out.println("A" + s));
bs.onNext(2);
bs.subscribe(v-> System.out.println("B" + s));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

输出

A1
A2
B2

AsyncSubject

AsyncSubject 也缓存最后一个数据。区别是 AsyncSubject 只有当数据发送完成时(onCompleted 调用的时候)才发射这个缓存的最后一个数据。可以使用 AsyncSubject 发射一个数据并立刻结束。

AsyncSubject as = AsyncSubject.create();
as.onNext(0);
as.onNext(1);
as.subscribe(s-> System.out.println("A" + s));
as.onNext(2);
as.onNext(3);
as.onCompleted();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

输出

A3

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/300891
推荐阅读
相关标签
  

闽ICP备14008679号