赞
踩
在响应编程中,消费者对数据进行反应,这就是为什么异步编程也被称为响应式编程的原因。 响应式编程允许将事件更改传播到已注册的观察者。
RxJava是从Netflix的反向扩展(Rx)到Java的端口。 RxJava是2014年开源的,托管于http://reactivex.io/。
“观察者模式做的正确。 ReactiveX是来自Observer模式,Iterator模式和功能编程的最佳创意的组合。“
--activex.io
这个概念的Java版本叫做RxJava,它托管在https://github.com/ReactiveX/RxJava下。 RxJava根据Apache 2.0许可证发布。
RxJava将自己描述为用于具有可观察流的异步编程的API。
在撰写本文时,2.0.4版本目前是发布版本。 将g.a.v替换为2.0.6或更高版本。
对于Gradle构建,您可以通过以下依赖关系语句添加RxJava。
compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: 'g.a.v'
对于Maven,您可以添加以下代码段的依赖关系
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>g.a.v</version>
</dependency>
对于OSGi环境,例如Eclipse RCP开发,https://dl.bintray.com/simon-scholz/RxJava-OSGi/可用作p2更新站点。
一个例子是:
- public List<Todo> getTodos() {
-
- List<Todo> todosFromWeb = // query a webservice (with bad network latency)
-
- return todosFromDb;
- }
从主线程或UI线程调用getTodos()方法将导致一个非响应的应用程序,直到todosFromWeb到达。
- public void getTodos(Consumer<List<Todo>> todosCallback) {
-
- Thread thread = new Thread(()-> {
- List<Todo> todosFromWeb = // query a webservice
-
- todosCallback.accept(todosFromWeb);
- });
- thread.start();
- }
现在调用
getTodos(Consumer <List <Todo >> todosConsumer)后,主线程可以继续工作,一旦调用了给定的Consumer的accept方法,就不会被阻塞,并且可以做出反应。
- public void getTodos(FailableCallback<List<Todo>> todosCallback) {
-
- Thread thread = new Thread(()-> {
- try {
- List<Todo> todosFromWeb = // query a web service
-
- todosCallback.accept(todosFromWeb);
- } catch(Exception ex) {
- todosCallback.error(ex);
- }
- });
- thread.start();
- }
使用自定义
FailableCallback界面可以工作,但也增加了复杂性。
- public void getUserPermission(FailableCallback<UserPermission> permissionCallback) {
- Thread thread = new Thread(()-> {
- try {
- UserPermission permission = // query a web service
-
- permissionCallback.accept(permission);
- } catch(Exception ex) {
- permission.error(ex);
- }
- });
- thread.start();
- }
-
- public void getTodos(FailableCallback<List<Todo>> todosCallback) {
-
- Thread thread = new Thread(()-> {
- getUserPermission(new FailableCallback() {
-
- public void accept(UserPermission permission) {
- if(permission.isValid()) {
- try {
- List<Todo> todosFromWeb = // query a web service
-
- if(!todosCallbackInstance.isDisposed()) {
- if(syncWithUIThread()) {
- todosCallback.accept(todosFromWeb);
- }
- }
- } catch(Exception ex) {
- if(!todosCallbackInstance.isDisposed()) {
- if(syncWithUIThread()) {
- todosCallback.error(ex);
- }
- }
- }
- }
- }
-
- public void error(Exception ex) {
- // Oh no!
- }
- });
- });
- thread.start();
- }

这是非常糟糕的编码,它可能会变得更糟,应该只显示一个可以用ReactiveX解决的例子。 这些问题通常被认为是回调地狱/大坑。
- Observable<Todo> todoObservable = Observable.create(emitter -> {
- try {
- List<Todo> todos = getTodos();
- for (Todo todo : todos) {
- emitter.onNext(todo);
- }
- emitter.onComplete();
- } catch (Exception e) {
- emitter.onError(e);
- }
- });
典型的Observable可能会发出无限数据,就像一个点击监听器一样,UI监听器是不可预测的,通常用户可能会点击按钮或其他UI小部件。通常终止成功或失败的类型
Maybe<T>,
Single<T>和
Completable。- Maybe<List<Todo>> todoMaybe = Maybe.create(emitter -> {
- try {
- List<Todo> todos = getTodos();
- if(todos != null && !todos.isEmpty()) {
- emitter.onSuccess(todos); (1)
- }else {
- emitter.onComplete(); (2)
- }
- } catch (Exception e) {
- emitter.onError(e); (3)
- }
- });
(1) | java.util.Optional 与一个值 |
(2) | java.util.Optional 不包含值→null |
(3) | 发生错误 |
- Observable<Todo> todoObservable = Observable.create(emitter -> { ... });
-
- // Simply subscribe with a io.reactivex.functions.Consumer<T>, which will be informed onNext()
- Disposable disposable = todoObservable.subscribe(t -> System.out.print(t));
-
- // Dispose the subscription when not interested in the emitted data any more
- disposable.dispose();
-
- // Also handle the error case with a second io.reactivex.functions.Consumer<T>
- Disposable subscribe = todoObservable.subscribe(t -> System.out.print(t), e -> e.printStackTrace());
-
- // ...
- DisposableObserver<Todo> disposableObserver = todoObservable.subscribeWith(new DisposableObserver<Todo>() {
-
- @Override
- public void onNext(Todo t) {
- }
-
- @Override
- public void onError(Throwable e) {
- }
-
- @Override
- public void onComplete() {
- }
- });
- import io.reactivex.Single;
- import io.reactivex.disposables.Disposable;
- import io.reactivex.observers.DisposableSingleObserver;
-
- Single<List<Todo>> todosSingle = getTodos();
-
- Disposable disposable = todosSingle.subscribeWith(new DisposableSingleObserver<List<Todo>>() {
-
- @Override
- public void onSuccess(List<Todo> todos) {
- // work with the resulting todos
- }
-
- @Override
- public void onError(Throwable e) {
- // handle the error case
- }
- });
-
- // continue working and dispose when value of the Single is not interesting any more
- disposable.dispose();

