赞
踩
RxJava是一个通过可观察的序列方式实现异步和基于事件的响应式Reactive编程,
RxJava 是实现了reactive-streams-jvm 定义了规范,4个有主要接口
RxJavae3 提供了5个基础类,来进行相关的操作
UpStream DownStream
RxJava中的数据流包含一个源、0到多个中间操作,后面有一个数据消费者或操作组合(一个操作步骤step代表通过某种方式消费数据)。
PS:和Java中有Stream流水线的概念类似,一个源,0到多个中间操作,后面一个结束操作
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
operator2的左边的叫upstream,右边的叫downstream
移动的对象
在RxJava文件里面,item/emission/emits/event/signal/data/message 都是同义词,表示沿着数据流移动的对象
Backpressure
Flow Control流控的一种形式,表示一个操作步骤step可以处理多个事件(item).Backpressure允许控制数据流的内存使用量,特别是当一个操作步骤(step)不知道upstream会发送多少操作步骤item给它时。
在RxJava里面
Assembly time 组装态
数据流在进行一系列中间操作(intermediate operators)前的准备,就是所谓的组装态
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0);
在这个状态下,数据还没有开始流动,操作都没有执行。和Java Stream中的中间状态一样,数据还没有开始真正的处理
Subscription time 订阅态
当一个建立了内部调用链的flow调用subscribe()方法时的一个临时状态,叫订阅态
flow.subscribe(System.out::println)
这是触发订阅时的结果(参见doOnSubscribe)
Runtime 运行态
这是flow主动发出信息(item)、错误(error)或完成(completion)信号时的状态:
Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);
就是上面给定示例的主体执行时的情况。
简单的后台计算
RxJava的一个常用的场景是执行一些计算工作、一个网络请求或一个后台线程,然后在UI线程上显示结果(或错误)
import io.reactivex.rxjava3.schedulers.Schedulers;
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- wait for the flow to finish
调用链是流式API,类似于Builder模式。RxJava里面的类型是不可变的,每次方法调用都会产生一个新的Flowable,带有新加的行为
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
一般情况下,可以把云计算任务或阻塞IO通过subscribeOn方法放在其它线程里面执行。数据准备好后,可以确保通过observeOn在前台或GUI线程上对它们进行处理。
Schedulers 调度器
RxJava里面的操作没有直接使用Thread或ExecutorService,而是使用Scheduler,抽象化了多线程操作,RxJava3提供了多个Scheduler工具类
这些可用于所有JVM平台,但是一些特殊的平台,如Android,有属于平台挺特有的Schedulers定义,AndroidSchedulers.mainThread(), SwingScheduler.instance() 或 JavaFXSchedulers.gui()
同时,可以使用Schedulers.from(Executor)来会改传入定制的Executor。当有大量并且固定的线程池(非computation() and io())的场景下,可以使用此方法
代码最后出现的Thread.sleep(2000)不是意外。在RxJava里面,默认Scheduler运行在守护进程里面,意味着当主线程退出时,所有守护进程都会停止,后台计算可能不会发生。在上面的示例中,Sleep一些时间,可以方便查看console上的输出结果
在一个flow中并发
RxJava中的流本质上是顺序的,分为多个处理阶段,这些阶段可以彼此并行运行
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);
此示例流程在computation调度器上对1到10的数字进行平方运算,并在主线程(更精确地说,是blockingSubscribe的调用者线程)上使用结果。 但是,lambda表达式 v-> v * v不会并行运行。 它在一个计算线程中一个接一个地执行。
并行执行
并行计算1到10的平方比上面的要稍微复杂一点
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);
一般来说,在RxJava里面,并行表示单独执行流里面的任务,然后把结果汇总到一个新的流中。flatMap操作通过把从1到10的每个元素分配到单独的Flowalbe里面执行,然后汇总结果
注意:flatmap无法保证执行顺序,可能会和流中的其它流混合在一起结束。有2个可选操作
使用Flowable.parallel()操作,ParallelFlowable 可以达到同样的并行结果
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
Dependent sub-flows
flatMap是一个功能强大的操作,在很多场景下有用。比如,给定一个serivce然后返回一个Flowable,我们想用第一个服务发出的值调用另一个服务
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource
.flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
.map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
.subscribe(System.out::println);
Continuations
有时,当一个流可执行时,这个流可能会执行一些相关的计算,这种场景有时被称为Continuations,根据什么应该会发生和哪个类型会被调用,来决定调用不同的操作完成
Dependent
典型场景是给定一个值,调用另外一个服务,等待其返回值后继续执行
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
通常情况下,后面的操作会依赖前面的执行结果。上面的代码等价于如下代码
service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)
Non-dependent
在其他情况下,和前一个source或数据流的结果无关,会继续执行使用另外一个source来执行独立任务
Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
.subscribe(System.out::println, Throwable::printStackTrace);
在这种场景下,通过使用Observable 来代替Single
Deferred-dependent
有时,前一个数据流和后一个数据流会有隐式依赖,不适用于上面的场景,通常建议使用如下:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println);
上面的代码会输出0,因为Single.just在assembly time的时候就判断了,此时数据流还没有开始运行。需要一些操作来延迟判断,直到主要数据流完成
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println);
或
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.fromCallable(() -> count.get()))
.subscribe(System.out::println);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。