赞
踩
需要依赖的包
implementation 'io.reactivex.rxjava2:rxjava:2.1.3' implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
public class RxJavaDemo1Activity extends AppCompatActivity implements View.OnClickListener { protected Button btnSend1; protected Button btnSend2; protected Button btnSend3; protected Button btnSend4; protected Button btnSend5; protected Button btnSend6; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); super.setContentView(R.layout.activity_rx_java_demo1); initView(); } @Override public void onClick(View view) { if (view.getId() == R.id.btn_send1) { test1();//普通使用 } else if (view.getId() == R.id.btn_send2) { test2();//链式调用 } else if (view.getId() == R.id.btn_send3) { test3();//发送中,中断. } else if (view.getId() == R.id.btn_send4) { test4();//只关心onnext事件的操作 } else if (view.getId() == R.id.btn_send5) { test5();// } else if (view.getId() == R.id.btn_send6) { test6(); } } private void test6() { DialogUtil.showListDialog(this, "rxjava的操作符号使用", new String[]{ "0map()操作符", "1flatMap()操作符", "2filter()操作符", "3take()操作符", "4doOnNext()操作符", }, new DialogInterface.OnClickListener() { @Override public void onClick(DialogInterface dialog, int which) { switch (which) { case 0: map0(); break; case 1: map1(); break; case 2: map2(); break; case 3: map3(); break; case 4: map4(); break; } } }); } private void map4() { // 4doOnNext()操作符 Observable.just(new ArrayList<String>(){ { for (int i = 0; i < 6; i++) { add("data"+i); } } }).flatMap(new Function<List<String>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(List<String> strings) throws Exception { return Observable.fromIterable(strings); } }).take(5).doOnNext(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { DemonstrateUtil.showLogResult("额外的准备工作!"); } }).subscribe(new Consumer<Object>() { @Override public void accept(Object s) throws Exception { DemonstrateUtil.showLogResult(s.toString()); } }); } private void map3() { // 3take()操作符 Observable.just(new ArrayList<String>(){ { for (int i = 0; i < 8; i++) { add("data"+i); } } }).flatMap(new Function<List<String>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(List<String> strings) throws Exception { return Observable.fromIterable(strings); } }).take(5).subscribe(new Consumer<Object>() { @Override public void accept(Object s) throws Exception { DemonstrateUtil.showLogResult(s.toString()); } }); } private void map2() { Observable .just(new ArrayList<String>(){ { for (int i = 0; i < 5; i++) { add("data"+i); } } }) .flatMap(new Function<List<String>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(List<String> strings) throws Exception { return Observable.fromIterable(strings); } }).filter(new Predicate<Object>() { @Override public boolean test(Object s) throws Exception { String newStr = (String) s; if (newStr.contains("3")){ return true; } return false; } }).subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { DemonstrateUtil.showLogResult((String)o); } }); } private void map1() { // 1flatMap()操作符 Observable.just(new ArrayList<String>(){ { for (int i = 0; i < 3; i++) { add("data"+i); } } }).flatMap(new Function<List<String>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(List<String> strings) throws Exception { return Observable.fromIterable(strings); } }).subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Object o) { DemonstrateUtil.showLogResult("flatMap转换后,接收到的"+o); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } private void map0() { // 0map()操作符 Observable.just("hellorxjava") .map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return s.length(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { DemonstrateUtil.showLogResult("接收到被转换的数据结果:"+integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } private void test5() { DialogUtil.showListDialog(this, "rxjava的其他操作", new String[]{ "0just()方式创建Observable", "1fromIterable()方式创建Observable", "2defer()方式创建Observable", "3interval( )方式创建Observable", "4timer( )方式创建Observable", "5range( )方式创建Observable", "6repeat( )方式创建Observable", }, new DialogInterface.OnClickListener() { @Override public void onClick(DialogInterface dialog, int which) { switch (which) { case 0: other0(); break; case 1: other1(); break; case 2: other2(); break; case 3: other3(); break; case 4: other4(); break; case 5: other5(); break; case 6: other6(); break; } } }); } private void other6() { // 6repeat( )方式创建Observable Observable.just(123).repeat().subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { DemonstrateUtil.showLogResult("重复integer" + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } private void other5() { // 5range( )方式创建Observable Observable.range(1, 5).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { DemonstrateUtil.showLogResult("连续收到:" + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } private void other4() { // 4timer( )方式创建Observable Observable.timer(5, TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { DemonstrateUtil.showLogResult("延迟5s后调用了:onNext"); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } private void other3() { // 3interval( )方式创建Observable Observable.interval(3, TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { DemonstrateUtil.showLogResult("数字是:" + aLong); //DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this,"数字是:"+aLong); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } private void other2() { Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() { @Override public ObservableSource<? extends String> call() throws Exception { return Observable.just("hello,defer"); } }); //上游衔接下游! observable.subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { DemonstrateUtil.showLogResult(s); DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this, s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } private void other1() { Observable.fromIterable(new ArrayList<String>() { { for (int i = 0; i < 5; i++) { add("Hello," + i); } } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this, s); DemonstrateUtil.showLogResult(s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } private void other0() { Observable.just("hello,you hao!").subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { DemonstrateUtil.showLogResult(s); DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this, s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } private void test4() { //只关心onnext事件的操作 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { DemonstrateUtil.showLogResult("emitter 1"); emitter.onNext(1); DemonstrateUtil.showLogResult("emitter 2"); emitter.onNext(2); DemonstrateUtil.showLogResult("emitter 3"); emitter.onNext(3); DemonstrateUtil.showLogResult("complete"); emitter.onComplete(); DemonstrateUtil.showLogResult("emitter 4"); emitter.onNext(4); } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { DemonstrateUtil.showLogResult("accept:" + integer); } }); } private void test3() { // 发送中,中断. Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { DemonstrateUtil.showLogResult("emitter 1"); emitter.onNext(1); DemonstrateUtil.showLogResult("emitter 2"); emitter.onNext(2); DemonstrateUtil.showLogResult("emitter 3"); emitter.onNext(3); DemonstrateUtil.showLogResult("complete"); emitter.onComplete(); DemonstrateUtil.showLogResult("emitter 4"); emitter.onNext(4); } }).subscribe(new Observer<Integer>() { private Disposable mDisposable; private int i; @Override public void onSubscribe(Disposable d) { DemonstrateUtil.showLogResult("subscribe"); mDisposable = d; } @Override public void onNext(Integer value) { DemonstrateUtil.showLogResult("onNext:" + value); i++; if (i == 2) { DemonstrateUtil.showLogResult("dispose:" + value); mDisposable.dispose(); DemonstrateUtil.showLogResult("isDisposed : " + mDisposable.isDisposed()); } } @Override public void onError(Throwable e) { DemonstrateUtil.showLogResult("error:"); } @Override public void onComplete() { DemonstrateUtil.showLogResult("complete"); } }); } private void test2() { //链式调用 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { DemonstrateUtil.showLogResult("onSubscribe"); } @Override public void onNext(Integer integer) { DemonstrateUtil.showLogResult("onNext-->integer" + integer); } @Override public void onError(Throwable e) { DemonstrateUtil.showLogResult("onError"); } @Override public void onComplete() { DemonstrateUtil.showLogResult("onComplete"); } }); } private void test1() { //创建上游,数据发射源! //ObservableOnSubscribe对象作为参数,它的作用相当于一个计划表,当 Observable被订阅的时候, // ObservableOnSubscribe的subscribe()方法会自动被调用,事件序列就会依照设定依次触发 //ObservableEmitter,发射器,触发事件. Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }); //创建下游,数据接收处! Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { DemonstrateUtil.showLogResult("onSubscribe"); } @Override public void onNext(Integer integer) { DemonstrateUtil.showLogResult("onNext--integer" + integer); } @Override public void onError(Throwable e) { DemonstrateUtil.showLogResult("onError"); } @Override public void onComplete() { DemonstrateUtil.showLogResult("onComplete"); } }; //数据源连接接收处,上游衔接下游! //只有当上游和下游建立连接之后, 上游才会开始发送事件 observable.subscribe(observer); } private void initView() { btnSend1 = (Button) findViewById(R.id.btn_send1); btnSend1.setOnClickListener(RxJavaDemo1Activity.this); btnSend2 = (Button) findViewById(R.id.btn_send2); btnSend2.setOnClickListener(RxJavaDemo1Activity.this); btnSend3 = (Button) findViewById(R.id.btn_send3); btnSend3.setOnClickListener(RxJavaDemo1Activity.this); btnSend4 = (Button) findViewById(R.id.btn_send4); btnSend4.setOnClickListener(RxJavaDemo1Activity.this); btnSend5 = (Button) findViewById(R.id.btn_send5); btnSend5.setOnClickListener(RxJavaDemo1Activity.this); btnSend6 = (Button) findViewById(R.id.btn_send6); btnSend6.setOnClickListener(RxJavaDemo1Activity.this); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。