当前位置:   article > 正文

Flutter: Websocket的使用与封装_flutter web_socket_channel

flutter web_socket_channel

1.引入相关插件库

  1. # websocket
  2. web_socket_channel: ^2.4.0
  3. # 引入rxdart 解决Bad state: Stream has already been listened to.的报错问题
  4. rxdart: ^0.27.7
  5. # 状态管理*
  6. provider: ^6.0.5

2.代码编写及封装

  1. import 'dart:async';
  2. import 'package:rxdart/subjects.dart';
  3. import 'package:web_socket_channel/web_socket_channel.dart';
  4. import 'package:web_socket_channel/status.dart' as status;
  5. typedef WebsocketMessageCallback = void Function(dynamic message);
  6. /// 注册流控制器需要在哪些页面使用
  7. ///
  8. /// 目前分三种类型:
  9. ///
  10. /// 1.[customerLoginPage]游客模式下也就是在未登录时候(用户处于登录相关页面)
  11. /// 2.[customerMainPage]用户已登录,处于主页及其他登录后的页面下
  12. /// 3.[chatRoomPage]用户处在聊天室里(游客下的在线客服聊天室、用户已登录下的在线客服聊天室、买卖用户之间的聊天室)
  13. enum StreamControllerNameEnum {
  14. customerLoginPage,
  15. customerMainPage,
  16. chatRoomPage;
  17. }
  18. class WebsocketHelper {
  19. WebsocketHelper._();
  20. static WebsocketHelper? _singleton;
  21. factory WebsocketHelper() => _singleton ??= WebsocketHelper._();
  22. /// 用于连接websocket的链接uri
  23. Uri? wsUri;
  24. /// websocket连接后的对象
  25. WebSocketChannel? _webSocketChannel;
  26. /// 指定的stream流控制器存放map
  27. Map<String, BehaviorSubject<String>>? streamControllerList;
  28. /// 是否开启心跳
  29. bool isOpenHeartBeat = true;
  30. /// 用于控制心跳轮询
  31. StreamSubscription<String>? _subscription;
  32. /// 是否是用户主动触发关闭连接
  33. bool isDisconnectByUser = false;
  34. /// 另辟一个单独的消息回调函数
  35. WebsocketMessageCallback? messageCallback;
  36. /// 连接断开回调
  37. Function()? onDone;
  38. /// 连接出错回调
  39. Function? onError;
  40. /// step one - ex: ws://localhost:1234
  41. initSocket({required String wsPath, bool isOpenHeartBeat = true}) {
  42. if (_webSocketChannel != null) {
  43. print("socket实例已存在, 请勿重复创建");
  44. return;
  45. }
  46. // 自己项目中后端需要前端拼一个登录令牌用于控制后端逻辑处理,这里使用的是登录后的token
  47. var authorization = "登录后的token";
  48. wsUri = Uri.tryParse("$wsPath?Authorization=$authorization");
  49. // wsUri = Uri.tryParse(wsPath);
  50. if (wsUri == null) return;
  51. this.isOpenHeartBeat = isOpenHeartBeat;
  52. _connectWebsocket();
  53. }
  54. /// [isRunForReConnect] 是否是由重连机制触发的此方法
  55. void _connectWebsocket({bool isRunForReConnect = false}) {
  56. _webSocketChannel = WebSocketChannel.connect(wsUri!);
  57. if (!isRunForReConnect) {
  58. isDisconnectByUser = false;
  59. }
  60. }
  61. /// step two - listen
  62. void listen(
  63. {WebsocketMessageCallback? messageCallback,
  64. Function()? onDone,
  65. Function? onError}) {
  66. this.messageCallback = messageCallback;
  67. this.onDone = onDone;
  68. this.onError = onError;
  69. streamControllerList ??= <String, BehaviorSubject<String>>{
  70. // StreamControllerNameEnum.customerLoginPage.name: BehaviorSubject(),
  71. // StreamControllerNameEnum.customerMainPage.name: BehaviorSubject(),
  72. // StreamControllerNameEnum.chatRoomPage.name: BehaviorSubject()
  73. };
  74. // 监听一系列连接情况(如收到消息、onDone:连接关闭、onError:接连异常)
  75. _webSocketChannel?.stream.listen((message) {
  76. print(
  77. "websocket onData message = ${message.toString()}, type = ${message.runtimeType}");
  78. if (message is String && message.isEmpty) {
  79. // 消息为空(可能得情况:心跳 or another)
  80. return;
  81. }
  82. // 通过流控制器把消息分发出去,在需要的页面监听此流的消息
  83. streamControllerList?.forEach((key, value) {
  84. // print("key = $key, value.isClosed = ${value.isClosed}");
  85. if (!value.isClosed) {
  86. value.sink.add(message);
  87. }
  88. });
  89. this.messageCallback?.call(message);
  90. }, onDone: () {
  91. print("websocket onDone ...");
  92. this.onDone?.call();
  93. // 掉线重连
  94. reConnect();
  95. }, onError: (Object error, StackTrace stackTrace) {
  96. print(
  97. "websocket onError error = ${error.toString()}, stackTrace = ${stackTrace.toString()}");
  98. showToast(msg: "连接服务器失败!");
  99. this.onError?.call(error, stackTrace);
  100. }, cancelOnError: false);
  101. // 连接建立成功时的回调通知,可在此做心跳操作
  102. _webSocketChannel?.ready.then((value) {
  103. print("webSocket ready");
  104. isDisconnectByUser = false;
  105. if (isOpenHeartBeat) {
  106. // 收到连接成功的回馈,开始执行心跳操作
  107. _startHeartBeat();
  108. }
  109. });
  110. }
  111. /// 掉线重连
  112. void reConnect() {
  113. if (isDisconnectByUser) return;
  114. Future.delayed(
  115. const Duration(seconds: 3),
  116. () {
  117. // disconnect();
  118. _subscription?.cancel();
  119. _subscription = null;
  120. _webSocketChannel?.sink.close(status.abnormalClosure, "掉线重连");
  121. _webSocketChannel = null;
  122. _connectWebsocket();
  123. listen(
  124. messageCallback: messageCallback, onDone: onDone, onError: onError);
  125. },
  126. );
  127. }
  128. /// 发送消息
  129. void sendMessage({required String message, bool needDisplayMsg = true}) {
  130. print("websocket sendMessage message = $message");
  131. if (needDisplayMsg) {
  132. streamControllerList?.forEach((key, value) {
  133. if (!value.isClosed) {
  134. value.sink.add(message);
  135. }
  136. });
  137. }
  138. _webSocketChannel?.sink.add(message);
  139. }
  140. /// 开启心跳
  141. void _startHeartBeat() {
  142. if (_subscription != null) {
  143. print("websocket startHeartBeat _subscription != null");
  144. return;
  145. }
  146. Future.delayed(
  147. const Duration(seconds: 30),
  148. () {
  149. var pollingStream = StreamTool().timedPolling(
  150. const Duration(seconds: 30), () => Future(() => ""), 100000000);
  151. //进行流内容监听
  152. _subscription = pollingStream.listen((result) {
  153. sendMessage(message: "heart beat", needDisplayMsg: false);
  154. });
  155. },
  156. );
  157. }
  158. /// 断开连接并销毁
  159. void disconnect({bool isDisconnectByUser = false}) {
  160. this.isDisconnectByUser = isDisconnectByUser;
  161. _subscription?.cancel();
  162. _subscription = null;
  163. streamControllerList?.forEach((key, value) {
  164. value.close();
  165. });
  166. streamControllerList?.clear();
  167. _webSocketChannel?.sink.close(status.normalClosure, "用户退出聊天界面,聊天关闭");
  168. _webSocketChannel = null;
  169. }
  170. /// 新建指定stream流控制器进行消息流回调
  171. setNewStreamController(StreamControllerNameEnum streamControllerName) {
  172. if (streamControllerList?.containsKey(streamControllerName.name) ?? false) {
  173. streamControllerList?[streamControllerName.name]?.close();
  174. }
  175. streamControllerList?[streamControllerName.name] = BehaviorSubject();
  176. }
  177. }

