当前位置:   article > 正文

Android WebSocket原理及实战(二)_android client.newwebsocket 发送消息

android client.newwebsocket 发送消息

上期原理篇我们讲到了WebSocket和HTTP的区别及其优缺点,这期主要是实战部分,大多数Android项目不需要使用第三方重量级的WebSocket框架,所以我以OKHttp3.0自带的WebSocket为例,来进行讲解

添加依赖

implementation "com.squareup.okhttp3:okhttp:4.9.0"

实现步骤

  • 构建OkHttpClient配置初始化一些参数。

  • 使用WebSocket的Url地址连接。

  • 设置WebSocket的连接状态回调和消息回调。

  • 解析消息json处理业务等。

  • 连接成功后,使用WebSocket发送消息

1、配置OKHttpClient

  1. val mHolder = OkHttpClient.Builder()
  2. .connectTimeout(CONNECT_TIME_OUT, TimeUnit.SECONDS)//设置读取超时时间
  3. .readTimeout(READ_TIME_OUT, TimeUnit.SECONDS)//设置连接超时时间
  4. .writeTimeout(3, TimeUnit.SECONDS)//设置写的超时时间
  5. .build()

2、使用URL 构建WebSocket请求

  1. // 构建鉴权url
  2. val authUrl = TTSTools.createAuthUrl(mTTSConfig)
  3. // 获取请求
  4. val request: Request = Request.Builder().url(authUrl).build()

鉴权url 一般是ws[s]://host/servername/path?形式,至于ws和wss的区别,可以类比http和https,这里不做赘述

3、发起连接,配置回调

  • onOpen(),连接成功
  • onMessage(String text),收到字符串类型的消息,一般我们都是使用这个
  • onMessage(ByteString bytes),收到字节数组类型消息,我这里没有用到
  • onClosed(),连接关闭
  • onFailure(),连接失败,一般都是在这里发起重连操作
  1. // 开始连接
  2. val client: OkHttpClient = CusOkHttpClient.mInstance
  3. mWebSocket = client.newWebSocket(
  4. request,
  5. object : WebSocketListener() {
  6. override fun onOpen(webSocket: WebSocket, response: Response) {
  7. Logger.d(mTAG, "WebSocket-握手协议")
  8. }
  9. override fun onMessage(webSocket: WebSocket, text: String) {
  10. Logger.d(mTAG, "WebSocket-返回结果}")
  11. }
  12. override fun onClosed(webSocket: WebSocket, code: Int, reason: String)
  13. {
  14. Logger.d(mTAG, "WebSocket关闭")
  15. }
  16. override fun onFailure(webSocket: WebSocket,
  17. t: Throwable, response: Response?) {
  18. Logger.d(mTAG, "WebSocket出错--${t.message}")
  19. }
  20. }
  21. )

4、使用WebSocket对象发送消息,msg为消息内容(一般是json,当然你也可以使用其他的,例如xml等),send方法会马上返回发送结果

  1. /**
  2. * 发送数据
  3. */
  4. private fun sendSimpleData(
  5. simpleData: String
  6. ) {
  7. if (!TextUtils.isEmpty(simpleData)) {
  8. mTtsText = ttsText
  9. }
  10. mWebSocket?.send(getRequest(ttsText))
  11. }

getRequest方法主要是生成相应的请求JSON,数据协议由两端决定

  1. private fun getRequest(rawText: String): String {
  2. return JSONObject().apply {
  3. put("text",rawText)
  4. }.toString()
  5. }

可以看出,OKHTTP3的WebSocket使用非常的简单,而WebSocket可以基于RxJava或协程做二次封装,下面基于RxJava的封装,我找了个比较好的案例,供大家学习

基于RxJava封装

1、定义Api调用接口,外部只需要接触Api无无需关心内部实现逻辑

  1. public interface WebSocketWorker {
  2. /**
  3. * 获取连接,并返回观察对象
  4. */
  5. Observable<WebSocketInfo> get(String url);
  6. /**
  7. * 设置一个超时时间,在指定时间内如果没有收到消息,会尝试重连
  8. *
  9. * @param timeout 超时时间
  10. * @param timeUnit 超时时间单位
  11. */
  12. Observable<WebSocketInfo> get(String url, long timeout, TimeUnit timeUnit);
  13. /**
  14. * 发送,url的WebSocket已打开的情况下使用,否则会抛出异常
  15. *
  16. * @param msg 消息,看具体和后端协商的格式,一般为json
  17. */
  18. Observable<Boolean> send(String url, String msg);
  19. /**
  20. * 发送,同上
  21. *
  22. * @param byteString 信息类型为ByteString
  23. */
  24. Observable<Boolean> send(String url, ByteString byteString);
  25. /**
  26. * 不关心WebSocket是否连接,直接发送
  27. */
  28. Observable<Boolean> asyncSend(String url, String msg);
  29. /**
  30. * 同上,只是消息类型为ByteString
  31. */
  32. Observable<Boolean> asyncSend(String url, ByteString byteString);
  33. /**
  34. * 关闭指定Url的连接
  35. */
  36. Observable<Boolean> close(String url);
  37. /**
  38. * 马上关闭指定Url的连接
  39. */
  40. boolean closeNow(String url);
  41. /**
  42. * 关闭当前所有连接
  43. */
  44. Observable<List<Boolean>> closeAll();
  45. /**
  46. * 马上关闭所有连接
  47. */
  48. void closeAllNow();
  49. }

