赞
踩
1.引入相关插件库
- # websocket
- web_socket_channel: ^2.4.0
- # 引入rxdart 解决Bad state: Stream has already been listened to.的报错问题
- rxdart: ^0.27.7
- # 状态管理*
- provider: ^6.0.5
2.代码编写及封装
- import 'dart:async';
-
- import 'package:rxdart/subjects.dart';
- import 'package:web_socket_channel/web_socket_channel.dart';
- import 'package:web_socket_channel/status.dart' as status;
-
- typedef WebsocketMessageCallback = void Function(dynamic message);
-
- /// 注册流控制器需要在哪些页面使用
- ///
- /// 目前分三种类型:
- ///
- /// 1.[customerLoginPage]游客模式下也就是在未登录时候(用户处于登录相关页面)
- /// 2.[customerMainPage]用户已登录,处于主页及其他登录后的页面下
- /// 3.[chatRoomPage]用户处在聊天室里(游客下的在线客服聊天室、用户已登录下的在线客服聊天室、买卖用户之间的聊天室)
- enum StreamControllerNameEnum {
- customerLoginPage,
- customerMainPage,
- chatRoomPage;
- }
-
- class WebsocketHelper {
- WebsocketHelper._();
-
- static WebsocketHelper? _singleton;
-
- factory WebsocketHelper() => _singleton ??= WebsocketHelper._();
-
- /// 用于连接websocket的链接uri
- Uri? wsUri;
-
- /// websocket连接后的对象
- WebSocketChannel? _webSocketChannel;
-
- /// 指定的stream流控制器存放map
- Map<String, BehaviorSubject<String>>? streamControllerList;
-
- /// 是否开启心跳
- bool isOpenHeartBeat = true;
-
- /// 用于控制心跳轮询
- StreamSubscription<String>? _subscription;
-
- /// 是否是用户主动触发关闭连接
- bool isDisconnectByUser = false;
-
- /// 另辟一个单独的消息回调函数
- WebsocketMessageCallback? messageCallback;
-
- /// 连接断开回调
- Function()? onDone;
-
- /// 连接出错回调
- Function? onError;
-
- /// step one - ex: ws://localhost:1234
- initSocket({required String wsPath, bool isOpenHeartBeat = true}) {
- if (_webSocketChannel != null) {
- print("socket实例已存在, 请勿重复创建");
- return;
- }
-
- // 自己项目中后端需要前端拼一个登录令牌用于控制后端逻辑处理,这里使用的是登录后的token
- var authorization = "登录后的token";
- wsUri = Uri.tryParse("$wsPath?Authorization=$authorization");
- // wsUri = Uri.tryParse(wsPath);
- if (wsUri == null) return;
- this.isOpenHeartBeat = isOpenHeartBeat;
- _connectWebsocket();
- }
-
- /// [isRunForReConnect] 是否是由重连机制触发的此方法
- void _connectWebsocket({bool isRunForReConnect = false}) {
- _webSocketChannel = WebSocketChannel.connect(wsUri!);
- if (!isRunForReConnect) {
- isDisconnectByUser = false;
- }
- }
-
- /// step two - listen
- void listen(
- {WebsocketMessageCallback? messageCallback,
- Function()? onDone,
- Function? onError}) {
- this.messageCallback = messageCallback;
- this.onDone = onDone;
- this.onError = onError;
- streamControllerList ??= <String, BehaviorSubject<String>>{
- // StreamControllerNameEnum.customerLoginPage.name: BehaviorSubject(),
- // StreamControllerNameEnum.customerMainPage.name: BehaviorSubject(),
- // StreamControllerNameEnum.chatRoomPage.name: BehaviorSubject()
- };
-
- // 监听一系列连接情况(如收到消息、onDone:连接关闭、onError:接连异常)
- _webSocketChannel?.stream.listen((message) {
- print(
- "websocket onData message = ${message.toString()}, type = ${message.runtimeType}");
- if (message is String && message.isEmpty) {
- // 消息为空(可能得情况:心跳 or another)
- return;
- }
- // 通过流控制器把消息分发出去,在需要的页面监听此流的消息
- streamControllerList?.forEach((key, value) {
- // print("key = $key, value.isClosed = ${value.isClosed}");
- if (!value.isClosed) {
- value.sink.add(message);
- }
- });
- this.messageCallback?.call(message);
- }, onDone: () {
- print("websocket onDone ...");
- this.onDone?.call();
- // 掉线重连
- reConnect();
- }, onError: (Object error, StackTrace stackTrace) {
- print(
- "websocket onError error = ${error.toString()}, stackTrace = ${stackTrace.toString()}");
- showToast(msg: "连接服务器失败!");
- this.onError?.call(error, stackTrace);
- }, cancelOnError: false);
- // 连接建立成功时的回调通知,可在此做心跳操作
- _webSocketChannel?.ready.then((value) {
- print("webSocket ready");
- isDisconnectByUser = false;
- if (isOpenHeartBeat) {
- // 收到连接成功的回馈,开始执行心跳操作
- _startHeartBeat();
- }
- });
- }
-
- /// 掉线重连
- void reConnect() {
- if (isDisconnectByUser) return;
- Future.delayed(
- const Duration(seconds: 3),
- () {
- // disconnect();
- _subscription?.cancel();
- _subscription = null;
- _webSocketChannel?.sink.close(status.abnormalClosure, "掉线重连");
- _webSocketChannel = null;
- _connectWebsocket();
- listen(
- messageCallback: messageCallback, onDone: onDone, onError: onError);
- },
- );
- }
-
- /// 发送消息
- void sendMessage({required String message, bool needDisplayMsg = true}) {
- print("websocket sendMessage message = $message");
- if (needDisplayMsg) {
- streamControllerList?.forEach((key, value) {
- if (!value.isClosed) {
- value.sink.add(message);
- }
- });
- }
-
- _webSocketChannel?.sink.add(message);
- }
-
- /// 开启心跳
- void _startHeartBeat() {
- if (_subscription != null) {
- print("websocket startHeartBeat _subscription != null");
- return;
- }
- Future.delayed(
- const Duration(seconds: 30),
- () {
- var pollingStream = StreamTool().timedPolling(
- const Duration(seconds: 30), () => Future(() => ""), 100000000);
- //进行流内容监听
- _subscription = pollingStream.listen((result) {
- sendMessage(message: "heart beat", needDisplayMsg: false);
- });
- },
- );
- }
-
- /// 断开连接并销毁
- void disconnect({bool isDisconnectByUser = false}) {
- this.isDisconnectByUser = isDisconnectByUser;
- _subscription?.cancel();
- _subscription = null;
- streamControllerList?.forEach((key, value) {
- value.close();
- });
- streamControllerList?.clear();
- _webSocketChannel?.sink.close(status.normalClosure, "用户退出聊天界面,聊天关闭");
- _webSocketChannel = null;
- }
-
- /// 新建指定stream流控制器进行消息流回调
- setNewStreamController(StreamControllerNameEnum streamControllerName) {
- if (streamControllerList?.containsKey(streamControllerName.name) ?? false) {
- streamControllerList?[streamControllerName.name]?.close();
- }
- streamControllerList?[streamControllerName.name] = BehaviorSubject();
- }
- }
3.提供一个轮询工具类StreamTool
- import 'dart:async';
-
- typedef FutureGenerator<T> = Future<T> Function();
-
- class StreamTool {
- /// interval 轮询时间间隔
- /// maxCount 最大轮询数
- Stream<T> timedPolling<T>(Duration interval, FutureGenerator<T> future,
- [int maxCount = 1]) {
- StreamController<T>? controller;
- int counter = 0;
- bool polling = true;
-
- void stopTimer() {
- polling = false;
- }
-
- void tick() async {
- counter++;
- T result = await future();
- if (controller != null && !controller.isClosed) {
- controller.add(result);
- }
- if (counter == maxCount) {
- stopTimer();
- controller?.close();
- } else if (polling) {
- Future.delayed(interval, tick);
- }
- }
-
- void startTimer() {
- polling = true;
- tick();
- }
-
- //StreamSubscription调用pause,cancel时,stream里面的轮询也能响应暂停或取消
- controller = StreamController<T>(
- onListen: startTimer,
- onPause: stopTimer,
- onResume: startTimer,
- onCancel: stopTimer,
- );
-
- return controller.stream;
- }
- }
4.新建全局的ChangeNotifier -> GlobalWebsocketVM
- class GlobalWebsocketVM extends ChangeNotifier {
- void startWebSocket() {
- WebsocketHelper()
- ..initSocket(wsPath: Api.wsUrlPath, isOpenHeartBeat: false)
- ..listen(
- messageCallback: (message) {
- // 延迟500毫秒,使listview进行滑动到底部
- // gotoListBottom();
- },
- onDone: () {},
- );
- }
-
- /// 获取socket实时数据流
- ///
- /// 每次都需要新绑定一个StreamController,避免数据流出现错乱情况
- Stream<String>? getMessageStream(
- StreamControllerNameEnum streamControllerName) =>
- (WebsocketHelper()..setNewStreamController(streamControllerName))
- .streamControllerList?[streamControllerName.name]
- ?.stream;
- }
5.在入口类main.dart中MaterialApp中使用全局GlobalWebsocketVM
- late GlobalWebsocketVM socketVM;
-
- @override
- void initState() {
- socketVM = GlobalWebsocketVM();
- }
-
- MaterialApp.router(
- debugShowCheckedModeBanner: false,
- onGenerateTitle: (context) => S.current.appName,
- theme: ThemeData(
- useMaterial3: true,
- colorScheme: ColorScheme.fromSeed(seedColor: Colors.white),
- appBarTheme: const AppBarTheme(
- color: Colors.white, surfaceTintColor: Colors.white),
- bottomAppBarTheme: BottomAppBarTheme.of(context)
- .copyWith(color: Colors.white, surfaceTintColor: Colors.white),
- scaffoldBackgroundColor: Colors.grey[200],
- cardTheme: const CardTheme(
- color: Colors.white, surfaceTintColor: Colors.white),
- progressIndicatorTheme:
- const ProgressIndicatorThemeData(color: AppColor.appThemeColor),
- // 统一修改输入框光标颜色、文本选中颜色
- textSelectionTheme: const TextSelectionThemeData(
- cursorColor: AppColor.appThemeColor,
- selectionColor: AppColor.appThemeColor,
- selectionHandleColor: AppColor.appThemeColor,
- ),
- // ios主题色设置
- cupertinoOverrideTheme:
- const CupertinoThemeData(primaryColor: AppColor.appThemeColor),
- iconButtonTheme: IconButtonThemeData(
- style: AppButtonStyle.stGlobalDefaultBtn,
- ),
- textButtonTheme: TextButtonThemeData(
- style: AppButtonStyle.stGlobalDefaultBtn,
- ),
- // primarySwatch: themeVM.theme,
- ),
- // locale: localeVM.getLocale(),
- builder: FlutterSmartDialog.init(
- builder: (context, mChild) {
- return MultiProvider(
- providers: [
- ChangeNotifierProvider<UserVM>(
- create: (_) => userVM,
- ),
- // 在这里为每个页面添加GlobalWebsocketVM绑定
- ChangeNotifierProvider<GlobalWebsocketVM>(
- create: (_) => socketVM,
- ),
- ],
- builder: (context, child) => mChild ?? const SizedBox.shrink(),
- );
- /*return ChangeNotifierProvider<UserVM>(
- create: (_) => userVM,
- builder: (context, child) =>
- mChild ?? const SizedBox.shrink(),
- );*/
- },
- ),
- localizationsDelegates: const [
- S.delegate,
- GlobalMaterialLocalizations.delegate,
- GlobalWidgetsLocalizations.delegate,
- GlobalCupertinoLocalizations.delegate,
- ],
- supportedLocales: S.delegate.supportedLocales,
- // localeResolutionCallback: (_locale, supportedLocales) {
- // if (localeVM.getLocale() != null) {
- // //如果已经选定语言,则不跟随系统
- // return localeVM.getLocale();
- // } else {
- // //跟随系统
- // Locale locale;
- // if (supportedLocales.contains(_locale)) {
- // locale = _locale!;
- // } else {
- // //如果系统语言不是中文简体或美国英语,则默认使用美国英语
- // locale = const Locale('en', 'US');
- // }
- // return locale;
- // }
- // },
- routerConfig: RouterHelper.router,
- );
6.页面中调用,在initState方法中建立连接,在build中使用StreamBuilder进行消息监听
- @override
- void initState() {
- super.initState();
- // 建立连接ws global init
- context.read<GlobalWebsocketVM>().startWebSocket();
-
- }
-
- @override
- Widget build(BuildContext context) {
- return Scaffold(
- resizeToAvoidBottomInset: false,
- appBar: TitleBar.build(title: "正与${model.titleContent}沟通"),
- // 监听聊天消息并刷新聊天列表
- body: StreamBuilder<String>(
- stream: context
- .read<GlobalWebsocketVM>()
- .getMessageStream(StreamControllerNameEnum.chatRoomPage),
- builder: (context, snapshot) {
- if (snapshot.connectionState == ConnectionState.active) {
- if (snapshot.data?.isEmpty ?? true) {
- return const SizedBox.shrink();
- }
- addMessageAndRefreshUI("orderNo", snapshot.data!);
- return const SizedBox.shrink();
- }
- return const SizedBox.shrink();
- },
- // catchError: (context, error) => error.toString(),
- ),,
- );
- }
-
- void addMessageAndRefreshUI(String tag, String message) {
- print("收到聊天消息:" + message);
- }
7.发送消息
- /// 在合适的地方(比如发送按钮点击发送聊天消息)
- void sendChatMessage() {
- WebsocketHelper().sendMessage(
- message: "我发送一条消息",
- needDisplayMsg: false,
- );
- }
8.退出app断开websocket清理内存(可以在任何想断开websocket的地方调用销毁)
- /// 通常在dispose中调用销毁,可以在任何想断开websocket的地方调用销毁
- @override
- void dispose() {
- ScanHelper().dispose();
- WebsocketHelper().disconnect(isDisconnectByUser: true);
- super.dispose();
- }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。