赞
踩
上期原理篇我们讲到了WebSocket和HTTP的区别及其优缺点,这期主要是实战部分,大多数Android项目不需要使用第三方重量级的WebSocket框架,所以我以OKHttp3.0自带的WebSocket为例,来进行讲解
添加依赖
implementation "com.squareup.okhttp3:okhttp:4.9.0"
实现步骤
构建OkHttpClient配置初始化一些参数。
使用WebSocket的Url地址连接。
设置WebSocket的连接状态回调和消息回调。
解析消息json处理业务等。
连接成功后,使用WebSocket发送消息
1、配置OKHttpClient
- val mHolder = OkHttpClient.Builder()
- .connectTimeout(CONNECT_TIME_OUT, TimeUnit.SECONDS)//设置读取超时时间
- .readTimeout(READ_TIME_OUT, TimeUnit.SECONDS)//设置连接超时时间
- .writeTimeout(3, TimeUnit.SECONDS)//设置写的超时时间
- .build()
2、使用URL 构建WebSocket请求
- // 构建鉴权url
- val authUrl = TTSTools.createAuthUrl(mTTSConfig)
- // 获取请求
- val request: Request = Request.Builder().url(authUrl).build()
鉴权url 一般是ws[s]://host/servername/path?形式,至于ws和wss的区别,可以类比http和https,这里不做赘述
3、发起连接,配置回调
- // 开始连接
- val client: OkHttpClient = CusOkHttpClient.mInstance
- mWebSocket = client.newWebSocket(
- request,
- object : WebSocketListener() {
-
- override fun onOpen(webSocket: WebSocket, response: Response) {
- Logger.d(mTAG, "WebSocket-握手协议")
- }
-
- override fun onMessage(webSocket: WebSocket, text: String) {
- Logger.d(mTAG, "WebSocket-返回结果}")
- }
-
- override fun onClosed(webSocket: WebSocket, code: Int, reason: String)
- {
- Logger.d(mTAG, "WebSocket关闭")
- }
-
- override fun onFailure(webSocket: WebSocket,
- t: Throwable, response: Response?) {
- Logger.d(mTAG, "WebSocket出错--${t.message}")
- }
- }
- )
4、使用WebSocket对象发送消息,msg为消息内容(一般是json,当然你也可以使用其他的,例如xml等),send方法会马上返回发送结果
- /**
- * 发送数据
- */
- private fun sendSimpleData(
- simpleData: String
- ) {
- if (!TextUtils.isEmpty(simpleData)) {
- mTtsText = ttsText
- }
- mWebSocket?.send(getRequest(ttsText))
- }
getRequest方法主要是生成相应的请求JSON,数据协议由两端决定
- private fun getRequest(rawText: String): String {
- return JSONObject().apply {
- put("text",rawText)
- }.toString()
- }
可以看出,OKHTTP3的WebSocket使用非常的简单,而WebSocket可以基于RxJava或协程做二次封装,下面基于RxJava的封装,我找了个比较好的案例,供大家学习
基于RxJava封装
1、定义Api调用接口,外部只需要接触Api无无需关心内部实现逻辑
- public interface WebSocketWorker {
- /**
- * 获取连接,并返回观察对象
- */
- Observable<WebSocketInfo> get(String url);
-
- /**
- * 设置一个超时时间,在指定时间内如果没有收到消息,会尝试重连
- *
- * @param timeout 超时时间
- * @param timeUnit 超时时间单位
- */
- Observable<WebSocketInfo> get(String url, long timeout, TimeUnit timeUnit);
-
- /**
- * 发送,url的WebSocket已打开的情况下使用,否则会抛出异常
- *
- * @param msg 消息,看具体和后端协商的格式,一般为json
- */
- Observable<Boolean> send(String url, String msg);
-
- /**
- * 发送,同上
- *
- * @param byteString 信息类型为ByteString
- */
- Observable<Boolean> send(String url, ByteString byteString);
-
- /**
- * 不关心WebSocket是否连接,直接发送
- */
- Observable<Boolean> asyncSend(String url, String msg);
-
- /**
- * 同上,只是消息类型为ByteString
- */
- Observable<Boolean> asyncSend(String url, ByteString byteString);
-
- /**
- * 关闭指定Url的连接
- */
- Observable<Boolean> close(String url);
-
- /**
- * 马上关闭指定Url的连接
- */
- boolean closeNow(String url);
-
- /**
- * 关闭当前所有连接
- */
- Observable<List<Boolean>> closeAll();
- /**
- * 马上关闭所有连接
- */
- void closeAllNow();
- }
2、构建者模式,大量的配置参数,我们先使用一个Builder类保存,再使用build()方法生成RxWebSocket对象
- public class RxWebSocketBuilder {
- Context mContext;
- /**
- * 是否打印Log
- */
- boolean mIsPrintLog;
- /**
- * Log代理对象
- */
- Logger.LogDelegate mLogDelegate;
- /**
- * 支持外部传入OkHttpClient
- */
- OkHttpClient mClient;
- /**
- * 支持SSL
- */
- SSLSocketFactory mSslSocketFactory;
- X509TrustManager mTrustManager;
- /**
- * 重连间隔时间
- */
- long mReconnectInterval;
- /**
- * 重连间隔时间的单位
- */
- TimeUnit mReconnectIntervalTimeUnit;
-
- public RxWebSocketBuilder(Context context) {
- this.mContext = context.getApplicationContext();
- }
-
- public RxWebSocketBuilder isPrintLog(boolean isPrintLog) {
- this.mIsPrintLog = isPrintLog;
- return this;
- }
-
- public RxWebSocketBuilder logger(Logger.LogDelegate logDelegate) {
- Logger.setDelegate(logDelegate);
- return this;
- }
-
- public RxWebSocketBuilder client(OkHttpClient client) {
- this.mClient = client;
- return this;
- }
-
- public RxWebSocketBuilder sslSocketFactory(SSLSocketFactory sslSocketFactory, X509TrustManager trustManager) {
- this.mSslSocketFactory = sslSocketFactory;
- this.mTrustManager = trustManager;
- return this;
- }
-
- public RxWebSocketBuilder reconnectInterval(long reconnectInterval, TimeUnit reconnectIntervalTimeUnit) {
- this.mReconnectInterval = reconnectInterval;
- this.mReconnectIntervalTimeUnit = reconnectIntervalTimeUnit;
- return this;
- }
-
- public RxWebSocket build() {
- return new RxWebSocket(this);
- }
- }
3、基础缓存池
- public abstract class BaseCachePool<T extends ICacheTarget<T>> implements ICachePool<T>, Comparator<CacheItem<T>> {
- /**
- * 缓存池
- */
- private ConcurrentHashMap<String, LinkedList<CacheItem<T>>> mPool;
- public BaseCachePool() {
- mPool = new ConcurrentHashMap<>(8);
- }
-
- @Override
- public T obtain(String cacheKey) {
- //缓存链
- LinkedList<CacheItem<T>> cacheChain;
- //没有缓存过,进行缓存
- if (!mPool.containsKey(cacheKey)) {
- cacheChain = new LinkedList<>();
- } else {
- cacheChain = mPool.get(cacheKey);
- }
- if (cacheChain == null) {
- throw new NullPointerException("cacheChain 缓存链创建失败");
- }
- //未满最大缓存数量,生成一个实例
- if (cacheChain.size() < onSetupMaxCacheCount()) {
- T cache = onCreateCache();
- CacheItem<T> cacheItem = new CacheItem<>(cache, System.currentTimeMillis());
- cacheChain.add(cacheItem);
- mPool.put(cacheKey, cacheChain);
- return cache;
- }
- //达到最大缓存数量。按最近的使用时间排序,最近使用的放后面,每次取只取最前面(最久没有使用的)
- Collections.sort(cacheChain, this);
- CacheItem<T> cacheItem = cacheChain.getFirst();
- cacheItem.setRecentlyUsedTime(System.currentTimeMillis());
- //重置所有属性
- T cacheTarget = cacheItem.getCacheTarget();
- cacheTarget = onObtainCacheAfter(cacheTarget);
- return cacheTarget;
- }
-
- @Override
- public T onObtainCacheAfter(ICacheTarget<T> cacheTarget) {
- //默认调用reset方法进行重置,如果有其他需求,子类再进行复写
- return cacheTarget.reset();
- }
-
- @Override
- public int compare(CacheItem<T> o1, CacheItem<T> o2) {
- return Long.compare(o1.getRecentlyUsedTime(), o2.getRecentlyUsedTime());
- }
- }
缓存模型
- public class WebSocketInfo implements Serializable, ICacheTarget<WebSocketInfo> {
- private static final long serialVersionUID = -880481254453932113L;
-
- private WebSocket mWebSocket;
- private String mStringMsg;
- private ByteString mByteStringMsg;
- /**
- * 连接成功
- */
- private boolean isConnect;
- /**
- * 重连成功
- */
- private boolean isReconnect;
- /**
- * 准备重连
- */
- private boolean isPrepareReconnect;
-
- /**
- * 重置
- */
- @Override
- public WebSocketInfo reset() {
- this.mWebSocket = null;
- this.mStringMsg = null;
- this.mByteStringMsg = null;
- this.isConnect = false;
- this.isReconnect = false;
- this.isPrepareReconnect = false;
- return this;
- }
-
- //省略get、set方法
- }
4、具体实现
- public class WebSocketWorkerImpl implements WebSocketWorker {
- private static final String TAG = WebSocketWorkerImpl.class.getName();
-
- /**
- * 上下文
- */
- private Context mContext;
- /**
- * 支持外部传入OkHttpClient
- */
- private OkHttpClient mClient;
- /**
- * 重连间隔时间
- */
- private long mReconnectInterval;
- /**
- * 重连间隔时间的单位
- */
- private TimeUnit mReconnectIntervalTimeUnit;
-
- /**
- * 缓存观察者对象,Url对应一个Observable
- */
- private Map<String, Observable<WebSocketInfo>> mObservableCacheMap;
- /**
- * 缓存Url和对应的WebSocket实例,同一个Url共享一个WebSocket连接
- */
- private Map<String, WebSocket> mWebSocketPool;
- /**
- * WebSocketInfo缓存池
- */
- private final WebSocketInfoPool mWebSocketInfoPool;
-
- public WebSocketWorkerImpl(
- Context context,
- boolean isPrintLog,
- Logger.LogDelegate logDelegate,
- OkHttpClient client,
- SSLSocketFactory sslSocketFactory,
- X509TrustManager trustManager,
- long reconnectInterval,
- TimeUnit reconnectIntervalTimeUnit) {
- this.mContext = context;
- //配置Logger
- Logger.setDelegate(logDelegate);
- Logger.setLogPrintEnable(isPrintLog);
- this.mClient = client;
- //重试时间配置
- this.mReconnectInterval = reconnectInterval;
- this.mReconnectIntervalTimeUnit = reconnectIntervalTimeUnit;
- //配置SSL
- if (sslSocketFactory != null && trustManager != null) {
- mClient = mClient.newBuilder().sslSocketFactory(sslSocketFactory, trustManager).build();
- }
- this.mObservableCacheMap = new HashMap<>(16);
- this.mWebSocketPool = new HashMap<>(16);
- mWebSocketInfoPool = new WebSocketInfoPool();
- }
-
- @Override
- public Observable<WebSocketInfo> get(String url) {
- return getWebSocketInfo(url);
- }
-
- @Override
- public Observable<WebSocketInfo> get(String url, long timeout, TimeUnit timeUnit) {
- return getWebSocketInfo(url, timeout, timeUnit);
- }
-
- @Override
- public Observable<Boolean> send(String url, String msg) {
- return Observable.create(new ObservableOnSubscribe<Boolean>() {
- @Override
- public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
- WebSocket webSocket = mWebSocketPool.get(url);
- if (webSocket == null) {
- emitter.onError(new IllegalStateException("The WebSocket not open"));
- } else {
- emitter.onNext(webSocket.send(msg));
- }
- }
- });
- }
-
- @Override
- public Observable<Boolean> send(String url, ByteString byteString) {
- return Observable.create(new ObservableOnSubscribe<Boolean>() {
- @Override
- public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
- WebSocket webSocket = mWebSocketPool.get(url);
- if (webSocket == null) {
- emitter.onError(new IllegalStateException("The WebSocket not open"));
- } else {
- emitter.onNext(webSocket.send(byteString));
- }
- }
- });
- }
-
- @Override
- public Observable<Boolean> asyncSend(String url, String msg) {
- return getWebSocket(url)
- .take(1)
- .map(new Function<WebSocket, Boolean>() {
- @Override
- public Boolean apply(WebSocket webSocket) throws Exception {
- return webSocket.send(msg);
- }
- });
- }
-
- @Override
- public Observable<Boolean> asyncSend(String url, ByteString byteString) {
- return getWebSocket(url)
- .take(1)
- .map(new Function<WebSocket, Boolean>() {
- @Override
- public Boolean apply(WebSocket webSocket) throws Exception {
- return webSocket.send(byteString);
- }
- });
- }
-
- @Override
- public Observable<Boolean> close(String url) {
- return Observable.create(new ObservableOnSubscribe<WebSocket>() {
- @Override
- public void subscribe(ObservableEmitter<WebSocket> emitter) throws Exception {
- WebSocket webSocket = mWebSocketPool.get(url);
- if (webSocket == null) {
- emitter.onError(new NullPointerException("url:" + url + " WebSocket must be not null"));
- } else {
- emitter.onNext(webSocket);
- }
- }
- }).map(new Function<WebSocket, Boolean>() {
- @Override
- public Boolean apply(WebSocket webSocket) throws Exception {
- return closeWebSocket(webSocket);
- }
- });
- }
-
- @Override
- public boolean closeNow(String url) {
- return closeWebSocket(mWebSocketPool.get(url));
- }
-
- @Override
- public Observable<List<Boolean>> closeAll() {
- return Observable
- .just(mWebSocketPool)
- .map(new Function<Map<String, WebSocket>, Collection<WebSocket>>() {
- @Override
- public Collection<WebSocket> apply(Map<String, WebSocket> webSocketMap) throws Exception {
- return webSocketMap.values();
- }
- })
- .concatMap(new Function<Collection<WebSocket>, ObservableSource<WebSocket>>() {
- @Override
- public ObservableSource<WebSocket> apply(Collection<WebSocket> webSockets) throws Exception {
- return Observable.fromIterable(webSockets);
- }
- }).map(new Function<WebSocket, Boolean>() {
- @Override
- public Boolean apply(WebSocket webSocket) throws Exception {
- return closeWebSocket(webSocket);
- }
- }).collect(new Callable<List<Boolean>>() {
- @Override
- public List<Boolean> call() throws Exception {
- return new ArrayList<>();
- }
- }, new BiConsumer<List<Boolean>, Boolean>() {
- @Override
- public void accept(List<Boolean> list, Boolean isCloseSuccess) throws Exception {
- list.add(isCloseSuccess);
- }
- }).toObservable();
- }
-
- @Override
- public void closeAllNow() {
- for (Map.Entry<String, WebSocket> entry : mWebSocketPool.entrySet()) {
- closeWebSocket(entry.getValue());
- }
- }
-
- /**
- * 是否有连接
- */
- private boolean hasWebSocketConnection(String url) {
- return mWebSocketPool.get(url) != null;
- }
-
- /**
- * 关闭WebSocket连接
- */
- private boolean closeWebSocket(WebSocket webSocket) {
- if (webSocket == null) {
- return false;
- }
- WebSocketCloseEnum normalCloseEnum = WebSocketCloseEnum.USER_EXIT;
- boolean result = webSocket.close(normalCloseEnum.getCode(), normalCloseEnum.getReason());
- if (result) {
- removeUrlWebSocketMapping(webSocket);
- }
- return result;
- }
-
- /**
- * 移除Url和WebSocket的映射
- */
- private void removeUrlWebSocketMapping(WebSocket webSocket) {
- for (Map.Entry<String, WebSocket> entry : mWebSocketPool.entrySet()) {
- if (entry.getValue() == webSocket) {
- String url = entry.getKey();
- mObservableCacheMap.remove(url);
- mWebSocketPool.remove(url);
- }
- }
- }
-
- private void removeWebSocketCache(WebSocket webSocket) {
- for (Map.Entry<String, WebSocket> entry : mWebSocketPool.entrySet()) {
- if (entry.getValue() == webSocket) {
- String url = entry.getKey();
- mWebSocketPool.remove(url);
- }
- }
- }
-
- public Observable<WebSocket> getWebSocket(String url) {
- return getWebSocketInfo(url)
- .filter(new Predicate<WebSocketInfo>() {
- @Override
- public boolean test(WebSocketInfo webSocketInfo) throws Exception {
- return webSocketInfo.getWebSocket() != null;
- }
- })
- .map(new Function<WebSocketInfo, WebSocket>() {
- @Override
- public WebSocket apply(WebSocketInfo webSocketInfo) throws Exception {
- return webSocketInfo.getWebSocket();
- }
- });
- }
-
- public Observable<WebSocketInfo> getWebSocketInfo(String url) {
- return getWebSocketInfo(url, 5, TimeUnit.SECONDS);
- }
-
- public synchronized Observable<WebSocketInfo> getWebSocketInfo(final String url, final long timeout, final TimeUnit timeUnit) {
- //先从缓存中取
- Observable<WebSocketInfo> observable = mObservableCacheMap.get(url);
- if (observable == null) {
- //缓存中没有,新建
- observable = Observable
- .create(new WebSocketOnSubscribe(url))
- .retry()
- //因为有share操作符,只有当所有观察者取消注册时,这里才会回调
- .doOnDispose(new Action() {
- @Override
- public void run() throws Exception {
- //所有都不注册了,关闭连接
- closeNow(url);
- Logger.d(TAG, "所有观察者都取消注册,关闭连接...");
- }
- })
- //Share操作符,实现多个观察者对应一个数据源
- .share()
- //将回调都放置到主线程回调,外部调用方直接观察,实现响应回调方法做UI处理
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread());
- //将数据源缓存
- mObservableCacheMap.put(url, observable);
- } else {
- //缓存中有,从连接池中取出
- WebSocket webSocket = mWebSocketPool.get(url);
- if (webSocket != null) {
- observable = observable.startWith(createConnect(url, webSocket));
- }
- }
- return observable;
- }
-
- /**
- * 组装数据源
- */
- private final class WebSocketOnSubscribe implements ObservableOnSubscribe<WebSocketInfo> {
- private String mWebSocketUrl;
- private WebSocket mWebSocket;
- private boolean isReconnecting = false;
-
- public WebSocketOnSubscribe(String webSocketUrl) {
- this.mWebSocketUrl = webSocketUrl;
- }
-
- @Override
- public void subscribe(ObservableEmitter<WebSocketInfo> emitter) throws Exception {
- //因为retry重连不能设置延时,所以只能这里延时,降低发送频率
- if (mWebSocket == null && isReconnecting) {
- if (Thread.currentThread() != Looper.getMainLooper().getThread()) {
- long millis = mReconnectIntervalTimeUnit.toMillis(mReconnectInterval);
- if (millis == 0) {
- millis = 1000;
- }
- SystemClock.sleep(millis);
- }
- }
- initWebSocket(emitter);
- }
-
- private Request createRequest(String url) {
- return new Request.Builder().get().url(url).build();
- }
-
- /**
- * 初始化WebSocket
- */
- private synchronized void initWebSocket(ObservableEmitter<WebSocketInfo> emitter) {
- if (mWebSocket == null) {
- mWebSocket = mClient.newWebSocket(createRequest(mWebSocketUrl), new WebSocketListener() {
- @Override
- public void onOpen(WebSocket webSocket, Response response) {
- super.onOpen(webSocket, response);
- //连接成功
- if (!emitter.isDisposed()) {
- mWebSocketPool.put(mWebSocketUrl, mWebSocket);
- //重连成功
- if (isReconnecting) {
- emitter.onNext(createReconnect(mWebSocketUrl, webSocket));
- } else {
- emitter.onNext(createConnect(mWebSocketUrl, webSocket));
- }
- }
- isReconnecting = false;
- }
-
- @Override
- public void onMessage(WebSocket webSocket, String text) {
- super.onMessage(webSocket, text);
- //收到消息
- if (!emitter.isDisposed()) {
- emitter.onNext(createReceiveStringMsg(mWebSocketUrl, webSocket, text));
- }
- }
-
- @Override
- public void onMessage(WebSocket webSocket, ByteString bytes) {
- super.onMessage(webSocket, bytes);
- //收到消息
- if (!emitter.isDisposed()) {
- emitter.onNext(createReceiveByteStringMsg(mWebSocketUrl, webSocket, bytes));
- }
- }
-
- @Override
- public void onClosed(WebSocket webSocket, int code, String reason) {
- super.onClosed(webSocket, code, reason);
- if (!emitter.isDisposed()) {
- emitter.onNext(createClose(mWebSocketUrl));
- }
- }
-
- @Override
- public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
- super.onFailure(webSocket, throwable, response);
- isReconnecting = true;
- mWebSocket = null;
- //移除WebSocket缓存,retry重试重新连接
- removeWebSocketCache(webSocket);
- if (!emitter.isDisposed()) {
- emitter.onNext(createPrepareReconnect(mWebSocketUrl));
- //失败发送onError,让retry操作符重试
- emitter.onError(new ImproperCloseException());
- }
- }
- });
- }
- }
- }
-
-
- private WebSocketInfo createConnect(String url, WebSocket webSocket) {
- return mWebSocketInfoPool.obtain(url)
- .setWebSocket(webSocket)
- .setConnect(true);
- }
-
- private WebSocketInfo createReconnect(String url, WebSocket webSocket) {
- return mWebSocketInfoPool.obtain(url)
- .setWebSocket(webSocket)
- .setReconnect(true);
- }
-
- private WebSocketInfo createPrepareReconnect(String url) {
- return mWebSocketInfoPool.obtain(url)
- .setPrepareReconnect(true);
- }
-
- private WebSocketInfo createReceiveStringMsg(String url, WebSocket webSocket, String stringMsg) {
- return mWebSocketInfoPool.obtain(url)
- .setConnect(true)
- .setWebSocket(webSocket)
- .setStringMsg(stringMsg);
- }
-
- private WebSocketInfo createReceiveByteStringMsg(String url, WebSocket webSocket, ByteString byteMsg) {
- return mWebSocketInfoPool.obtain(url)
- .setConnect(true)
- .setWebSocket(webSocket)
- .setByteStringMsg(byteMsg);
- }
-
- private WebSocketInfo createClose(String url) {
- return mWebSocketInfoPool.obtain(url);
- }
- }
定时发送心跳维持连接
因为WebSocket断线后,后端不能马上知道连接已经断开,所以需要一个心跳消息保持双方通信。
实现心跳,本质就是一个定时消息,我们使用RxJava的interval操作符定时执行任务,这里我的消息需要增加一个时间戳,所以我加上了timestamp操作符来给每一次执行结果附加一个时间戳。
1.网络没有开启时,不发送心跳消息
- public class NetworkUtil {
- private NetworkUtil() {
- }
-
- /**
- * 当前是否有网络状态
- *
- * @param context 上下文
- * @param needWifi 是否需要wifi网络
- */
- public static boolean hasNetWorkStatus(Context context, boolean needWifi) {
- NetworkInfo info = getActiveNetwork(context);
- if (info == null) {
- return false;
- }
- if (!needWifi) {
- return info.isAvailable();
- } else if (info.getType() == ConnectivityManager.TYPE_WIFI) {
- return info.isAvailable();
- }
- return false;
- }
-
- /**
- * 获取活动网络连接信息
- *
- * @param context 上下文
- * @return NetworkInfo
- */
- public static NetworkInfo getActiveNetwork(Context context) {
- ConnectivityManager mConnMgr = (ConnectivityManager) context
- .getSystemService(Context.CONNECTIVITY_SERVICE);
- if (mConnMgr == null) {
- return null;
- }
- // 获取活动网络连接信息
- return mConnMgr.getActiveNetworkInfo();
- }
- }
2.心跳回调接口,让外部生成心跳json
- public interface HeartBeatGenerateCallback {
- /**
- * 当需要生成心跳信息时回调
- *
- * @param timestamp 当前时间戳
- * @return 要发送的心跳信息
- */
- String onGenerateHeartBeatMsg(long timestamp);
- }
3.发送心跳消息,需要制定Url地址、间隔时间,间隔时间单位,心跳消息生成回调
- @Override
- public Observable<Boolean> heartBeat(String url, int period, TimeUnit unit,
- HeartBeatGenerateCallback heartBeatGenerateCallback) {
- if (heartBeatGenerateCallback == null) {
- return Observable.error(new NullPointerException("heartBeatGenerateCallback == null"));
- }
- return Observable
- .interval(period, unit)
- //timestamp操作符,给每个事件加一个时间戳
- .timestamp()
- .retry()
- .flatMap(new Function<Timed<Long>, ObservableSource<Boolean>>() {
- @Override
- public ObservableSource<Boolean> apply(Timed<Long> timed) throws Exception {
- long timestamp = timed.time();
- //判断网络,存在网络才发消息,否则直接返回发送心跳失败
- if (mContext != null && NetworkUtil.hasNetWorkStatus(mContext, false)) {
- String heartBeatMsg = heartBeatGenerateCallback.onGenerateHeartBeatMsg(timestamp);
- Logger.d(TAG, "发送心跳消息: " + heartBeatMsg);
- if (hasWebSocketConnection(url)) {
- return send(url, heartBeatMsg);
- } else {
- //这里必须用异步发送,如果切断网络,再重连,缓存的WebSocket会被清除,此时再重连网络
- //是没有WebSocket连接可用的,所以就需要异步连接完成后,再发送
- return asyncSend(url, heartBeatMsg);
- }
- } else {
- Logger.d(TAG, "无网络连接,不发送心跳,下次网络连通时,再次发送心跳");
- return Observable.create(new ObservableOnSubscribe<Boolean>() {
- @Override
- public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
- emitter.onNext(false);
- }
- });
- }
- }
- });
- }
实现重连
重连配置RxJava,有个天然优势就是RxJava提供了Retry操作符,支持重试,我们在onFailure()连接失败回调中手动发出onError(),让数据源增加retry操作符进行重试,就会重新走到数据源的订阅回调重新连接WebSocket
- private final class WebSocketOnSubscribe implements ObservableOnSubscribe<WebSocketInfo> {
- private String mWebSocketUrl;
- private WebSocket mWebSocket;
- private boolean isReconnecting = false;
-
- public WebSocketOnSubscribe(String webSocketUrl) {
- this.mWebSocketUrl = webSocketUrl;
- }
-
- @Override
- public void subscribe(ObservableEmitter<WebSocketInfo> emitter) throws Exception {
- //...
- }
-
- private Request createRequest(String url) {
- return new Request.Builder().get().url(url).build();
- }
-
- /**
- * 初始化WebSocket
- */
- private synchronized void initWebSocket(ObservableEmitter<WebSocketInfo> emitter) {
- if (mWebSocket == null) {
- mWebSocket = mClient.newWebSocket(createRequest(mWebSocketUrl), new WebSocketListener() {
- @Override
- public void onOpen(WebSocket webSocket, Response response) {
- super.onOpen(webSocket, response);
- //连接成功
- if (!emitter.isDisposed()) {
- mWebSocketPool.put(mWebSocketUrl, mWebSocket);
- //重连成功
- if (isReconnecting) {
- emitter.onNext(createReconnect(mWebSocketUrl, webSocket));
- } else {
- emitter.onNext(createConnect(mWebSocketUrl, webSocket));
- }
- }
- isReconnecting = false;
- }
-
- @Override
- public void onMessage(WebSocket webSocket, String text) {
- super.onMessage(webSocket, text);
- //收到消息
- if (!emitter.isDisposed()) {
- emitter.onNext(createReceiveStringMsg(mWebSocketUrl, webSocket, text));
- }
- }
-
- @Override
- public void onMessage(WebSocket webSocket, ByteString bytes) {
- super.onMessage(webSocket, bytes);
- //收到消息
- if (!emitter.isDisposed()) {
- emitter.onNext(createReceiveByteStringMsg(mWebSocketUrl, webSocket, bytes));
- }
- }
-
- @Override
- public void onClosed(WebSocket webSocket, int code, String reason) {
- super.onClosed(webSocket, code, reason);
- if (!emitter.isDisposed()) {
- emitter.onNext(createClose(mWebSocketUrl));
- }
- }
-
- @Override
- public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
- super.onFailure(webSocket, throwable, response);
- isReconnecting = true;
- mWebSocket = null;
- //移除WebSocket缓存,retry重试重新连接
- removeWebSocketCache(webSocket);
- if (!emitter.isDisposed()) {
- emitter.onNext(createPrepareReconnect(mWebSocketUrl));
- //失败发送onError,让retry操作符重试
- emitter.onError(new ImproperCloseException());
- }
- }
- });
- }
- }
- }
以上,基于Rxjava对WebSocket的封装基本完成,以下是项目中的实际使用
使用
- //自定义OkHttpClient
- OkHttpClient mClient = new OkHttpClient.Builder()
- .readTimeout(3, TimeUnit.SECONDS)//设置读取超时时间
- .writeTimeout(3, TimeUnit.SECONDS)//设置写的超时时间
- .connectTimeout(3, TimeUnit.SECONDS)//设置连接超时时间
- .build();
-
- //RxWebSocketBuilder构建RxWebSocket
- RxWebSocket rxWebSocket = new RxWebSocketBuilder(context)
- //是否打印Log
- .isPrintLog(true)
- //5秒无响应则重连
- .reconnectInterval(5, TimeUnit.SECONDS)
- .client(mClient)
- .build();
- String url = "ws://xxxxxxxxx"
- //开始连接
- rxWebSocket.get(url)
- //切换到子线程去连接
- .compose(RxSchedulerUtil.ioToMain())
- //绑定生命周期
- .as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
- .subscribe(new Consumer<WebSocketInfo>() {
- @Override
- public void accept(WebSocketInfo webSocketInfo) throws Exception {
- String json = webSocketInfo.getStringMsg();
- //业务层的json解析
- ...
- }
- });
- rxWebSocket.send(url, "我是消息")
- .compose(RxSchedulerUtil.ioToMain())
- .as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
- .subscribe(new Consumer<Boolean>() {
- @Override
- public void accept(Boolean isSuccess) throws Exception {
- if(isSuccess) {
- //发送成功
- } ele {
- //发送失败
- }
- }
- });
- rxWebSocket.asyncSend(url, "我是消息")
- .compose(RxSchedulerUtil.ioToMain())
- .as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
- .subscribe(new Consumer<Boolean>() {
- @Override
- public void accept(Boolean isSuccess) throws Exception {
- if(isSuccess) {
- //发送成功
- } ele {
- //发送失败
- }
- }
- });
- rxWebSocket.heartBeat(url, 6 ,TimeUnit.SECONDS, new HeartBeatGenerateCallback() {
- @Override
- public String onGenerateHeartBeatMsg(long timestamp) {
- //生成心跳Json,业务模块处理,例如后端需要秒值,我们除以1000换算为秒。
- //后续可以在这里配置通用参数等
- return GsonUtil.toJson(new HeartBeatMsgRequestModel(WssCommandTypeEnum.HEART_BEAT.getCode(),
- String.valueOf(timestamp / 1000)));
- }
- });
总结
Okhttp的WebSocket使用比较简单,基本都是发起请求和配置回调2个步骤,再使用send()方法发送消息,简单使用可以不需要做封装,Java使用可以基于Rxjava做封装,而Kotlin使用可以用协程做封装,后续有时间我可能会填这个坑
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。