当前位置:   article > 正文

flutter开发实战-长链接WebSocket使用stomp协议stomp_dart_client_stomp 连接状态

stomp 连接状态

flutter开发实战-长链接WebSocket使用stomp协议stomp_dart_client

在app中经常会使用长连接进行消息通信,这里记录一下基于websocket使用stomp协议的使用。
在这里插入图片描述

一、stomp:流文本定向消息协议

1.1 stomp介绍

stomp,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。
它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互,类似于OpenWire(一种二进制协议)。

1.2 协议支持

stomp 1.0
stomp 1.1 (including heart-beating)
  • 1
  • 2

1.3 stomp frame(帧)

stomp frame(帧)对象包括command、headers、body

command和headers属性始终会被定义,若头部信息时,headers参数可为{},body也可能为空

二、flutter上使用stomp

2.1 引入库stomp_dart_client

flutter上使用stomp时,需要在pubspec.yaml引入库如下

# stomp协议长链接
  stomp_dart_client: ^0.4.4
  stomp: ^0.8.0
  • 1
  • 2
  • 3

2.2 实现websocketmanager封装stomp

// 管理长链接socket, stomp协议

import 'package:stomp_dart_client/stomp.dart';
import 'package:stomp_dart_client/stomp_config.dart';
import 'package:stomp_dart_client/stomp_frame.dart';

// 接收到stomp协议的frame的callback
typedef OnFrameCallback = void Function(StompFrame);

enum StompState {
  IDLE,
  CREATED,
  CONNECTING,
  CONNECTED,
  RECONNECTING,
  DISCONNECTED,
  ERROR,
}

class WebSocketStompManager {
  //私有构造函数
  WebSocketStompManager._internal();

  //保存单例
  static WebSocketStompManager _singleton = WebSocketStompManager._internal();

  //工厂构造函数
  factory WebSocketStompManager() => _singleton;

  // 订阅的Subscription
  // 保存订阅, id: dynamic
  Map _subscriptions = Map<String, dynamic>();

  // stomp的headers信息
  Map<String, String>? _headers = Map<String, String>();

  // 是否连接
  StompState _stompState = StompState.IDLE;

  // 当前连接的Url
  String _urlString = '';

  // StompClient client
  StompClient? _client;

  // 创建连接
  void createConnect(String urlString, Map<String, String> headers) {
    _urlString = urlString;
    _headers = _headers;

    _client?.deactivate();
    _client = null;

    _client = StompClient(
        config: StompConfig(
      url: urlString,
      // connectionTimeout: Duration(seconds: 10),
      // stompConnectHeaders: {
      //   'upgraded': 'websocket',
      // },
      // webSocketConnectHeaders: {
      //   'upgraded': 'websocket',
      // },
      // 连接
      beforeConnect: beforeConnectCallback,
      onConnect: onConnectCallback,
      onDisconnect: onDisconnectCallback,
      onStompError: onStompErrorCallback,
      onUnhandledFrame: onUnhandledFrameCallback,
      onUnhandledMessage: onUnhandledMessageCallback,
      onUnhandledReceipt: onUnhandledReceiptCallback,
      onWebSocketError: onWebSocketErrorCallback,
      onWebSocketDone: onWebSocketDoneCallback,
      onDebugMessage: onDebugMessageCallback,
    ));
  }

  /// beforeConnect:未来	在建立连接之前将等待的异步函数。
  Future<void> beforeConnectCallback() async {
    // 在建立连接之前将等待的异步函数。
    print("beforeConnectCallback 在建立连接之前将等待的异步函数。");
    print('waiting to connect...');
    // await Future.delayed(Duration(milliseconds: 200));
    print('connecting...');
  }

  /// onClientNotCreateCallback, client未创建
  void onClientNotCreateCallback() {
    // client未创建
    print("onClientNotCreateCallback client未创建");
  }

  /// onConnect:函数(StompFrame)	客户端连接成功调用的函数
  void onConnectCallback(StompFrame connectFrame) {
    // client is connected and ready
    // 如果连接成功
    print(
        "onConnectCallback 客户端连接成功调用的函数:"
            "${connectFrame.toString()},"
            "${connectFrame.command},"
            "${connectFrame.headers},"
            "${connectFrame.body}"
    );
  }

