当前位置:   article > 正文

RxJava2使用归纳总结

rxjava2

1. 基础理论

  RXJava角色划分:
       observale: 被观察者
       observer: 观察者   子类Subject【科目】  回调走的是观察者
   subscribe: 订阅
       被观察者.订阅(观察者),被观察者通知观察者
  被观察者类型: 
       热: 订阅完毕立即发送数据
       冷: 创建以后,等待观察者订阅,
       一旦订阅了,才发送数据

2. 基础Api使用        订阅、取消订阅

1. 1.添加依赖:

  1. compile 'io.reactivex.rxjava2:rxjava:2.0.8' //RxJava 的jar包
  2. compile 'io.reactivex.rxjava2:rxandroid:2.0.1' // RxJava对安卓支持

1.2.Observer 使用

  1. public void showDemo(View view) {
  2. //1. 创建被观察者
  3. // 一旦订阅,调用观察者的 onSubscribe方法
  4. Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() {
  5. @Override
  6. public void subscribe(ObservableEmitter<String> e) throws Exception {
  7. // 调用 观察者的 onNext 方法
  8. // 观察者 通知被 观察者 调用 onNext 方法
  9. e.onNext("健身");
  10. e.onNext("我要大保健");
  11. e.onNext("去酒吧");
  12. e.onComplete(); // 大保健完成
  13. // e.onError(new Throwable("错误"));
  14. }
  15. });
  16. // 2. 创建观察者
  17. Observer<String> observer=new Observer<String>() {
  18. Disposable dd;
  19. @Override
  20. public void onSubscribe(Disposable d) {
  21. Log.e("denganzhi","onSubscribe");
  22. dd= d;
  23. }
  24. @Override
  25. public void onNext(String s) {
  26. // 取消订阅
  27. if(s.contains("大保健")){
  28. dd.dispose();
  29. }
  30. Log.e("denganzhi1","onNext:"+s);
  31. }
  32. @Override
  33. public void onError(Throwable e) {
  34. Log.e("denganzhi","onError");
  35. }
  36. @Override
  37. public void onComplete() {
  38. Log.e("denganzhi","onComplete");
  39. }
  40. };
  41. // 3. 订阅
  42. observable.subscribe(observer);
  43. }

Observable调用了e.onNext方法:健身
Observable调用了e.onNext方法:我要大保健

 1.3.Consumer 的 Observer 子类,避免实现过多方法

  1. // 1.第一步:创建被观察者
  2. Observable<String> observable= Observable.create(new ObservableOnSubscribe<String>() {
  3. @Override
  4. public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
  5. e.onNext("健身");
  6. e.onNext("我要大保健");
  7. e.onNext("去酒吧");
  8. e.onComplete(); // 大保健完成
  9. }
  10. });
  11. // 简单的用法
  12. // Consumer 是 Observer的子类
  13. observable.subscribe(new Consumer<String>() {
  14. @Override
  15. public void accept(@NonNull String s) throws Exception {
  16. Log.e("denganzhi1","accept-->onNext:"+s);
  17. }
  18. }, new Consumer<Throwable>() {
  19. @Override
  20. public void accept(@NonNull Throwable throwable) throws Exception {
  21. }
  22. }, new Action() {
  23. @Override
  24. public void run() throws Exception {
  25. Log.e("denganzhi1","accept-->onComplete");
  26. }
  27. });

accept-->onNext:健身
accept-->onNext:我要大保健
accept-->onNext:去酒吧
accept-->onComplete

   2. 数据发送Api

  1. // 发送一条数据或者多条数据
  2. // Observable<String> observable1= Observable.just("上网","大保健","回家");
  3. // 发送Array
  4. Observable<String> observable1= Observable.fromArray("上网","大保健","回家");
  5. observable1.subscribe(new Consumer<String>() {
  6. @Override
  7. public void accept(@NonNull String s) throws Exception {
  8. Log.e("denganzhi1","just--》"+s);
  9. }
  10. });

