赞
踩
RXJava角色划分:
observale: 被观察者
observer: 观察者 子类Subject【科目】 回调走的是观察者
subscribe: 订阅
被观察者.订阅(观察者),被观察者通知观察者
被观察者类型:
热: 订阅完毕立即发送数据
冷: 创建以后,等待观察者订阅,
一旦订阅了,才发送数据
1. 1.添加依赖:
- compile 'io.reactivex.rxjava2:rxjava:2.0.8' //RxJava 的jar包
- compile 'io.reactivex.rxjava2:rxandroid:2.0.1' // RxJava对安卓支持
1.2.Observer 使用
- public void showDemo(View view) {
- //1. 创建被观察者
- // 一旦订阅,调用观察者的 onSubscribe方法
- Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() {
-
- @Override
- public void subscribe(ObservableEmitter<String> e) throws Exception {
- // 调用 观察者的 onNext 方法
- // 观察者 通知被 观察者 调用 onNext 方法
- e.onNext("健身");
- e.onNext("我要大保健");
- e.onNext("去酒吧");
- e.onComplete(); // 大保健完成
- // e.onError(new Throwable("错误"));
- }
- });
- // 2. 创建观察者
- Observer<String> observer=new Observer<String>() {
- Disposable dd;
- @Override
- public void onSubscribe(Disposable d) {
- Log.e("denganzhi","onSubscribe");
- dd= d;
- }
-
- @Override
- public void onNext(String s) {
- // 取消订阅
- if(s.contains("大保健")){
- dd.dispose();
- }
- Log.e("denganzhi1","onNext:"+s);
- }
-
- @Override
- public void onError(Throwable e) {
- Log.e("denganzhi","onError");
- }
-
- @Override
- public void onComplete() {
- Log.e("denganzhi","onComplete");
- }
- };
- // 3. 订阅
- observable.subscribe(observer);
- }
Observable调用了e.onNext方法:健身
Observable调用了e.onNext方法:我要大保健
1.3.Consumer 的 Observer 子类,避免实现过多方法
- // 1.第一步:创建被观察者
- Observable<String> observable= Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
- e.onNext("健身");
- e.onNext("我要大保健");
- e.onNext("去酒吧");
- e.onComplete(); // 大保健完成
- }
- });
- // 简单的用法
- // Consumer 是 Observer的子类
- observable.subscribe(new Consumer<String>() {
- @Override
- public void accept(@NonNull String s) throws Exception {
- Log.e("denganzhi1","accept-->onNext:"+s);
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(@NonNull Throwable throwable) throws Exception {
-
- }
- }, new Action() {
- @Override
- public void run() throws Exception {
- Log.e("denganzhi1","accept-->onComplete");
- }
- });
accept-->onNext:健身
accept-->onNext:我要大保健
accept-->onNext:去酒吧
accept-->onComplete
2. 数据发送Api
- // 发送一条数据或者多条数据
- // Observable<String> observable1= Observable.just("上网","大保健","回家");
- // 发送Array
- Observable<String> observable1= Observable.fromArray("上网","大保健","回家");
- observable1.subscribe(new Consumer<String>() {
- @Override
- public void accept(@NonNull String s) throws Exception {
- Log.e("denganzhi1","just--》"+s);
- }
- });
Subject 它是一个Observer 也可以做为 Subject订阅的一个Observable
Subject实现:
PublishSubject publishSubject;
订阅以后不会直接调用onNext方法,必须要手动调用 publishSubject.onNext("hello")
才回调 subscribe的 onNext方法
3.1.1.创建 PublishSubject
- PublishSubject publishSubject;
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_cold);
- // 冷的,订阅以后不会直接调用onNext方法,必须要手动调用 publishSubject.onNext("hello");
- publishSubject =PublishSubject.create();
- }
3.1.2. 订阅 onSubscribe方法调用
- public void code_subscribe(View view){
- // 冷的,分开2步实现
- // 1. 订阅 subscribe 2. 发送 onNext
- publishSubject.subscribe(new Observer() {
- @Override
- public void onSubscribe(Disposable d) {
- Log.e(TAG,"onSubscribe");
- }
- @Override
- public void onNext(Object o) {
- Log.e(TAG,"onNext:"+(String)o);
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG,"onError");
- }
- @Override
- public void onComplete() {
- Log.e(TAG,"onComplete");
- }
- });
- }
E/denganzhi1: onSubscribe
3.1.3. 被观察这通知观察着
- public void code_send(View view){
- // PublishSubject 使用
- publishSubject.onNext("PublishSubject-->OnNext");
-
- }
输出 :
onNext:PublishSubject-->OnNext
3.2.1. 创建 BehaviorSubject
- BehaviorSubject<String> behaviorSubject;
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_cold);
- // 冷的,默认值,订阅以后会默认调用onNext方法
- behaviorSubject= BehaviorSubject.createDefault("默认发送");
- }
-
- public void code_subscribe(View view){
-
- // 冷的,分开2步实现
- // 订阅以后会调用
- behaviorSubject.subscribe(new Observer<String>() {
- @Override
- public void onSubscribe(Disposable d) {
- Log.e(TAG, "onSubscribe......onSubscribe");
- }
-
- @Override
- public void onNext(String s) {
- Log.e(TAG, "onNext......onNext:"+s);
- }
-
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError......onError");
- }
-
- @Override
- public void onComplete() {
- Log.e(TAG, "完成......onComplete");
- }
- });
输出:
//onSubscribe......onSubscribe
// onNext......onNext:默认发送,订阅以后默认调用onNext方法
3.2.2. 订阅:
- public void code_subscribe(View view){
- behaviorSubject.subscribe(new Observer<String>() {
- @Override
- public void onSubscribe(Disposable d) {
- Log.e(TAG, "onSubscribe......onSubscribe");
- }
-
- @Override
- public void onNext(String s) {
- Log.e(TAG, "onNext......onNext:"+s);
- }
-
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError......onError");
- }
-
- @Override
- public void onComplete() {
- Log.e(TAG, "完成......onComplete");
- }
- });
-
-
- }
3.2.3. 发送:
- // 手动调用
- public void code_send(View view){
- behaviorSubject.onNext("BehaviorSubject...BehaviorSubject");
- }
// onNext......onNext:BehaviorSubject...BehaviorSubject
3.3.1. 创建ReplaySubject
- ReplaySubject<String> replaySubject;
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_cold);
- // 会把每次发送给观察者的数据缓存起来,一旦有新的观察者订阅,会立刻把缓存中的消息发送给对应观察者
- replaySubject= ReplaySubject.create();
- }
3.3.2. 第一次订阅:
- public void code_subscribe(View view){
- // 订阅1
- replaySubject.subscribe(new Observer<String>() {
- @Override
- public void onSubscribe(Disposable d) {
- Log.e(TAG, "onSubscribe111");
- }
-
- @Override
- public void onNext(String s) {
- Log.e(TAG, "onNext:"+s);
- }
-
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "Throwable:");
- }
-
- @Override
- public void onComplete() {
- Log.e(TAG, "onComplete:");
- }
- });
- }
E/denganzhi1: onSubscribe
3.3.3. 发送
- public void code_send(View view){
- replaySubject.onNext("ReplaySubject ... 测试");
- }
onNext:ReplaySubject ... 测试
3.3.4. 第二次订阅
- public void re_code_subscribe(View view){
- // 订阅2 : 再次订阅,会发送之前订阅的消息
- replaySubject.subscribe(new Observer<String>() {
- @Override
- public void onSubscribe(Disposable d) {
- Log.e(TAG, "re========>onSubscribe");
- }
-
- @Override
- public void onNext(String s) {
- Log.e(TAG, "re========>onNext:"+s);
- }
-
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "re========>Throwable:");
- }
-
- @Override
- public void onComplete() {
- Log.e(TAG, "re========>onComplete:");
- }
- });
- }
re========>onSubscribe
re========>onNext:ReplaySubject ... 测试 // 把第一次发送的缓存起来,第二次发送,也可以接受第一次的
3.4.1 创建AsyncSubject
- 1. 创建AsyncSubject
- AsyncSubject<String> asyncSubject;
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_cold);
- //1. 调用onNext方法以后必须要调用onComplete();
- //2. 多个onNext发送调用,只能收到最后一个onNext
- asyncSubject= AsyncSubject.create();
-
- }
3.4.2 订阅:
- public void code_subscribe(View view){
- asyncSubject.subscribe(new Observer<String>() {
- @Override
- public void onSubscribe(Disposable d) {
- Log.e(TAG, "onSubscribe");
- }
-
- @Override
- public void onNext(String s) {
- Log.e(TAG, "onNext:"+s);
- }
-
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "Throwable:");
- }
-
- @Override
- public void onComplete() {
- Log.e(TAG, "onComplete:");
- }
- });
- }
E/denganzhi1: onSubscribe
3.4.3 发送:
- 3. public void code_send(View view){
- asyncSubject.onNext("asyncSubject测试1");
- asyncSubject.onNext("asyncSubject测试2");
- asyncSubject.onNext("asyncSubject测试3");
- asyncSubject.onComplete();
- }
denganzhi1: onNext:asyncSubject测试3 // 多次调用onNext,在调用onComplete以后,只调用一次onNext
denganzhi1: onComplete:
情景: 服务中不断收到蓝牙会的数据,然后发送到 订阅的各个Activity, Activity会以后如何取消订阅
1. MyApplication 启动 MyService 服务
- // 启动 MyService 服务
- public class MyApplciaton extends Application {
- @Override
- public void onCreate() {
- super.onCreate();
- Intent intent=new Intent(this,MyService.class);
- startService(intent);
- }
- }
2. MyService中实现,蓝牙数据上报接口数据
- package lanya.denganzhi.com.rxjava2;
-
- import android.app.Service;
- import android.content.Intent;
- import android.os.IBinder;
- import android.util.Log;
- import android.widget.Toast;
-
- import io.reactivex.subjects.PublishSubject;
-
- public class MyService extends Service {
- public MyService() {
- }
- @Override
- public IBinder onBind(Intent intent) {
- // TODO: Return the communication channel to the service.
- throw new UnsupportedOperationException("Not yet implemented");
- }
- String TAG="denganzhi";
- @Override
- public void onCreate() {
- Log.e(TAG,"onCreate");
- super.onCreate();
- }
- @Override
- public int onStartCommand(Intent intent, int flags, int startId) {
- Log.e(TAG,"onStartCommand");
- return super.onStartCommand(intent, flags, startId);
- }
-
- private PublishSubject<String> dataChangeSubject = PublishSubject.create();
- public PublishSubject<String> listeneDataChange() {
- return dataChangeSubject;
- }
-
- // 蓝牙的所有数据上报到这个接口
- public void publish(final String topic, final String msg) {
- // 向订阅PublishSubject 的主题发送
- dataChangeSubject.onNext("蓝牙发送cmd"+0x08);
- }
- private static MyService rxMqtt;
- /**
- * 获取Service 实例
- * @return
- */
- public static MyService getRxMqtt() {
- if (rxMqtt == null) {
- synchronized (MyService.class) {
- if (rxMqtt == null) {
- rxMqtt = new MyService();
- }
- }
- }
- return rxMqtt;
- }
- }
3. MainActivity 订阅 dataChangeSubject
- String TAG="denganzhi";
- Disposable serviceDisposable;
- // 订阅在当前Activity
- public void showDemo(View view) {
- // 需要的时候才订阅,每次使用取消订阅,如果要一直订阅,那就不要取消了
- toDisposable(serviceDisposable);
- serviceDisposable= MyService.getRxMqtt().listeneDataChange()
- .filter(new AppendOnlyLinkedArrayList.NonThrowingPredicate<String>() {
- @Override
- public boolean test(String s) {
- return true;
- }
- })
- .timeout(5 * 1000, TimeUnit.MILLISECONDS)
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG,"listeneDataChange-->accept:"+s);
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(Throwable throwable) throws Exception {
- Log.e(TAG,"listeneDataChange-->Consumer");
- }
- }
-
- );
- compositeDisposable.add(serviceDisposable);
- }
-
- public void toDisposable(Disposable disposable) {
- if (disposable != null && !disposable.isDisposed()) {
- disposable.dispose();
- }
- }
-
- @Override
- protected void onDestroy() {
- toDisposable(serviceDisposable);
- super.onDestroy();
- }
4. 模拟数据上报,接收数据,模拟蓝牙发送 Activity中的Button
- public void sendDemo(View view) {
- MyService.getRxMqtt().publish("/push/topic","蓝牙发送数据....");
- }
运行效果 : MainActivity中的subscribe 订阅中回调接收数据
listeneDataChange-->accept:蓝牙发送cmd8
listeneDataChange-->accept:蓝牙发送cmd8
listeneDataChange-->accept:蓝牙发送cmd8
listeneDataChange-->accept:蓝牙发送cmd8
5.1. RxJava操作符,数组相关的:
Observable.just("上网","大保健","回家");
fromIterable : 遍历List
*** Observable.fromArray("上网","大保健","回家"); // 数据遍历器
配合使用: .take(2) 取前2条 firstElement【第一条】
.takeLast 去最后2条 lastElement【最后一条】
.distinct() 过滤掉Array中重复数据
.skip(2) 跳过Array中1,2个元素,从第3个开始
skipLast(2) 最后2个不需要
.elementAt(2): 只有第3个元素
.elementAt(10,"小明"): 发送如果角标越结,发送默认的"小明"
.range(5,5): 从数据角标5位置开始发送5次
filter: 过滤
5.2.Observale 操作符与重复相关:
repeat(2) : 被观察这 发送2次
defer: 懒加载,当观察者订阅,才去创建Observable
interval: 每隔3s调用一次 onNext
timer: 定时器,3s以后执行一次
intervalRange(100,3,3,3,TimeUnit.SECONDS):从100开始,执行3次,第一个3s后执行,每次间隔3s 输出100 101 102
.timeout(5 * 1000, TimeUnit.MILLISECONDS): 超时
Observable.empty(): //会调用 onSubscribe 和 onComplete 方法
- private Observable<String> observableString;
- public void testRxJava(View view){
- // observable.repeat():
- // repeat(2) : 被观察这 发送2次
- Observable.just("xiaoming","xiaohei")
- .repeat(2) // 发送2次
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(@NonNull String s) throws Exception {
- // Log.e(TAG,"内容是:"+s);
- }
- });
-
- // defer(): 当观察者订阅,才去创建Observable 懒加载
- Observable observable1=Observable.defer(new Callable<ObservableSource<?>>() {
- @Override
- public ObservableSource<?> call() throws Exception {
- return Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
- e.onNext("下一步");
- e.onComplete();
- }
- });
- }
- });
- observable1.subscribe(new Consumer() {
- @Override
- public void accept(@NonNull Object o) throws Exception {
- // Log.e("denganzhi1","结果是:"+(String)o);
- }
- });
-
-
- // 从开始位置开始发送,向后发送多少个
- // 打印5,6,7,8,9
- Observable.range(5,5)
- .subscribe(new Consumer<Integer>() {
- @Override
- public void accept(@NonNull Integer integer) throws Exception {
- // Log.e("denganzhi1","result:"+integer);
- }
- });
-
- // 第一个:
- // 第二个参数:
- // 第三个:用到的单位
- // 无限轮训
- Observable observable_interval= Observable.interval(3,TimeUnit.SECONDS);
- Observer<Long> observer=new Observer<Long>() {
-
- Disposable dd;
- @Override
- public void onSubscribe(Disposable d) {
- this.dd=d;
- }
-
- @Override
- public void onNext(Long s) {
- // Log.e(TAG,"result:"+s); // 这里返回 s 从0 开始 1,2,3,4
- if(s==5){
- dd.dispose();
- }
- }
-
- @Override
- public void onError(Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
-
- }
- };
- observable_interval.subscribe(observer);
-
-
- //(long start, long count, long initialDelay, long period, TimeUnit unit)
- // 从100开始,执行3次,第一个3s后执行,每次间隔3s
- Observable.intervalRange(100,3,3,3,TimeUnit.SECONDS)
- .subscribe(new Consumer<Long>() {
- @Override
- public void accept(@NonNull Long aLong) throws Exception {
- // Log.e(TAG,"result:"+aLong);
- }
- });
-
- // 隔开4S执行一次
- Observable.timer(4,TimeUnit.SECONDS)
- .subscribe(new Consumer<Long>() {
- @Override
- public void accept(@NonNull Long aLong) throws Exception {
- // Log.e(TAG,"result:"+aLong);
- }
- });
-
-
- // 过滤 filter
- User u1=new User(1,"小明");
- User u2=new User(2,"xiaohei");
- Observable.just(u1,u2)
- .filter(new Predicate<User>() {
- @Override
- public boolean test(@NonNull User user) throws Exception {
- // 数据遍历,返回需要数据
- return user.getUsername().contains("小");
- }
- }).subscribe(new Consumer<User>() {
- @Override
- public void accept(@NonNull User user) throws Exception {
- // Log.e(TAG,"过滤以后的结果是:"+user.getUsername());
- }
- });
-
-
- Observable.fromArray("xiaoming","xiaohei","a","b").filter(new AppendOnlyLinkedArrayList.NonThrowingPredicate<String>() {
- @Override
- public boolean test(String s) {
- return true;
- }
- })
- .take(2).subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- // Log.e("filte-->"+TAG,s);
- }
- });
-
-
- Observable.fromArray("xiaoming","xiaohei","a","b").filter(new AppendOnlyLinkedArrayList.NonThrowingPredicate<String>() {
- @Override
- public boolean test(String s) {
- return true;
- }
- })
- .takeLast(2).subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- // Log.e("filte-->"+TAG,s);
- }
- });
-
-
- Observable.fromArray("xiaoming","xiaohei","a","a","a").distinct()
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- // Log.e("distinct-->"+TAG,s);
- }
- });
-
-
-
- Observable.fromArray("xiaoming","xiaohei","a","a","a")
- .firstElement()
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- // Log.e("firstElement-->"+TAG,s);
- }
- });
-
-
- // skip(n): 跳过前面2个
- Observable.just("xiaoming","小黑","xiaoming3")
- .skipLast(2)
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(@NonNull String s) throws Exception {
- // Log.e(TAG,"skip:"+s);
- }
- });
-
-
-
- // 过滤重复数据
- Observable.just("xiaoming","小黑","xiaoming")
- .distinct()
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(@NonNull String s) throws Exception {
- // Log.e(TAG,s);
- }
- });
-
-
- Observable.fromArray("xiaoming1","xiaohei2","3","4","5")
- .elementAt(10,"小明")
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- // Log.e("elementAt-->"+TAG,s);
- }
- });
-
-
- // 会调用 onSubscribe 和 onComplete 方法
- Observable.empty().subscribe(new Observer<Object>() {
- @Override
- public void onSubscribe(Disposable d) {
- Log.e(TAG,"onSubscribe");
- }
- @Override
- public void onNext(Object o) {
- Log.e(TAG,"onNext");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG,"onError");
- }
- @Override
- public void onComplete() {
- Log.e(TAG,"onComplete");
- }
- });
- }
5.3. RxJava 变化操作符 Map、flatMap 变化符使用,数据变换
功能: 比如 把 Integer 变成 User 在变成 String
- //RxJava变化操作符使用
- public void showDemo3(View view){
- // id(Intger) --> User
- // Function<Integer, User> 第一个是参数 ,第二个是函数返回结果
- Observable.just(400).map(new Function<Integer, User>() {
- @Override
- public User apply(Integer integer) throws Exception {
- return new User(integer,"xiaoming");
- }
- }).subscribe(new Consumer<User>() {
- @Override
- public void accept(User user) throws Exception {
- Log.e(TAG,"转化之后的值:"+user);
- }
- });
-
-
- // 根据id获取token
- // 根据token获取用户信息
- //可以不断的做下去,不断的封装数据
- // Function<Integer, ObservableSource<User>> 第一参数,第二个返回值
- Observable.just(100).flatMap(new Function<Integer, ObservableSource<User>>() {
- @Override
- public ObservableSource<User> apply(Integer integer) throws Exception {
- return Observable.just(new User(integer,"xiaoming"));
- }
- }).flatMap(new Function<User, ObservableSource<String>>() {
- @Override
- public ObservableSource<String> apply(User user) throws Exception {
- return Observable.just(user.getUsername());
- }
- }).subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG,"result:"+s);
- }
- });
- }
1. Merge 顺序触发,首先执行observal1,在执行observal2
Merge observal1 抛出异常,那么不会执行 observal2,使用mergeDelayError‘’
- Observable<String> observable_1= Observable.create(new ObservableOnSubscribe() {
- @Override
- public void subscribe(ObservableEmitter e) throws Exception {
- // int j=1/0;、
- Log.e(TAG,"组合1触发了");
- SystemClock.sleep(5000);
- int i=1/0;
- e.onNext("组合1");
- // Log.e(TAG,"组合1");
- }
- });
-
-
- Observable<String> observable_2= Observable.create(new ObservableOnSubscribe() {
- @Override
- public void subscribe(ObservableEmitter e) throws Exception {
- Log.e(TAG,"组合2触发了");
- SystemClock.sleep(3000);
- e.onNext("组合2");
- // Log.e(TAG,"组合2");
- }
- });
-
- // observable_1 执行完毕,在执行 observable_2 有顺序
- Observable.mergeDelayError(observable_1,observable_2).subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG, "mergeDelayError:" + s);
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(Throwable throwable) throws Exception {
- Log.e(TAG, "concat:" +throwable);
- }
- });
输出:
组合1触发了
组合2触发了
mergeDelayError:组合2
2. concat: observal1 执行, observal2没有执行
3. zip 首先执行observal1,在执行observal2,然后 1,2 整合 BiFunction
- Observable<String> observable_1= Observable.create(new ObservableOnSubscribe() {
- @Override
- public void subscribe(ObservableEmitter e) throws Exception {
- // int j=1/0;、
- Log.e(TAG,"组合1触发了");
- SystemClock.sleep(5000);
- int i=1/0;
- e.onNext("组合1");
- // Log.e(TAG,"组合1");
- }
- });
- Observable<String> observable_2= Observable.create(new ObservableOnSubscribe() {
- @Override
- public void subscribe(ObservableEmitter e) throws Exception {
- Log.e(TAG,"组合2触发了");
- SystemClock.sleep(3000);
- e.onNext("组合2");
- // Log.e(TAG,"组合2");
- }
- });
- Observable.zip(observable_1, observable_2, new BiFunction<String, String, String>() {
- @Override
- public String apply(String s1, String s2) throws Exception {
- return s1+s2;
- }
- }).subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.e(TAG,"zip:"+s);
- }
- });
输出:
组合1触发了
组合2触发了
zip:组合1组合2
Zip数据合并:
- List<String> list1= Arrays.asList("a","b","c");
- List<String> list2= Arrays.asList("1","2","3");
- // 输出list
- Observable.fromIterable(list1).subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- // Log.e(TAG,"fromIterable:"+s);
- }
- });
- // 数据合并
- Observable.zip(Observable.fromIterable(list1),
- Observable.fromIterable(list2),
- new BiFunction<String, String, String>() {
- @Override
- public String apply(String s, String s2) throws Exception {
- return s+s2;
- }
- }).subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- // Log.e(TAG,"zip:"+s);
- /**
- * E/denganzhi: zip:a1
- E/denganzhi: zip:b2
- E/denganzhi: zip:c3
- */
- }
- });
RxJava 线程切换:
Schedulers.newThread(): 总是启动新线程执行操作
.subscribeOn(Schedulers.io()): 行为模式和newThread()差不多,区别在与Schedulers.io内部有线程池,可以重用
observeOn(AndroidSchedulers.mainThread()): Aroid提供切换到UI线程
线程控制函数subscribeOn : 指定事件创建者线程, 可以左右于它前后的代码,直到observeOn 才结束,切换新线程
observeOn: 执行观察着线程,只能作用于它之后的代码
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。