当前位置:   article > 正文

Rxjava_io.reactivex.rxjava2:rxjava:2.1.3

io.reactivex.rxjava2:rxjava:2.1.3

需要依赖的包

 

implementation 'io.reactivex.rxjava2:rxjava:2.1.3'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'



public class RxJavaDemo1Activity extends AppCompatActivity implements View.OnClickListener {

    protected Button btnSend1;
    protected Button btnSend2;
    protected Button btnSend3;
    protected Button btnSend4;
    protected Button btnSend5;
    protected Button btnSend6;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        super.setContentView(R.layout.activity_rx_java_demo1);
        initView();
    }

    @Override
    public void onClick(View view) {
        if (view.getId() == R.id.btn_send1) {
            test1();//普通使用
        } else if (view.getId() == R.id.btn_send2) {
            test2();//链式调用
        } else if (view.getId() == R.id.btn_send3) {
            test3();//发送中,中断.
        } else if (view.getId() == R.id.btn_send4) {
            test4();//只关心onnext事件的操作
        } else if (view.getId() == R.id.btn_send5) {
            test5();//
        } else if (view.getId() == R.id.btn_send6) {
            test6();
        }
    }

    private void test6() {
        DialogUtil.showListDialog(this, "rxjava的操作符号使用", new String[]{
                "0map()操作符",
                "1flatMap()操作符",
                "2filter()操作符",
                "3take()操作符",
                "4doOnNext()操作符",
        }, new DialogInterface.OnClickListener() {
            @Override
            public void onClick(DialogInterface dialog, int which) {
                switch (which) {
                    case 0:
                        map0();
                        break;
                    case 1:
                        map1();
                        break;
                    case 2:
                        map2();
                        break;
                    case 3:
                        map3();
                        break;
                    case 4:
                        map4();
                        break;
                }
            }
        });
    }

    private void map4() {
//        4doOnNext()操作符
        Observable.just(new ArrayList<String>(){
            {
                for (int i = 0; i < 6; i++) {
                    add("data"+i);
                }
            }
        }).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        }).take(5).doOnNext(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                DemonstrateUtil.showLogResult("额外的准备工作!");
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object s) throws Exception {
                DemonstrateUtil.showLogResult(s.toString());
            }
        });
    }

    private void map3() {
//        3take()操作符
        Observable.just(new ArrayList<String>(){
            {
                for (int i = 0; i < 8; i++) {
                    add("data"+i);
                }
            }
        }).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        }).take(5).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object s) throws Exception {
                DemonstrateUtil.showLogResult(s.toString());
            }
        });
    }

    private void map2() {
        Observable
                .just(new ArrayList<String>(){
                    {
                        for (int i = 0; i < 5; i++) {
                            add("data"+i);
                        }
                    }
                })
                .flatMap(new Function<List<String>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(List<String> strings) throws Exception {
                        return Observable.fromIterable(strings);
                    }
                }).filter(new Predicate<Object>() {
            @Override
            public boolean test(Object s) throws Exception {
                String newStr = (String) s;
                if (newStr.contains("3")){
                    return true;
                }
                return false;
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                DemonstrateUtil.showLogResult((String)o);
            }
        });
    }

    private void map1() {
//        1flatMap()操作符
        Observable.just(new ArrayList<String>(){
            {
                for (int i = 0; i < 3; i++) {
                    add("data"+i);
                }
            }
        }).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {
                DemonstrateUtil.showLogResult("flatMap转换后,接收到的"+o);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

    private void map0() {
//        0map()操作符
        Observable.just("hellorxjava")
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        return s.length();
                    }
                }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                DemonstrateUtil.showLogResult("接收到被转换的数据结果:"+integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

    private void test5() {
        DialogUtil.showListDialog(this, "rxjava的其他操作", new String[]{
                "0just()方式创建Observable",
                "1fromIterable()方式创建Observable",
                "2defer()方式创建Observable",
                "3interval( )方式创建Observable",
                "4timer( )方式创建Observable",
                "5range( )方式创建Observable",
                "6repeat( )方式创建Observable",
        }, new DialogInterface.OnClickListener() {
            @Override
            public void onClick(DialogInterface dialog, int which) {
                switch (which) {
                    case 0:
                        other0();
                        break;
                    case 1:
                        other1();
                        break;
                    case 2:
                        other2();
                        break;
                    case 3:
                        other3();
                        break;
                    case 4:
                        other4();
                        break;
                    case 5:
                        other5();
                        break;
                    case 6:
                        other6();
                        break;
                }
            }
        });
    }

    private void other6() {
//        6repeat( )方式创建Observable
        Observable.just(123).repeat().subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                DemonstrateUtil.showLogResult("重复integer" + integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

    private void other5() {
//        5range( )方式创建Observable
        Observable.range(1, 5).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                DemonstrateUtil.showLogResult("连续收到:" + integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

    private void other4() {
//        4timer( )方式创建Observable
        Observable.timer(5, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                DemonstrateUtil.showLogResult("延迟5s后调用了:onNext");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

    private void other3() {
//        3interval( )方式创建Observable
        Observable.interval(3, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                DemonstrateUtil.showLogResult("数字是:" + aLong);
                //DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this,"数字是:"+aLong);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

    private void other2() {
        Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> call() throws Exception {
                return Observable.just("hello,defer");
            }
        });

        //上游衔接下游!
        observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                DemonstrateUtil.showLogResult(s);
                DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this, s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

    private void other1() {
        Observable.fromIterable(new ArrayList<String>() {
            {
                for (int i = 0; i < 5; i++) {
                    add("Hello," + i);
                }
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this, s);
                DemonstrateUtil.showLogResult(s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

    private void other0() {
        Observable.just("hello,you hao!").subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                DemonstrateUtil.showLogResult(s);
                DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this, s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

    private void test4() {
        //只关心onnext事件的操作
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                DemonstrateUtil.showLogResult("emitter 1");
                emitter.onNext(1);

                DemonstrateUtil.showLogResult("emitter 2");
                emitter.onNext(2);

                DemonstrateUtil.showLogResult("emitter 3");
                emitter.onNext(3);

                DemonstrateUtil.showLogResult("complete");
                emitter.onComplete();

                DemonstrateUtil.showLogResult("emitter 4");
                emitter.onNext(4);
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                DemonstrateUtil.showLogResult("accept:" + integer);
            }
        });
    }

    private void test3() {
//        发送中,中断.
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                DemonstrateUtil.showLogResult("emitter 1");
                emitter.onNext(1);

                DemonstrateUtil.showLogResult("emitter 2");
                emitter.onNext(2);

                DemonstrateUtil.showLogResult("emitter 3");
                emitter.onNext(3);

                DemonstrateUtil.showLogResult("complete");
                emitter.onComplete();

                DemonstrateUtil.showLogResult("emitter 4");
                emitter.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            private Disposable mDisposable;
            private int i;

            @Override
            public void onSubscribe(Disposable d) {
                DemonstrateUtil.showLogResult("subscribe");
                mDisposable = d;
            }

            @Override
            public void onNext(Integer value) {
                DemonstrateUtil.showLogResult("onNext:" + value);
                i++;
                if (i == 2) {
                    DemonstrateUtil.showLogResult("dispose:" + value);
                    mDisposable.dispose();
                    DemonstrateUtil.showLogResult("isDisposed : " + mDisposable.isDisposed());
                }
            }

            @Override
            public void onError(Throwable e) {
                DemonstrateUtil.showLogResult("error:");
            }

            @Override
            public void onComplete() {
                DemonstrateUtil.showLogResult("complete");
            }
        });

    }

    private void test2() {
        //链式调用
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                DemonstrateUtil.showLogResult("onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                DemonstrateUtil.showLogResult("onNext-->integer" + integer);
            }

            @Override
            public void onError(Throwable e) {
                DemonstrateUtil.showLogResult("onError");
            }

            @Override
            public void onComplete() {
                DemonstrateUtil.showLogResult("onComplete");
            }
        });
    }

    private void test1() {

        //创建上游,数据发射源!
        //ObservableOnSubscribe对象作为参数,它的作用相当于一个计划表,当 Observable被订阅的时候,
        // ObservableOnSubscribe的subscribe()方法会自动被调用,事件序列就会依照设定依次触发
        //ObservableEmitter,发射器,触发事件.
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        });

        //创建下游,数据接收处!
        Observer<Integer> observer = new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                DemonstrateUtil.showLogResult("onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                DemonstrateUtil.showLogResult("onNext--integer" + integer);
            }

            @Override
            public void onError(Throwable e) {
                DemonstrateUtil.showLogResult("onError");
            }

            @Override
            public void onComplete() {
                DemonstrateUtil.showLogResult("onComplete");
            }
        };

        //数据源连接接收处,上游衔接下游!
        //只有当上游和下游建立连接之后, 上游才会开始发送事件
        observable.subscribe(observer);
    }

    private void initView() {
        btnSend1 = (Button) findViewById(R.id.btn_send1);
        btnSend1.setOnClickListener(RxJavaDemo1Activity.this);
        btnSend2 = (Button) findViewById(R.id.btn_send2);
        btnSend2.setOnClickListener(RxJavaDemo1Activity.this);
        btnSend3 = (Button) findViewById(R.id.btn_send3);
        btnSend3.setOnClickListener(RxJavaDemo1Activity.this);
        btnSend4 = (Button) findViewById(R.id.btn_send4);
        btnSend4.setOnClickListener(RxJavaDemo1Activity.this);
        btnSend5 = (Button) findViewById(R.id.btn_send5);
        btnSend5.setOnClickListener(RxJavaDemo1Activity.this);
        btnSend6 = (Button) findViewById(R.id.btn_send6);
        btnSend6.setOnClickListener(RxJavaDemo1Activity.this);
    }
}

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/300784
推荐阅读
相关标签
  

闽ICP备14008679号