3.提供一个轮询工具类StreamTool

  1. import 'dart:async';
  2. typedef FutureGenerator<T> = Future<T> Function();
  3. class StreamTool {
  4. /// interval 轮询时间间隔
  5. /// maxCount 最大轮询数
  6. Stream<T> timedPolling<T>(Duration interval, FutureGenerator<T> future,
  7. [int maxCount = 1]) {
  8. StreamController<T>? controller;
  9. int counter = 0;
  10. bool polling = true;
  11. void stopTimer() {
  12. polling = false;
  13. }
  14. void tick() async {
  15. counter++;
  16. T result = await future();
  17. if (controller != null && !controller.isClosed) {
  18. controller.add(result);
  19. }
  20. if (counter == maxCount) {
  21. stopTimer();
  22. controller?.close();
  23. } else if (polling) {
  24. Future.delayed(interval, tick);
  25. }
  26. }
  27. void startTimer() {
  28. polling = true;
  29. tick();
  30. }
  31. //StreamSubscription调用pause,cancel时,stream里面的轮询也能响应暂停或取消
  32. controller = StreamController<T>(
  33. onListen: startTimer,
  34. onPause: stopTimer,
  35. onResume: startTimer,
  36. onCancel: stopTimer,
  37. );
  38. return controller.stream;
  39. }
  40. }

