当前位置:   article > 正文

RxJava使用场景解析_rxjava flatmap blockingfirst

rxjava flatmap blockingfirst

1. RxJava简介

相信大家对RxJava都有或多或少的了解,RxJava是Java语言的响应式编程库, 更具体点就是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。

1.1 RxJava概念

RxJava 有四个基本概念:
Observable(可观察者,即被观察者)、 Observer(观察者)、 subscribe(订阅)、事件。ObservableObserver通过 subscribe()方法实现订阅关系,从而 Observable可以在需要的时候发出事件来通知 Observer

RxJava采用观察者的模式设计,与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext()之外,还定义了两个特殊的事件:onCompleted()onError()

onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext()发出时,需要触发 onCompleted()方法作为标志。
onError(): 事件队列异常。在事件处理过程中出异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中, onCompleted()onError()有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()onError()二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

RxJava观察者模型如下:

RxJava观察者模型

1.2 操作符

flatMap: 它可以把一个发射器 Observable 通过某种方法转换为多个 Observables,然后再把这些分散的 Observables装进一个单一的发射器Observable。但有个需要注意的是,flatMap并不能保证事件的顺序,如果需要保证,需要用到我们下面要讲的 ConcatMap。

通常使用中,我们只是简单的将一个Observable转化成另外一个Observable,这是flatMap操作的一种特殊情景。

filter: 很明显用于过滤操作,用于过滤出满足条件的Observable对象

timer/interval: 定时操作,也就是在接下来某个时间点开始每隔一定时间执行一项任务。

distinct:去重操作。

doOnNext:让订阅者在接收到数据之前干点有意思的事情。假如我们在获取到数据之前想先保存一下它,无疑我们可以这样实现。

just: 简单的发射器依次调用 onNext() 方法。

defer: 每次订阅都会创建一个新的 Observable,并且如果没有被订阅,就不会产生新的 Observable。

2. RxJava数据库操作

场景:加载GVC系列IPVT通知列表, 并对列表数据做查询数量、删除、更新操作

需求:从数据库加载并实现内存缓存

实现:采用MVP架构实现通知列表加载及操作逻辑

2.1 查询

该操作中使用到了Rxjava的flatMap、filter、fromIterable等操作符,涉及了RxJava线程切换

  1. NoticeActivity的onResume()方法中调用Presenter的subscribe()执行加载逻辑,代码在NoticeActivity.java中