3 RxJava 冷的Subject

 Subject 它是一个Observer 也可以做为 Subject订阅的一个Observable
 Subject实现:
 PublishSubject publishSubject;
 
  订阅以后不会直接调用onNext方法,必须要手动调用 publishSubject.onNext("hello")
  才回调 subscribe的 onNext方法

 3.1. PublishSubject,订阅以后不会直接调用onNext方法,必须要手动调用 publishSubject.onNext("hello");

    3.1.1.创建 PublishSubject

  1. PublishSubject publishSubject;
  2. @Override
  3. protected void onCreate(Bundle savedInstanceState) {
  4. super.onCreate(savedInstanceState);
  5. setContentView(R.layout.activity_cold);
  6. // 冷的,订阅以后不会直接调用onNext方法,必须要手动调用 publishSubject.onNext("hello");
  7. publishSubject =PublishSubject.create();
  8. }

    3.1.2. 订阅  onSubscribe方法调用

  1. public void code_subscribe(View view){
  2. // 冷的,分开2步实现
  3. // 1. 订阅 subscribe 2. 发送 onNext
  4. publishSubject.subscribe(new Observer() {
  5. @Override
  6. public void onSubscribe(Disposable d) {
  7. Log.e(TAG,"onSubscribe");
  8. }
  9. @Override
  10. public void onNext(Object o) {
  11. Log.e(TAG,"onNext:"+(String)o);
  12. }
  13. @Override
  14. public void onError(Throwable e) {
  15. Log.e(TAG,"onError");
  16. }
  17. @Override
  18. public void onComplete() {
  19. Log.e(TAG,"onComplete");
  20. }
  21. });
  22. }

 E/denganzhi1: onSubscribe   

3.1.3. 被观察这通知观察着

  1. public void code_send(View view){
  2. // PublishSubject 使用
  3. publishSubject.onNext("PublishSubject-->OnNext");
  4. }

 输出 :

onNext:PublishSubject-->OnNext     

3.2. BehaviorSubject 默认值,订阅以后会默认调用onNext方法