4.新建全局的ChangeNotifier -> GlobalWebsocketVM

  1. class GlobalWebsocketVM extends ChangeNotifier {
  2. void startWebSocket() {
  3. WebsocketHelper()
  4. ..initSocket(wsPath: Api.wsUrlPath, isOpenHeartBeat: false)
  5. ..listen(
  6. messageCallback: (message) {
  7. // 延迟500毫秒,使listview进行滑动到底部
  8. // gotoListBottom();
  9. },
  10. onDone: () {},
  11. );
  12. }
  13. /// 获取socket实时数据流
  14. ///
  15. /// 每次都需要新绑定一个StreamController,避免数据流出现错乱情况
  16. Stream<String>? getMessageStream(
  17. StreamControllerNameEnum streamControllerName) =>
  18. (WebsocketHelper()..setNewStreamController(streamControllerName))
  19. .streamControllerList?[streamControllerName.name]
  20. ?.stream;
  21. }

5.在入口类main.dart中MaterialApp中使用全局GlobalWebsocketVM

  1. late GlobalWebsocketVM socketVM;
  2. @override
  3. void initState() {
  4. socketVM = GlobalWebsocketVM();
  5. }
  6. MaterialApp.router(
  7. debugShowCheckedModeBanner: false,
  8. onGenerateTitle: (context) => S.current.appName,
  9. theme: ThemeData(
  10. useMaterial3: true,
  11. colorScheme: ColorScheme.fromSeed(seedColor: Colors.white),
  12. appBarTheme: const AppBarTheme(
  13. color: Colors.white, surfaceTintColor: Colors.white),
  14. bottomAppBarTheme: BottomAppBarTheme.of(context)
  15. .copyWith(color: Colors.white, surfaceTintColor: Colors.white),
  16. scaffoldBackgroundColor: Colors.grey[200],
  17. cardTheme: const CardTheme(
  18. color: Colors.white, surfaceTintColor: Colors.white),
  19. progressIndicatorTheme:
  20. const ProgressIndicatorThemeData(color: AppColor.appThemeColor),
  21. // 统一修改输入框光标颜色、文本选中颜色
  22. textSelectionTheme: const TextSelectionThemeData(
  23. cursorColor: AppColor.appThemeColor,
  24. selectionColor: AppColor.appThemeColor,
  25. selectionHandleColor: AppColor.appThemeColor,
  26. ),
  27. // ios主题色设置
  28. cupertinoOverrideTheme:
  29. const CupertinoThemeData(primaryColor: AppColor.appThemeColor),
  30. iconButtonTheme: IconButtonThemeData(
  31. style: AppButtonStyle.stGlobalDefaultBtn,
  32. ),
  33. textButtonTheme: TextButtonThemeData(
  34. style: AppButtonStyle.stGlobalDefaultBtn,
  35. ),
  36. // primarySwatch: themeVM.theme,
  37. ),
  38. // locale: localeVM.getLocale(),
  39. builder: FlutterSmartDialog.init(
  40. builder: (context, mChild) {
  41. return MultiProvider(
  42. providers: [
  43. ChangeNotifierProvider<UserVM>(
  44. create: (_) => userVM,
  45. ),
  46. // 在这里为每个页面添加GlobalWebsocketVM绑定
  47. ChangeNotifierProvider<GlobalWebsocketVM>(
  48. create: (_) => socketVM,
  49. ),
  50. ],
  51. builder: (context, child) => mChild ?? const SizedBox.shrink(),
  52. );
  53. /*return ChangeNotifierProvider<UserVM>(
  54. create: (_) => userVM,
  55. builder: (context, child) =>
  56. mChild ?? const SizedBox.shrink(),
  57. );*/
  58. },
  59. ),
  60. localizationsDelegates: const [
  61. S.delegate,
  62. GlobalMaterialLocalizations.delegate,
  63. GlobalWidgetsLocalizations.delegate,
  64. GlobalCupertinoLocalizations.delegate,
  65. ],
  66. supportedLocales: S.delegate.supportedLocales,
  67. // localeResolutionCallback: (_locale, supportedLocales) {
  68. // if (localeVM.getLocale() != null) {
  69. // //如果已经选定语言,则不跟随系统
  70. // return localeVM.getLocale();
  71. // } else {
  72. // //跟随系统
  73. // Locale locale;
  74. // if (supportedLocales.contains(_locale)) {
  75. // locale = _locale!;
  76. // } else {
  77. // //如果系统语言不是中文简体或美国英语,则默认使用美国英语
  78. // locale = const Locale('en', 'US');
  79. // }
  80. // return locale;
  81. // }
  82. // },
  83. routerConfig: RouterHelper.router,
  84. );