@Override
    protected void onResume() {
        super.onResume();
        Logger.d(TAG, "[onResume] ");
        mNoticePresenter.subscribe();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 在NoticePresenter的subscribe()实现加载,loadNotices() 方法的boolean型参数表示是否清空缓存, 代码在NoticePresenter.java中
@Override
    public void subscribe() {
        loadNotices(false);
    }
  • 1
  • 2
  • 3
  • 4

具体的加载逻辑在loadNotices(false)方法中实现

@Override
    public void loadNotices(boolean forceUpdate) {
        // Simplification for sample: a network reload will be forced on first load.
        loadNotices(forceUpdate || mFirstLoad, true);
        mFirstLoad = false;
    }

    /**
     * @param forceUpdate   Pass in true to refresh the data in the {@link NoticeLocalDataSource}
     * @param showLoadingUI Pass in true to display a loading icon in the UI
     */
    private void loadNotices(final boolean forceUpdate, final boolean showLoadingUI) {
        if (showLoadingUI) {
            mNoticeView.setLoadingIndicator(true);
        }
        if (forceUpdate) { //为true时会执行清空缓存操作
            localDataSource.refreshNotices();
        }
        mCompositeDisposable.clear();
        Disposable disposable = localDataSource
                .getAndCacheLocalNotices() //从数据库中获取通知列表并缓存起来
                //flatMap操作执行过滤操作
                .flatMap(messages -> Flowable.fromIterable(messages)
                        //根据不同查询类型进行列表过滤
                        .filter(notice -> {
                            switch (mCurrentFiltering) {
                                case UNREAD_NOTICES:
                                    return !notice.isLocked();
                                case ALL_NOTICES:
                                default:
                                    Logger.d(TAG," filter my thread name is "
                                            +Thread.currentThread().getName());

                                    String msgContent = notice.getContent();
                                    if (!TextUtils.isEmpty(msgContent) &&
                                            msgContent.equals(mainPageNotice)&&
                                            isMainPageAction) {
                                        if(defSelectedPos==0){
                                            defSelectedPos = tmpSelectcedPos;
                                        }
                                    } else {
                                        tmpSelectcedPos++;
                                    }

                                    return true;
                            }
                        })
                        .toList().toFlowable())
                 //flatMap操作执行删除多余通知的操作,最多显示200条       
                .flatMap(messageDetails -> {
                    if(messageDetails.size() > MessageContract.MAX_SHOW_SIZE){
                        return localDataSource.deleteExceedMessages(messageDetails);
                    }else{
                        return Flowable.fromIterable(messageDetails).toList().toFlowable();
                    }
                }).doFinally(() -> {
                    Logger.d(TAG,"do finally.");
                })
                //线程切换
                .subscribeOn(mSchedulerProvider.io())
                .observeOn(mSchedulerProvider.ui())
                .subscribe(

                        // onNext
                        tasks -> {
                            Logger.d(TAG,"subscribe my thread name is "+Thread.currentThread().getName());

                            processMessages(tasks);
                            mNoticeView.setLoadingIndicator(false);

                        },
                        // onError
                        throwable -> mNoticeView.showLoadNoticeError());

        mCompositeDisposable.add(disposable);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  1. 在NoticeDataSource中实现通知数据的加载,代码NoticeLocalDataSource.java中
public Flowable<List<MessageDetail>> getAndCacheLocalNotices() {
        // Respond immediately with cache if available and not dirty
        if (mCachedMessages != null && !mCacheIsDirty) {
            Logger.i(TAG, " load from cache");
            return Flowable.fromIterable(mCachedMessages.values()).toList().toFlowable();
        } else if (mCachedMessages == null) {
            mCachedMessages = new LinkedHashMap<>();
        }
        Logger.d(TAG,"begin obtain notices from db");
        //从数据库加载数据并缓存到LinkedHashMap中,便于后续操作单条记录
        return getAllNotices()
                .flatMap(messages -> Flowable.fromIterable(messages)
                        .doOnNext(message -> mCachedMessages.put(String.valueOf(message.getId()), message))
                        .toList()
                        .toFlowable())
                .doOnComplete(() -> mCacheIsDirty = false);
    }
    
    
    /*这里使用SqlBrite库,将常规数据类型转成Observable类型, 先将Cursor对象转成JavaBean对象(RxJava 中Function<Cursor, MessageDetail> 操作符实现),然后转成Observable对象*/
    public Flowable<List<MessageDetail>> getAllNotices() {
        return mBriteContentResolver.createQuery(MessageContract.MessageDetail.CONTENT_URI,
                null, LIMIT, null, ORDER_BY, true)
                .mapToList(mMessageDetailMapperFunction)
                .toFlowable(BackpressureStrategy.BUFFER);

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

2.2 删除、修改

  1. NoticePresenter中使用RxJava执行异步删除任务
@Override
    public void deleteNotices(List<Long> noticeIds) {
        Logger.d(TAG," deleting notices...");
        Disposable disposable = localDataSource.deleteMarkedNotices(noticeIds)
                    .subscribeOn(mSchedulerProvider.io())
                .observeOn(mSchedulerProvider.ui())
                .subscribe(aBoolean -> {
                    Logger.d(TAG," delete notice completed.");
                    mNoticeView.showDeleteResult(true);
                   }, throwable -> {
                    Logger.e(TAG,"delete notices exception.");
                    throwable.printStackTrace();
                    mNoticeView.showDeleteResult(false);
                }, () -> {
                    Logger.d(TAG,"delete Notice subscribe onCompleted.");
                });
        mCompositeDisposable.add(disposable);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  1. 具体的删除操作在Datasource中执行,执行过程是先从缓存中将该数据项删除,然后在数据库中将该数据项删除,刷新时,只需要重新加载一下缓存列表,不需要重新查询数据库

这里采用create操作符产生一个Observable对象

@Override
    public Flowable<Boolean> deleteMarkedNotices(List<Long> idList) {
        // Do in memory cache update to keep the app UI up to date
        if (mCachedMessages == null) {
            mCachedMessages = new LinkedHashMap<>();
        }
        Iterator<Map.Entry<String, MessageDetail>> it = mCachedMessages.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, MessageDetail> entry = it.next();
            if (idList.contains(entry.getKey())) {
                it.remove();
            }
        }

        return Flowable.create(new FlowableOnSubscribe<Boolean>() {
            @Override
            public void subscribe(FlowableEmitter<Boolean> e) throws Exception {
                ArrayList<ContentProviderOperation> batch = new ArrayList<ContentProviderOperation>();
                for(Long id: idList){
                    String[] whereArgs = new String[]{String.valueOf(id),
                            String.valueOf(1)};
                    String smsWhere = MessageContract.MessageDetail._ID + " = ? and "
                            + MessageContract.MessageDetail.COLUMN_ACCOUNT_INDEX + "= ?";
                    batch.add(ContentProviderOperation
                            .newDelete(MessageContract.MessageDetail.CONTENT_URI)
                            .withSelection(smsWhere, whereArgs)
                            .build());
                }
                int result = 0;
                try {
                    result = mContext.getContentResolver().applyBatch(MessageContract.AUTHORITY,
                            batch).length;
                } catch (RemoteException remoteExp) {
                    Logger.e(TAG,"deleteMarked Notice occure remoteException.");
                } catch (OperationApplicationException operaExp) {
                    Logger.e(TAG,"deleteMarked Notice occure OperationApplicationException.");
                }
                if (result > 0) {
                    e.onNext(true);//返回结果
                } else {
                    e.onNext(false);//返回结果
                }
                e.onComplete();//返回结束
            }
        }, BackpressureStrategy.BUFFER);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  1. 更新操作在Presenter中与删除类似,在DataSource中操作代码如下

    @Override
    public Flowable<Boolean> updateContentType(MessageDetail detail, int type) {

        String id = String.valueOf(detail.getId());
        try {
            MessageDetail newDetail = (MessageDetail)detail.clone();
            if(mCachedMessages == null){
                mCachedMessages = new LinkedHashMap<>();
            }
            mCachedMessages.put(id, newDetail);
        } catch (CloneNotSupportedException e) {
            e.printStackTrace();
        }

        return Flowable.create(new FlowableOnSubscribe<Boolean>() {
            @Override
            public void subscribe(FlowableEmitter<Boolean> e) throws Exception {
                try {
                    ContentValues cv = new ContentValues();
                    cv.put(MessageContract.MessageDetail.COLUMN_LOCKED, 1);
                    cv.put(MessageContract.MessageDetail.COLUMN_CONTENT_TYPE, type);
                    int res = mContext.getContentResolver().update(MessageContract.MessageDetail.CONTENT_URI,
                            cv,MessageContract.MessageDetail._ID + "=?",
                            new String[]{id});

                    Logger.d(TAG," update message type return res = "+res);
                    if(res>0){
                        e.onNext(true);
                    } else {
                        e.onNext(false);
                    }
                    e.onComplete();
                } catch (Exception e1) {
                    e.onError(e1);
                }
            }
        }, BackpressureStrategy.BUFFER);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

2.3 压缩操作zip

zip 专用于合并事件,该合并不是连接(连接操作符后面会说),而是两两配对,也就意味着,最终配对出的 Observable 发射事件数目只和少的那个相同。

该过程中,使用zip操作符将所有条数及未读条数获取过程做压缩操作并一起返回显示

private void loadNoticeStatistics(){
        Single<Integer> allNotices = Single.just(localDataSource.getAllNotices().blockingFirst().size());
        Single<Integer> unreadNotices = Single.just(localDataSource.getAllNotices()
                .flatMap(messages -> Flowable.fromIterable(messages)
                        .filter(messageDetail -> !messageDetail.isLocked())
                        .toList().toFlowable()).blockingFirst().size());
        Disposable disposable = Single
                .zip(allNotices, unreadNotices, (all, unread) -> {
                    Logger.d(TAG,"all is "+all+", unread is "+unread);
                    return Pair.create(all, unread);
                })
                .subscribeOn(mSchedulerProvider.io())
                .observeOn(mSchedulerProvider.ui())
                .doFinally(() -> {
                    Logger.d(TAG,"do finally");
                })

                .subscribe(
                        // onSuccess
                        stats -> mainEntryView.showNoticeStatistics(stats.first,stats.second),
                        // onError
                        throwable -> {
                            throwable.printStackTrace();
                            Logger.e(TAG,"load notice statistics exp");
                        });
        mCompositeDisposable.add(disposable);

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

2.4 RxJava线程切换

RxJava线程切换主要通过调用subscribeOn和observeOn完成。

subscribeOn 用于指定 subscribe() 时所发生的线程。

observeOn 方法用于指定下游 Observer 回调发生的线程。

简单地说,

  • subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。
  • 多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略。
  • 但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次。

线程切换采用Schedulers类实现,该类内置了很多线程选项供我们选择,例如有:

  • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作;
  • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作;
  • Schedulers.newThread() 代表一个常规的新线程;
  • AndroidSchedulers.mainThread() 代表Android的主线程
    这些内置的 Scheduler 已经足够满足我们开发的需求,因此我们应该使用内置的这些选项,而 RxJava 内部使用的是线程池来维护这些线程,所以效率也比较高。

为了说明复杂的线程调度过程,下面上一张图来解释说明
RxJava线程切换

图中共有 5 处含有对事件的操作。由图中可以看出,①和②两处受第一个 subscribeOn() 影响,运行在红色线程;③和④处受第一个 observeOn() 的影响,运行在绿色线程;⑤处受第二个 onserveOn() 影响,运行在紫色线程;而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。

考虑场景,先后台耗时处理,获取一个返回结果刷新界面,紧接着根据前一个返回结果再做一个耗时处理,然后再根据返回结果做界面更新处理,使用RxJava线程切换操作可以很优雅处理这个过程,代码如下

//RxJava做线程切换,先加载联系人数量,刷新UI,再加载选中状态,刷新UI
    @Override
    public void loadNodeUsrCntAndCheckState(final String  nodeId, final RequestCallback<Integer> cntCallback, final RequestCallback<Boolean> checkCallback) {
        Disposable disposable = mDataSource.loadNodeUserCnt(nodeId)
                .subscribeOn(schedulerProvider.computation())
                .observeOn(schedulerProvider.ui())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Logger.d(TAG,"doOnNext thread = "+Thread.currentThread().getName());
                        cntCallback.processData(integer);
                    }
                })
                .observeOn(schedulerProvider.computation())
                .flatMap(new Function<Integer, Flowable<Boolean>>() {
                    @Override
                    public Flowable<Boolean> apply(Integer integer) throws Exception {
                        Logger.d(TAG,"flatMap thread = "+Thread.currentThread().getName());

                        if(integer <= 200 && integer>0){
                            try {
                                return mDataSource.isDepartmentChecked(integer, nodeId);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                        return Flowable.just(false);
                    }
                })
                .observeOn(schedulerProvider.ui())
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        Logger.d(TAG, "subscribe thread = " + Thread.currentThread().getName());
                        checkCallback.processData(aBoolean);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        checkCallback.processErrorCode("error");
                    }
                });
        mCompositeDisposable.add(disposable);

        adpaterDisposable.add(disposable);
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

3. RxJava网络请求操作

3.1 嵌套网络请求

考虑场景:登录服务器后获取Token,然后使用Token请求Api数据

该过程首先登录到服务器,通过flatMa操作符,将Single<HttpResult> 类型的loginInfoHttpResult 结果转换为Single<HttpResult> 类型的数据
也就是根据登录获取到的token信息转化为请求数据库版本信息的结果,整个过程实在异步线程中执行,其中线程切换的逻辑采用compose操作符做了合并操作,具体实现在applySchedulers()函数

    @Override
    public void loginAndReqDbVersion() {
       Single<HttpResult<LoginInfo>> singleLoginResult = loginServerSync();
       if(singleLoginResult == null) return;
       Disposable disposable = singleLoginResult
               .flatMap(loginInfoHttpResult -> {
                    processLoginInfo(mContext, loginInfoHttpResult.data());
                    Logger.d(TAG,"loginAndReqDbVersion, thread = "
                            +Thread.currentThread().getName());
                    return reqDbVersionSync();
               })
               .compose(applySchedulers())
               .subscribe(contactsVersionHttpResult -> {
                   ContactsVersion dbVersionInfo = contactsVersionHttpResult.data();
                   if (dbVersionInfo != null) {
                       Logger.d(TAG, "dbVersionInfo is  " + dbVersionInfo.toString());
                       String version = dbVersionInfo.version();
                       if (!TextUtils.isEmpty(version)) {
                           SharePrefUtil.getInstance(mContext).setContactDbVersion(version);
                       }
                   }
               }, throwable -> {
                   processException(throwable, null);
               });
       mCompositeDisposable.add(disposable);
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

applySchedulers() 代码如下

public <T> SingleTransformer<T,T> applySchedulers(){
        return new SingleTransformer<T, T>() {
            @Override
            public SingleSource<T> apply(Single<T> upstream) {
                return upstream.subscribeOn(mSchedulerProvider.io())
                        .observeOn(mSchedulerProvider.ui());
            }
        };
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

3.2 网络请求错误处理retryWhen

考虑场景:请求某个Api时候,token过期了

这种场景下首先需要重新登录服务器,更新token,然后重新请求该API

在这种场景下如果采用常规逻辑处理是不是很棘手,如果使用RxJava处理,逻辑上就显得简单多了。

这里使用了defer操作符,目的是每次调用都重新发送一个新的Observable对象,避免出错后重新请求,发现新请求token不生效,relogin()过程采用compose操作符做了压缩处理,另外,在请求过程中肯定需要弹一个进度条,这里也使用compose压缩处理,具体实现逻辑在RxProgress.bindToLifecycle

    @Override
    public void reqDbVersion(RequestCallback<ContactsVersion> callback) {
        int expNo = preProcessBeforeRequest();
        if(expNo > NetErrorCode.EXP_NONE){
            callback.processException(expNo);
            return;
        }
        Disposable disposable = Single.defer(()->ServiceGenerator.createService(mContext,IPVTService.class)
                .reqContactsVersion(RequestParamManager.getInstance().dbVersionParamsMap(mContext)))
                .compose(relogin())//这里需要做错误码重试请求
                .compose(applySchedulers())/*.subscribeOn(mSchedulerProvider.io()).observeOn(mSchedulerProvider.ui())*/
                .compose(RxProgress.bindToLifecycle(mActivity, R.string.load_msg))
                .subscribe(contactsVersionHttpResult -> {
                    processResult(contactsVersionHttpResult, callback);
                }, throwable -> {
                    processException(throwable, callback);
                });
        mCompositeDisposable.add(disposable);
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

relogin()实现代码如下,整个过程主要是通过retryWhen操作符完成, 首先每次请求的结果通过flatMap操作符进行处理,如果错误码为ERR_TOKEN_EXPIRED或者ERR_INVALID_TIMESTAMP,
则将该异常传递给retryWhen操作符,retryWhen根据错误类型执行不同的重新请求的逻辑, 比如ERR_TOKEN_EXPIRED需要先登录服务器后再发起重新请求,ERR_INVALID_TIMESTAMP需要更新请求参数的时间戳后发起重新请求。

private <T> SingleTransformer<HttpResult<T>, HttpResult<T>> relogin() {
        return (Single<HttpResult<T>> observable) -> observable
                .flatMap(tRequestResult -> {
                    String retCode = tRequestResult.retCode();
                    if (retCode.equals(NetErrorCode.ERR_TOKEN_EXPIRED)) {
                        //重新登录。
                        Logger.e(TAG, "token expired");
                        return Single.error(new TokenExpiredException("tokenExpired"));
                    } else if(retCode.equals(NetErrorCode.ERR_INVALID_TIMESTAMP)){
                        Logger.e(TAG, "timeStamp error");
                        String timeStamp = tRequestResult.timeStamp();
                        SharePrefUtil.getInstance(mContext).setServerTimeStamp(timeStamp);
                        return Single.error(new TimeStampErrorException("timeStampError"));
                    }
                    return Single.just(tRequestResult);
                })
                .retryWhen(error -> {
                    return error.flatMap(throwable -> {
                        if(throwable instanceof TokenExpiredException) {
                            Logger.d(TAG, " relogin current Thread = "
                                    + Thread.currentThread().getName());
                            Single<HttpResult<LoginInfo>> loginSingle = loginServerSync();
                            if (loginSingle != null) {
                            return loginSingle.toFlowable().flatMap(loginInfoHttpResult -> {
                                processLoginInfo(mContext, loginInfoHttpResult.data());
                                return Flowable.just(1);
                            });
                        }
                        }else if(throwable instanceof TimeStampErrorException){
                            return Flowable.just(1);
                        }
                        return Flowable.error(throwable);
                    });
                });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

RxProgress.bindToLifecycle具体代码实现如下,这里是将进度条显示放到订阅过程不同阶段去处理,订阅开始时会调用doOnSubscribe方法,订阅成功执行doOnSuccess方法,订阅出错执行doOnError方法,这样将UI显示状态与订阅过程同步起来了。

public final class RxProgress {

    private static ProgressDialog sProgressDialog;
    private static final Handler handler = new Handler();

    private RxProgress() {
        throw new AssertionError("No instances.");
    }

    public static <U> SingleTransformer<U, U> bindToLifecycle(Context context, @StringRes int stringRes) {
        if(context == null){
            return bindToLifecycle(null, null);
        }
        return bindToLifecycle(context, context.getString(stringRes));
    }

    public static <U> SingleTransformer<U, U> bindToLifecycle(Context context, CharSequence message) {
        return upstream -> {
            if(context == null){
                return upstream;
            }
            if(sProgressDialog != null && sProgressDialog.isShowing()){
                return upstream;
            }
            final ProgressDialog progressDialog = new ProgressDialog(context);
            progressDialog.setMessage(message);
            sProgressDialog = progressDialog;

            return upstream
                    .doOnSubscribe(disposable -> progressDialog.show())
                    .doOnSuccess(u -> {
                        handler.postDelayed(()->progressDialog.dismiss(), 800);
                    })
                    .doOnError(throwable -> {
                        handler.postDelayed(()-> progressDialog.dismiss(), 800);
                    });
        };
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

3.3 网络请求Json格式封装成RxJava Observable对象

该过程主要是在构造OkhttpClient对象的时候完成,该过程配合了Retrofit的使用,整个OkHttpClient构造在ServiceGenerator中完成,代码如下:

public class ServiceGenerator {

    private static final String TAG = ServiceGenerator.class.getSimpleName();

    private static final int DEFAULT_TIMEOUT = 20;


    public final static Moshi moshi = new Moshi.Builder()
            .add(MyAdpaterFactory.create())
            .build();


    private final static HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor()
            .setLevel(HttpLoggingInterceptor.Level.BODY);

    /*private final static File cacheDir =  new File(IPVTApplication.getInstance().getCacheDir(),
                    "OkHttpCache");*/


    private final static Retrofit.Builder builder = new Retrofit.Builder()
            //将OkHttp的Call对象转成Observable对象
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .addConverterFactory(new StringResponseConverterFactory())
            //将Json格式数据转成JavaBean对象,这里JavaBean工具使用google auto,Json工具使用moshi
            .addConverterFactory(MoshiConverterFactory.create(moshi));

    private final static OkHttpClient httpClient = new OkHttpClient.Builder()
            //addInterceptor添加的是应用拦截器Application Interceptor他只会在response被调用一次
            .addInterceptor(loggingInterceptor)
            .addInterceptor(new ResponseInterceptor())
            .connectTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
            .readTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
            .build();


    public static <S> S createService(final Context context, Class<S> serviceClass) {
        OkHttpClient.Builder clientBuilder = httpClient.newBuilder()
                .addInterceptor(new Interceptor() {
                    @Override
                    public Response intercept(Chain chain) throws IOException {

                        //对请求的做预处理,方便后续请求参数处理
                        Request original = chain.request();

                        /*String[] headers = {
                                "application/vnd.github.html+json",
                                "application/vnd.github.raw+json"
                        };

                        String token = TokenStore.getInstance(context).getToken();
                        Request.Builder requestBuilder = original.newBuilder()
                                .header("Authorization", "Token " + token)
                                .method(original.method(), original.body());

                        if (original.header("Accept") == null) {
                            requestBuilder.addHeader("Accept", TextUtils.join(",", headers));
                        }

                        Request request = requestBuilder.build();
                        return chain.proceed(request);*/
                        return chain.proceed(original);
                    }
                })
                .cache(new Cache(new File(context.getCacheDir(),
                        "OkHttpCache"), 10 * 1024 * 1024))
                //addNetworkInterceptor,网络拦截器Network Interfacetor它会在request和response时分别被调用一次;
                .addNetworkInterceptor(new CacheInterceptor(context))
                .cookieJar(new CookieJar() {
                    final HashMap<HttpUrl, List<Cookie>> cookieStore = new HashMap<>();
                    @Override
                    public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
                        cookieStore.put(url, cookies);//保存cookie
                        //也可以使用SP保存
                    }

                    @Override
                    public List<Cookie> loadForRequest(HttpUrl url) {
                        List<Cookie> cookies = cookieStore.get(url);//取出cookie
                        return cookies != null ? cookies : new ArrayList<Cookie>();
                    }
                });

        //check host ip start
        String serverAddress = ApiInstance.getIpvtHost(false);
        Logger.d(TAG,"ipvt server address is "+serverAddress);
        String newAddr = serverAddress;
        if(serverAddress.endsWith("/")){
            newAddr = (String) serverAddress.subSequence(0,serverAddress.length()-1);
        }
        Logger.d(TAG,"ipvt server address is "+newAddr);
        boolean isHttpsIpUrl = Setting.isHttpsIpAddress(newAddr);
        if(isHttpsIpUrl){
            Logger.i(TAG,"need tls skip.");
            clientBuilder = clientBuilder
            .sslSocketFactory(SSLSocketClient.getSSLSocketFactory())
            .hostnameVerifier(SSLSocketClient.getHostnameVerifier());
        }
        //check host ip end

        OkHttpClient client = clientBuilder.build();
        Retrofit retrofit = builder.baseUrl(serverAddress)
                .client(client)
                .build();
        return retrofit.create(serviceClass);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107

网络请求中的RESTful接口如下, 采用HttpResult泛型对象封装请求结果

public interface IPVTService {

    //login
    @FormUrlEncoded
    @POST("pro/user/login")
    Single<HttpResult<LoginInfo>> loginServer(@FieldMap Map<String, String> map);

    //query contacts db version
    @GET("pro/contacts/departments/lastversion")
    Single<HttpResult<ContactsVersion>> reqContactsVersion(@QueryMap Map<String, String> map);

    //pkg info
    @GET("pro/package/info")
    Single<HttpResult<PackageInfo>> reqPackageInfo(@QueryMap Map<String, String> map);

    //query enterprise
    @GET("pro/enterprise/info")
    Single<HttpResult<Enterprise>> reqEnterpriseInfo(@QueryMap Map<String, String> map);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

4. 总结

Rxjava库里面还有很多很有用的操作符,这里限于业务没有涉及到,本文中列举了一些常用的使用场景,希望对大家的代码开发有所帮助,RxJava博大精深,如果熟练使用,定能优雅的处理业务逻辑!

对于RxJava详细的使用,大家可以参考ReactiveX文档

https://mcxiaoke.gitbooks.io/rxdocs/content/Intro.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/177334
推荐阅读
相关标签
  

闽ICP备14008679号