  /// onDisconnect:函数(StompFrame)	客户端预期断开连接时调用的函数
  void onDisconnectCallback(StompFrame p1) {
    // 客户端预期断开连接时调用的函数
    print("onDisconnectCallback 客户端预期断开连接时调用的函数:${p1.toString()}");
  }

  /// onStompError:函数(StompFrame)	当 stomp 服务器发送错误帧时要调用的函数
  void onStompErrorCallback(StompFrame p1) {
    // 当 stomp 服务器发送错误帧时要调用的函数
    print("onStompErrorCallback 当 stomp 服务器发送错误帧时要调用的函数:${p1.toString()}");
  }

  /// onUnhandledFrame:函数(StompFrame)	服务器发送无法识别的帧时调用的函数
  void onUnhandledFrameCallback(StompFrame p1) {
    // 服务器发送无法识别的帧时调用的函数
    print("onUnhandledFrameCallback 服务器发送无法识别的帧时调用的函数:${p1.toString()}");
  }

  /// onUnhandledMessage:函数(StompFrame)	当订阅消息没有处理程序时要调用的函数
  void onUnhandledMessageCallback(StompFrame p1) {
    // 当订阅消息没有处理程序时要调用的函数
    print("onUnhandledMessageCallback 当订阅消息没有处理程序时要调用的函数:${p1.toString()}");
  }

  /// onUnhandledReceipt:函数(StompFrame)	当接收消息没有注册观察者时调用的函数
  void onUnhandledReceiptCallback(StompFrame p1) {
    // 当接收消息没有注册观察者时调用的函数
    print("onUnhandledReceiptCallback 当接收消息没有注册观察者时调用的函数:${p1.toString()}");
  }

  /// onWebSocketError:函数(动态)	当底层 WebSocket 抛出错误时要调用的函数
  void onWebSocketErrorCallback(dynamic error) {
    // 当底层 WebSocket 抛出错误时要调用的函数
    print(
        "onWebSocketErrorCallback 当底层 WebSocket 抛出错误时要调用的函数:${error.toString()}");
  }

  /// onWebSocketDone:函数()	当底层 WebSocket 完成/断开连接时要调用的函数
  void onWebSocketDoneCallback() {
    // 当底层 WebSocket 完成/断开连接时要调用的函数
    print("onWebSocketDoneCallback 当底层 WebSocket 完成/断开连接时要调用的函数");
  }

  /// onDebugMessage:函数(字符串)	为内部消息处理程序生成的调试消息调用的函数
  void onDebugMessageCallback(String p1) {
    // 为内部消息处理程序生成的调试消息调用的函数
    print("onDebugMessageCallback 为内部消息处理程序生成的调试消息调用的函数:${p1}");
  }

  // 连接
  void connect() {
    // connect连接
    if (_client != null) {
      _client?.activate();
    } else {
      // 未创建client
      onClientNotCreateCallback();
    }
  }

  // Subscribe
  void subscribe(String destination, OnFrameCallback? onFrameCallback) {
    if (_client != null) {
      dynamic unsubscribeFn = _client?.subscribe(
          destination: destination,
          headers: _headers,
          callback: (frame) {
            // Received a frame for this subscription
            print(frame.body);
            if (onFrameCallback != null) {
              onFrameCallback(frame);
            }
          });
      _subscriptions.putIfAbsent(destination, () => unsubscribeFn);
    } else {
      // 未创建client
      onClientNotCreateCallback();
    }
  }

  // client.subscribe(...) returns a function which can be called with an optional map of headers
  void unsubscribe(String destination) {
    if (_client != null) {
      dynamic unsubscribeFn = _subscriptions[destination];
      unsubscribeFn(unsubscribeHeaders: {});
    } else {
      // 未创建client
      onClientNotCreateCallback();
    }
  }

  // client.subscribe(...) returns a function which can be called with an optional map of headers
  void unsubscribeAll() {
    // 退订所有
    // 调用 Map 对象的 keys 成员 , 返回一个由 键 Key 组成的数组
    for (var destination in _subscriptions.keys){
      unsubscribe(destination);
    }
  }