2、构建者模式,大量的配置参数,我们先使用一个Builder类保存,再使用build()方法生成RxWebSocket对象

  1. public class RxWebSocketBuilder {
  2. Context mContext;
  3. /**
  4. * 是否打印Log
  5. */
  6. boolean mIsPrintLog;
  7. /**
  8. * Log代理对象
  9. */
  10. Logger.LogDelegate mLogDelegate;
  11. /**
  12. * 支持外部传入OkHttpClient
  13. */
  14. OkHttpClient mClient;
  15. /**
  16. * 支持SSL
  17. */
  18. SSLSocketFactory mSslSocketFactory;
  19. X509TrustManager mTrustManager;
  20. /**
  21. * 重连间隔时间
  22. */
  23. long mReconnectInterval;
  24. /**
  25. * 重连间隔时间的单位
  26. */
  27. TimeUnit mReconnectIntervalTimeUnit;
  28. public RxWebSocketBuilder(Context context) {
  29. this.mContext = context.getApplicationContext();
  30. }
  31. public RxWebSocketBuilder isPrintLog(boolean isPrintLog) {
  32. this.mIsPrintLog = isPrintLog;
  33. return this;
  34. }
  35. public RxWebSocketBuilder logger(Logger.LogDelegate logDelegate) {
  36. Logger.setDelegate(logDelegate);
  37. return this;
  38. }
  39. public RxWebSocketBuilder client(OkHttpClient client) {
  40. this.mClient = client;
  41. return this;
  42. }
  43. public RxWebSocketBuilder sslSocketFactory(SSLSocketFactory sslSocketFactory, X509TrustManager trustManager) {
  44. this.mSslSocketFactory = sslSocketFactory;
  45. this.mTrustManager = trustManager;
  46. return this;
  47. }
  48. public RxWebSocketBuilder reconnectInterval(long reconnectInterval, TimeUnit reconnectIntervalTimeUnit) {
  49. this.mReconnectInterval = reconnectInterval;
  50. this.mReconnectIntervalTimeUnit = reconnectIntervalTimeUnit;
  51. return this;
  52. }
  53. public RxWebSocket build() {
  54. return new RxWebSocket(this);
  55. }
  56. }

3、基础缓存池

  1. public abstract class BaseCachePool<T extends ICacheTarget<T>> implements ICachePool<T>, Comparator<CacheItem<T>> {
  2. /**
  3. * 缓存池
  4. */
  5. private ConcurrentHashMap<String, LinkedList<CacheItem<T>>> mPool;
  6. public BaseCachePool() {
  7. mPool = new ConcurrentHashMap<>(8);
  8. }
  9. @Override
  10. public T obtain(String cacheKey) {
  11. //缓存链
  12. LinkedList<CacheItem<T>> cacheChain;
  13. //没有缓存过,进行缓存
  14. if (!mPool.containsKey(cacheKey)) {
  15. cacheChain = new LinkedList<>();
  16. } else {
  17. cacheChain = mPool.get(cacheKey);
  18. }
  19. if (cacheChain == null) {
  20. throw new NullPointerException("cacheChain 缓存链创建失败");
  21. }
  22. //未满最大缓存数量,生成一个实例
  23. if (cacheChain.size() < onSetupMaxCacheCount()) {
  24. T cache = onCreateCache();
  25. CacheItem<T> cacheItem = new CacheItem<>(cache, System.currentTimeMillis());
  26. cacheChain.add(cacheItem);
  27. mPool.put(cacheKey, cacheChain);
  28. return cache;
  29. }
  30. //达到最大缓存数量。按最近的使用时间排序,最近使用的放后面,每次取只取最前面(最久没有使用的)
  31. Collections.sort(cacheChain, this);
  32. CacheItem<T> cacheItem = cacheChain.getFirst();
  33. cacheItem.setRecentlyUsedTime(System.currentTimeMillis());
  34. //重置所有属性
  35. T cacheTarget = cacheItem.getCacheTarget();
  36. cacheTarget = onObtainCacheAfter(cacheTarget);
  37. return cacheTarget;
  38. }
  39. @Override
  40. public T onObtainCacheAfter(ICacheTarget<T> cacheTarget) {
  41. //默认调用reset方法进行重置,如果有其他需求,子类再进行复写
  42. return cacheTarget.reset();
  43. }
  44. @Override
  45. public int compare(CacheItem<T> o1, CacheItem<T> o2) {
  46. return Long.compare(o1.getRecentlyUsedTime(), o2.getRecentlyUsedTime());
  47. }
  48. }