3.2.1. 创建 BehaviorSubject

  1. BehaviorSubject<String> behaviorSubject;
  2. @Override
  3. protected void onCreate(Bundle savedInstanceState) {
  4. super.onCreate(savedInstanceState);
  5. setContentView(R.layout.activity_cold);
  6. // 冷的,默认值,订阅以后会默认调用onNext方法
  7. behaviorSubject= BehaviorSubject.createDefault("默认发送");
  8. }
  9. public void code_subscribe(View view){
  10. // 冷的,分开2步实现
  11. // 订阅以后会调用
  12. behaviorSubject.subscribe(new Observer<String>() {
  13. @Override
  14. public void onSubscribe(Disposable d) {
  15. Log.e(TAG, "onSubscribe......onSubscribe");
  16. }
  17. @Override
  18. public void onNext(String s) {
  19. Log.e(TAG, "onNext......onNext:"+s);
  20. }
  21. @Override
  22. public void onError(Throwable e) {
  23. Log.e(TAG, "onError......onError");
  24. }
  25. @Override
  26. public void onComplete() {
  27. Log.e(TAG, "完成......onComplete");
  28. }
  29. });

输出:

 //onSubscribe......onSubscribe
    // onNext......onNext:默认发送,订阅以后默认调用onNext方法

  3.2.2. 订阅:

  1. public void code_subscribe(View view){
  2. behaviorSubject.subscribe(new Observer<String>() {
  3. @Override
  4. public void onSubscribe(Disposable d) {
  5. Log.e(TAG, "onSubscribe......onSubscribe");
  6. }
  7. @Override
  8. public void onNext(String s) {
  9. Log.e(TAG, "onNext......onNext:"+s);
  10. }
  11. @Override
  12. public void onError(Throwable e) {
  13. Log.e(TAG, "onError......onError");
  14. }
  15. @Override
  16. public void onComplete() {
  17. Log.e(TAG, "完成......onComplete");
  18. }
  19. });
  20. }

  3.2.3. 发送:

  1. // 手动调用
  2. public void code_send(View view){
  3. behaviorSubject.onNext("BehaviorSubject...BehaviorSubject");
  4. }

// onNext......onNext:BehaviorSubject...BehaviorSubject        

3.3.ReplaySubject 会把每次发送给观察者的数据缓存起来,一旦有新的观察者订阅,会立刻把缓存中的消息发送给对应观察者

        3.3.1. 创建ReplaySubject

  1. ReplaySubject<String> replaySubject;
  2. @Override
  3. protected void onCreate(Bundle savedInstanceState) {
  4. super.onCreate(savedInstanceState);
  5. setContentView(R.layout.activity_cold);
  6. // 会把每次发送给观察者的数据缓存起来,一旦有新的观察者订阅,会立刻把缓存中的消息发送给对应观察者
  7. replaySubject= ReplaySubject.create();
  8. }

 3.3.2. 第一次订阅: 

  1. public void code_subscribe(View view){
  2. // 订阅1
  3. replaySubject.subscribe(new Observer<String>() {
  4. @Override
  5. public void onSubscribe(Disposable d) {
  6. Log.e(TAG, "onSubscribe111");
  7. }
  8. @Override
  9. public void onNext(String s) {
  10. Log.e(TAG, "onNext:"+s);
  11. }
  12. @Override
  13. public void onError(Throwable e) {
  14. Log.e(TAG, "Throwable:");
  15. }
  16. @Override
  17. public void onComplete() {
  18. Log.e(TAG, "onComplete:");
  19. }
  20. });
  21. }

E/denganzhi1: onSubscribe

3.3.3. 发送

  1. public void code_send(View view){
  2. replaySubject.onNext("ReplaySubject ... 测试");
  3. }

onNext:ReplaySubject ... 测试
        3.3.4. 第二次订阅

  1. public void re_code_subscribe(View view){
  2. // 订阅2 : 再次订阅,会发送之前订阅的消息
  3. replaySubject.subscribe(new Observer<String>() {
  4. @Override
  5. public void onSubscribe(Disposable d) {
  6. Log.e(TAG, "re========>onSubscribe");
  7. }
  8. @Override
  9. public void onNext(String s) {
  10. Log.e(TAG, "re========>onNext:"+s);
  11. }
  12. @Override
  13. public void onError(Throwable e) {
  14. Log.e(TAG, "re========>Throwable:");
  15. }
  16. @Override
  17. public void onComplete() {
  18. Log.e(TAG, "re========>onComplete:");
  19. }
  20. });
  21. }


        re========>onSubscribe
        re========>onNext:ReplaySubject ... 测试    // 把第一次发送的缓存起来,第二次发送,也可以接受第一次的

      

 3.4. AsyncSubject 多个onNext发送调用,只能收到最后一个onNext,方法以后必须要调用onComplete();

3.4.1 创建AsyncSubject

  1. 1. 创建AsyncSubject
  2. AsyncSubject<String> asyncSubject;
  3. @Override
  4. protected void onCreate(Bundle savedInstanceState) {
  5. super.onCreate(savedInstanceState);
  6. setContentView(R.layout.activity_cold);
  7. //1. 调用onNext方法以后必须要调用onComplete();
  8. //2. 多个onNext发送调用,只能收到最后一个onNext
  9. asyncSubject= AsyncSubject.create();
  10. }

3.4.2  订阅:

  1. public void code_subscribe(View view){
  2. asyncSubject.subscribe(new Observer<String>() {
  3. @Override
  4. public void onSubscribe(Disposable d) {
  5. Log.e(TAG, "onSubscribe");
  6. }
  7. @Override
  8. public void onNext(String s) {
  9. Log.e(TAG, "onNext:"+s);
  10. }
  11. @Override
  12. public void onError(Throwable e) {
  13. Log.e(TAG, "Throwable:");
  14. }
  15. @Override
  16. public void onComplete() {
  17. Log.e(TAG, "onComplete:");
  18. }
  19. });
  20. }

 E/denganzhi1: onSubscribe
   3.4.3  发送:

  1. 3. public void code_send(View view){
  2. asyncSubject.onNext("asyncSubject测试1");
  3. asyncSubject.onNext("asyncSubject测试2");
  4. asyncSubject.onNext("asyncSubject测试3");
  5. asyncSubject.onComplete();
  6. }

denganzhi1: onNext:asyncSubject测试3      // 多次调用onNext,在调用onComplete以后,只调用一次onNext
 denganzhi1: onComplete:

4.  RxJava 案例: 实现类似EventBus功能:

情景: 服务中不断收到蓝牙会的数据,然后发送到 订阅的各个Activity, Activity会以后如何取消订阅

1. MyApplication  启动 MyService 服务

  1. // 启动 MyService 服务
  2. public class MyApplciaton extends Application {
  3. @Override
  4. public void onCreate() {
  5. super.onCreate();
  6. Intent intent=new Intent(this,MyService.class);
  7. startService(intent);
  8. }
  9. }

2.  MyService中实现,蓝牙数据上报接口数据

  1. package lanya.denganzhi.com.rxjava2;
  2. import android.app.Service;
  3. import android.content.Intent;
  4. import android.os.IBinder;
  5. import android.util.Log;
  6. import android.widget.Toast;
  7. import io.reactivex.subjects.PublishSubject;
  8. public class MyService extends Service {
  9. public MyService() {
  10. }
  11. @Override
  12. public IBinder onBind(Intent intent) {
  13. // TODO: Return the communication channel to the service.
  14. throw new UnsupportedOperationException("Not yet implemented");
  15. }
  16. String TAG="denganzhi";
  17. @Override
  18. public void onCreate() {
  19. Log.e(TAG,"onCreate");
  20. super.onCreate();
  21. }
  22. @Override
  23. public int onStartCommand(Intent intent, int flags, int startId) {
  24. Log.e(TAG,"onStartCommand");
  25. return super.onStartCommand(intent, flags, startId);
  26. }
  27. private PublishSubject<String> dataChangeSubject = PublishSubject.create();
  28. public PublishSubject<String> listeneDataChange() {
  29. return dataChangeSubject;
  30. }
  31. // 蓝牙的所有数据上报到这个接口
  32. public void publish(final String topic, final String msg) {
  33. // 向订阅PublishSubject 的主题发送
  34. dataChangeSubject.onNext("蓝牙发送cmd"+0x08);
  35. }
  36. private static MyService rxMqtt;
  37. /**
  38. * 获取Service 实例
  39. * @return
  40. */
  41. public static MyService getRxMqtt() {
  42. if (rxMqtt == null) {
  43. synchronized (MyService.class) {
  44. if (rxMqtt == null) {
  45. rxMqtt = new MyService();
  46. }
  47. }
  48. }
  49. return rxMqtt;
  50. }
  51. }

  3. MainActivity  订阅 dataChangeSubject 

  1. String TAG="denganzhi";
  2. Disposable serviceDisposable;
  3. // 订阅在当前Activity
  4. public void showDemo(View view) {
  5. // 需要的时候才订阅,每次使用取消订阅,如果要一直订阅,那就不要取消了
  6. toDisposable(serviceDisposable);
  7. serviceDisposable= MyService.getRxMqtt().listeneDataChange()
  8. .filter(new AppendOnlyLinkedArrayList.NonThrowingPredicate<String>() {
  9. @Override
  10. public boolean test(String s) {
  11. return true;
  12. }
  13. })
  14. .timeout(5 * 1000, TimeUnit.MILLISECONDS)
  15. .subscribe(new Consumer<String>() {
  16. @Override
  17. public void accept(String s) throws Exception {
  18. Log.e(TAG,"listeneDataChange-->accept:"+s);
  19. }
  20. }, new Consumer<Throwable>() {
  21. @Override
  22. public void accept(Throwable throwable) throws Exception {
  23. Log.e(TAG,"listeneDataChange-->Consumer");
  24. }
  25. }
  26. );
  27. compositeDisposable.add(serviceDisposable);
  28. }
  29. public void toDisposable(Disposable disposable) {
  30. if (disposable != null && !disposable.isDisposed()) {
  31. disposable.dispose();
  32. }
  33. }
  34. @Override
  35. protected void onDestroy() {
  36. toDisposable(serviceDisposable);
  37. super.onDestroy();
  38. }

 4. 模拟数据上报,接收数据,模拟蓝牙发送 Activity中的Button

  1. public void sendDemo(View view) {
  2. MyService.getRxMqtt().publish("/push/topic","蓝牙发送数据....");
  3. }

  运行效果 :  MainActivity中的subscribe 订阅中回调接收数据

 listeneDataChange-->accept:蓝牙发送cmd8
 listeneDataChange-->accept:蓝牙发送cmd8
 listeneDataChange-->accept:蓝牙发送cmd8
 listeneDataChange-->accept:蓝牙发送cmd8

5.  RxJava操作符:

5.1.  RxJava操作符,数组相关的:
  Observable.just("上网","大保健","回家");
fromIterable : 遍历List
      *** Observable.fromArray("上网","大保健","回家");   // 数据遍历器
          配合使用:  .take(2) 取前2条        firstElement【第一条】        
                      .takeLast  去最后2条     lastElement【最后一条】
                      .distinct()  过滤掉Array中重复数据
                       .skip(2)    跳过Array中1,2个元素,从第3个开始
                       skipLast(2)    最后2个不需要
                         .elementAt(2): 只有第3个元素
                          .elementAt(10,"小明"): 发送如果角标越结,发送默认的"小明"
                          .range(5,5):  从数据角标5位置开始发送5次
      filter:  过滤                  
         
  5.2.Observale 操作符与重复相关: 
      repeat(2) :  被观察这 发送2次
      defer: 懒加载,当观察者订阅,才去创建Observable
      interval: 每隔3s调用一次 onNext
      timer: 定时器,3s以后执行一次
 intervalRange(100,3,3,3,TimeUnit.SECONDS):从100开始,执行3次,第一个3s后执行,每次间隔3s  输出100 101 102
        .timeout(5 * 1000, TimeUnit.MILLISECONDS): 超时
        
         Observable.empty():  //会调用 onSubscribe  和 onComplete 方法
   

  1. private Observable<String> observableString;
  2. public void testRxJava(View view){
  3. // observable.repeat():
  4. // repeat(2) : 被观察这 发送2次
  5. Observable.just("xiaoming","xiaohei")
  6. .repeat(2) // 发送2次
  7. .subscribe(new Consumer<String>() {
  8. @Override
  9. public void accept(@NonNull String s) throws Exception {
  10. // Log.e(TAG,"内容是:"+s);
  11. }
  12. });
  13. // defer(): 当观察者订阅,才去创建Observable 懒加载
  14. Observable observable1=Observable.defer(new Callable<ObservableSource<?>>() {
  15. @Override
  16. public ObservableSource<?> call() throws Exception {
  17. return Observable.create(new ObservableOnSubscribe<String>() {
  18. @Override
  19. public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
  20. e.onNext("下一步");
  21. e.onComplete();
  22. }
  23. });
  24. }
  25. });
  26. observable1.subscribe(new Consumer() {
  27. @Override
  28. public void accept(@NonNull Object o) throws Exception {
  29. // Log.e("denganzhi1","结果是:"+(String)o);
  30. }
  31. });
  32. // 从开始位置开始发送,向后发送多少个
  33. // 打印5,6,7,8,9
  34. Observable.range(5,5)
  35. .subscribe(new Consumer<Integer>() {
  36. @Override
  37. public void accept(@NonNull Integer integer) throws Exception {
  38. // Log.e("denganzhi1","result:"+integer);
  39. }
  40. });
  41. // 第一个:
  42. // 第二个参数:
  43. // 第三个:用到的单位
  44. // 无限轮训
  45. Observable observable_interval= Observable.interval(3,TimeUnit.SECONDS);
  46. Observer<Long> observer=new Observer<Long>() {
  47. Disposable dd;
  48. @Override
  49. public void onSubscribe(Disposable d) {
  50. this.dd=d;
  51. }
  52. @Override
  53. public void onNext(Long s) {
  54. // Log.e(TAG,"result:"+s); // 这里返回 s 从0 开始 1,2,3,4
  55. if(s==5){
  56. dd.dispose();
  57. }
  58. }
  59. @Override
  60. public void onError(Throwable e) {
  61. }
  62. @Override
  63. public void onComplete() {
  64. }
  65. };
  66. observable_interval.subscribe(observer);
  67. //(long start, long count, long initialDelay, long period, TimeUnit unit)
  68. // 从100开始,执行3次,第一个3s后执行,每次间隔3s
  69. Observable.intervalRange(100,3,3,3,TimeUnit.SECONDS)
  70. .subscribe(new Consumer<Long>() {
  71. @Override
  72. public void accept(@NonNull Long aLong) throws Exception {
  73. // Log.e(TAG,"result:"+aLong);
  74. }
  75. });
  76. // 隔开4S执行一次
  77. Observable.timer(4,TimeUnit.SECONDS)
  78. .subscribe(new Consumer<Long>() {
  79. @Override
  80. public void accept(@NonNull Long aLong) throws Exception {
  81. // Log.e(TAG,"result:"+aLong);
  82. }
  83. });
  84. // 过滤 filter
  85. User u1=new User(1,"小明");
  86. User u2=new User(2,"xiaohei");
  87. Observable.just(u1,u2)
  88. .filter(new Predicate<User>() {
  89. @Override
  90. public boolean test(@NonNull User user) throws Exception {
  91. // 数据遍历,返回需要数据
  92. return user.getUsername().contains("小");
  93. }
  94. }).subscribe(new Consumer<User>() {
  95. @Override
  96. public void accept(@NonNull User user) throws Exception {
  97. // Log.e(TAG,"过滤以后的结果是:"+user.getUsername());
  98. }
  99. });
  100. Observable.fromArray("xiaoming","xiaohei","a","b").filter(new AppendOnlyLinkedArrayList.NonThrowingPredicate<String>() {
  101. @Override
  102. public boolean test(String s) {
  103. return true;
  104. }
  105. })
  106. .take(2).subscribe(new Consumer<String>() {
  107. @Override
  108. public void accept(String s) throws Exception {
  109. // Log.e("filte-->"+TAG,s);
  110. }
  111. });
  112. Observable.fromArray("xiaoming","xiaohei","a","b").filter(new AppendOnlyLinkedArrayList.NonThrowingPredicate<String>() {
  113. @Override
  114. public boolean test(String s) {
  115. return true;
  116. }
  117. })
  118. .takeLast(2).subscribe(new Consumer<String>() {
  119. @Override
  120. public void accept(String s) throws Exception {
  121. // Log.e("filte-->"+TAG,s);
  122. }
  123. });
  124. Observable.fromArray("xiaoming","xiaohei","a","a","a").distinct()
  125. .subscribe(new Consumer<String>() {
  126. @Override
  127. public void accept(String s) throws Exception {
  128. // Log.e("distinct-->"+TAG,s);
  129. }
  130. });
  131. Observable.fromArray("xiaoming","xiaohei","a","a","a")
  132. .firstElement()
  133. .subscribe(new Consumer<String>() {
  134. @Override
  135. public void accept(String s) throws Exception {
  136. // Log.e("firstElement-->"+TAG,s);
  137. }
  138. });
  139. // skip(n): 跳过前面2个
  140. Observable.just("xiaoming","小黑","xiaoming3")
  141. .skipLast(2)
  142. .subscribe(new Consumer<String>() {
  143. @Override
  144. public void accept(@NonNull String s) throws Exception {
  145. // Log.e(TAG,"skip:"+s);
  146. }
  147. });
  148. // 过滤重复数据
  149. Observable.just("xiaoming","小黑","xiaoming")
  150. .distinct()
  151. .subscribe(new Consumer<String>() {
  152. @Override
  153. public void accept(@NonNull String s) throws Exception {
  154. // Log.e(TAG,s);
  155. }
  156. });
  157. Observable.fromArray("xiaoming1","xiaohei2","3","4","5")
  158. .elementAt(10,"小明")
  159. .subscribe(new Consumer<String>() {
  160. @Override
  161. public void accept(String s) throws Exception {
  162. // Log.e("elementAt-->"+TAG,s);
  163. }
  164. });
  165. // 会调用 onSubscribe 和 onComplete 方法
  166. Observable.empty().subscribe(new Observer<Object>() {
  167. @Override
  168. public void onSubscribe(Disposable d) {
  169. Log.e(TAG,"onSubscribe");
  170. }
  171. @Override
  172. public void onNext(Object o) {
  173. Log.e(TAG,"onNext");
  174. }
  175. @Override
  176. public void onError(Throwable e) {
  177. Log.e(TAG,"onError");
  178. }
  179. @Override
  180. public void onComplete() {
  181. Log.e(TAG,"onComplete");
  182. }
  183. });
  184. }

 5.3. RxJava 变化操作符 Map、flatMap 变化符使用,数据变换