- import io.reactivex.Single;
- import io.reactivex.disposables.Disposable;
- import io.reactivex.observers.DisposableSingleObserver;
- import io.reactivex.disposables.CompositeDisposable;
-
- CompositeDisposable compositeDisposable = new CompositeDisposable();
-
- Single<List<Todo>> todosSingle = getTodos();
-
- Single<Happiness> happiness = getHappiness();
-
- compositeDisposable.add(todosSingle.subscribeWith(new DisposableSingleObserver<List<Todo>>() {
-
- @Override
- public void onSuccess(List<Todo> todos) {
- // work with the resulting todos
- }
-
- @Override
- public void onError(Throwable e) {
- // handle the error case
- }
- }));
-
- compositeDisposable.add(happiness.subscribeWith(new DisposableSingleObserver<Happiness>() {
-
- @Override
- public void onSuccess(Happiness happiness) {
- // celebrate the happiness :-D
- }
-
- @Override
- public void onError(Throwable e) {
- System.err.println("Don't worry, be happy! :-P");
- }
- }));
-
- // continue working and dispose all subscriptions when the values from the Single objects are not interesting any more
- compositeDisposable.dispose();

- Single<List<Todo>> todosSingle = Single.create(emitter -> {
- Thread thread = new Thread(() -> {
- try {
- List<Todo> todosFromWeb = // query a webservice
-
- System.out.println("Called 4 times!");
-
- emitter.onSuccess(todosFromWeb);
- } catch (Exception e) {
- emitter.onError(e);
- }
- });
- thread.start();
- });
-
- todosSingle.subscribe(... " Show todos times in a bar chart " ...);
-
- showTodosInATable(todosSingle);
-
- todosSingle.subscribe(... " Show todos in gant diagram " ...);
-
- anotherMethodThatsSupposedToSubscribeTheSameSingle(todosSingle);

- Single<List<Todo>> todosSingle = Single.create(emitter -> {
- Thread thread = new Thread(() -> {
- try {
- List<Todo> todosFromWeb = // query a webservice
-
- System.out.println("I am only called once!");
-
- emitter.onSuccess(todosFromWeb);
- } catch (Exception e) {
- emitter.onError(e);
- }
- });
- thread.start();
- });
-
- // cache the result of the single, so that the web query is only done once
- Single<List<Todo>> cachedSingle = todosSingle.cache();
-
- cachedSingle.subscribe(... " Show todos times in a bar chart " ...);
-
- showTodosInATable(cachedSingle);
-
- cachedSingle.subscribe(... " Show todos in gant diagram " ...);
-
- anotherMethodThatsSupposedToSubscribeTheSameSingle(cachedSingle);

From / To | Flowable | Observable | Maybe | Single | Completable |
---|---|---|---|---|---|
Flowable | toObservable() | reduce() | scan() | ignoreElements() | |
Observable | toFlowable() | reduce() | scan() | ignoreElements() | |
Maybe | toFlowable() | toObservable() | toSingle() | toCompletable() | |
Single | toFlowable() | toObservable() | toMaybe() | toCompletable() | |
Completable | toFlowable() | toObservable() | toMaybe() | toSingle() |
- Observable<String> obs = ...// assume creation code here
- TestSubscriber<String> testSubscriber = new TestSubscriber<>();
- obs.subscribe(testSubscriber);
-
- testSubscriber.assertNoErrors();
- List<String> chickens = testSubscriber.getOnNextEvents();
-
- // TODO assert your string integrity...
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。