缓存模型

  1. public class WebSocketInfo implements Serializable, ICacheTarget<WebSocketInfo> {
  2. private static final long serialVersionUID = -880481254453932113L;
  3. private WebSocket mWebSocket;
  4. private String mStringMsg;
  5. private ByteString mByteStringMsg;
  6. /**
  7. * 连接成功
  8. */
  9. private boolean isConnect;
  10. /**
  11. * 重连成功
  12. */
  13. private boolean isReconnect;
  14. /**
  15. * 准备重连
  16. */
  17. private boolean isPrepareReconnect;
  18. /**
  19. * 重置
  20. */
  21. @Override
  22. public WebSocketInfo reset() {
  23. this.mWebSocket = null;
  24. this.mStringMsg = null;
  25. this.mByteStringMsg = null;
  26. this.isConnect = false;
  27. this.isReconnect = false;
  28. this.isPrepareReconnect = false;
  29. return this;
  30. }
  31. //省略getset方法
  32. }

4、具体实现

  • 将连接WebSocket封装到Observable的订阅回调中。
  • 以Map缓存Url和数据源,多个Url共享同一个连接对象。
  • 使用share操作符,让多个观察者同时订阅一个数据源。所有订阅者都取消订阅时,再断开连接。
  1. public class WebSocketWorkerImpl implements WebSocketWorker {
  2. private static final String TAG = WebSocketWorkerImpl.class.getName();
  3. /**
  4. * 上下文
  5. */
  6. private Context mContext;
  7. /**
  8. * 支持外部传入OkHttpClient
  9. */
  10. private OkHttpClient mClient;
  11. /**
  12. * 重连间隔时间
  13. */
  14. private long mReconnectInterval;
  15. /**
  16. * 重连间隔时间的单位
  17. */
  18. private TimeUnit mReconnectIntervalTimeUnit;
  19. /**
  20. * 缓存观察者对象,Url对应一个Observable
  21. */
  22. private Map<String, Observable<WebSocketInfo>> mObservableCacheMap;
  23. /**
  24. * 缓存Url和对应的WebSocket实例,同一个Url共享一个WebSocket连接
  25. */
  26. private Map<String, WebSocket> mWebSocketPool;
  27. /**
  28. * WebSocketInfo缓存池
  29. */
  30. private final WebSocketInfoPool mWebSocketInfoPool;
  31. public WebSocketWorkerImpl(
  32. Context context,
  33. boolean isPrintLog,
  34. Logger.LogDelegate logDelegate,
  35. OkHttpClient client,
  36. SSLSocketFactory sslSocketFactory,
  37. X509TrustManager trustManager,
  38. long reconnectInterval,
  39. TimeUnit reconnectIntervalTimeUnit) {
  40. this.mContext = context;
  41. //配置Logger
  42. Logger.setDelegate(logDelegate);
  43. Logger.setLogPrintEnable(isPrintLog);
  44. this.mClient = client;
  45. //重试时间配置
  46. this.mReconnectInterval = reconnectInterval;
  47. this.mReconnectIntervalTimeUnit = reconnectIntervalTimeUnit;
  48. //配置SSL
  49. if (sslSocketFactory != null && trustManager != null) {
  50. mClient = mClient.newBuilder().sslSocketFactory(sslSocketFactory, trustManager).build();
  51. }
  52. this.mObservableCacheMap = new HashMap<>(16);
  53. this.mWebSocketPool = new HashMap<>(16);
  54. mWebSocketInfoPool = new WebSocketInfoPool();
  55. }
  56. @Override
  57. public Observable<WebSocketInfo> get(String url) {
  58. return getWebSocketInfo(url);
  59. }
  60. @Override
  61. public Observable<WebSocketInfo> get(String url, long timeout, TimeUnit timeUnit) {
  62. return getWebSocketInfo(url, timeout, timeUnit);
  63. }
  64. @Override
  65. public Observable<Boolean> send(String url, String msg) {
  66. return Observable.create(new ObservableOnSubscribe<Boolean>() {
  67. @Override
  68. public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
  69. WebSocket webSocket = mWebSocketPool.get(url);
  70. if (webSocket == null) {
  71. emitter.onError(new IllegalStateException("The WebSocket not open"));
  72. } else {
  73. emitter.onNext(webSocket.send(msg));
  74. }
  75. }
  76. });
  77. }
  78. @Override
  79. public Observable<Boolean> send(String url, ByteString byteString) {
  80. return Observable.create(new ObservableOnSubscribe<Boolean>() {
  81. @Override
  82. public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
  83. WebSocket webSocket = mWebSocketPool.get(url);
  84. if (webSocket == null) {
  85. emitter.onError(new IllegalStateException("The WebSocket not open"));
  86. } else {
  87. emitter.onNext(webSocket.send(byteString));
  88. }
  89. }
  90. });
  91. }
  92. @Override
  93. public Observable<Boolean> asyncSend(String url, String msg) {
  94. return getWebSocket(url)
  95. .take(1)
  96. .map(new Function<WebSocket, Boolean>() {
  97. @Override
  98. public Boolean apply(WebSocket webSocket) throws Exception {
  99. return webSocket.send(msg);
  100. }
  101. });
  102. }
  103. @Override
  104. public Observable<Boolean> asyncSend(String url, ByteString byteString) {
  105. return getWebSocket(url)
  106. .take(1)
  107. .map(new Function<WebSocket, Boolean>() {
  108. @Override
  109. public Boolean apply(WebSocket webSocket) throws Exception {
  110. return webSocket.send(byteString);
  111. }
  112. });
  113. }
  114. @Override
  115. public Observable<Boolean> close(String url) {
  116. return Observable.create(new ObservableOnSubscribe<WebSocket>() {
  117. @Override
  118. public void subscribe(ObservableEmitter<WebSocket> emitter) throws Exception {
  119. WebSocket webSocket = mWebSocketPool.get(url);
  120. if (webSocket == null) {
  121. emitter.onError(new NullPointerException("url:" + url + " WebSocket must be not null"));
  122. } else {
  123. emitter.onNext(webSocket);
  124. }
  125. }
  126. }).map(new Function<WebSocket, Boolean>() {
  127. @Override
  128. public Boolean apply(WebSocket webSocket) throws Exception {
  129. return closeWebSocket(webSocket);
  130. }
  131. });
  132. }
  133. @Override
  134. public boolean closeNow(String url) {
  135. return closeWebSocket(mWebSocketPool.get(url));
  136. }
  137. @Override
  138. public Observable<List<Boolean>> closeAll() {
  139. return Observable
  140. .just(mWebSocketPool)
  141. .map(new Function<Map<String, WebSocket>, Collection<WebSocket>>() {
  142. @Override
  143. public Collection<WebSocket> apply(Map<String, WebSocket> webSocketMap) throws Exception {
  144. return webSocketMap.values();
  145. }
  146. })
  147. .concatMap(new Function<Collection<WebSocket>, ObservableSource<WebSocket>>() {
  148. @Override
  149. public ObservableSource<WebSocket> apply(Collection<WebSocket> webSockets) throws Exception {
  150. return Observable.fromIterable(webSockets);
  151. }
  152. }).map(new Function<WebSocket, Boolean>() {
  153. @Override
  154. public Boolean apply(WebSocket webSocket) throws Exception {
  155. return closeWebSocket(webSocket);
  156. }
  157. }).collect(new Callable<List<Boolean>>() {
  158. @Override
  159. public List<Boolean> call() throws Exception {
  160. return new ArrayList<>();
  161. }
  162. }, new BiConsumer<List<Boolean>, Boolean>() {
  163. @Override
  164. public void accept(List<Boolean> list, Boolean isCloseSuccess) throws Exception {
  165. list.add(isCloseSuccess);
  166. }
  167. }).toObservable();
  168. }
  169. @Override
  170. public void closeAllNow() {
  171. for (Map.Entry<String, WebSocket> entry : mWebSocketPool.entrySet()) {
  172. closeWebSocket(entry.getValue());
  173. }
  174. }
  175. /**
  176. * 是否有连接
  177. */
  178. private boolean hasWebSocketConnection(String url) {
  179. return mWebSocketPool.get(url) != null;
  180. }
  181. /**
  182. * 关闭WebSocket连接
  183. */
  184. private boolean closeWebSocket(WebSocket webSocket) {
  185. if (webSocket == null) {
  186. return false;
  187. }
  188. WebSocketCloseEnum normalCloseEnum = WebSocketCloseEnum.USER_EXIT;
  189. boolean result = webSocket.close(normalCloseEnum.getCode(), normalCloseEnum.getReason());
  190. if (result) {
  191. removeUrlWebSocketMapping(webSocket);
  192. }
  193. return result;
  194. }
  195. /**
  196. * 移除Url和WebSocket的映射
  197. */
  198. private void removeUrlWebSocketMapping(WebSocket webSocket) {
  199. for (Map.Entry<String, WebSocket> entry : mWebSocketPool.entrySet()) {
  200. if (entry.getValue() == webSocket) {
  201. String url = entry.getKey();
  202. mObservableCacheMap.remove(url);
  203. mWebSocketPool.remove(url);
  204. }
  205. }
  206. }
  207. private void removeWebSocketCache(WebSocket webSocket) {
  208. for (Map.Entry<String, WebSocket> entry : mWebSocketPool.entrySet()) {
  209. if (entry.getValue() == webSocket) {
  210. String url = entry.getKey();
  211. mWebSocketPool.remove(url);
  212. }
  213. }
  214. }
  215. public Observable<WebSocket> getWebSocket(String url) {
  216. return getWebSocketInfo(url)
  217. .filter(new Predicate<WebSocketInfo>() {
  218. @Override
  219. public boolean test(WebSocketInfo webSocketInfo) throws Exception {
  220. return webSocketInfo.getWebSocket() != null;
  221. }
  222. })
  223. .map(new Function<WebSocketInfo, WebSocket>() {
  224. @Override
  225. public WebSocket apply(WebSocketInfo webSocketInfo) throws Exception {
  226. return webSocketInfo.getWebSocket();
  227. }
  228. });
  229. }
  230. public Observable<WebSocketInfo> getWebSocketInfo(String url) {
  231. return getWebSocketInfo(url, 5, TimeUnit.SECONDS);
  232. }
  233. public synchronized Observable<WebSocketInfo> getWebSocketInfo(final String url, final long timeout, final TimeUnit timeUnit) {
  234. //先从缓存中取
  235. Observable<WebSocketInfo> observable = mObservableCacheMap.get(url);
  236. if (observable == null) {
  237. //缓存中没有,新建
  238. observable = Observable
  239. .create(new WebSocketOnSubscribe(url))
  240. .retry()
  241. //因为有share操作符,只有当所有观察者取消注册时,这里才会回调
  242. .doOnDispose(new Action() {
  243. @Override
  244. public void run() throws Exception {
  245. //所有都不注册了,关闭连接
  246. closeNow(url);
  247. Logger.d(TAG, "所有观察者都取消注册,关闭连接...");
  248. }
  249. })
  250. //Share操作符,实现多个观察者对应一个数据源
  251. .share()
  252. //将回调都放置到主线程回调,外部调用方直接观察,实现响应回调方法做UI处理
  253. .subscribeOn(Schedulers.io())
  254. .observeOn(AndroidSchedulers.mainThread());
  255. //将数据源缓存
  256. mObservableCacheMap.put(url, observable);
  257. } else {
  258. //缓存中有,从连接池中取出
  259. WebSocket webSocket = mWebSocketPool.get(url);
  260. if (webSocket != null) {
  261. observable = observable.startWith(createConnect(url, webSocket));
  262. }
  263. }
  264. return observable;
  265. }
  266. /**
  267. * 组装数据源
  268. */
  269. private final class WebSocketOnSubscribe implements ObservableOnSubscribe<WebSocketInfo> {
  270. private String mWebSocketUrl;
  271. private WebSocket mWebSocket;
  272. private boolean isReconnecting = false;
  273. public WebSocketOnSubscribe(String webSocketUrl) {
  274. this.mWebSocketUrl = webSocketUrl;
  275. }
  276. @Override
  277. public void subscribe(ObservableEmitter<WebSocketInfo> emitter) throws Exception {
  278. //因为retry重连不能设置延时,所以只能这里延时,降低发送频率
  279. if (mWebSocket == null && isReconnecting) {
  280. if (Thread.currentThread() != Looper.getMainLooper().getThread()) {
  281. long millis = mReconnectIntervalTimeUnit.toMillis(mReconnectInterval);
  282. if (millis == 0) {
  283. millis = 1000;
  284. }
  285. SystemClock.sleep(millis);
  286. }
  287. }
  288. initWebSocket(emitter);
  289. }
  290. private Request createRequest(String url) {
  291. return new Request.Builder().get().url(url).build();
  292. }
  293. /**
  294. * 初始化WebSocket
  295. */
  296. private synchronized void initWebSocket(ObservableEmitter<WebSocketInfo> emitter) {
  297. if (mWebSocket == null) {
  298. mWebSocket = mClient.newWebSocket(createRequest(mWebSocketUrl), new WebSocketListener() {
  299. @Override
  300. public void onOpen(WebSocket webSocket, Response response) {
  301. super.onOpen(webSocket, response);
  302. //连接成功
  303. if (!emitter.isDisposed()) {
  304. mWebSocketPool.put(mWebSocketUrl, mWebSocket);
  305. //重连成功
  306. if (isReconnecting) {
  307. emitter.onNext(createReconnect(mWebSocketUrl, webSocket));
  308. } else {
  309. emitter.onNext(createConnect(mWebSocketUrl, webSocket));
  310. }
  311. }
  312. isReconnecting = false;
  313. }
  314. @Override
  315. public void onMessage(WebSocket webSocket, String text) {
  316. super.onMessage(webSocket, text);
  317. //收到消息
  318. if (!emitter.isDisposed()) {
  319. emitter.onNext(createReceiveStringMsg(mWebSocketUrl, webSocket, text));
  320. }
  321. }
  322. @Override
  323. public void onMessage(WebSocket webSocket, ByteString bytes) {
  324. super.onMessage(webSocket, bytes);
  325. //收到消息
  326. if (!emitter.isDisposed()) {
  327. emitter.onNext(createReceiveByteStringMsg(mWebSocketUrl, webSocket, bytes));
  328. }
  329. }
  330. @Override
  331. public void onClosed(WebSocket webSocket, int code, String reason) {
  332. super.onClosed(webSocket, code, reason);
  333. if (!emitter.isDisposed()) {
  334. emitter.onNext(createClose(mWebSocketUrl));
  335. }
  336. }
  337. @Override
  338. public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
  339. super.onFailure(webSocket, throwable, response);
  340. isReconnecting = true;
  341. mWebSocket = null;
  342. //移除WebSocket缓存,retry重试重新连接
  343. removeWebSocketCache(webSocket);
  344. if (!emitter.isDisposed()) {
  345. emitter.onNext(createPrepareReconnect(mWebSocketUrl));
  346. //失败发送onError,让retry操作符重试
  347. emitter.onError(new ImproperCloseException());
  348. }
  349. }
  350. });
  351. }
  352. }
  353. }
  354. private WebSocketInfo createConnect(String url, WebSocket webSocket) {
  355. return mWebSocketInfoPool.obtain(url)
  356. .setWebSocket(webSocket)
  357. .setConnect(true);
  358. }
  359. private WebSocketInfo createReconnect(String url, WebSocket webSocket) {
  360. return mWebSocketInfoPool.obtain(url)
  361. .setWebSocket(webSocket)
  362. .setReconnect(true);
  363. }
  364. private WebSocketInfo createPrepareReconnect(String url) {
  365. return mWebSocketInfoPool.obtain(url)
  366. .setPrepareReconnect(true);
  367. }
  368. private WebSocketInfo createReceiveStringMsg(String url, WebSocket webSocket, String stringMsg) {
  369. return mWebSocketInfoPool.obtain(url)
  370. .setConnect(true)
  371. .setWebSocket(webSocket)
  372. .setStringMsg(stringMsg);
  373. }
  374. private WebSocketInfo createReceiveByteStringMsg(String url, WebSocket webSocket, ByteString byteMsg) {
  375. return mWebSocketInfoPool.obtain(url)
  376. .setConnect(true)
  377. .setWebSocket(webSocket)
  378. .setByteStringMsg(byteMsg);
  379. }
  380. private WebSocketInfo createClose(String url) {
  381. return mWebSocketInfoPool.obtain(url);
  382. }
  383. }