6.页面中调用,在initState方法中建立连接,在build中使用StreamBuilder进行消息监听

  1. @override
  2. void initState() {
  3. super.initState();
  4. // 建立连接ws global init
  5. context.read<GlobalWebsocketVM>().startWebSocket();
  6. }
  7. @override
  8. Widget build(BuildContext context) {
  9. return Scaffold(
  10. resizeToAvoidBottomInset: false,
  11. appBar: TitleBar.build(title: "正与${model.titleContent}沟通"),
  12. // 监听聊天消息并刷新聊天列表
  13. body: StreamBuilder<String>(
  14. stream: context
  15. .read<GlobalWebsocketVM>()
  16. .getMessageStream(StreamControllerNameEnum.chatRoomPage),
  17. builder: (context, snapshot) {
  18. if (snapshot.connectionState == ConnectionState.active) {
  19. if (snapshot.data?.isEmpty ?? true) {
  20. return const SizedBox.shrink();
  21. }
  22. addMessageAndRefreshUI("orderNo", snapshot.data!);
  23. return const SizedBox.shrink();
  24. }
  25. return const SizedBox.shrink();
  26. },
  27. // catchError: (context, error) => error.toString(),
  28. ),,
  29. );
  30. }
  31. void addMessageAndRefreshUI(String tag, String message) {
  32. print("收到聊天消息:" + message);
  33. }

7.发送消息

  1. /// 在合适的地方(比如发送按钮点击发送聊天消息)
  2. void sendChatMessage() {
  3. WebsocketHelper().sendMessage(
  4. message: "我发送一条消息",
  5. needDisplayMsg: false,
  6. );
  7. }

8.退出app断开websocket清理内存(可以在任何想断开websocket的地方调用销毁)

  1. /// 通常在dispose中调用销毁,可以在任何想断开websocket的地方调用销毁
  2. @override
  3. void dispose() {
  4. ScanHelper().dispose();
  5. WebsocketHelper().disconnect(isDisconnectByUser: true);
  6. super.dispose();
  7. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/268650
推荐阅读
相关标签
  

闽ICP备14008679号