功能: 比如 把 Integer 变成 User 在变成  String

  1. //RxJava变化操作符使用
  2. public void showDemo3(View view){
  3. // id(Intger) --> User
  4. // Function<Integer, User> 第一个是参数 ,第二个是函数返回结果
  5. Observable.just(400).map(new Function<Integer, User>() {
  6. @Override
  7. public User apply(Integer integer) throws Exception {
  8. return new User(integer,"xiaoming");
  9. }
  10. }).subscribe(new Consumer<User>() {
  11. @Override
  12. public void accept(User user) throws Exception {
  13. Log.e(TAG,"转化之后的值:"+user);
  14. }
  15. });
  16. // 根据id获取token
  17. // 根据token获取用户信息
  18. //可以不断的做下去,不断的封装数据
  19. // Function<Integer, ObservableSource<User>> 第一参数,第二个返回值
  20. Observable.just(100).flatMap(new Function<Integer, ObservableSource<User>>() {
  21. @Override
  22. public ObservableSource<User> apply(Integer integer) throws Exception {
  23. return Observable.just(new User(integer,"xiaoming"));
  24. }
  25. }).flatMap(new Function<User, ObservableSource<String>>() {
  26. @Override
  27. public ObservableSource<String> apply(User user) throws Exception {
  28. return Observable.just(user.getUsername());
  29. }
  30. }).subscribe(new Consumer<String>() {
  31. @Override
  32. public void accept(String s) throws Exception {
  33. Log.e(TAG,"result:"+s);
  34. }
  35. });
  36. }

    6. RxJava组合方法:

1. Merge 顺序触发,首先执行observal1,在执行observal2
 Merge observal1 抛出异常,那么不会执行  observal2,使用mergeDelayError‘’

  1. Observable<String> observable_1= Observable.create(new ObservableOnSubscribe() {
  2. @Override
  3. public void subscribe(ObservableEmitter e) throws Exception {
  4. // int j=1/0;、
  5. Log.e(TAG,"组合1触发了");
  6. SystemClock.sleep(5000);
  7. int i=1/0;
  8. e.onNext("组合1");
  9. // Log.e(TAG,"组合1");
  10. }
  11. });
  12. Observable<String> observable_2= Observable.create(new ObservableOnSubscribe() {
  13. @Override
  14. public void subscribe(ObservableEmitter e) throws Exception {
  15. Log.e(TAG,"组合2触发了");
  16. SystemClock.sleep(3000);
  17. e.onNext("组合2");
  18. // Log.e(TAG,"组合2");
  19. }
  20. });
  21. // observable_1 执行完毕,在执行 observable_2 有顺序
  22. Observable.mergeDelayError(observable_1,observable_2).subscribe(new Consumer<String>() {
  23. @Override
  24. public void accept(String s) throws Exception {
  25. Log.e(TAG, "mergeDelayError:" + s);
  26. }
  27. }, new Consumer<Throwable>() {
  28. @Override
  29. public void accept(Throwable throwable) throws Exception {
  30. Log.e(TAG, "concat:" +throwable);
  31. }
  32. });