定时发送心跳维持连接

因为WebSocket断线后,后端不能马上知道连接已经断开,所以需要一个心跳消息保持双方通信。

实现心跳,本质就是一个定时消息,我们使用RxJava的interval操作符定时执行任务,这里我的消息需要增加一个时间戳,所以我加上了timestamp操作符来给每一次执行结果附加一个时间戳。

  • 心跳信息json的生成,我们期望外部进行生成,例如Gson序列化为Json,或者FastJson处理,或者增加其他通用参数等,不应该在WebSocket基础库中写,所以提供了一个HeartBeatGenerateCallback回调进行生成Json

1.网络没有开启时,不发送心跳消息

  1. public class NetworkUtil {
  2. private NetworkUtil() {
  3. }
  4. /**
  5. * 当前是否有网络状态
  6. *
  7. * @param context 上下文
  8. * @param needWifi 是否需要wifi网络
  9. */
  10. public static boolean hasNetWorkStatus(Context context, boolean needWifi) {
  11. NetworkInfo info = getActiveNetwork(context);
  12. if (info == null) {
  13. return false;
  14. }
  15. if (!needWifi) {
  16. return info.isAvailable();
  17. } else if (info.getType() == ConnectivityManager.TYPE_WIFI) {
  18. return info.isAvailable();
  19. }
  20. return false;
  21. }
  22. /**
  23. * 获取活动网络连接信息
  24. *
  25. * @param context 上下文
  26. * @return NetworkInfo
  27. */
  28. public static NetworkInfo getActiveNetwork(Context context) {
  29. ConnectivityManager mConnMgr = (ConnectivityManager) context
  30. .getSystemService(Context.CONNECTIVITY_SERVICE);
  31. if (mConnMgr == null) {
  32. return null;
  33. }
  34. // 获取活动网络连接信息
  35. return mConnMgr.getActiveNetworkInfo();
  36. }
  37. }

