当前位置:   article > 正文

RxJava使用全记录_implementation 'io.reactivex.observable

implementation 'io.reactivex.observable
  1. implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
  2. implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
  1. package com.example.rxjavademo;
  2. import android.app.Activity;
  3. import android.os.Bundle;
  4. import android.support.annotation.Nullable;
  5. import android.util.Log;
  6. import android.view.View;
  7. import android.widget.TextView;
  8. import android.widget.Toast;
  9. import com.example.entity.Persion;
  10. import java.util.ArrayList;
  11. import java.util.List;
  12. import java.util.concurrent.Callable;
  13. import java.util.concurrent.FutureTask;
  14. import java.util.concurrent.TimeUnit;
  15. import io.reactivex.Notification;
  16. import io.reactivex.Observable;
  17. import io.reactivex.ObservableEmitter;
  18. import io.reactivex.ObservableOnSubscribe;
  19. import io.reactivex.ObservableSource;
  20. import io.reactivex.Observer;
  21. import io.reactivex.disposables.Disposable;
  22. import io.reactivex.functions.Action;
  23. import io.reactivex.functions.BiConsumer;
  24. import io.reactivex.functions.BiFunction;
  25. import io.reactivex.functions.Consumer;
  26. import io.reactivex.functions.Function;
  27. import io.reactivex.functions.Predicate;
  28. import io.reactivex.observables.GroupedObservable;
  29. import io.reactivex.schedulers.Schedulers;
  30. public class MainActivity extends Activity {
  31. TextView tv, tv2, tv3, tv4, tv5, tv6, tv7, tv8, tv9, tv10, tv11, tv12, tv13, tv14, tv15, tv16, tv17, tv18, tv19, tv20;
  32. TextView tv21, tv22, tv23, tv24, tv25, tv26, tv27, tv28, tv29, tv30, tv31, tv32, tv33, tv34, tv35, tv36, tv37, tv38, tv39, tv40;
  33. TextView tvTest;
  34. @Override
  35. protected void onCreate(@Nullable Bundle savedInstanceState) {
  36. super.onCreate(savedInstanceState);
  37. setContentView(R.layout.activity_main);
  38. tv = findViewById(R.id.tv);
  39. tv2 = findViewById(R.id.tv2);
  40. tv3 = findViewById(R.id.tv3);
  41. tv4 = findViewById(R.id.tv4);
  42. tv5 = findViewById(R.id.tv5);
  43. tv6 = findViewById(R.id.tv6);
  44. tv7 = findViewById(R.id.tv7);
  45. tv8 = findViewById(R.id.tv8);
  46. tv9 = findViewById(R.id.tv9);
  47. tv11 = findViewById(R.id.tv11);
  48. tv12 = findViewById(R.id.tv12);
  49. tv13 = findViewById(R.id.tv13);
  50. tv14 = findViewById(R.id.tv14);
  51. tv15 = findViewById(R.id.tv15);
  52. tv16 = findViewById(R.id.tv16);
  53. tv17 = findViewById(R.id.tv17);
  54. tv18 = findViewById(R.id.tv18);
  55. tv19 = findViewById(R.id.tv19);
  56. tv20 = findViewById(R.id.tv20);
  57. tv21 = findViewById(R.id.tv21);
  58. tv22 = findViewById(R.id.tv22);
  59. tv23 = findViewById(R.id.tv23);
  60. tv24 = findViewById(R.id.tv24);
  61. tv25 = findViewById(R.id.tv25);
  62. tv26 = findViewById(R.id.tv26);
  63. tv27 = findViewById(R.id.tv27);
  64. tv28 = findViewById(R.id.tv28);
  65. tv29 = findViewById(R.id.tv29);
  66. tv30 = findViewById(R.id.tv30);
  67. tv31 = findViewById(R.id.tv31);
  68. tv32 = findViewById(R.id.tv32);
  69. tv33 = findViewById(R.id.tv33);
  70. tv34 = findViewById(R.id.tv34);
  71. tv35 = findViewById(R.id.tv35);
  72. tv36 = findViewById(R.id.tv36);
  73. tv37 = findViewById(R.id.tv37);
  74. tv38 = findViewById(R.id.tv38);
  75. tv39 = findViewById(R.id.tv39);
  76. tv40 = findViewById(R.id.tv40);
  77. tvTest = findViewById(R.id.tvTest);
  78. }
  79. public void btn(View v) {
  80. switch (v.getId()) {
  81. case R.id.tv:
  82. todo1();
  83. break;
  84. case R.id.tv2:
  85. todo2();
  86. break;
  87. case R.id.tv3:
  88. todo3();
  89. break;
  90. case R.id.tv4:
  91. todo4();
  92. break;
  93. case R.id.tv5:
  94. todo5();
  95. break;
  96. case R.id.tv6:
  97. todo6();
  98. break;
  99. case R.id.tv7:
  100. todo7();
  101. break;
  102. case R.id.tv8:
  103. todo8();
  104. break;
  105. case R.id.tv9:
  106. todo9();
  107. break;
  108. case R.id.tv10:
  109. todo10();
  110. break;
  111. case R.id.tv11:
  112. todo11();
  113. break;
  114. case R.id.tv12:
  115. todo12();
  116. break;
  117. case R.id.tv13:
  118. todo13();
  119. break;
  120. case R.id.tv14:
  121. todo14();
  122. break;
  123. case R.id.tv15:
  124. todo15();
  125. break;
  126. case R.id.tv16:
  127. todo16();
  128. break;
  129. case R.id.tv17:
  130. todo17();
  131. break;
  132. case R.id.tv18:
  133. todo18();
  134. break;
  135. case R.id.tv19:
  136. todo19();
  137. break;
  138. case R.id.tv20:
  139. todo20();
  140. break;
  141. case R.id.tv21:
  142. todo21();
  143. break;
  144. case R.id.tv22:
  145. todo22();
  146. break;
  147. case R.id.tv23:
  148. todo23();
  149. break;
  150. case R.id.tv24:
  151. todo24();
  152. break;
  153. case R.id.tv25:
  154. todo25();
  155. break;
  156. case R.id.tv26:
  157. todo26();
  158. break;
  159. case R.id.tv27:
  160. todo27();
  161. break;
  162. case R.id.tv28:
  163. todo28();
  164. break;
  165. case R.id.tv29:
  166. todo29();
  167. break;
  168. case R.id.tv30:
  169. todo30();
  170. break;
  171. case R.id.tv31:
  172. todo31();
  173. break;
  174. case R.id.tv32:
  175. todo32();
  176. break;
  177. case R.id.tv33:
  178. todo33();
  179. break;
  180. case R.id.tv34:
  181. todo34();
  182. break;
  183. case R.id.tv35:
  184. todo35();
  185. break;
  186. case R.id.tv36:
  187. todo36();
  188. break;
  189. case R.id.tv37:
  190. todo37();
  191. break;
  192. case R.id.tv38:
  193. todo38();
  194. break;
  195. case R.id.tv39:
  196. todo39();
  197. break;
  198. case R.id.tv40:
  199. todo40();
  200. break;
  201. case R.id.tvTest:
  202. todo45();
  203. break;
  204. }
  205. }
  206. Observable<String> observable;
  207. /**
  208. * 创建被观察者
  209. * onNext:发送该事件时,观察者会回调 onNext() 方法
  210. * onError:发送该事件时,观察者会回调 onError() 方法,当发送该事件之后,其他事件将不会继续发送
  211. * onComplete:发送该事件时,观察者会回调 onComplete() 方法,当发送该事件之后,其他事件将不会继续发送
  212. */
  213. private void todo1() {
  214. observable = Observable.create(new ObservableOnSubscribe<String>() {
  215. @Override
  216. public void subscribe(ObservableEmitter<String> emitter) throws Exception {
  217. Log.d("aaa", Thread.currentThread().getName());
  218. emitter.onNext("aaa");
  219. emitter.onNext("bbb");
  220. emitter.onNext("ccc");
  221. emitter.onComplete();
  222. }
  223. });
  224. }
  225. /**
  226. * 创建观察者并订阅
  227. */
  228. private void todo2() {
  229. Observer<String> observer = new Observer<String>() {
  230. @Override
  231. public void onSubscribe(Disposable d) {
  232. Log.d("aaa", "onSubscribe");
  233. }
  234. @Override
  235. public void onNext(String str) {
  236. Log.d("aaa", "onNext" + str);
  237. }
  238. @Override
  239. public void onError(Throwable e) {
  240. Log.d("aaa", "onError");
  241. }
  242. @Override
  243. public void onComplete() {
  244. Log.d("aaa", "onComplete");
  245. }
  246. };
  247. observable.subscribe(observer);
  248. }
  249. /**
  250. * 链式调用
  251. */
  252. private void todo3() {
  253. Observable.create(new ObservableOnSubscribe<String>() {
  254. @Override
  255. public void subscribe(ObservableEmitter<String> emitter) throws Exception {
  256. }
  257. }).subscribe(new Observer<String>() {
  258. @Override
  259. public void onSubscribe(Disposable d) {
  260. }
  261. @Override
  262. public void onNext(String str) {
  263. }
  264. @Override
  265. public void onError(Throwable e) {
  266. }
  267. @Override
  268. public void onComplete() {
  269. }
  270. });
  271. }
  272. /**
  273. * 常见操作符演示-just
  274. * 创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。
  275. */
  276. private void todo4() {
  277. Observable.just("aa", "bb", "cc", "aa", "bb", "cc", "aa", "bb", "cc", "aa").subscribe(new Observer<String>() {
  278. @Override
  279. public void onSubscribe(Disposable d) {
  280. Log.d("aaa", "onSubscribe");
  281. }
  282. @Override
  283. public void onNext(String str) {
  284. Log.d("aaa", "onNext" + str);
  285. }
  286. @Override
  287. public void onError(Throwable e) {
  288. Log.d("aaa", "onError");
  289. }
  290. @Override
  291. public void onComplete() {
  292. Log.d("aaa", "onComplete");
  293. }
  294. });
  295. }
  296. /**
  297. * 常见操作符演示-fromArray
  298. * 这个方法和 just() 类似,只不过 fromArray 可以传入多于10个的变量,并且可以传入一个数组。
  299. */
  300. private void todo5() {
  301. String[] strings = new String[]{"aa", "bb", "cc"};
  302. Observable.fromArray(strings).subscribe(new Observer<String>() {
  303. @Override
  304. public void onSubscribe(Disposable d) {
  305. Log.d("aaa", "onSubscribe");
  306. }
  307. @Override
  308. public void onNext(String str) {
  309. Log.d("aaa", "onNext" + str);
  310. }
  311. @Override
  312. public void onError(Throwable e) {
  313. Log.d("aaa", "onError");
  314. }
  315. @Override
  316. public void onComplete() {
  317. Log.d("aaa", "onComplete");
  318. }
  319. });
  320. }
  321. /**
  322. * 常见操作符演示-fromCallable
  323. * 这里的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它会返回一个结果值,这个结果值就是发给观察者的。
  324. */
  325. private void todo6() {
  326. Observable.fromCallable(new Callable<String>() {
  327. @Override
  328. public String call() throws Exception {
  329. return "aaa";
  330. }
  331. }).subscribe(new Consumer<String>() {
  332. @Override
  333. public void accept(String s) throws Exception {
  334. Log.d("aaa", "onNext" + s);
  335. }
  336. });
  337. }
  338. /**
  339. * 常见操作符演示-fromFuture
  340. * 参数中的 Future 是 java.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,
  341. * 它可以通过 get() 方法来获取 Callable 返回的值。
  342. * doOnSubscribe() 的作用就是只有订阅时才会发送事件,具体会在下面讲解。
  343. */
  344. private void todo7() {
  345. final FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
  346. @Override
  347. public String call() throws Exception {
  348. Log.d("aaa", Thread.currentThread().getName());
  349. return "futureTask---call";
  350. }
  351. });
  352. Observable.fromFuture(futureTask).doOnSubscribe(new Consumer<Disposable>() {
  353. @Override
  354. public void accept(Disposable disposable) throws Exception {
  355. futureTask.run();
  356. }
  357. }).subscribe(new Consumer<String>() {
  358. @Override
  359. public void accept(String s) throws Exception {
  360. Log.d("aaa", "onNext" + s);
  361. }
  362. });
  363. }
  364. /**
  365. * 常见操作符演示-fromIterable
  366. * 直接发送一个 List 集合数据给观察者
  367. */
  368. private void todo8() {
  369. List<String> stringList = new ArrayList<String>();
  370. stringList.add("aaa");
  371. stringList.add("bbb");
  372. stringList.add("ccc");
  373. Observable.fromIterable(stringList).subscribe(new Observer<String>() {
  374. @Override
  375. public void onSubscribe(Disposable d) {
  376. }
  377. @Override
  378. public void onNext(String s) {
  379. }
  380. @Override
  381. public void onError(Throwable e) {
  382. }
  383. @Override
  384. public void onComplete() {
  385. }
  386. });
  387. }
  388. /**
  389. * 常见操作符演示-defer
  390. * 这个方法的作用就是直到被观察者被订阅后才会创建被观察者。
  391. */
  392. String s = "aaa";
  393. private void todo9() {
  394. Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
  395. @Override
  396. public ObservableSource<? extends String> call() throws Exception {
  397. return Observable.just(s);
  398. }
  399. });
  400. s = "bbb";
  401. Observer<String> observer = new Observer<String>() {
  402. @Override
  403. public void onSubscribe(Disposable d) {
  404. Log.d("aaa", "onSubscribe");
  405. }
  406. @Override
  407. public void onNext(String str) {
  408. Log.d("aaa", "onNext" + str);
  409. }
  410. @Override
  411. public void onError(Throwable e) {
  412. Log.d("aaa", "onError");
  413. }
  414. @Override
  415. public void onComplete() {
  416. Log.d("aaa", "onComplete");
  417. }
  418. };
  419. observable.subscribe(observer);
  420. s = "ccc";
  421. observable.subscribe(observer);
  422. }
  423. /**
  424. * 常见操作符演示-timer
  425. * 当到指定时间后就会发送一个 0L 的值给观察者。
  426. */
  427. private void todo10() {
  428. Observable.timer(2, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
  429. @Override
  430. public void onSubscribe(Disposable d) {
  431. }
  432. @Override
  433. public void onNext(Long aLong) {
  434. Log.d("aaa", "onNext" + aLong);
  435. }
  436. @Override
  437. public void onError(Throwable e) {
  438. }
  439. @Override
  440. public void onComplete() {
  441. }
  442. });
  443. }
  444. /**
  445. * 常见操作符演示-interval
  446. * 每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。
  447. */
  448. private void todo11() {
  449. Observable.interval(2, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
  450. @Override
  451. public void onSubscribe(Disposable d) {
  452. }
  453. @Override
  454. public void onNext(Long aLong) {
  455. Log.d("aaa", "onNext" + aLong);
  456. }
  457. @Override
  458. public void onError(Throwable e) {
  459. }
  460. @Override
  461. public void onComplete() {
  462. }
  463. });
  464. }
  465. /**
  466. * 常见操作符演示-intervalRange
  467. * 可以指定发送事件的开始值和数量,其他与 interval() 的功能一样。
  468. */
  469. private void todo12() {
  470. Observable.intervalRange(0, 5, 2, 1, TimeUnit.SECONDS).
  471. subscribe(new Observer<Long>() {
  472. @Override
  473. public void onSubscribe(Disposable d) {
  474. }
  475. @Override
  476. public void onNext(Long aLong) {
  477. Log.d("aaa", "onNext" + aLong);
  478. }
  479. @Override
  480. public void onError(Throwable e) {
  481. }
  482. @Override
  483. public void onComplete() {
  484. }
  485. });
  486. }
  487. /**
  488. * 常见操作符演示-range
  489. * 同时发送一定范围的事件序列。
  490. * 常见操作符演示-rangeLong
  491. * 作用与 range() 一样,只是数据类型为 Long
  492. */
  493. private void todo13() {
  494. Observable.range(2, 5).subscribe(new Observer<Integer>() {
  495. @Override
  496. public void onSubscribe(Disposable d) {
  497. }
  498. @Override
  499. public void onNext(Integer integer) {
  500. }
  501. @Override
  502. public void onError(Throwable e) {
  503. }
  504. @Override
  505. public void onComplete() {
  506. }
  507. });
  508. }
  509. /**
  510. * 常见操作符演示-empty() & never() & error()
  511. * empty() : 直接发送 onComplete() 事件
  512. * never():不发送任何事件
  513. * error():发送 onError() 事件
  514. */
  515. private void todo14() {
  516. Observable.empty().subscribe(new Observer<Object>() {
  517. @Override
  518. public void onSubscribe(Disposable d) {
  519. }
  520. @Override
  521. public void onNext(Object o) {
  522. }
  523. @Override
  524. public void onError(Throwable e) {
  525. }
  526. @Override
  527. public void onComplete() {
  528. }
  529. });
  530. }
  531. //===============================================转换操作符=========================================
  532. /**
  533. * 常见操作符演示-map
  534. * map 可以将被观察者发送的数据类型转变成其他的类型
  535. */
  536. private void todo15() {
  537. Observable.just(1, 2, 3).map(new Function<Integer, Object>() {
  538. @Override
  539. public Object apply(Integer integer) throws Exception {
  540. return "aaa" + integer;
  541. }
  542. }).subscribe(new Observer<Object>() {
  543. @Override
  544. public void onSubscribe(Disposable d) {
  545. }
  546. @Override
  547. public void onNext(Object o) {
  548. Log.d("aaa", "onNext" + o.toString());
  549. }
  550. @Override
  551. public void onError(Throwable e) {
  552. }
  553. @Override
  554. public void onComplete() {
  555. }
  556. });
  557. }
  558. /**
  559. * 常见操作符演示-flatMap
  560. * 这个方法可以将事件序列中的元素进行整合加工,返回一个新的被观察者。
  561. * flatMap() 其实与 map() 类似,但是 flatMap() 返回的是一个 Observerable
  562. * <p>
  563. * * 常见操作符演示-concatMap
  564. * concatMap() 和 flatMap() 基本上是一样的,只不过 concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的。
  565. */
  566. private void todo16() {
  567. List<Persion> persionList = initPersionData();
  568. //获取所有人的action列表
  569. //1、map方式
  570. // Observable.fromIterable(persionList).map(new Function<Persion, List<Persion.PlanListBean>>() {
  571. // @Override
  572. // public List<Persion.PlanListBean> apply(Persion persion) throws Exception {
  573. //
  574. // return persion.getPlanList();
  575. // }
  576. // }).subscribe(new Observer<List<Persion.PlanListBean>>() {
  577. // @Override
  578. // public void onSubscribe(Disposable d) {
  579. //
  580. // }
  581. //
  582. // @Override
  583. // public void onNext(List<Persion.PlanListBean> planListBeans) {
  584. // for (Persion.PlanListBean plans : planListBeans) {
  585. // List<String> actions = plans.getActionList();
  586. // for (String action : actions) {
  587. // Log.d("aaa", "onNext" + action);
  588. // }
  589. // }
  590. //
  591. // }
  592. //
  593. // @Override
  594. // public void onError(Throwable e) {
  595. //
  596. // }
  597. //
  598. // @Override
  599. // public void onComplete() {
  600. //
  601. // }
  602. // });
  603. //2、flatMap;方式1在观察者里面出现了多重循环,当数据复杂时不好处理
  604. Observable.fromIterable(persionList).flatMap(new Function<Persion, ObservableSource<Persion.PlanListBean>>() {
  605. @Override
  606. public ObservableSource<Persion.PlanListBean> apply(Persion persion) throws Exception {
  607. if (persion.getPersionName().equals("chen")) {
  608. return Observable.fromIterable(persion.getPlanList()).delay(1, TimeUnit.SECONDS);
  609. }
  610. return Observable.fromIterable(persion.getPlanList());
  611. }
  612. }).flatMap(new Function<Persion.PlanListBean, ObservableSource<String>>() {
  613. @Override
  614. public ObservableSource<String> apply(Persion.PlanListBean planListBean) throws Exception {
  615. return Observable.fromIterable(planListBean.getActionList());
  616. }
  617. }).subscribe(new Observer<String>() {
  618. @Override
  619. public void onSubscribe(Disposable d) {
  620. }
  621. @Override
  622. public void onNext(String s) {
  623. Log.d("aaa", "onNext" + s);
  624. }
  625. @Override
  626. public void onError(Throwable e) {
  627. }
  628. @Override
  629. public void onComplete() {
  630. }
  631. });
  632. }
  633. private List<Persion> initPersionData() {
  634. List<String> stringList1 = new ArrayList<String>();
  635. stringList1.add("t1");
  636. stringList1.add("t2");
  637. List<String> stringList2 = new ArrayList<String>();
  638. stringList1.add("t3");
  639. stringList1.add("t4");
  640. List<Persion> persionList = new ArrayList<Persion>();
  641. Persion persion1 = new Persion();
  642. persion1.setPersionName("chen");
  643. List<Persion.PlanListBean> planListBeanList1 = new ArrayList<Persion.PlanListBean>();
  644. Persion.PlanListBean planListBean1 = new Persion.PlanListBean();
  645. planListBean1.setPlayName("plan1");
  646. planListBean1.setActionList(stringList1);
  647. planListBeanList1.add(planListBean1);
  648. persion1.setPlanList(planListBeanList1);
  649. Persion persion2 = new Persion();
  650. persion2.setPersionName("lv");
  651. List<Persion.PlanListBean> planListBeanList2 = new ArrayList<Persion.PlanListBean>();
  652. Persion.PlanListBean planListBean2 = new Persion.PlanListBean();
  653. planListBean2.setPlayName("plan2");
  654. planListBean2.setActionList(stringList2);
  655. planListBeanList2.add(planListBean2);
  656. persion2.setPlanList(planListBeanList2);
  657. persionList.add(persion1);
  658. persionList.add(persion2);
  659. return persionList;
  660. }
  661. /**
  662. * 常见操作符演示-buffer
  663. * 从需要发送的事件当中获取一定数量的事件,并将这些事件放到缓冲区当中一并发出。
  664. * buffer 有两个参数,一个是 count,另一个 skip。
  665. * count 缓冲区元素的数量,skip 就代表缓冲区满了之后,发送下一次事件序列的时候要跳过多少元素。
  666. */
  667. private void todo17() {
  668. Observable.just(1, 2, 3, 4, 5, 6, 7).buffer(2, 2).subscribe(new Observer<List<Integer>>() {
  669. @Override
  670. public void onSubscribe(Disposable d) {
  671. }
  672. @Override
  673. public void onNext(List<Integer> integers) {
  674. Log.d("aaa", "onNext" + integers);
  675. }
  676. @Override
  677. public void onError(Throwable e) {
  678. }
  679. @Override
  680. public void onComplete() {
  681. }
  682. });
  683. }
  684. /**
  685. * 常见操作符演示-groupBy
  686. * 将发送的数据进行分组,每个分组都会返回一个被观察者。
  687. */
  688. private void todo18() {
  689. Observable.just(3, 1, 4, 3, 7, 3, 47, 9, 3).groupBy(new Function<Integer, Object>() {
  690. @Override
  691. public Object apply(Integer integer) throws Exception {
  692. return integer % 3;
  693. }
  694. }).subscribe(new Observer<GroupedObservable<Object, Integer>>() {
  695. @Override
  696. public void onSubscribe(Disposable d) {
  697. }
  698. @Override
  699. public void onNext(final GroupedObservable<Object, Integer> objectIntegerGroupedObservable) {
  700. Log.d("aaa", "onNext " + objectIntegerGroupedObservable.getKey());
  701. objectIntegerGroupedObservable.subscribe(new Observer<Integer>() {
  702. @Override
  703. public void onSubscribe(Disposable d) {
  704. }
  705. @Override
  706. public void onNext(Integer integer) {
  707. Log.d("aaa", "onNext " + objectIntegerGroupedObservable.getKey() + "value" + integer);
  708. }
  709. @Override
  710. public void onError(Throwable e) {
  711. }
  712. @Override
  713. public void onComplete() {
  714. }
  715. });
  716. }
  717. @Override
  718. public void onError(Throwable e) {
  719. }
  720. @Override
  721. public void onComplete() {
  722. }
  723. });
  724. }
  725. /**
  726. * 常见操作符演示-scan
  727. * 将数据以一定的逻辑聚合起来。
  728. */
  729. private void todo19() {
  730. Observable.just("aa", "bb", "cc", "dd").scan(new BiFunction<String, String, String>() {
  731. @Override
  732. public String apply(String s, String s2) throws Exception {
  733. return s + s2;
  734. }
  735. }).subscribe(new Consumer<String>() {
  736. @Override
  737. public void accept(String s) throws Exception {
  738. Log.d("aaa", s);
  739. }
  740. });
  741. }
  742. /**
  743. * 常见操作符演示-window
  744. * 发送指定数量的事件时,就将这些事件分为一组。window 中的 count 的参数就是代表指定的数量,例如将 count 指定为2,
  745. * 那么每发2个数据就会将这2个数据分成一组。
  746. */
  747. private void todo20() {
  748. Observable.just(1, 2, 3, 4, 5).window(2).subscribe(new Observer<Observable<Integer>>() {
  749. @Override
  750. public void onSubscribe(Disposable d) {
  751. }
  752. @Override
  753. public void onNext(Observable<Integer> integerObservable) {
  754. integerObservable.subscribe(new Observer<Integer>() {
  755. @Override
  756. public void onSubscribe(Disposable d) {
  757. }
  758. @Override
  759. public void onNext(Integer integer) {
  760. }
  761. @Override
  762. public void onError(Throwable e) {
  763. }
  764. @Override
  765. public void onComplete() {
  766. }
  767. });
  768. }
  769. @Override
  770. public void onError(Throwable e) {
  771. }
  772. @Override
  773. public void onComplete() {
  774. }
  775. });
  776. }
  777. //组合操作符//
  778. /**
  779. * 组合操作符-concat
  780. * 可以将多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat() 最多只可以发送4个事件。
  781. */
  782. private void todo21() {
  783. Observable.concat(Observable.just("aa", "bb"), Observable.just("cc", "dd"), Observable.just("ee", "ff")).subscribe(new Observer<String>() {
  784. @Override
  785. public void onSubscribe(Disposable d) {
  786. }
  787. @Override
  788. public void onNext(String s) {
  789. Log.d("aaa", "onNext " + s);
  790. }
  791. @Override
  792. public void onError(Throwable e) {
  793. }
  794. @Override
  795. public void onComplete() {
  796. }
  797. });
  798. }
  799. /**
  800. * 组合操作符-concatArray
  801. * 与 concat() 作用一样,不过 concatArray() 可以发送多于 4 个被观察者。
  802. */
  803. private void todo22() {
  804. Toast.makeText(this, "与 concat() 作用一样,不过 concatArray() 可以发送多于 4 个被观察者。", Toast.LENGTH_SHORT).show();
  805. }
  806. /**
  807. * 组合操作符-merge
  808. * 这个方法月 concat() 作用基本一样,知识 concat() 是串行发送事件,而 merge() 并行发送事件。
  809. * mergeArray() 与 merge() 的作用是一样的,只是它可以发送4个以上的被观察者
  810. * <p>
  811. * <p>
  812. * 此方法如果用concat 则执行兯第二个事件。
  813. */
  814. private void todo23() {
  815. Observable.merge(Observable.interval(2, TimeUnit.SECONDS).map(new Function<Long, String>() {
  816. @Override
  817. public String apply(Long aLong) throws Exception {
  818. return "A" + aLong;
  819. }
  820. }), Observable.interval(2, TimeUnit.SECONDS).map(new Function<Long, String>() {
  821. @Override
  822. public String apply(Long aLong) throws Exception {
  823. return "B" + aLong;
  824. }
  825. })).subscribe(new Observer<String>() {
  826. @Override
  827. public void onSubscribe(Disposable d) {
  828. }
  829. @Override
  830. public void onNext(String s) {
  831. Log.d("aaa", "onNext " + s);
  832. }
  833. @Override
  834. public void onError(Throwable e) {
  835. }
  836. @Override
  837. public void onComplete() {
  838. }
  839. });
  840. }
  841. /**
  842. * 组合操作符-concatArrayDelayError mergeArrayDelayError
  843. * 在 concatArray() 和 mergeArray() 两个方法当中,如果其中有一个被观察者发送了一个 Error 事件,那么就会停止发送事件,
  844. * 如果你想 onError() 事件延迟到所有被观察者都发送完事件后再执行的话,就可以使用 concatArrayDelayError() 和 mergeArrayDelayError()
  845. */
  846. private void todo24() {
  847. Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
  848. @Override
  849. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
  850. emitter.onNext(1);
  851. emitter.onError(new NumberFormatException());
  852. }
  853. });
  854. Observable observable2 = Observable.just(2, 3, 4);
  855. Observable.concatArrayDelayError(observable, observable2).subscribe(new Observer<Integer>() {
  856. @Override
  857. public void onSubscribe(Disposable d) {
  858. }
  859. @Override
  860. public void onNext(Integer integer) {
  861. Log.d("aaa", integer + "");
  862. }
  863. @Override
  864. public void onError(Throwable e) {
  865. }
  866. @Override
  867. public void onComplete() {
  868. }
  869. });
  870. }
  871. /**
  872. * 组合操作符-zip
  873. * 会将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样。
  874. */
  875. private void todo25() {
  876. Observable observable = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
  877. @Override
  878. public String apply(Long aLong) throws Exception {
  879. return "A" + aLong;
  880. }
  881. });
  882. Observable observable1 = Observable.intervalRange(2, 6, 2, 2, TimeUnit.SECONDS).map(new Function<Long, String>() {
  883. @Override
  884. public String apply(Long aLong) throws Exception {
  885. return "B" + aLong;
  886. }
  887. });
  888. Observable.zip(observable, observable1, new BiFunction<String, String, String>() {
  889. @Override
  890. public String apply(String s, String s1) throws Exception {
  891. return s + s1;
  892. }
  893. }).subscribe(new Observer<String>() {
  894. @Override
  895. public void onSubscribe(Disposable d) {
  896. }
  897. @Override
  898. public void onNext(String s) {
  899. Log.d("aaa ", s);
  900. }
  901. @Override
  902. public void onError(Throwable e) {
  903. }
  904. @Override
  905. public void onComplete() {
  906. }
  907. });
  908. }
  909. /**
  910. * 组合操作符-combineLatest() & combineLatestDelayError()
  911. * combineLatest() 的作用与 zip() 类似,但是 combineLatest() 发送事件的序列是与发送的时间线有关的,
  912. * 当 combineLatest() 中所有的 Observable 都发送了事件,只要其中有一个 Observable 发送事件,
  913. * 这个事件就会和其他 Observable 最近发送的事件结合起来发送,这样可能还是比较抽象,看看以下例子代码。
  914. * <p>
  915. * combineLatestDelayError() 就是多了延迟发送 onError() 功能
  916. */
  917. private void todo26() {
  918. Observable observable = Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
  919. @Override
  920. public String apply(Long aLong) throws Exception {
  921. return "A" + aLong;
  922. }
  923. });
  924. Observable observable1 = Observable.intervalRange(1, 6, 2, 2, TimeUnit.SECONDS).map(new Function<Long, String>() {
  925. @Override
  926. public String apply(Long aLong) throws Exception {
  927. return "B" + aLong;
  928. }
  929. });
  930. Observable.combineLatest(observable, observable1, new BiFunction<String, String, String>() {
  931. @Override
  932. public String apply(String s, String s1) throws Exception {
  933. return s + s1;
  934. }
  935. }).subscribe(new Observer<String>() {
  936. @Override
  937. public void onSubscribe(Disposable d) {
  938. }
  939. @Override
  940. public void onNext(String s) {
  941. Log.d("aaa ", s);
  942. }
  943. @Override
  944. public void onError(Throwable e) {
  945. }
  946. @Override
  947. public void onComplete() {
  948. }
  949. });
  950. }
  951. /**
  952. * 组合操作符-reduce
  953. * 与 scan() 操作符的作用也是将发送数据以一定逻辑聚合起来,
  954. * 这两个的区别在于 scan() 每处理一次数据就会将事件发送给观察者,而 reduce() 会将所有数据聚合在一起才会发送事件给观察者。
  955. */
  956. private void todo27() {
  957. Observable.just("aa", "bb", "cc", "dd").reduce(new BiFunction<String, String, String>() {
  958. @Override
  959. public String apply(String s, String s2) throws Exception {
  960. return s + s2;
  961. }
  962. }).subscribe(new Consumer<String>() {
  963. @Override
  964. public void accept(String s) throws Exception {
  965. Log.d("aaa", s);
  966. }
  967. });
  968. }
  969. /**
  970. * 组合操作符-collect
  971. * 将数据收集到数据结构当中。
  972. */
  973. private void todo28() {
  974. Observable.just("aa", "bb", "cc").collect(new Callable<ArrayList<String>>() {
  975. @Override
  976. public ArrayList<String> call() throws Exception {
  977. return new ArrayList<String>();
  978. }
  979. }, new BiConsumer<ArrayList<String>, String>() {
  980. @Override
  981. public void accept(ArrayList<String> strings, String s) throws Exception {
  982. strings.add(s);
  983. }
  984. }).subscribe(new Consumer<ArrayList<String>>() {
  985. @Override
  986. public void accept(ArrayList<String> strings) throws Exception {
  987. Log.d("aaa", strings + "");
  988. }
  989. });
  990. }
  991. /**
  992. * 组合操作符-startWith() & startWithArray()
  993. * 在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出。
  994. */
  995. private void todo29() {
  996. Observable.just(1).startWith(2).startWithArray(2, 5, 7).subscribe(new Consumer<Integer>() {
  997. @Override
  998. public void accept(Integer integer) throws Exception {
  999. Log.d("aaa", integer + "");
  1000. }
  1001. });
  1002. }
  1003. /**
  1004. * 组合操作符-count
  1005. * 返回被观察者发送事件的数量
  1006. */
  1007. private void todo30() {
  1008. Observable.just(1, 2).count().subscribe(new Consumer<Long>() {
  1009. @Override
  1010. public void accept(Long aLong) throws Exception {
  1011. Log.d("aaa", aLong + "");
  1012. }
  1013. });
  1014. }
  1015. /功能操作符/
  1016. /**
  1017. * 功能操作符-delay
  1018. * 延迟一段事件发送事件。
  1019. */
  1020. private void todo31() {
  1021. Observable.just(1, 2, 3)
  1022. .delay(2, TimeUnit.SECONDS)
  1023. .subscribe(new Observer<Integer>() {
  1024. @Override
  1025. public void onSubscribe(Disposable d) {
  1026. Log.d("aaa", "=======================onSubscribe");
  1027. }
  1028. @Override
  1029. public void onNext(Integer integer) {
  1030. Log.d("aaa", "=======================onNext " + integer);
  1031. }
  1032. @Override
  1033. public void onError(Throwable e) {
  1034. }
  1035. @Override
  1036. public void onComplete() {
  1037. Log.d("aaa", "=======================onSubscribe");
  1038. }
  1039. });
  1040. }
  1041. /**
  1042. * 功能操作符-doOnEach
  1043. * Observable 每发送一件事件(包括完成、错误)之前都会先回调这个方法。
  1044. */
  1045. private void todo32() {
  1046. Observable.just(1, 2, 3).doOnEach(new Consumer<Notification<Integer>>() {
  1047. @Override
  1048. public void accept(Notification<Integer> integerNotification) throws Exception {
  1049. Log.d("aaa", integerNotification.getValue() + "doOnEach");
  1050. }
  1051. }).subscribe(new Consumer<Integer>() {
  1052. @Override
  1053. public void accept(Integer integer) throws Exception {
  1054. Log.d("aaa", integer + "");
  1055. }
  1056. });
  1057. }
  1058. /**
  1059. * 功能操作符-doOnNext
  1060. * Observable 每发送 onNext() 之前都会先回调这个方法。
  1061. */
  1062. private void todo33() {
  1063. Observable.create(new ObservableOnSubscribe<Integer>() {
  1064. @Override
  1065. public void subscribe(ObservableEmitter<Integer> e) throws Exception {
  1066. e.onNext(1);
  1067. e.onNext(2);
  1068. e.onNext(3);
  1069. e.onComplete();
  1070. }
  1071. })
  1072. .doOnNext(new Consumer<Integer>() {
  1073. @Override
  1074. public void accept(Integer integer) throws Exception {
  1075. todoLog("==================doOnNext " + integer);
  1076. }
  1077. })
  1078. .subscribe(new Observer<Integer>() {
  1079. @Override
  1080. public void onSubscribe(Disposable d) {
  1081. todoLog("==================onSubscribe ");
  1082. }
  1083. @Override
  1084. public void onNext(Integer integer) {
  1085. todoLog("onNext " + integer);
  1086. }
  1087. @Override
  1088. public void onError(Throwable e) {
  1089. todoLog("onNext onError");
  1090. }
  1091. @Override
  1092. public void onComplete() {
  1093. todoLog("onNext onComplete");
  1094. }
  1095. });
  1096. }
  1097. /**
  1098. * 功能操作符-doAfterNext/doOnComplete/doOnError/doOnSubscribe/doOnDispose
  1099. * Observable 每发送 onNext() 之后都会回调这个方法。
  1100. * Observable 每发送 onComplete() 之前都会回调这个方法。
  1101. * Observable 每发送 onError() 之前都会回调这个方法。
  1102. * Observable 每发送 onSubscribe() 之前都会回调这个方法。
  1103. * 当调用 Disposable 的 dispose() 之后回调该方法。
  1104. */
  1105. public static final String TAG = "aaa";
  1106. private void todo34() {
  1107. Observable.create(new ObservableOnSubscribe<Integer>() {
  1108. @Override
  1109. public void subscribe(ObservableEmitter<Integer> e) throws Exception {
  1110. e.onNext(1);
  1111. e.onNext(2);
  1112. e.onNext(3);
  1113. e.onComplete();
  1114. }
  1115. })
  1116. .doOnDispose(new Action() {
  1117. @Override
  1118. public void run() throws Exception {
  1119. Log.d(TAG, "==================doOnDispose ");
  1120. }
  1121. })
  1122. .subscribe(new Observer<Integer>() {
  1123. private Disposable d;
  1124. @Override
  1125. public void onSubscribe(Disposable d) {
  1126. Log.d(TAG, "==================onSubscribe ");
  1127. this.d = d;
  1128. }
  1129. @Override
  1130. public void onNext(Integer integer) {
  1131. Log.d(TAG, "==================onNext " + integer);
  1132. d.dispose();
  1133. }
  1134. @Override
  1135. public void onError(Throwable e) {
  1136. Log.d(TAG, "==================onError ");
  1137. }
  1138. @Override
  1139. public void onComplete() {
  1140. Log.d(TAG, "==================onComplete ");
  1141. }
  1142. });
  1143. }
  1144. /**
  1145. * 功能操作符-doOnTerminate() & doAfterTerminate()/doFinally
  1146. * doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调。
  1147. * 在所有事件发送完毕之后回调该方法。
  1148. * <p>
  1149. * 这里可能你会有个问题,那就是 doFinally() 和 doAfterTerminate() 到底有什么区别?区别就是在于取消订阅,
  1150. * 如果取消订阅之后 doAfterTerminate() 就不会被回调,而 doFinally() 无论怎么样都会被回调,且都会在事件序列的最后。
  1151. * <p>
  1152. * onErrorReturn
  1153. * 当接受到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列。
  1154. * onErrorResumeNext
  1155. * 当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列。
  1156. * onExceptionResumeNext
  1157. * 与 onErrorResumeNext() 作用基本一致,但是这个方法只能捕捉 Exception。
  1158. * retry
  1159. * 如果出现错误事件,则会重新发送所有事件序列。times 是代表重新发的次数。
  1160. * retryUntil
  1161. * 出现错误事件之后,可以通过此方法判断是否继续发送事件。
  1162. * retryWhen
  1163. * 当被观察者接收到异常或者错误事件时会回调该方法,这个方法会返回一个新的被观察者。
  1164. * 如果返回的被观察者发送 Error 事件则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断重试发送事件
  1165. */
  1166. private void todo35() {
  1167. Observable.create(new ObservableOnSubscribe<String>() {
  1168. @Override
  1169. public void subscribe(ObservableEmitter<String> e) throws Exception {
  1170. e.onNext("chan");
  1171. e.onNext("ze");
  1172. e.onNext("de");
  1173. e.onError(new Exception("404"));
  1174. e.onNext("haha");
  1175. }
  1176. })
  1177. .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
  1178. @Override
  1179. public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
  1180. return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
  1181. @Override
  1182. public ObservableSource<?> apply(Throwable throwable) throws Exception {
  1183. if (!throwable.toString().equals("java.lang.Exception: 404")) {
  1184. return Observable.just("可以忽略的异常");
  1185. } else {
  1186. return Observable.error(new Throwable("终止啦"));
  1187. }
  1188. }
  1189. });
  1190. }
  1191. })
  1192. .subscribe(new Observer<String>() {
  1193. @Override
  1194. public void onSubscribe(Disposable d) {
  1195. Log.d(TAG, "==================onSubscribe ");
  1196. }
  1197. @Override
  1198. public void onNext(String s) {
  1199. Log.d(TAG, "==================onNext " + s);
  1200. }
  1201. @Override
  1202. public void onError(Throwable e) {
  1203. Log.d(TAG, "==================onError " + e.toString());
  1204. }
  1205. @Override
  1206. public void onComplete() {
  1207. Log.d(TAG, "==================onComplete ");
  1208. }
  1209. });
  1210. }
  1211. /**
  1212. * 功能操作符-repeat
  1213. * 重复发送被观察者的事件,times 为发送次数。
  1214. * repeatWhen
  1215. * 这个方法可以会返回一个新的被观察者设定一定逻辑来决定是否重复发送事件。
  1216. * subscribeOn
  1217. * 指定被观察者的线程,要注意的时,如果多次调用此方法,只有第一次有效。
  1218. * observeOn()
  1219. * 指定观察者的线程,每指定一次就会生效一次。
  1220. * <p>
  1221. * RxJava 中的调度器:
  1222. * Schedulers.computation( ) 用于使用计算任务,如事件循环和回调处理
  1223. * Schedulers.immediate( ) 当前线程
  1224. * Schedulers.io( ) 用于 IO 密集型任务,如果异步阻塞 IO 操作。
  1225. * Schedulers.newThread( ) 创建一个新的线程
  1226. * AndroidSchedulers.mainThread() Android 的 UI 线程,用于操作 UI。
  1227. */
  1228. private void todo36() {
  1229. Observable.create(new ObservableOnSubscribe<Integer>() {
  1230. @Override
  1231. public void subscribe(ObservableEmitter<Integer> e) throws Exception {
  1232. Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
  1233. e.onNext(1);
  1234. e.onNext(2);
  1235. e.onNext(3);
  1236. e.onComplete();
  1237. }
  1238. })
  1239. .subscribeOn(Schedulers.newThread())
  1240. .subscribe(new Observer<Integer>() {
  1241. @Override
  1242. public void onSubscribe(Disposable d) {
  1243. Log.d(TAG, "======================onSubscribe");
  1244. }
  1245. @Override
  1246. public void onNext(Integer integer) {
  1247. Log.d(TAG, "======================onNext " + integer);
  1248. }
  1249. @Override
  1250. public void onError(Throwable e) {
  1251. Log.d(TAG, "======================onError");
  1252. }
  1253. @Override
  1254. public void onComplete() {
  1255. Log.d(TAG, "======================onComplete");
  1256. }
  1257. });
  1258. }
  1259. ///过滤操作符/
  1260. /**
  1261. * 过滤操作符-filter
  1262. * 通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送。
  1263. */
  1264. private void todo37() {
  1265. Observable.just(1, 2, 3)
  1266. .filter(new Predicate<Integer>() {
  1267. @Override
  1268. public boolean test(Integer integer) throws Exception {
  1269. return integer < 2;
  1270. }
  1271. })
  1272. .subscribe(new Observer<Integer>() {
  1273. @Override
  1274. public void onSubscribe(Disposable d) {
  1275. Log.d(TAG, "==================onSubscribe ");
  1276. }
  1277. @Override
  1278. public void onNext(Integer integer) {
  1279. Log.d(TAG, "==================onNext " + integer);
  1280. }
  1281. @Override
  1282. public void onError(Throwable e) {
  1283. Log.d(TAG, "==================onError ");
  1284. }
  1285. @Override
  1286. public void onComplete() {
  1287. Log.d(TAG, "==================onComplete ");
  1288. }
  1289. });
  1290. }
  1291. /**
  1292. * 过滤操作符-ofType
  1293. * 可以过滤不符合该类型事件
  1294. */
  1295. private void todo38() {
  1296. Observable.just(1, 2, 3, "chan", "zhide")
  1297. .ofType(Integer.class)
  1298. .subscribe(new Observer<Integer>() {
  1299. @Override
  1300. public void onSubscribe(Disposable d) {
  1301. Log.d(TAG, "==================onSubscribe ");
  1302. }
  1303. @Override
  1304. public void onNext(Integer integer) {
  1305. Log.d(TAG, "==================onNext " + integer);
  1306. }
  1307. @Override
  1308. public void onError(Throwable e) {
  1309. Log.d(TAG, "==================onError ");
  1310. }
  1311. @Override
  1312. public void onComplete() {
  1313. Log.d(TAG, "==================onComplete ");
  1314. }
  1315. });
  1316. }
  1317. /**
  1318. * 过滤操作符-skip
  1319. * 跳过正序某些事件,count 代表跳过事件的数量
  1320. */
  1321. private void todo39() {
  1322. Observable.just(1, 2, 3)
  1323. .skip(2)
  1324. .subscribe(new Observer<Integer>() {
  1325. @Override
  1326. public void onSubscribe(Disposable d) {
  1327. Log.d(TAG, "==================onSubscribe ");
  1328. }
  1329. @Override
  1330. public void onNext(Integer integer) {
  1331. Log.d(TAG, "==================onNext " + integer);
  1332. }
  1333. @Override
  1334. public void onError(Throwable e) {
  1335. Log.d(TAG, "==================onError ");
  1336. }
  1337. @Override
  1338. public void onComplete() {
  1339. Log.d(TAG, "==================onComplete ");
  1340. }
  1341. });
  1342. }
  1343. /**
  1344. * 过滤操作符-distinct
  1345. * 过滤事件序列中的重复事件。
  1346. */
  1347. private void todo40() {
  1348. Observable.just(1, 2, 3, 3, 2, 1)
  1349. .distinct()
  1350. .subscribe(new Observer<Integer>() {
  1351. @Override
  1352. public void onSubscribe(Disposable d) {
  1353. Log.d(TAG, "==================onSubscribe ");
  1354. }
  1355. @Override
  1356. public void onNext(Integer integer) {
  1357. Log.d(TAG, "==================onNext " + integer);
  1358. }
  1359. @Override
  1360. public void onError(Throwable e) {
  1361. Log.d(TAG, "==================onError ");
  1362. }
  1363. @Override
  1364. public void onComplete() {
  1365. Log.d(TAG, "==================onComplete ");
  1366. }
  1367. });
  1368. }
  1369. /**
  1370. * 过滤操作符-distinctUntilChanged
  1371. * 过滤掉连续重复的事件
  1372. */
  1373. private void todo41() {
  1374. Observable.just(1, 2, 3, 3, 2, 1)
  1375. .distinctUntilChanged()
  1376. .subscribe(new Observer<Integer>() {
  1377. @Override
  1378. public void onSubscribe(Disposable d) {
  1379. Log.d(TAG, "==================onSubscribe ");
  1380. }
  1381. @Override
  1382. public void onNext(Integer integer) {
  1383. Log.d(TAG, "==================onNext " + integer);
  1384. }
  1385. @Override
  1386. public void onError(Throwable e) {
  1387. Log.d(TAG, "==================onError ");
  1388. }
  1389. @Override
  1390. public void onComplete() {
  1391. Log.d(TAG, "==================onComplete ");
  1392. }
  1393. });
  1394. }
  1395. /**
  1396. * 过滤操作符-take
  1397. * 控制观察者接收的事件的数量。
  1398. * takeLast() 的作用就是控制观察者只能接受事件序列的后面几件事情
  1399. */
  1400. private void todo42() {
  1401. Observable.just(1, 2, 3, 4, 5)
  1402. .take(3)
  1403. .subscribe(new Observer<Integer>() {
  1404. @Override
  1405. public void onSubscribe(Disposable d) {
  1406. Log.d(TAG, "==================onSubscribe ");
  1407. }
  1408. @Override
  1409. public void onNext(Integer integer) {
  1410. Log.d(TAG, "==================onNext " + integer);
  1411. }
  1412. @Override
  1413. public void onError(Throwable e) {
  1414. Log.d(TAG, "==================onError ");
  1415. }
  1416. @Override
  1417. public void onComplete() {
  1418. Log.d(TAG, "==================onComplete ");
  1419. }
  1420. });
  1421. }
  1422. /**
  1423. * 过滤操作符-debounce
  1424. * 如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者。
  1425. * throttleWithTimeout() 与此方法的作用一样
  1426. */
  1427. private void todo43() {
  1428. Observable.create(new ObservableOnSubscribe<Integer>() {
  1429. @Override
  1430. public void subscribe(ObservableEmitter<Integer> e) throws Exception {
  1431. e.onNext(1);
  1432. Thread.sleep(900);
  1433. e.onNext(2);
  1434. }
  1435. })
  1436. .debounce(1, TimeUnit.SECONDS)
  1437. .subscribe(new Observer<Integer>() {
  1438. @Override
  1439. public void onSubscribe(Disposable d) {
  1440. Log.d(TAG, "===================onSubscribe ");
  1441. }
  1442. @Override
  1443. public void onNext(Integer integer) {
  1444. Log.d(TAG, "===================onNext " + integer);
  1445. }
  1446. @Override
  1447. public void onError(Throwable e) {
  1448. Log.d(TAG, "===================onError ");
  1449. }
  1450. @Override
  1451. public void onComplete() {
  1452. Log.d(TAG, "===================onComplete ");
  1453. }
  1454. });
  1455. }
  1456. /**
  1457. * 过滤操作符-firstElement() && lastElement()
  1458. * firstElement() 取事件序列的第一个元素,lastElement() 取事件序列的最后一个元素。
  1459. */
  1460. private void todo44() {
  1461. Observable.just(1, 2, 3, 4)
  1462. .firstElement()
  1463. .subscribe(new Consumer<Integer>() {
  1464. @Override
  1465. public void accept(Integer integer) throws Exception {
  1466. Log.d(TAG, "====================firstElement " + integer);
  1467. }
  1468. });
  1469. Observable.just(1, 2, 3, 4)
  1470. .lastElement()
  1471. .subscribe(new Consumer<Integer>() {
  1472. @Override
  1473. public void accept(Integer integer) throws Exception {
  1474. Log.d(TAG, "====================lastElement " + integer);
  1475. }
  1476. });
  1477. }
  1478. /**
  1479. * 过滤操作符-elementAt() & elementAtOrError()
  1480. * elementAt() 可以指定取出事件序列中事件,但是输入的 index 超出事件序列的总数的话就不会出现任何结果。
  1481. * 这种情况下,你想发出异常信息的话就用 elementAtOrError() 。
  1482. */
  1483. private void todo45() {
  1484. Observable.just(1, 2, 3, 4)
  1485. .elementAtOrError(5)
  1486. .subscribe(new Consumer<Integer>() {
  1487. @Override
  1488. public void accept(Integer integer) throws Exception {
  1489. Log.d(TAG, "====================accept " + integer);
  1490. }
  1491. });
  1492. }
  1493. /条件操作符/
  1494. /**
  1495. * 条件操作符-all
  1496. * 判断事件序列是否全部满足某个事件,如果都满足则返回 true,反之则返回 false。
  1497. */
  1498. private void todo46() {
  1499. Observable.just(1, 2, 3, 4)
  1500. .all(new Predicate<Integer>() {
  1501. @Override
  1502. public boolean test(Integer integer) throws Exception {
  1503. return integer < 5;
  1504. }
  1505. })
  1506. .subscribe(new Consumer<Boolean>() {
  1507. @Override
  1508. public void accept(Boolean aBoolean) throws Exception {
  1509. Log.d(TAG, "==================aBoolean " + aBoolean);
  1510. }
  1511. });
  1512. }
  1513. /**
  1514. * 条件操作符-takeWhile
  1515. * 可以设置条件,当某个数据满足条件时就会发送该数据,反之则不发送。
  1516. */
  1517. private void todo47() {
  1518. Observable.just(1, 2, 3, 4)
  1519. .takeWhile(new Predicate<Integer>() {
  1520. @Override
  1521. public boolean test(Integer integer) throws Exception {
  1522. return integer < 3;
  1523. }
  1524. })
  1525. .subscribe(new Consumer<Integer>() {
  1526. @Override
  1527. public void accept(Integer integer) throws Exception {
  1528. Log.d(TAG, "========================integer " + integer);
  1529. }
  1530. });
  1531. }
  1532. /**
  1533. * 条件操作符-takeUntil
  1534. * 可以设置条件,当事件满足此条件时,下一次的事件就不会被发送了。
  1535. */
  1536. private void todo48() {
  1537. Observable.just(1, 2, 3, 4, 5, 6)
  1538. .takeUntil(new Predicate<Integer>() {
  1539. @Override
  1540. public boolean test(Integer integer) throws Exception {
  1541. return integer > 3;
  1542. }
  1543. })
  1544. .subscribe(new Consumer<Integer>() {
  1545. @Override
  1546. public void accept(Integer integer) throws Exception {
  1547. Log.d(TAG, "========================integer " + integer);
  1548. }
  1549. });
  1550. }
  1551. /**
  1552. * 条件操作符-skipUntil
  1553. * 当 skipUntil() 中的 Observable 发送事件了,原来的 Observable 才会发送事件给观察者。
  1554. */
  1555. private void todo49() {
  1556. Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
  1557. .skipUntil(Observable.intervalRange(6, 5, 3, 1, TimeUnit.SECONDS))
  1558. .subscribe(new Observer<Long>() {
  1559. @Override
  1560. public void onSubscribe(Disposable d) {
  1561. Log.d(TAG, "========================onSubscribe ");
  1562. }
  1563. @Override
  1564. public void onNext(Long along) {
  1565. Log.d(TAG, "========================onNext " + along);
  1566. }
  1567. @Override
  1568. public void onError(Throwable e) {
  1569. Log.d(TAG, "========================onError ");
  1570. }
  1571. @Override
  1572. public void onComplete() {
  1573. Log.d(TAG, "========================onComplete ");
  1574. }
  1575. });
  1576. }
  1577. /**
  1578. * 条件操作符-sequenceEqual
  1579. * 判断两个 Observable 发送的事件是否相同。
  1580. */
  1581. private void todo50() {
  1582. Observable.sequenceEqual(Observable.just(1, 2, 3),
  1583. Observable.just(1, 2, 3))
  1584. .subscribe(new Consumer<Boolean>() {
  1585. @Override
  1586. public void accept(Boolean aBoolean) throws Exception {
  1587. Log.d(TAG, "========================onNext " + aBoolean);
  1588. }
  1589. });
  1590. }
  1591. /**
  1592. * 条件操作符-contains
  1593. * 判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false。
  1594. */
  1595. private void todo51() {
  1596. Observable.just(1, 2, 3)
  1597. .contains(3)
  1598. .subscribe(new Consumer<Boolean>() {
  1599. @Override
  1600. public void accept(Boolean aBoolean) throws Exception {
  1601. Log.d(TAG, "========================onNext " + aBoolean);
  1602. }
  1603. });
  1604. }
  1605. /**
  1606. * 条件操作符-isEmpty
  1607. * 判断事件序列是否为空。
  1608. */
  1609. private void todo52() {
  1610. Observable.create(new ObservableOnSubscribe<Integer>() {
  1611. @Override
  1612. public void subscribe(ObservableEmitter<Integer> e) throws Exception {
  1613. e.onComplete();
  1614. }
  1615. })
  1616. .isEmpty()
  1617. .subscribe(new Consumer<Boolean>() {
  1618. @Override
  1619. public void accept(Boolean aBoolean) throws Exception {
  1620. Log.d(TAG, "========================onNext " + aBoolean);
  1621. }
  1622. });
  1623. }
  1624. /**
  1625. * 条件操作符-amb
  1626. * amb() 要传入一个 Observable 集合,但是只会发送最先发送事件的 Observable 中的事件,其余 Observable 将会被丢弃。
  1627. */
  1628. private void todo53() {
  1629. ArrayList<Observable<Long>> list = new ArrayList<>();
  1630. list.add(Observable.intervalRange(1, 5, 2, 1, TimeUnit.SECONDS));
  1631. list.add(Observable.intervalRange(6, 5, 0, 1, TimeUnit.SECONDS));
  1632. Observable.amb(list)
  1633. .subscribe(new Consumer<Long>() {
  1634. @Override
  1635. public void accept(Long aLong) throws Exception {
  1636. Log.d(TAG, "========================aLong " + aLong);
  1637. }
  1638. });
  1639. }
  1640. /**
  1641. * 条件操作符-defaultIfEmpty
  1642. * 如果观察者只发送一个 onComplete() 事件,则可以利用这个方法发送一个值。
  1643. */
  1644. private void todo54() {
  1645. Observable.create(new ObservableOnSubscribe<Integer>() {
  1646. @Override
  1647. public void subscribe(ObservableEmitter<Integer> e) throws Exception {
  1648. e.onComplete();
  1649. }
  1650. })
  1651. .defaultIfEmpty(666)
  1652. .subscribe(new Consumer<Integer>() {
  1653. @Override
  1654. public void accept(Integer integer) throws Exception {
  1655. Log.d(TAG, "========================onNext " + integer);
  1656. }
  1657. });
  1658. }
  1659. private void todoLog(String s) {
  1660. Log.d("aaa", s);
  1661. }
  1662. }

  1. package com.example.entity;
  2. import java.util.List;
  3. public class Persion {
  4. /**
  5. * persionName :
  6. * planList : [{"playName":"","actionList":["aaa","bbb"]}]
  7. */
  8. private String persionName;
  9. private List<PlanListBean> planList;
  10. public String getPersionName() {
  11. return persionName;
  12. }
  13. public void setPersionName(String persionName) {
  14. this.persionName = persionName;
  15. }
  16. public List<PlanListBean> getPlanList() {
  17. return planList;
  18. }
  19. public void setPlanList(List<PlanListBean> planList) {
  20. this.planList = planList;
  21. }
  22. public static class PlanListBean {
  23. /**
  24. * playName :
  25. * actionList : ["aaa","bbb"]
  26. */
  27. private String playName;
  28. private List<String> actionList;
  29. public String getPlayName() {
  30. return playName;
  31. }
  32. public void setPlayName(String playName) {
  33. this.playName = playName;
  34. }
  35. public List<String> getActionList() {
  36. return actionList;
  37. }
  38. public void setActionList(List<String> actionList) {
  39. this.actionList = actionList;
  40. }
  41. }
  42. }


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/300777
推荐阅读
相关标签
  

闽ICP备14008679号