输出:        
 组合1触发了
 组合2触发了
 mergeDelayError:组合2

        2. concat: observal1 执行, observal2没有执行
        3. zip     首先执行observal1,在执行observal2,然后 1,2 整合 BiFunction

  1. Observable<String> observable_1= Observable.create(new ObservableOnSubscribe() {
  2. @Override
  3. public void subscribe(ObservableEmitter e) throws Exception {
  4. // int j=1/0;、
  5. Log.e(TAG,"组合1触发了");
  6. SystemClock.sleep(5000);
  7. int i=1/0;
  8. e.onNext("组合1");
  9. // Log.e(TAG,"组合1");
  10. }
  11. });
  12. Observable<String> observable_2= Observable.create(new ObservableOnSubscribe() {
  13. @Override
  14. public void subscribe(ObservableEmitter e) throws Exception {
  15. Log.e(TAG,"组合2触发了");
  16. SystemClock.sleep(3000);
  17. e.onNext("组合2");
  18. // Log.e(TAG,"组合2");
  19. }
  20. });
  21. Observable.zip(observable_1, observable_2, new BiFunction<String, String, String>() {
  22. @Override
  23. public String apply(String s1, String s2) throws Exception {
  24. return s1+s2;
  25. }
  26. }).subscribeOn(Schedulers.io())
  27. .observeOn(AndroidSchedulers.mainThread())
  28. .subscribe(new Consumer<String>() {
  29. @Override
  30. public void accept(String s) throws Exception {
  31. Log.e(TAG,"zip:"+s);
  32. }
  33. });