2.心跳回调接口,让外部生成心跳json

  1. public interface HeartBeatGenerateCallback {
  2. /**
  3. * 当需要生成心跳信息时回调
  4. *
  5. * @param timestamp 当前时间戳
  6. * @return 要发送的心跳信息
  7. */
  8. String onGenerateHeartBeatMsg(long timestamp);
  9. }

3.发送心跳消息,需要制定Url地址、间隔时间,间隔时间单位,心跳消息生成回调

  1. @Override
  2. public Observable<Boolean> heartBeat(String url, int period, TimeUnit unit,
  3. HeartBeatGenerateCallback heartBeatGenerateCallback) {
  4. if (heartBeatGenerateCallback == null) {
  5. return Observable.error(new NullPointerException("heartBeatGenerateCallback == null"));
  6. }
  7. return Observable
  8. .interval(period, unit)
  9. //timestamp操作符,给每个事件加一个时间戳
  10. .timestamp()
  11. .retry()
  12. .flatMap(new Function<Timed<Long>, ObservableSource<Boolean>>() {
  13. @Override
  14. public ObservableSource<Boolean> apply(Timed<Long> timed) throws Exception {
  15. long timestamp = timed.time();
  16. //判断网络,存在网络才发消息,否则直接返回发送心跳失败
  17. if (mContext != null && NetworkUtil.hasNetWorkStatus(mContext, false)) {
  18. String heartBeatMsg = heartBeatGenerateCallback.onGenerateHeartBeatMsg(timestamp);
  19. Logger.d(TAG, "发送心跳消息: " + heartBeatMsg);
  20. if (hasWebSocketConnection(url)) {
  21. return send(url, heartBeatMsg);
  22. } else {
  23. //这里必须用异步发送,如果切断网络,再重连,缓存的WebSocket会被清除,此时再重连网络
  24. //是没有WebSocket连接可用的,所以就需要异步连接完成后,再发送
  25. return asyncSend(url, heartBeatMsg);
  26. }
  27. } else {
  28. Logger.d(TAG, "无网络连接,不发送心跳,下次网络连通时,再次发送心跳");
  29. return Observable.create(new ObservableOnSubscribe<Boolean>() {
  30. @Override
  31. public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
  32. emitter.onNext(false);
  33. }
  34. });
  35. }
  36. }
  37. });
  38. }

