赞
踩
相信大家对RxJava都有或多或少的了解,RxJava是Java语言的响应式编程库, 更具体点就是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
RxJava 有四个基本概念:
Observable
(可观察者,即被观察者)、 Observer
(观察者)、 subscribe
(订阅)、事件。Observable
和 Observer
通过 subscribe()
方法实现订阅关系,从而 Observable
可以在需要的时候发出事件来通知 Observer
。
RxJava采用观察者的模式设计,与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext()
之外,还定义了两个特殊的事件:onCompleted()
和 onError()
。
onCompleted()
: 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext()
发出时,需要触发 onCompleted()
方法作为标志。
onError()
: 事件队列异常。在事件处理过程中出异常时,onError()
会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中, onCompleted()
和 onError()
有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()
和 onError()
二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
RxJava观察者模型如下:
flatMap: 它可以把一个发射器 Observable 通过某种方法转换为多个 Observables,然后再把这些分散的 Observables装进一个单一的发射器Observable。但有个需要注意的是,flatMap并不能保证事件的顺序,如果需要保证,需要用到我们下面要讲的 ConcatMap。
通常使用中,我们只是简单的将一个Observable转化成另外一个Observable,这是flatMap操作的一种特殊情景。
filter: 很明显用于过滤操作,用于过滤出满足条件的Observable对象
timer/interval: 定时操作,也就是在接下来某个时间点开始每隔一定时间执行一项任务。
distinct:去重操作。
doOnNext:让订阅者在接收到数据之前干点有意思的事情。假如我们在获取到数据之前想先保存一下它,无疑我们可以这样实现。
just: 简单的发射器依次调用 onNext() 方法。
defer: 每次订阅都会创建一个新的 Observable,并且如果没有被订阅,就不会产生新的 Observable。
场景:加载GVC系列IPVT通知列表, 并对列表数据做查询数量、删除、更新操作
需求:从数据库加载并实现内存缓存
实现:采用MVP架构实现通知列表加载及操作逻辑
该操作中使用到了Rxjava的flatMap、filter、fromIterable等操作符,涉及了RxJava线程切换
@Override
protected void onResume() {
super.onResume();
Logger.d(TAG, "[onResume] ");
mNoticePresenter.subscribe();
}
@Override
public void subscribe() {
loadNotices(false);
}
具体的加载逻辑在loadNotices(false)方法中实现
@Override public void loadNotices(boolean forceUpdate) { // Simplification for sample: a network reload will be forced on first load. loadNotices(forceUpdate || mFirstLoad, true); mFirstLoad = false; } /** * @param forceUpdate Pass in true to refresh the data in the {@link NoticeLocalDataSource} * @param showLoadingUI Pass in true to display a loading icon in the UI */ private void loadNotices(final boolean forceUpdate, final boolean showLoadingUI) { if (showLoadingUI) { mNoticeView.setLoadingIndicator(true); } if (forceUpdate) { //为true时会执行清空缓存操作 localDataSource.refreshNotices(); } mCompositeDisposable.clear(); Disposable disposable = localDataSource .getAndCacheLocalNotices() //从数据库中获取通知列表并缓存起来 //flatMap操作执行过滤操作 .flatMap(messages -> Flowable.fromIterable(messages) //根据不同查询类型进行列表过滤 .filter(notice -> { switch (mCurrentFiltering) { case UNREAD_NOTICES: return !notice.isLocked(); case ALL_NOTICES: default: Logger.d(TAG," filter my thread name is " +Thread.currentThread().getName()); String msgContent = notice.getContent(); if (!TextUtils.isEmpty(msgContent) && msgContent.equals(mainPageNotice)&& isMainPageAction) { if(defSelectedPos==0){ defSelectedPos = tmpSelectcedPos; } } else { tmpSelectcedPos++; } return true; } }) .toList().toFlowable()) //flatMap操作执行删除多余通知的操作,最多显示200条 .flatMap(messageDetails -> { if(messageDetails.size() > MessageContract.MAX_SHOW_SIZE){ return localDataSource.deleteExceedMessages(messageDetails); }else{ return Flowable.fromIterable(messageDetails).toList().toFlowable(); } }).doFinally(() -> { Logger.d(TAG,"do finally."); }) //线程切换 .subscribeOn(mSchedulerProvider.io()) .observeOn(mSchedulerProvider.ui()) .subscribe( // onNext tasks -> { Logger.d(TAG,"subscribe my thread name is "+Thread.currentThread().getName()); processMessages(tasks); mNoticeView.setLoadingIndicator(false); }, // onError throwable -> mNoticeView.showLoadNoticeError()); mCompositeDisposable.add(disposable); }
public Flowable<List<MessageDetail>> getAndCacheLocalNotices() { // Respond immediately with cache if available and not dirty if (mCachedMessages != null && !mCacheIsDirty) { Logger.i(TAG, " load from cache"); return Flowable.fromIterable(mCachedMessages.values()).toList().toFlowable(); } else if (mCachedMessages == null) { mCachedMessages = new LinkedHashMap<>(); } Logger.d(TAG,"begin obtain notices from db"); //从数据库加载数据并缓存到LinkedHashMap中,便于后续操作单条记录 return getAllNotices() .flatMap(messages -> Flowable.fromIterable(messages) .doOnNext(message -> mCachedMessages.put(String.valueOf(message.getId()), message)) .toList() .toFlowable()) .doOnComplete(() -> mCacheIsDirty = false); } /*这里使用SqlBrite库,将常规数据类型转成Observable类型, 先将Cursor对象转成JavaBean对象(RxJava 中Function<Cursor, MessageDetail> 操作符实现),然后转成Observable对象*/ public Flowable<List<MessageDetail>> getAllNotices() { return mBriteContentResolver.createQuery(MessageContract.MessageDetail.CONTENT_URI, null, LIMIT, null, ORDER_BY, true) .mapToList(mMessageDetailMapperFunction) .toFlowable(BackpressureStrategy.BUFFER); }
@Override public void deleteNotices(List<Long> noticeIds) { Logger.d(TAG," deleting notices..."); Disposable disposable = localDataSource.deleteMarkedNotices(noticeIds) .subscribeOn(mSchedulerProvider.io()) .observeOn(mSchedulerProvider.ui()) .subscribe(aBoolean -> { Logger.d(TAG," delete notice completed."); mNoticeView.showDeleteResult(true); }, throwable -> { Logger.e(TAG,"delete notices exception."); throwable.printStackTrace(); mNoticeView.showDeleteResult(false); }, () -> { Logger.d(TAG,"delete Notice subscribe onCompleted."); }); mCompositeDisposable.add(disposable); }
这里采用create操作符产生一个Observable对象
@Override public Flowable<Boolean> deleteMarkedNotices(List<Long> idList) { // Do in memory cache update to keep the app UI up to date if (mCachedMessages == null) { mCachedMessages = new LinkedHashMap<>(); } Iterator<Map.Entry<String, MessageDetail>> it = mCachedMessages.entrySet().iterator(); while (it.hasNext()) { Map.Entry<String, MessageDetail> entry = it.next(); if (idList.contains(entry.getKey())) { it.remove(); } } return Flowable.create(new FlowableOnSubscribe<Boolean>() { @Override public void subscribe(FlowableEmitter<Boolean> e) throws Exception { ArrayList<ContentProviderOperation> batch = new ArrayList<ContentProviderOperation>(); for(Long id: idList){ String[] whereArgs = new String[]{String.valueOf(id), String.valueOf(1)}; String smsWhere = MessageContract.MessageDetail._ID + " = ? and " + MessageContract.MessageDetail.COLUMN_ACCOUNT_INDEX + "= ?"; batch.add(ContentProviderOperation .newDelete(MessageContract.MessageDetail.CONTENT_URI) .withSelection(smsWhere, whereArgs) .build()); } int result = 0; try { result = mContext.getContentResolver().applyBatch(MessageContract.AUTHORITY, batch).length; } catch (RemoteException remoteExp) { Logger.e(TAG,"deleteMarked Notice occure remoteException."); } catch (OperationApplicationException operaExp) { Logger.e(TAG,"deleteMarked Notice occure OperationApplicationException."); } if (result > 0) { e.onNext(true);//返回结果 } else { e.onNext(false);//返回结果 } e.onComplete();//返回结束 } }, BackpressureStrategy.BUFFER); }
@Override public Flowable<Boolean> updateContentType(MessageDetail detail, int type) { String id = String.valueOf(detail.getId()); try { MessageDetail newDetail = (MessageDetail)detail.clone(); if(mCachedMessages == null){ mCachedMessages = new LinkedHashMap<>(); } mCachedMessages.put(id, newDetail); } catch (CloneNotSupportedException e) { e.printStackTrace(); } return Flowable.create(new FlowableOnSubscribe<Boolean>() { @Override public void subscribe(FlowableEmitter<Boolean> e) throws Exception { try { ContentValues cv = new ContentValues(); cv.put(MessageContract.MessageDetail.COLUMN_LOCKED, 1); cv.put(MessageContract.MessageDetail.COLUMN_CONTENT_TYPE, type); int res = mContext.getContentResolver().update(MessageContract.MessageDetail.CONTENT_URI, cv,MessageContract.MessageDetail._ID + "=?", new String[]{id}); Logger.d(TAG," update message type return res = "+res); if(res>0){ e.onNext(true); } else { e.onNext(false); } e.onComplete(); } catch (Exception e1) { e.onError(e1); } } }, BackpressureStrategy.BUFFER); }
zip 专用于合并事件,该合并不是连接(连接操作符后面会说),而是两两配对,也就意味着,最终配对出的 Observable 发射事件数目只和少的那个相同。
该过程中,使用zip操作符将所有条数及未读条数获取过程做压缩操作并一起返回显示
private void loadNoticeStatistics(){ Single<Integer> allNotices = Single.just(localDataSource.getAllNotices().blockingFirst().size()); Single<Integer> unreadNotices = Single.just(localDataSource.getAllNotices() .flatMap(messages -> Flowable.fromIterable(messages) .filter(messageDetail -> !messageDetail.isLocked()) .toList().toFlowable()).blockingFirst().size()); Disposable disposable = Single .zip(allNotices, unreadNotices, (all, unread) -> { Logger.d(TAG,"all is "+all+", unread is "+unread); return Pair.create(all, unread); }) .subscribeOn(mSchedulerProvider.io()) .observeOn(mSchedulerProvider.ui()) .doFinally(() -> { Logger.d(TAG,"do finally"); }) .subscribe( // onSuccess stats -> mainEntryView.showNoticeStatistics(stats.first,stats.second), // onError throwable -> { throwable.printStackTrace(); Logger.e(TAG,"load notice statistics exp"); }); mCompositeDisposable.add(disposable); }
RxJava线程切换主要通过调用subscribeOn和observeOn完成。
subscribeOn 用于指定 subscribe() 时所发生的线程。
observeOn 方法用于指定下游 Observer 回调发生的线程。
简单地说,
线程切换采用Schedulers类实现,该类内置了很多线程选项供我们选择,例如有:
为了说明复杂的线程调度过程,下面上一张图来解释说明
图中共有 5 处含有对事件的操作。由图中可以看出,①和②两处受第一个 subscribeOn() 影响,运行在红色线程;③和④处受第一个 observeOn() 的影响,运行在绿色线程;⑤处受第二个 onserveOn() 影响,运行在紫色线程;而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。
考虑场景,先后台耗时处理,获取一个返回结果刷新界面,紧接着根据前一个返回结果再做一个耗时处理,然后再根据返回结果做界面更新处理,使用RxJava线程切换操作可以很优雅处理这个过程,代码如下
//RxJava做线程切换,先加载联系人数量,刷新UI,再加载选中状态,刷新UI @Override public void loadNodeUsrCntAndCheckState(final String nodeId, final RequestCallback<Integer> cntCallback, final RequestCallback<Boolean> checkCallback) { Disposable disposable = mDataSource.loadNodeUserCnt(nodeId) .subscribeOn(schedulerProvider.computation()) .observeOn(schedulerProvider.ui()) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Logger.d(TAG,"doOnNext thread = "+Thread.currentThread().getName()); cntCallback.processData(integer); } }) .observeOn(schedulerProvider.computation()) .flatMap(new Function<Integer, Flowable<Boolean>>() { @Override public Flowable<Boolean> apply(Integer integer) throws Exception { Logger.d(TAG,"flatMap thread = "+Thread.currentThread().getName()); if(integer <= 200 && integer>0){ try { return mDataSource.isDepartmentChecked(integer, nodeId); } catch (Exception e) { e.printStackTrace(); } } return Flowable.just(false); } }) .observeOn(schedulerProvider.ui()) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Logger.d(TAG, "subscribe thread = " + Thread.currentThread().getName()); checkCallback.processData(aBoolean); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { checkCallback.processErrorCode("error"); } }); mCompositeDisposable.add(disposable); adpaterDisposable.add(disposable); }
考虑场景:登录服务器后获取Token,然后使用Token请求Api数据
该过程首先登录到服务器,通过flatMa操作符,将Single<HttpResult> 类型的loginInfoHttpResult 结果转换为Single<HttpResult> 类型的数据
也就是根据登录获取到的token信息转化为请求数据库版本信息的结果,整个过程实在异步线程中执行,其中线程切换的逻辑采用compose操作符做了合并操作,具体实现在applySchedulers()函数
@Override public void loginAndReqDbVersion() { Single<HttpResult<LoginInfo>> singleLoginResult = loginServerSync(); if(singleLoginResult == null) return; Disposable disposable = singleLoginResult .flatMap(loginInfoHttpResult -> { processLoginInfo(mContext, loginInfoHttpResult.data()); Logger.d(TAG,"loginAndReqDbVersion, thread = " +Thread.currentThread().getName()); return reqDbVersionSync(); }) .compose(applySchedulers()) .subscribe(contactsVersionHttpResult -> { ContactsVersion dbVersionInfo = contactsVersionHttpResult.data(); if (dbVersionInfo != null) { Logger.d(TAG, "dbVersionInfo is " + dbVersionInfo.toString()); String version = dbVersionInfo.version(); if (!TextUtils.isEmpty(version)) { SharePrefUtil.getInstance(mContext).setContactDbVersion(version); } } }, throwable -> { processException(throwable, null); }); mCompositeDisposable.add(disposable); }
applySchedulers() 代码如下
public <T> SingleTransformer<T,T> applySchedulers(){
return new SingleTransformer<T, T>() {
@Override
public SingleSource<T> apply(Single<T> upstream) {
return upstream.subscribeOn(mSchedulerProvider.io())
.observeOn(mSchedulerProvider.ui());
}
};
}
考虑场景:请求某个Api时候,token过期了
这种场景下首先需要重新登录服务器,更新token,然后重新请求该API
在这种场景下如果采用常规逻辑处理是不是很棘手,如果使用RxJava处理,逻辑上就显得简单多了。
这里使用了defer操作符,目的是每次调用都重新发送一个新的Observable对象,避免出错后重新请求,发现新请求token不生效,relogin()过程采用compose操作符做了压缩处理,另外,在请求过程中肯定需要弹一个进度条,这里也使用compose压缩处理,具体实现逻辑在RxProgress.bindToLifecycle
@Override public void reqDbVersion(RequestCallback<ContactsVersion> callback) { int expNo = preProcessBeforeRequest(); if(expNo > NetErrorCode.EXP_NONE){ callback.processException(expNo); return; } Disposable disposable = Single.defer(()->ServiceGenerator.createService(mContext,IPVTService.class) .reqContactsVersion(RequestParamManager.getInstance().dbVersionParamsMap(mContext))) .compose(relogin())//这里需要做错误码重试请求 .compose(applySchedulers())/*.subscribeOn(mSchedulerProvider.io()).observeOn(mSchedulerProvider.ui())*/ .compose(RxProgress.bindToLifecycle(mActivity, R.string.load_msg)) .subscribe(contactsVersionHttpResult -> { processResult(contactsVersionHttpResult, callback); }, throwable -> { processException(throwable, callback); }); mCompositeDisposable.add(disposable); }
relogin()实现代码如下,整个过程主要是通过retryWhen操作符完成, 首先每次请求的结果通过flatMap操作符进行处理,如果错误码为ERR_TOKEN_EXPIRED或者ERR_INVALID_TIMESTAMP,
则将该异常传递给retryWhen操作符,retryWhen根据错误类型执行不同的重新请求的逻辑, 比如ERR_TOKEN_EXPIRED需要先登录服务器后再发起重新请求,ERR_INVALID_TIMESTAMP需要更新请求参数的时间戳后发起重新请求。
private <T> SingleTransformer<HttpResult<T>, HttpResult<T>> relogin() { return (Single<HttpResult<T>> observable) -> observable .flatMap(tRequestResult -> { String retCode = tRequestResult.retCode(); if (retCode.equals(NetErrorCode.ERR_TOKEN_EXPIRED)) { //重新登录。 Logger.e(TAG, "token expired"); return Single.error(new TokenExpiredException("tokenExpired")); } else if(retCode.equals(NetErrorCode.ERR_INVALID_TIMESTAMP)){ Logger.e(TAG, "timeStamp error"); String timeStamp = tRequestResult.timeStamp(); SharePrefUtil.getInstance(mContext).setServerTimeStamp(timeStamp); return Single.error(new TimeStampErrorException("timeStampError")); } return Single.just(tRequestResult); }) .retryWhen(error -> { return error.flatMap(throwable -> { if(throwable instanceof TokenExpiredException) { Logger.d(TAG, " relogin current Thread = " + Thread.currentThread().getName()); Single<HttpResult<LoginInfo>> loginSingle = loginServerSync(); if (loginSingle != null) { return loginSingle.toFlowable().flatMap(loginInfoHttpResult -> { processLoginInfo(mContext, loginInfoHttpResult.data()); return Flowable.just(1); }); } }else if(throwable instanceof TimeStampErrorException){ return Flowable.just(1); } return Flowable.error(throwable); }); }); }
RxProgress.bindToLifecycle具体代码实现如下,这里是将进度条显示放到订阅过程不同阶段去处理,订阅开始时会调用doOnSubscribe方法,订阅成功执行doOnSuccess方法,订阅出错执行doOnError方法,这样将UI显示状态与订阅过程同步起来了。
public final class RxProgress { private static ProgressDialog sProgressDialog; private static final Handler handler = new Handler(); private RxProgress() { throw new AssertionError("No instances."); } public static <U> SingleTransformer<U, U> bindToLifecycle(Context context, @StringRes int stringRes) { if(context == null){ return bindToLifecycle(null, null); } return bindToLifecycle(context, context.getString(stringRes)); } public static <U> SingleTransformer<U, U> bindToLifecycle(Context context, CharSequence message) { return upstream -> { if(context == null){ return upstream; } if(sProgressDialog != null && sProgressDialog.isShowing()){ return upstream; } final ProgressDialog progressDialog = new ProgressDialog(context); progressDialog.setMessage(message); sProgressDialog = progressDialog; return upstream .doOnSubscribe(disposable -> progressDialog.show()) .doOnSuccess(u -> { handler.postDelayed(()->progressDialog.dismiss(), 800); }) .doOnError(throwable -> { handler.postDelayed(()-> progressDialog.dismiss(), 800); }); }; } }
该过程主要是在构造OkhttpClient对象的时候完成,该过程配合了Retrofit的使用,整个OkHttpClient构造在ServiceGenerator中完成,代码如下:
public class ServiceGenerator { private static final String TAG = ServiceGenerator.class.getSimpleName(); private static final int DEFAULT_TIMEOUT = 20; public final static Moshi moshi = new Moshi.Builder() .add(MyAdpaterFactory.create()) .build(); private final static HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor() .setLevel(HttpLoggingInterceptor.Level.BODY); /*private final static File cacheDir = new File(IPVTApplication.getInstance().getCacheDir(), "OkHttpCache");*/ private final static Retrofit.Builder builder = new Retrofit.Builder() //将OkHttp的Call对象转成Observable对象 .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .addConverterFactory(new StringResponseConverterFactory()) //将Json格式数据转成JavaBean对象,这里JavaBean工具使用google auto,Json工具使用moshi .addConverterFactory(MoshiConverterFactory.create(moshi)); private final static OkHttpClient httpClient = new OkHttpClient.Builder() //addInterceptor添加的是应用拦截器Application Interceptor他只会在response被调用一次 .addInterceptor(loggingInterceptor) .addInterceptor(new ResponseInterceptor()) .connectTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS) .readTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS) .build(); public static <S> S createService(final Context context, Class<S> serviceClass) { OkHttpClient.Builder clientBuilder = httpClient.newBuilder() .addInterceptor(new Interceptor() { @Override public Response intercept(Chain chain) throws IOException { //对请求的做预处理,方便后续请求参数处理 Request original = chain.request(); /*String[] headers = { "application/vnd.github.html+json", "application/vnd.github.raw+json" }; String token = TokenStore.getInstance(context).getToken(); Request.Builder requestBuilder = original.newBuilder() .header("Authorization", "Token " + token) .method(original.method(), original.body()); if (original.header("Accept") == null) { requestBuilder.addHeader("Accept", TextUtils.join(",", headers)); } Request request = requestBuilder.build(); return chain.proceed(request);*/ return chain.proceed(original); } }) .cache(new Cache(new File(context.getCacheDir(), "OkHttpCache"), 10 * 1024 * 1024)) //addNetworkInterceptor,网络拦截器Network Interfacetor它会在request和response时分别被调用一次; .addNetworkInterceptor(new CacheInterceptor(context)) .cookieJar(new CookieJar() { final HashMap<HttpUrl, List<Cookie>> cookieStore = new HashMap<>(); @Override public void saveFromResponse(HttpUrl url, List<Cookie> cookies) { cookieStore.put(url, cookies);//保存cookie //也可以使用SP保存 } @Override public List<Cookie> loadForRequest(HttpUrl url) { List<Cookie> cookies = cookieStore.get(url);//取出cookie return cookies != null ? cookies : new ArrayList<Cookie>(); } }); //check host ip start String serverAddress = ApiInstance.getIpvtHost(false); Logger.d(TAG,"ipvt server address is "+serverAddress); String newAddr = serverAddress; if(serverAddress.endsWith("/")){ newAddr = (String) serverAddress.subSequence(0,serverAddress.length()-1); } Logger.d(TAG,"ipvt server address is "+newAddr); boolean isHttpsIpUrl = Setting.isHttpsIpAddress(newAddr); if(isHttpsIpUrl){ Logger.i(TAG,"need tls skip."); clientBuilder = clientBuilder .sslSocketFactory(SSLSocketClient.getSSLSocketFactory()) .hostnameVerifier(SSLSocketClient.getHostnameVerifier()); } //check host ip end OkHttpClient client = clientBuilder.build(); Retrofit retrofit = builder.baseUrl(serverAddress) .client(client) .build(); return retrofit.create(serviceClass); } }
网络请求中的RESTful接口如下, 采用HttpResult泛型对象封装请求结果
public interface IPVTService { //login @FormUrlEncoded @POST("pro/user/login") Single<HttpResult<LoginInfo>> loginServer(@FieldMap Map<String, String> map); //query contacts db version @GET("pro/contacts/departments/lastversion") Single<HttpResult<ContactsVersion>> reqContactsVersion(@QueryMap Map<String, String> map); //pkg info @GET("pro/package/info") Single<HttpResult<PackageInfo>> reqPackageInfo(@QueryMap Map<String, String> map); //query enterprise @GET("pro/enterprise/info") Single<HttpResult<Enterprise>> reqEnterpriseInfo(@QueryMap Map<String, String> map);
Rxjava库里面还有很多很有用的操作符,这里限于业务没有涉及到,本文中列举了一些常用的使用场景,希望对大家的代码开发有所帮助,RxJava博大精深,如果熟练使用,定能优雅的处理业务逻辑!
对于RxJava详细的使用,大家可以参考ReactiveX文档
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。