赞
踩
https://www.jianshu.com/p/7474950af2df
这个比较简单,整个流程大致是:
1.通过Obsrvable.create方法,调用OkHttp网络请求
2.通过map方法结合gson,将response转换为bean类
3.通过onNext,解析bean中数据,并进行数据库存储
4.调度线程
5.通过subscribe,根据请求成功或异常来更新UI
Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception { Request.Builder builder = new Request.Builder() .url("url") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); } }).map(new Function<Response, Bean>() { @Override public Bean apply(@NonNull Response response) throws Exception { //Gson } }).doOnNext(new Consumer<Bean>() { @Override public void accept(@NonNull Bean bean) throws Exception { //saveData } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Bean>() { @Override public void accept(@NonNull Bean bean) throws Exception { //refresh UI } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { //get ERROR } });
这里主要是依赖于flatMap关键字,FlatMap可以将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。
利用这个特性,我们可以将Observable转成另一个Observable
Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception { Request.Builder builder = new Request.Builder() .url("url") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); } }).map(new Function<Response, FirstBean>() { @Override public FirstBean apply(@NonNull Response response) throws Exception { //Gson } }).flatMap(new Function<FirstBean, ObservableSource<Response>>() { @Override public ObservableSource<Response> apply(@NonNull FirstBean bean) throws Exception { final String s = bean.getData(); return Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception { Request.Builder builder = new Request.Builder() .url("url/" + s) .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); } }); } }).map(new Function<Response, SecondBean>() { @Override public SecondBean apply(@NonNull Response response) throws Exception { //Gson } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<SecondBean>() { @Override public void accept(@NonNull SecondBean secondBean) throws Exception { //refresh UI } });
(前后都执行,先缓存相关,然后获取网络)
这里需要依赖另一个操作符:Concat
concat可以做到不交错的发射两个或多个Observable的发射物,并且只有前一个Observable终止(onComleted)才会订阅下一个Obervable
利用这个特性,我们就可以依次的读取缓存数据展示UI,然后再获取网络数据刷新UI
1.首先创建一个从cache获取数据的observable
2.再创建一个从网络获取数据的Observable(可以通过map等方法转换数据类型)
3.通过concat方法将多个observable结合起来
4.通过subscribe订阅每一个observable
Observable<List<String>> cache = Observable.create(new ObservableOnSubscribe<List<String>>() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { CacheManager manager = CacheManager.getInstance(); List<String> data = manager.query(); e.onNext(data); //一定要有onComplete,不然不会执行第二个Observale e.onComplete(); } }); Observable<List<String>> network = Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { Request.Builder builder = new Request.Builder() .url("url") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); e.onComplete(); } }).map(new Function<Response, List<String>>() { @Override public List<String> apply(@NonNull Response response) throws Exception { //解析数据 } }); //两个observable的泛型应该保持一致 Observable.concat(cache, network) .subscribeOn(Schedulers.io()) .subscribe(new Consumer<List<String>>() { @Override public void accept(@NonNull List<String> strings) throws Exception { //refresh ui } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { //get error } });
当然,有的时候我们的缓存可能还会分为memory和disk,无差,只需要多写一个Observable然后一样通过concat合并即可。
这个时候就需要靠zip方法啦,zip方法可以将多个Observable的数据结合为一个数据源再发射出去。
Observable<FirstBean> firstRequest = Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { Request.Builder builder = new Request.Builder() .url("firstUrl") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); e.onComplete(); } }).map(new Function<Response, FirstBean>() { @Override public FirstBean apply(@NonNull Response response) throws Exception { //解析数据 } }); Observable<SecondBean> secondRequest = Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { Request.Builder builder = new Request.Builder() .url("secondUrl") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); e.onComplete(); } }).map(new Function<Response, SecondBean>() { @Override public SecondBean apply(@NonNull Response response) throws Exception { //解析数据 } }); Observable.zip(firstRequest, secondRequest, new BiFunction<FirstBean, SecondBean, WholeBean>() { @Override public WholeBean apply(@NonNull FirstBean firstBean, @NonNull SecondBean secondBean) throws Exception { //结合数据为一体 } }) .subscribeOn(Schedulers.io()) .subscribe(new Consumer<WholeBean>() { @Override public void accept(@NonNull WholeBean strings) throws Exception { //refresh ui } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { //get error } });
当然,如果你的两个api返回的是相同类型的数据,那么可以直接使用merge将数据合并,而不需要实现回调。
debounce
去抖动;防反跳;防抖动;弹跳;抖动消除
设想一种场景:点击一次button就进行一次网络请求,或者当输入框数据变化时进行网络请求,那么这样就会在一下子产生大量的网络请求,但实际上又没有必要,这个时候就可以通过debounce方法来处理,debounce操作符会过滤掉发射速率过快的数据项:
为了方便处理点击事件和Observable的关系,我们引入RxBinding处理:
RxView.clicks(mButton)
.debounce(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Object>() {
@Override
public void accept(@NonNull Object o) throws Exception {
// refresh ui
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
// get error
}
});
想必即时通讯等需要轮训的任务在如今的 APP 中已是很常见,而 RxJava 2.x 的 interval 操作符可谓完美地解决了我们的疑惑。
这里就简单的意思一下轮训。
private Disposable mDisposable; @Override protected void doSomething() { mDisposable = Flowable.interval(1, TimeUnit.SECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { Log.e(TAG, "accept: doOnNext : "+aLong ); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { Log.e(TAG, "accept: 设置文本 :"+aLong ); mRxOperatorsText.append("accept: 设置文本 :"+aLong +"\n"); } }); } /** * 销毁时停止心跳 */ @Override protected void onDestroy() { super.onDestroy(); if (mDisposable != null){ mDisposable.dispose(); } }
Flowable相关教程:
给初学者的RxJava2.0教程(五)
给初学者的RxJava2.0教程(六)
给初学者的RxJava2.0教程(七)
给初学者的 RxJava2.0 教程 (八)
给初学者的 RxJava2.0 教程 (九)
https://www.jianshu.com/p/ec57e90e786d
自定义ResponseBody–ProgressResponseBody,获取到从网络读取的数据,并通过EventBus发送通知
自定义拦截器ProgressInterceptor,对请求返回的响应Response进行处理,
定义一个抽象类FileCallBack,1.用于封装类似Observer的接口,2.注册EventBus来接收文件的接收进度,
自定义Observer–FileDownloadObserver,用于包裹FileCallBack
final FileCallBack<ResponseBody> fileCallBack = new FileCallBack<ResponseBody>("","") { @Override public void onSuccess(ResponseBody responseBody) { Log.e(TAG,"onSuccess:"+responseBody.toString()); } @Override public void progress(long progress, long total) { Log.e(TAG,total+"/"+progress); mProgressBar.setMax((int) total); mProgressBar.setProgress((int) progress); DecimalFormat decimalFormat = new DecimalFormat("0.00"); String scaleStr = decimalFormat.format(progress * 1f/ total ); mTvProgress.setText( (int)(Float.parseFloat(scaleStr) * 100) +"%"); } @Override public void onStart(Disposable disposable) { } @Override public void onCompleted() { Log.e(TAG,"onComplete"); show("下载完成"); } @Override public void onError(Throwable e) { Log.e(TAG,"onError:"+e.getMessage()); } }; HttpManager.createService(Api.class,new ProgressInterceptor()) .downloadApk() .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .doOnNext(new Consumer<ResponseBody>() { @Override public void accept(ResponseBody responseBody) throws Exception { // fileCallBack.saveFile(responseBody); } }).observeOn(AndroidSchedulers.mainThread()) .subscribe(new FileDownloadObserver<ResponseBody>(fileCallBack));
通过操作符buffer(3000, TimeUnit.MILLISECONDS),将这段时间发射的数据缓存在集合中
在Observer的onNext中对接收到的集合数据进行求平均值
/** * rxjava处理 */ private void rxjavaCompose() { mPublishSubject = PublishSubject.create(); DisposableObserver<List<Double>> disposableObserver = new DisposableObserver<List<Double>>() { @Override public void onNext(List<Double> temperatureList) { double resultSum = 0;//温度的和 double resultAvera = 0; Log.e("onNext","接收到集合的大小:"+temperatureList.size()); if (temperatureList.size() > 0){ for(Double temperature : temperatureList){ resultSum += temperature; } resultAvera = resultSum / temperatureList.size(); } Log.e(mActivity.getClass().getSimpleName(),"更新平均温度:"+resultAvera); final double finalResultAvera = resultAvera; mTvAveraTemperature.post(new Runnable() { @Override public void run() { mTvAveraTemperature.append("平均3秒温度:"+ finalResultAvera +"℃ 时间:"+ new Date().toLocaleString()+"\n"); int scrollAmount = mTvAveraTemperature.getLayout().getLineTop(mTvAveraTemperature.getLineCount()) - mTvAveraTemperature.getHeight(); if (scrollAmount > 0){ mTvAveraTemperature.scrollTo(0,scrollAmount); }else { mTvAveraTemperature.scrollTo(0,0); } } }); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; mPublishSubject.buffer(3000, TimeUnit.MILLISECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver); mCompositeDisposable = new CompositeDisposable();//用于管理订阅与解除订阅 mCompositeDisposable.add(disposableObserver);
需要优化的问题
避免用户连续输入时造成发起不必要的请求。(debounce操作符来解决)
避免用户输入未空时发起不必要的请求。(filter操作符来解决)
避免前后发起两个请求,后面请求响应先于前面请求响应返回。(switch操作符来解决)
/** * 初始化Observable */ private void initObservable() { mPublishSubject = PublishSubject.create(); mDisposableObserver = new DisposableObserver<MyResponse<String>>() {//Disposable是一个抽象的观察者,可以通过disposable进行异步取消 @Override public void onNext(MyResponse<String> myResponse) { Gson gson = new Gson(); mTvLog.setText(JsonUtils.formatJson(gson.toJson(myResponse))); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; mPublishSubject.debounce(200, TimeUnit.MILLISECONDS)//不会发射时间间隔小于200毫秒的, .filter(new Predicate<String>() {//过滤操作符,只有字符串长度大于0才能发射 @Override public boolean test(String s) throws Exception { return s.length() > 0; } }).switchMap(new Function<String, ObservableSource<MyResponse<String>>>() {//switchMap操作符会保存最新的Observable产生的结果而舍弃旧的结果 @Override public ObservableSource<MyResponse<String >> apply(String s) throws Exception { return HttpManager.createService(Api.class).search(s).subscribeOn(Schedulers.io()); } }).observeOn(AndroidSchedulers.mainThread()) .subscribe(mDisposableObserver); mCompositeDisposable = new CompositeDisposable();//用于取消订阅关系 mCompositeDisposable.add(mDisposableObserver);//添加到订阅关系 }
应用场景:有的时候需要我们尝试间隔一段时间就向服务器发起一次请求,但是又不适合引入长连接的场景。
可以使用intervalRange操作符,参数含义:
start:发送数据的起始值,为Long型。
count:总共发送多少项数据。
initialDelay:发送第一个数据项时的起始时延。
period:两项数据之间的间隔时间。
TimeUnit:时间单位。
/** * 固定时间间隔的轮询 */ private void startFixPolling() { Observable<MyResponse<String>> observableFix = Observable.intervalRange(0,5,0,1000, TimeUnit.MILLISECONDS) .take(5) .flatMap(new Function<Long, ObservableSource<MyResponse<String>>>() { @Override public ObservableSource<MyResponse<String>> apply(Long aLong) throws Exception { return HttpManager.createService(Api.class).polling().subscribeOn(Schedulers.io()); } }); DisposableObserver<MyResponse<String>> disposableObserverFix = new DisposableObserver<MyResponse<String>>() { @Override public void onNext(MyResponse<String> response) { mTvFix.append(response.data+"\n"); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; observableFix.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(disposableObserverFix); mCompositeDisposable.add(disposableObserverFix); }
网络请求出错,重试的情况下需要处理的问题:
限制重试的次数
根据错误类型,判断是否需要重新请求
根据错误类型,等待特定的时间后再去重新请求
retryWhen操作符可以实现重新订阅,由onError事件来触发。
private void retryDemo() { Observable<MyResponse<String>> observable = HttpManager.createService(Api.class).retry() .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() { private int mRetryCount; @Override public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception { return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() { @Override public ObservableSource<?> apply(Throwable throwable) throws Exception { long waitTime = 0;//等待时间 if (throwable instanceof ConnectException){ mainThreadTextChange("ConnectException异常\n"); waitTime = 2000; } mRetryCount++; if (waitTime > 0){ mainThreadTextChange("2秒后重新发起请求\n"); } return waitTime > 0 && mRetryCount <= 4 ? Observable.timer(waitTime,TimeUnit.MILLISECONDS): Observable.error(throwable); } }); } }); DisposableObserver<MyResponse<String>> disposableObserver = new DisposableObserver<MyResponse<String>>() { @Override public void onNext(MyResponse<String> response) { Gson gson = new Gson(); mTv.append("onNext:\n"+ JsonUtils.formatJson(gson.toJson(response))+"\n"); } @Override public void onError(Throwable e) { mTv.append("onError:"+e.getMessage()+"\n"); } @Override public void onComplete() { mTv.append("onComplete\n"); } }; observable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(disposableObserver); mCompositeDisposable.add(disposableObserver); } private void mainThreadTextChange(final String content){ runOnUiThread(new Runnable() { @Override public void run() { mTv.append(content); } }); }
应用场景:登录场景中,需要账户是一定的长度,密码也有特定的长度,此时使用操作符combineLatest来实现需求。
combineLatest可以接受多个Observable和一个函数作为参数。当其中的任意一个Observable发射数据后,会去获取其他的Observable最后一次发射的数据,回调到函数中。(此函数回调的前提是都至少发射过一次数据)
private void initRxjava2() { mCompositeDisposable = new CompositeDisposable(); mAccoountPublishSubject = PublishSubject.create(); mPwdPublishSubject = PublishSubject.create(); Observable<Boolean> observable = Observable.combineLatest(mAccoountPublishSubject, mPwdPublishSubject, new BiFunction<String, String, Boolean>() { @Override public Boolean apply(String account, String pwd) throws Exception { int nameLength = account.length(); int pwdLength = pwd.length(); return (nameLength >=3 && nameLength <=5) && (pwdLength >=6 && pwdLength <=10); } }); DisposableObserver<Boolean> disposableObserver = new DisposableObserver<Boolean>() { @Override public void onNext(Boolean aBoolean) { if (aBoolean){//两个输入框的内容都符合要求 mTvLogin.setEnabled(true); mTvLogin.setBackgroundColor(Color.GREEN); }else {//两个输入框有不符合要求的内容 mTvLogin.setEnabled(false); mTvLogin.setBackgroundColor(Color.GRAY); } } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; observable.subscribe(disposableObserver); mCompositeDisposable.add(disposableObserver); } @Override public void initListener() { mEditAccount.addTextChangedListener(new EditTextWatcher(mAccoountPublishSubject)); mEditPwd.addTextChangedListener(new EditTextWatcher(mPwdPublishSubject)); mTvLogin.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { Toast.makeText(mActivity,"登录成功",Toast.LENGTH_SHORT).show(); } }); } @Override protected void onDestroy() { super.onDestroy(); if (mCompositeDisposable != null) mCompositeDisposable.clear(); } /** * EditText的TextWatcher */ public static class EditTextWatcher implements TextWatcher{ private PublishSubject mPublishSubject; public EditTextWatcher(PublishSubject publishSubject){ this.mPublishSubject = publishSubject; } @Override public void beforeTextChanged(CharSequence s, int start, int count, int after) { } @Override public void onTextChanged(CharSequence s, int start, int before, int count) { } @Override public void afterTextChanged(Editable s) { mPublishSubject.onNext(s.toString()); } }
应用场景:进入新页面,为了提升用户体验,再网络请求没有返回时,优先显示缓存数据。
要求:同时发起请求网络数据和加载本地缓存数据。在网络数据未返回时,显示本地缓存数据;网络数据返回时,显示最新的网络数据
几种实现方式的缺点:
好的实现方式:publish+merge+takeUntil
/** * 优先加载本地缓存数据,同时请求网络数据 */ private void requestData(final long delayTimeLocal, long delayTimeNet) { mProgressBar.setVisibility(View.VISIBLE); Observable<MyResponse<List<CacheToNetData>>> observable = getNetData(delayTimeNet).publish(new Function<Observable<MyResponse<List<CacheToNetData>>>, ObservableSource<MyResponse<List<CacheToNetData>>>>() { @Override public ObservableSource<MyResponse<List<CacheToNetData>>> apply(Observable<MyResponse<List<CacheToNetData>>> netResponseObservable) throws Exception { return Observable.merge(getLocalCacheData(delayTimeLocal),netResponseObservable ) .takeUntil(new Predicate<MyResponse<List<CacheToNetData>>>() { @Override public boolean test(MyResponse<List<CacheToNetData>> listMyResponse) throws Exception { mainThreadTextLog("获取到的数据类型:"+listMyResponse.msg); return listMyResponse.msg.equals("成功"); } }); } }); DisposableObserver<MyResponse<List<CacheToNetData>>> disposableObserver = new DisposableObserver<MyResponse<List<CacheToNetData>>>() { @Override public void onNext(MyResponse<List<CacheToNetData>> listMyResponse) { mProgressBar.setVisibility(View.GONE); if (listMyResponse.code == 1) { if (listMyResponse.msg.equals("本地数据")) { mainThreadTextLog("onNext --- 加载了本地数据"); } else { mainThreadTextLog("onNext --- 加载了网络数据"); } mAdapter.setData(listMyResponse.data); } } @Override public void onError(Throwable e) { mainThreadTextLog("onError:" + e.getMessage()); } @Override public void onComplete() { mainThreadTextLog("onComplete"); } }; observable.observeOn(AndroidSchedulers.mainThread()) .subscribe(disposableObserver); mCompositeDisposable.add(disposableObserver); } @Override protected void onDestroy() { super.onDestroy(); if (mCompositeDisposable != null) mCompositeDisposable.clear(); } /** * 获取本地缓存数据 */ public Observable<MyResponse<List<CacheToNetData>>> getLocalCacheData(final long delayTime) { return Observable.create(new ObservableOnSubscribe<MyResponse<List<CacheToNetData>>>() { @Override public void subscribe(ObservableEmitter<MyResponse<List<CacheToNetData>>> emitter) throws Exception { try { mainThreadTextLog("开始加载本地缓存数据"); Thread.sleep(delayTime); List<CacheToNetData> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { CacheToNetData bean = new CacheToNetData("来自本地缓存", "数据项 --- " + i); list.add(bean); } mainThreadTextLog("结束加载本地缓存数据"); emitter.onNext(new MyResponse<List<CacheToNetData>>("本地数据", 1, list)); emitter.onComplete(); } catch (Exception e) { mainThreadTextLog("加载本地缓存数据异常:" + e.getMessage()); if (!emitter.isDisposed()) emitter.onError(e); } } }).subscribeOn(Schedulers.io()); } /** * 获取网络数据 * * @param delayTime * @return */ public Observable<MyResponse<List<CacheToNetData>>> getNetData(long delayTime) { mainThreadTextLog("开始请求网络数据"); return HttpManager.createService(Api.class) .getNetData(delayTime) .subscribeOn(Schedulers.io()) .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends MyResponse<List<CacheToNetData>>>>() { @Override public ObservableSource<? extends MyResponse<List<CacheToNetData>>> apply(Throwable throwable) throws Exception { mainThreadTextLog("请求网络数据失败:" + throwable.getMessage()); return Observable.never(); } }); } /** * 主线程更新UI日志 * * @param content */ private void mainThreadTextLog(final String content) { mActivity.runOnUiThread(new Runnable() { @Override public void run() { mTvLog.append(content + "\n"); } }); }
使用intervalRange操作符实现倒计时功能
/** * 开始倒计时 * @param countDownTimeLong */ private void startCountDown(final long countDownTimeLong) { mCompositeDisposable.clear(); Observable<Long> observable = Observable.intervalRange(0,countDownTimeLong + 1,0,1, TimeUnit.SECONDS); DisposableObserver<Long> disposableObserver = new DisposableObserver<Long>() { @Override public void onNext(final Long aLong) { runOnUiThread(new Runnable() { @Override public void run() { mTvTime.setText(formatDuring((countDownTimeLong - aLong) * 1000)); } }); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; observable.subscribe(disposableObserver); mCompositeDisposable.add(disposableObserver); }
应用场景:第一个网络请求之后,再进行一次网络请求,才能拿到需要得数据。
例子:有两道门,宝藏在第二道门后面。需要先后打开两道门,必须打开第一道门之后,才能获取到第二道门得开门密码。
private void requestData() { String inputStr = mEditText.getText().toString().trim(); if (TextUtils.isEmpty(inputStr)){ Toast.makeText(mActivity,"输入不能为空",Toast.LENGTH_SHORT).show(); return; } int intputInt = Integer.parseInt(inputStr); HttpManager.createService(Api.class) .openFirstDoor(intputInt) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<MyResponse<Nest1Bean>>() { @Override public void accept(MyResponse<Nest1Bean> nest1BeanMyResponse) throws Exception { mainThreadTextLog("doOnNext:\n"+JsonUtils.formatJson(new Gson().toJson(nest1BeanMyResponse))); } }) .observeOn(Schedulers.io()) .flatMap(new Function<MyResponse<Nest1Bean>, ObservableSource<MyResponse<Nest2Bean>>>() { @Override public ObservableSource<MyResponse<Nest2Bean>> apply(MyResponse<Nest1Bean> nest1BeanMyResponse) throws Exception { mainThreadTextLog("获取第二道门得密码,去打开第二道门"); return HttpManager.createService(Api.class).openSecondDoor(nest1BeanMyResponse.data.password); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<MyResponse<Nest2Bean>>() { @Override public void onSubscribe(Disposable d) { mainThreadTextLog("onSubscribe"); } @Override public void onNext(MyResponse<Nest2Bean> nest2BeanMyResponse) { mainThreadTextLog("onNext:\n"+JsonUtils.formatJson(new Gson().toJson(nest2BeanMyResponse))); } @Override public void onError(Throwable e) { mainThreadTextLog("onError:"+e.getMessage()); } @Override public void onComplete() { mainThreadTextLog("onComplete\n\n"); } }); }
应用场景:有的时候我们需要的数据,可能需要请求两个不同的接口才能得到,使用zip操作符可以实现需求
private void requestData() { String gradeStr = mEditGrade.getText().toString().trim(); if (TextUtils.isEmpty(gradeStr)){ Toast.makeText(mActivity,"输入不能为空",Toast.LENGTH_SHORT).show(); return; } int gradeInt = Integer.parseInt(gradeStr); Observable<MyResponse<Teacher>> observableTeacher = HttpManager.createService(Api.class).getTeacher(gradeInt); Observable<MyResponse<List<Student>>> observableStudents = HttpManager.createService(Api.class).getStudents(gradeInt); Observable.zip(observableTeacher, observableStudents, new BiFunction<MyResponse<Teacher>, MyResponse<List<Student>>, ClassBean>() { @Override public ClassBean apply(MyResponse<Teacher> teacherMyResponse, MyResponse<List<Student>> studentListMyResponse) throws Exception { mainThreadTextLog("请求到得老师数据:\n"+JsonUtils.formatJson(new Gson().toJson(teacherMyResponse))+ "\n请求到得学生数据:\n"+JsonUtils.formatJson(new Gson().toJson(studentListMyResponse))); String teacherName = teacherMyResponse.data.name; String grade = teacherMyResponse.data.grade; List<Student> studentList = studentListMyResponse.data; ClassBean classBean = new ClassBean(teacherName,grade,studentList); return classBean; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<ClassBean>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(ClassBean classBean) { mainThreadTextLog("onNext合并后得数据:\n"+JsonUtils.formatJson(new Gson().toJson(classBean))); } @Override public void onError(Throwable e) { } @Override public void onComplete() { mainThreadTextLog("onComplete\n\n"); } }); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。