实现重连

重连配置RxJava,有个天然优势就是RxJava提供了Retry操作符,支持重试,我们在onFailure()连接失败回调中手动发出onError(),让数据源增加retry操作符进行重试,就会重新走到数据源的订阅回调重新连接WebSocket

  1. private final class WebSocketOnSubscribe implements ObservableOnSubscribe<WebSocketInfo> {
  2. private String mWebSocketUrl;
  3. private WebSocket mWebSocket;
  4. private boolean isReconnecting = false;
  5. public WebSocketOnSubscribe(String webSocketUrl) {
  6. this.mWebSocketUrl = webSocketUrl;
  7. }
  8. @Override
  9. public void subscribe(ObservableEmitter<WebSocketInfo> emitter) throws Exception {
  10. //...
  11. }
  12. private Request createRequest(String url) {
  13. return new Request.Builder().get().url(url).build();
  14. }
  15. /**
  16. * 初始化WebSocket
  17. */
  18. private synchronized void initWebSocket(ObservableEmitter<WebSocketInfo> emitter) {
  19. if (mWebSocket == null) {
  20. mWebSocket = mClient.newWebSocket(createRequest(mWebSocketUrl), new WebSocketListener() {
  21. @Override
  22. public void onOpen(WebSocket webSocket, Response response) {
  23. super.onOpen(webSocket, response);
  24. //连接成功
  25. if (!emitter.isDisposed()) {
  26. mWebSocketPool.put(mWebSocketUrl, mWebSocket);
  27. //重连成功
  28. if (isReconnecting) {
  29. emitter.onNext(createReconnect(mWebSocketUrl, webSocket));
  30. } else {
  31. emitter.onNext(createConnect(mWebSocketUrl, webSocket));
  32. }
  33. }
  34. isReconnecting = false;
  35. }
  36. @Override
  37. public void onMessage(WebSocket webSocket, String text) {
  38. super.onMessage(webSocket, text);
  39. //收到消息
  40. if (!emitter.isDisposed()) {
  41. emitter.onNext(createReceiveStringMsg(mWebSocketUrl, webSocket, text));
  42. }
  43. }
  44. @Override
  45. public void onMessage(WebSocket webSocket, ByteString bytes) {
  46. super.onMessage(webSocket, bytes);
  47. //收到消息
  48. if (!emitter.isDisposed()) {
  49. emitter.onNext(createReceiveByteStringMsg(mWebSocketUrl, webSocket, bytes));
  50. }
  51. }
  52. @Override
  53. public void onClosed(WebSocket webSocket, int code, String reason) {
  54. super.onClosed(webSocket, code, reason);
  55. if (!emitter.isDisposed()) {
  56. emitter.onNext(createClose(mWebSocketUrl));
  57. }
  58. }
  59. @Override
  60. public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
  61. super.onFailure(webSocket, throwable, response);
  62. isReconnecting = true;
  63. mWebSocket = null;
  64. //移除WebSocket缓存,retry重试重新连接
  65. removeWebSocketCache(webSocket);
  66. if (!emitter.isDisposed()) {
  67. emitter.onNext(createPrepareReconnect(mWebSocketUrl));
  68. //失败发送onError,让retry操作符重试
  69. emitter.onError(new ImproperCloseException());
  70. }
  71. }
  72. });
  73. }
  74. }
  75. }