输出:
组合1触发了
组合2触发了
zip:组合1组合2

   Zip数据合并:
 

  1. List<String> list1= Arrays.asList("a","b","c");
  2. List<String> list2= Arrays.asList("1","2","3");
  3. // 输出list
  4. Observable.fromIterable(list1).subscribe(new Consumer<String>() {
  5. @Override
  6. public void accept(String s) throws Exception {
  7. // Log.e(TAG,"fromIterable:"+s);
  8. }
  9. });
  10. // 数据合并
  11. Observable.zip(Observable.fromIterable(list1),
  12. Observable.fromIterable(list2),
  13. new BiFunction<String, String, String>() {
  14. @Override
  15. public String apply(String s, String s2) throws Exception {
  16. return s+s2;
  17. }
  18. }).subscribe(new Consumer<String>() {
  19. @Override
  20. public void accept(String s) throws Exception {
  21. // Log.e(TAG,"zip:"+s);
  22. /**
  23. * E/denganzhi: zip:a1
  24. E/denganzhi: zip:b2
  25. E/denganzhi: zip:c3
  26. */
  27. }
  28. });

7. RxJava 线程切换: 

RxJava 线程切换: 
Schedulers.newThread():   总是启动新线程执行操作
.subscribeOn(Schedulers.io()): 行为模式和newThread()差不多,区别在与Schedulers.io内部有线程池,可以重用
observeOn(AndroidSchedulers.mainThread()):  Aroid提供切换到UI线程         

    线程控制函数subscribeOn 指定事件创建者线程, 可以左右于它前后的代码,直到observeOn 才结束,切换新线程
    observeOn:  执行观察着线程,只能作用于它之后的代码

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/177387
推荐阅读
相关标签
  

闽ICP备14008679号