  void send(String destination, String? message) {
    if (_client != null) {
      _client?.send(destination: destination, body: message, headers: _headers);
    } else {
      // 未创建client
      onClientNotCreateCallback();
    }
  }

  void disconnect() {
    if (_client != null) {
      _client?.deactivate();
    } else {
      // 未创建client
      onClientNotCreateCallback();
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221

2.3 使用websocketmanager收发消息

创建页面进行消息收发

class MyHomePage extends StatefulWidget {
  MyHomePage({Key? key, required this.title}) : super(key: key);

  final String title;

  
  _MyHomePageState createState() => _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {

  
  void initState() {
    // TODO: implement initState
    super.initState();
  }

  
  void dispose() {
    // TODO: implement dispose
    super.dispose();
  }


  
  Widget build(BuildContext context) {
          return Scaffold(
            appBar: AppBar(
              // Here we take the value from the MyHomePage object that was created by
              // the App.build method, and use it to set our appbar title.
              title: Text(widget.title),
            ),
            floatingActionButton: FloatingActionButton(
              onPressed: () {
                _incrementCounter(model);
              },
              tooltip: 'Increment',
              child: Icon(Icons.add),
            ),
            body: Center(
              child: Column(
                mainAxisAlignment: MainAxisAlignment.center,
                children: <Widget>[
                  Wrap(
                    spacing: 8.0, // 主轴(水平)方向间距
                    runSpacing: 4.0, // 纵轴(垂直)方向间距
                    alignment: WrapAlignment.center, //沿主轴方向居中
                    children: [
                      TextButton(
                        onPressed: stompCreate,
                        child: Container(
                          color: Colors.black26,
                          child: Text(
                            'stomp创建',
                            style: Theme.of(context).textTheme.bodyMedium,
                          ),
                        ),
                      ),
                      TextButton(
                        onPressed: stompConnect,
                        child: Container(
                          color: Colors.black26,
                          child: Text(
                            'stomp连接',
                            style: Theme.of(context).textTheme.bodyMedium,
                          ),
                        ),
                      ),
                      TextButton(
                        onPressed: stompSubscribe,
                        child: Container(
                          color: Colors.black26,
                          child: Text(
                            'stomp订阅',
                            style: Theme.of(context).textTheme.bodyMedium,
                          ),
                        ),
                      ),
                      TextButton(
                        onPressed: stompUnSubscribe,
                        child: Container(
                          color: Colors.black26,
                          child: Text(
                            'stomp退订',
                            style: Theme.of(context).textTheme.bodyMedium,
                          ),
                        ),
                      ),
                      TextButton(
                        onPressed: stompSendMessage,
                        child: Container(
                          color: Colors.black26,
                          child: Text(
                            'stomp发送消息',
                            style: Theme.of(context).textTheme.bodyMedium,
                          ),
                        ),
                      )
                    ],
                  ),
                ],
              ),
            ),
          );
  }

  // 测试stomp长链接
  void stompCreate() {
    // 创建stompClint
    WebSocketStompManager().createConnect("ws://192.168.100.25:8080/test-endpoint/websocket", {});
  }

  void stompConnect() {
    WebSocketStompManager().connect();
  }

  void stompSubscribe() {
    WebSocketStompManager()
        .subscribe("/topic/echo", (p0) {
      print("stompSubscribe 1:$p0");
    });

    WebSocketStompManager()
        .subscribe("/topic/echo", (p0) {
      print("stompSubscribe 2:$p0");
    });
  }

  void stompUnSubscribe() {
    WebSocketStompManager().unsubscribeAll();
  }

  void stompSendMessage() {
    WebSocketStompManager().send("/app/echo", "haha message from dart");
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136

至此实现了flutter开发实战-长链接WebSocket 使用stomp协议,进行消息发送、消息接收。

2.4 注意事项

由于stomp_dart_client不支持https,如果使用WebSocketStompManager().createConnect(“ws://192.168.100.25:8080/test-endpoint/websocket”, {});
会报告错误“Not support Https shceme”,所以这里要使用ws或者wss。

三、小结

至此实现了flutter开发实战-长链接WebSocket 使用stomp协议,进行消息发送、消息接收。stomp实现的库stomp_dart_client来实现该功能。

学习记录,每天不停进步。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/268671
推荐阅读
相关标签
  

闽ICP备14008679号