以上,基于Rxjava对WebSocket的封装基本完成,以下是项目中的实际使用

使用

  • 使用RxWebSocketBuilder,构建RxWebSocket
    1. //自定义OkHttpClient
    2. OkHttpClient mClient = new OkHttpClient.Builder()
    3. .readTimeout(3, TimeUnit.SECONDS)//设置读取超时时间
    4. .writeTimeout(3, TimeUnit.SECONDS)//设置写的超时时间
    5. .connectTimeout(3, TimeUnit.SECONDS)//设置连接超时时间
    6. .build();
    7. //RxWebSocketBuilder构建RxWebSocket
    8. RxWebSocket rxWebSocket = new RxWebSocketBuilder(context)
    9. //是否打印Log
    10. .isPrintLog(true)
    11. //5秒无响应则重连
    12. .reconnectInterval(5, TimeUnit.SECONDS)
    13. .client(mClient)
    14. .build();

  • 连接Url地址
    1. String url = "ws://xxxxxxxxx"
    2. //开始连接
    3. rxWebSocket.get(url)
    4. //切换到子线程去连接
    5. .compose(RxSchedulerUtil.ioToMain())
    6. //绑定生命周期
    7. .as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
    8. .subscribe(new Consumer<WebSocketInfo>() {
    9. @Override
    10. public void accept(WebSocketInfo webSocketInfo) throws Exception {
    11. String json = webSocketInfo.getStringMsg();
    12. //业务层的json解析
    13. ...
    14. }
    15. });

  • 同步发送消息(必须确保连接正常,否则发送失败)
    1. rxWebSocket.send(url, "我是消息")
    2. .compose(RxSchedulerUtil.ioToMain())
    3. .as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
    4. .subscribe(new Consumer<Boolean>() {
    5. @Override
    6. public void accept(Boolean isSuccess) throws Exception {
    7. if(isSuccess) {
    8. //发送成功
    9. } ele {
    10. //发送失败
    11. }
    12. }
    13. });

  • 异步发送消息(不需要确保连接正常,如果未连接会连接成功后自动发送)
    1. rxWebSocket.asyncSend(url, "我是消息")
    2. .compose(RxSchedulerUtil.ioToMain())
    3. .as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
    4. .subscribe(new Consumer<Boolean>() {
    5. @Override
    6. public void accept(Boolean isSuccess) throws Exception {
    7. if(isSuccess) {
    8. //发送成功
    9. } ele {
    10. //发送失败
    11. }
    12. }
    13. });

  • 发送心跳包
    1. rxWebSocket.heartBeat(url, 6 ,TimeUnit.SECONDS, new HeartBeatGenerateCallback() {
    2. @Override
    3. public String onGenerateHeartBeatMsg(long timestamp) {
    4. //生成心跳Json,业务模块处理,例如后端需要秒值,我们除以1000换算为秒。
    5. //后续可以在这里配置通用参数等
    6. return GsonUtil.toJson(new HeartBeatMsgRequestModel(WssCommandTypeEnum.HEART_BEAT.getCode(),
    7. String.valueOf(timestamp / 1000)));
    8. }
    9. });

总结 

Okhttp的WebSocket使用比较简单,基本都是发起请求和配置回调2个步骤,再使用send()方法发送消息,简单使用可以不需要做封装,Java使用可以基于Rxjava做封装,而Kotlin使用可以用协程做封装,后续有时间我可能会填这个坑

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号