当前位置:   article > 正文

python实现websocket的stomp客户端_python stomp websocket

python stomp websocket
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# author: jenny
# datetime: 2021/5/6 15:53 
# File :stomp_ws.py
import websocket
import time
from threading import Thread

BYTE = {
    'LF': '\x0A',
    'NULL': '\x00'
}

VERSIONS = '1.0,1.1'


class Stomp:
    def __init__(self, host, sockjs=False, wss=True):
        """
        Initialize STOMP communication. This is the high level API that is exposed to clients.

        Args:
            host: Hostname
            sockjs: True if the STOMP server is sockjs
            wss: True if communication is over SSL
        """
        # websocket.enableTrace(True)
        ws_host = host if sockjs is False else host + "/websocket"
        protocol = "ws://" if wss is False else "wss://"

        self.url = protocol + ws_host

        self.dispatcher = Dispatcher(self)

        # maintain callback registry for subscriptions -> topic (str) vs callback (func)
        self.callback_registry = {}

    def connect(self):
        """
        Connect to the remote STOMP server
        """
        # set flag to false
        self.connected = False

        # attempt to connect
        self.dispatcher.connect()

        # wait until connected
        while self.connected is False:
            time.sleep(.50)

        return self.connected

    def subscribe(self, destination, callback):
        """
        Subscribe to a destination and supply a callback that should be executed when a message is received on that destination
        """
        # create entry in registry against destination
        self.callback_registry[destination] = callback
        self.dispatcher.subscribe(destination)

    def send(self, destination, message):
        """
        Send a message to a destination
        """
        self.dispatcher.send(destination, message)


class Dispatcher:
    def __init__(self, stomp):
        """
        The Dispatcher handles all network I/O and frame marshalling/unmarshalling
        """
        self.stomp = stomp

        self.ws = websocket.WebSocketApp(self.stomp.url)

        # register websocket callbacks
        self.ws.on_open = self._on_open
        self.ws.on_message = self._on_message
        self.ws.on_error = self._on_error
        self.ws.on_close = self._on_close

        # run event loop on separate thread
        Thread(target=self.ws.run_forever).start()

        self.opened = False

        # wait until connected
        while self.opened is False:
            time.sleep(.50)

    def _on_message(self, ws, message):
        """
        Executed when messages is received on WS
        """
        print("<<< " + message)

        command, headers, body = self._parse_message(message)

        # if connected, let Stomp know
        if command == "CONNECTED":
            self.stomp.connected = True

        # if message received, call appropriate callback
        if command == "MESSAGE":
            self.stomp.callback_registry[headers['destination']](body)

    def _on_error(self, ws, error):
        """
        Executed when WS connection errors out
        """
        print(error)

    def _on_close(self, ws):
        """
        Executed when WS connection is closed
        """
        print("### closed ###")

    def _on_open(self, ws):
        """
        Executed when WS connection is opened
        """
        self.opened = True

    def _transmit(self, command, headers, msg=None):
        """
        Marshalls and transmits the frame
        """
        # Contruct the frame
        lines = []
        lines.append(command + BYTE['LF'])

        # add headers
        for key in headers:
            lines.append(key + ":" + headers[key] + BYTE['LF'])

        lines.append(BYTE['LF'])

        # add message, if any
        if msg is not None:
            lines.append(msg)

        # terminate with null octet
        lines.append(BYTE['NULL'])

        frame = ''.join(lines)

        # transmit over ws
        print(">>>" + frame)
        self.ws.send(frame)

    def _parse_message(self, frame):
        """
        Returns:
            command
            headers
            body

        Args:
            frame: raw frame string
        """
        lines = frame.split(BYTE['LF'])

        command = lines[0].strip()
        headers = {}

        # get all headers
        i = 1
        while lines[i] != '':
            # get key, value from raw header
            (key, value) = lines[i].split(':')
            headers[key] = value
            i += 1

        # set body to None if there is no body
        body = None if lines[i + 1] == BYTE['NULL'] else lines[i + 1]

        return command, headers, body

    def connect(self):
        """
        Transmit a CONNECT frame
        """
        headers = {}

        headers['host'] = self.stomp.url
        headers['accept-version'] = VERSIONS
        headers['heart-beat'] = '10000,10000'

        self._transmit('CONNECT', headers)

    def subscribe(self, destination):
        """
        Transmit a SUBSCRIBE frame
        """
        headers = {}

        # TODO id should be auto generated
        headers['id'] = 'sub-1'
        headers['ack'] = 'client'
        headers['destination'] = destination

        self._transmit('SUBSCRIBE', headers)

    def send(self, destination, message):
        """
        Transmit a SEND frame
        """
        headers = {}

        headers['destination'] = destination
        headers['content-length'] = str(len(message))

        self._transmit('SEND', headers, msg=message)


def do_thing_a(msg):
    print("MESSAGE: " + msg)


def main(url,*sub_topic, **send_topic):
    stomp = Stomp(url, sockjs=False, wss=True)
    stomp.connect()
    stomp.subscribe(sub_topic, do_thing_a)
    time.sleep(2)
    stomp.send(send_topic, '{"name":"akshaye"}')
if __name__ == "__main__":
	main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/85438
推荐阅读
相关标签
  

闽ICP备14008679号