赞
踩
____tz_zs
websocket 库在 0.48.0 版本后对回调进行了修改。
新版本中,当我们将一个实例对象的方法作为 WebSocketApp 的回调时,WebSocketApp 将不再会返回他自己作为回调的第一个参数。
以下为官方示例的长连接用法 Long-lived connection,此种方式在新老版本中均能正常使用。
# -*- coding:utf-8 -*- """ @author: tz_zs """ import websocket try: import thread except ImportError: import _thread as thread import time url = "ws://echo.websocket.org/" def on_message(ws, message): print("####### on_message #######") print(ws) print(message) def on_error(ws, error): print("####### on_error #######") print(ws) print(error) def on_close(ws): print("####### on_close #######") print(ws) print("####### closed #######") def on_open(ws): print("####### on_open #######") def run(*args): for i in range(3): time.sleep(1) ws.send("Hello %d" % i) time.sleep(1) ws.close() print("thread terminating...") thread.start_new_thread(run, ()) if __name__ == '__main__': ws = websocket.WebSocketApp(url, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever(http_proxy_host="127.0.0.1", http_proxy_port=8118) """ ####### on_open ####### ####### on_message ####### <websocket._app.WebSocketApp object at 0x7f8ffe73eb38> Hello 0 ####### on_message ####### <websocket._app.WebSocketApp object at 0x7f8ffe73eb38> Hello 1 ####### on_message ####### <websocket._app.WebSocketApp object at 0x7f8ffe73eb38> Hello 2 ####### on_close ####### <websocket._app.WebSocketApp object at 0x7f8ffe73eb38> thread terminating... ####### closed ####### """
注意,下方代码中的 Test 不是 WebSocketApp 的子类。
版本 0.48.0 之前能如下方式使用,这种方式比较灵活,
# -*- coding:utf-8 -*- """ @author: tz_zs """ import websocket try: import thread except ImportError: import _thread as thread import time class Test(object): def __init__(self): self.url = "ws://echo.websocket.org/" def on_message(self, ws, message): print("on_message") print(self) print(ws) print(message) def on_error(self, ws, error): print("on_error") print(self) print(ws) print(error) def on_close(self, ws): print("on_close") print(self) print(ws) print("### closed ###") def on_open(self, ws): def run(*args): for i in range(3): time.sleep(1) ws.send("Hello %d" % i) time.sleep(1) ws.close() print("thread terminating...") thread.start_new_thread(run, ()) def start(self): ws = websocket.WebSocketApp(self.url, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) ws.on_open = self.on_open ws.run_forever(http_proxy_host="127.0.0.1", http_proxy_port=8118) if __name__ == '__main__': Test().start() """ on_message <__main__.Test object at 0x7ffa6409e908> <websocket._app.WebSocketApp object at 0x7ffa6409eb70> Hello 0 on_message <__main__.Test object at 0x7ffa6409e908> <websocket._app.WebSocketApp object at 0x7ffa6409eb70> Hello 1 on_message <__main__.Test object at 0x7ffa6409e908> <websocket._app.WebSocketApp object at 0x7ffa6409eb70> Hello 2 on_close thread terminating... <__main__.Test object at 0x7ffa6409e908> <websocket._app.WebSocketApp object at 0x7ffa6409eb70> ### closed ### """
但当升级为新版本后(0.48.0 版之后),这种方式不再兼容,具体原因:
新版本中,当我们将一个实例对象的方法作为 WebSocketApp 的回调时,WebSocketApp 将不再会返回他自己作为回调的第一个参数。
如果设置 log 等级为 DEBUG,可看到以下信息
# -*- coding:utf-8 -*- """ @author: tz_zs """ from websocket import WebSocketApp try: import thread except ImportError: import _thread as thread import time import logging import sys logging.basicConfig(level=logging.DEBUG, format='asctime: %(asctime)s \n' # 时间 'filename_line: %(filename)s_[line:%(lineno)d] \n' # 文件名_行号 'level: %(levelname)s \n' # log级别 'message: %(message)s \n', # log信息 datefmt='%a, %d %b %Y %H:%M:%S', stream=sys.stdout, filemode='w') class Test(object): def __init__(self): self.url = "ws://echo.websocket.org/" def on_message(self, ws, message): print("on_message") print(self) print(ws) print(message) def on_error(self, ws, error): print("on_error") print(self) print(ws) print(error) def on_close(self, ws): print("on_close") print(self) print(ws) print("### closed ###") def on_open(self, ws): def run(*args): for i in range(3): time.sleep(1) ws.send("Hello %d" % i) time.sleep(1) ws.close() print("thread terminating...") thread.start_new_thread(run, ()) def start(self): ws = WebSocketApp(self.url, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) ws.on_open = self.on_open ws.run_forever(http_proxy_host="127.0.0.1", http_proxy_port=8118) if __name__ == '__main__': Test().start() """ asctime: Thu, 11 Jul 2019 15:22:16 filename_line: _logging.py_[line:69] level: DEBUG message: Connecting proxy... File "/usr/local/lib/python3.5/dist-packages/websocket/_app.py", line 343, in _callback callback(*args) asctime: Thu, 11 Jul 2019 15:22:17 filename_line: _logging.py_[line:61] level: ERROR message: error from callback <bound method Test.on_open of <__main__.Test object at 0x7fb68a72ec88>>: on_open() missing 1 required positional argument: 'ws' asctime: Thu, 11 Jul 2019 15:23:00 filename_line: _logging.py_[line:61] level: ERROR message: error from callback <bound method Test.on_error of <__main__.Test object at 0x7fb68a72ec88>>: on_error() missing 1 required positional argument: 'error' File "/usr/local/lib/python3.5/dist-packages/websocket/_app.py", line 343, in _callback callback(*args) asctime: Thu, 11 Jul 2019 15:23:00 filename_line: _logging.py_[line:61] level: ERROR message: error from callback <bound method Test.on_close of <__main__.Test object at 0x7fb68a72ec88>>: on_close() missing 1 required positional argument: 'ws' File "/usr/local/lib/python3.5/dist-packages/websocket/_app.py", line 343, in _callback callback(*args) """
对于新版本,我们可以采用以下几种方法
因为新版库不再返回 WebSocketApp 本身,所以参数不再包括 ws,我们保存 WebSocketApp 对象作为实例的一个参数 self.ws,如此,仍可在类中的任意位置使用。
# -*- coding:utf-8 -*- """ @author: tz_zs """ import websocket from websocket import WebSocketApp try: import thread except ImportError: import _thread as thread import time class Test(object): def __init__(self): super(Test, self).__init__() self.url = "ws://echo.websocket.org/" self.ws = None def on_message(self, message): print("####### on_message #######") print(self) print(message) def on_error(self, error): print("####### on_error #######") print(self) print(error) def on_close(self): print("####### on_close #######") print(self) print("####### closed #######") def on_open(self): print(self) def run(*args): for i in range(3): time.sleep(1) self.ws.send("Hello %d" % i) time.sleep(1) self.ws.close() print("thread terminating...") thread.start_new_thread(run, ()) def start(self): self.ws = WebSocketApp(self.url, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) self.ws.on_open = self.on_open self.ws.run_forever(http_proxy_host="127.0.0.1", http_proxy_port=8118) if __name__ == '__main__': Test().start() """ <__main__.Test object at 0x7fb4e855cb70> ####### on_message ####### <__main__.Test object at 0x7fb4e855cb70> Hello 0 ####### on_message ####### <__main__.Test object at 0x7fb4e855cb70> Hello 1 ####### on_message ####### <__main__.Test object at 0x7fb4e855cb70> Hello 2 thread terminating... ####### on_close ####### <__main__.Test object at 0x7fb4e855cb70> ####### closed ####### """
缺点是回调方法中无法获得 self
# -*- coding:utf-8 -*- """ @author: tz_zs """ import websocket from websocket import WebSocketApp try: import thread except ImportError: import _thread as thread import time class Test(object): def __init__(self): super(Test, self).__init__() self.url = "ws://echo.websocket.org/" self.ws = None @staticmethod def on_message(ws, message): print("####### on_message #######") print(ws) print(message) @staticmethod def on_error(ws, error): print("####### on_error #######") print(ws) print(error) @staticmethod def on_close(ws): print("####### on_close #######") print(ws) print("####### closed #######") @staticmethod def on_open(ws): print(ws) def run(*args): for i in range(3): time.sleep(1) ws.send("Hello %d" % i) time.sleep(1) ws.close() print("thread terminating...") thread.start_new_thread(run, ()) def start(self): self.ws = WebSocketApp(self.url, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) self.ws.on_open = self.on_open self.ws.run_forever(http_proxy_host="127.0.0.1", http_proxy_port=8118) if __name__ == '__main__': Test().start() """ <websocket._app.WebSocketApp object at 0x7f05f2a580f0> ####### on_message ####### <websocket._app.WebSocketApp object at 0x7f05f2a580f0> Hello 0 ####### on_message ####### <websocket._app.WebSocketApp object at 0x7f05f2a580f0> Hello 1 ####### on_message ####### <websocket._app.WebSocketApp object at 0x7f05f2a580f0> Hello 2 ####### on_close ####### <websocket._app.WebSocketApp object at 0x7f05f2a580f0> ####### closed ####### """
class 继承 WebSocketApp,作为其子类。但这种方法不够灵活。
# -*- coding:utf-8 -*- """ @author: tz_zs """ import websocket from websocket import WebSocketApp try: import thread except ImportError: import _thread as thread import time class Test(WebSocketApp): def __init__(self): self.url = "ws://echo.websocket.org/" super(Test, self).__init__(url=self.url, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) def on_message(self, message): print("####### on_message #######") print(self) print(message) def on_error(self, error): print("####### on_error #######") print(self) print(error) def on_close(self): print("####### on_close #######") print(self) print("####### closed #######") def on_open(self): print(self) def run(*args): for i in range(3): time.sleep(1) self.send("Hello %d" % i) time.sleep(1) self.close() print("thread terminating...") thread.start_new_thread(run, ()) # def start(self): # ws = websocket.WebSocketApp(self.url, # on_message=self.on_message, # on_error=self.on_error, # on_close=self.on_close) # ws.on_open = self.on_open # ws.run_forever(http_proxy_host="127.0.0.1", http_proxy_port=8118) if __name__ == '__main__': obj = Test() obj.run_forever(http_proxy_host="127.0.0.1", http_proxy_port=8118) """ <__main__.Test object at 0x7f8a3d7e2908> ####### on_message ####### <__main__.Test object at 0x7f8a3d7e2908> Hello 0 ####### on_message ####### <__main__.Test object at 0x7f8a3d7e2908> Hello 1 ####### on_message ####### <__main__.Test object at 0x7f8a3d7e2908> Hello 2 ####### on_close ####### <__main__.Test object at 0x7f8a3d7e2908> ####### closed ####### """
从网站下载低版本
https://launchpad.net/ubuntu/+source/websocket-client/0.48.0-1
解压提取,使用以下命令安装
sudo python3 setup.py install
版本 websocket-client 0.44.0
""" websocket - WebSocket client library for Python Copyright (C) 2010 Hiroki Ohtani(liris) This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA """ """ WebSocketApp provides higher level APIs. """ import select import sys import threading import time import traceback import six from ._abnf import ABNF from ._core import WebSocket, getdefaulttimeout from ._exceptions import * from . import _logging __all__ = ["WebSocketApp"] class WebSocketApp(object): """ Higher level of APIs are provided. The interface is like JavaScript WebSocket object. """ def __init__(self, url, header=None, on_open=None, on_message=None, on_error=None, on_close=None, on_ping=None, on_pong=None, on_cont_message=None, keep_running=True, get_mask_key=None, cookie=None, subprotocols=None, on_data=None): """ url: websocket url. header: custom header for websocket handshake. on_open: callable object which is called at opening websocket. this function has one argument. The argument is this class object. on_message: callable object which is called when received data. on_message has 2 arguments. The 1st argument is this class object. The 2nd argument is utf-8 string which we get from the server. on_error: callable object which is called when we get error. on_error has 2 arguments. The 1st argument is this class object. The 2nd argument is exception object. on_close: callable object which is called when closed the connection. this function has one argument. The argument is this class object. on_cont_message: callback object which is called when receive continued frame data. on_cont_message has 3 arguments. The 1st argument is this class object. The 2nd argument is utf-8 string which we get from the server. The 3rd argument is continue flag. if 0, the data continue to next frame data on_data: callback object which is called when a message received. This is called before on_message or on_cont_message, and then on_message or on_cont_message is called. on_data has 4 argument. The 1st argument is this class object. The 2nd argument is utf-8 string which we get from the server. The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. The 4th argument is continue flag. if 0, the data continue keep_running: a boolean flag indicating whether the app's main loop should keep running, defaults to True get_mask_key: a callable to produce new mask keys, see the WebSocket.set_mask_key's docstring for more information subprotocols: array of available sub protocols. default is None. """ self.url = url self.header = header if header is not None else [] self.cookie = cookie self.on_open = on_open self.on_message = on_message self.on_data = on_data self.on_error = on_error self.on_close = on_close self.on_ping = on_ping self.on_pong = on_pong self.on_cont_message = on_cont_message self.keep_running = keep_running self.get_mask_key = get_mask_key self.sock = None self.last_ping_tm = 0 self.last_pong_tm = 0 self.subprotocols = subprotocols def send(self, data, opcode=ABNF.OPCODE_TEXT): """ send message. data: message to send. If you set opcode to OPCODE_TEXT, data must be utf-8 string or unicode. opcode: operation code of data. default is OPCODE_TEXT. """ if not self.sock or self.sock.send(data, opcode) == 0: raise WebSocketConnectionClosedException( "Connection is already closed.") def close(self, **kwargs): """ close websocket connection. """ self.keep_running = False if self.sock: self.sock.close(**kwargs) def _send_ping(self, interval, event): while not event.wait(interval): self.last_ping_tm = time.time() if self.sock: try: self.sock.ping() except Exception as ex: _logging.warning("send_ping routine terminated: {}".format(ex)) break def run_forever(self, sockopt=None, sslopt=None, ping_interval=0, ping_timeout=None, http_proxy_host=None, http_proxy_port=None, http_no_proxy=None, http_proxy_auth=None, skip_utf8_validation=False, host=None, origin=None): """ run event loop for WebSocket framework. This loop is infinite loop and is alive during websocket is available. sockopt: values for socket.setsockopt. sockopt must be tuple and each element is argument of sock.setsockopt. sslopt: ssl socket optional dict. ping_interval: automatically send "ping" command every specified period(second) if set to 0, not send automatically. ping_timeout: timeout(second) if the pong message is not received. http_proxy_host: http proxy host name. http_proxy_port: http proxy port. If not set, set to 80. http_no_proxy: host names, which doesn't use proxy. skip_utf8_validation: skip utf8 validation. host: update host header. origin: update origin header. """ if not ping_timeout or ping_timeout <= 0: ping_timeout = None if ping_timeout and ping_interval and ping_interval <= ping_timeout: raise WebSocketException("Ensure ping_interval > ping_timeout") if sockopt is None: sockopt = [] if sslopt is None: sslopt = {} if self.sock: raise WebSocketException("socket is already opened") thread = None close_frame = None try: self.sock = WebSocket( self.get_mask_key, sockopt=sockopt, sslopt=sslopt, fire_cont_frame=self.on_cont_message and True or False, skip_utf8_validation=skip_utf8_validation) self.sock.settimeout(getdefaulttimeout()) self.sock.connect( self.url, header=self.header, cookie=self.cookie, http_proxy_host=http_proxy_host, http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy, http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols, host=host, origin=origin) self._callback(self.on_open) if ping_interval: event = threading.Event() thread = threading.Thread( target=self._send_ping, args=(ping_interval, event)) thread.setDaemon(True) thread.start() while self.sock.connected: r, w, e = select.select( (self.sock.sock, ), (), (), ping_timeout or 10) # Use a 10 second timeout to avoid to wait forever on close if not self.keep_running: break if r: op_code, frame = self.sock.recv_data_frame(True) if op_code == ABNF.OPCODE_CLOSE: close_frame = frame break elif op_code == ABNF.OPCODE_PING: self._callback(self.on_ping, frame.data) elif op_code == ABNF.OPCODE_PONG: self.last_pong_tm = time.time() self._callback(self.on_pong, frame.data) elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: self._callback(self.on_data, data, frame.opcode, frame.fin) self._callback(self.on_cont_message, frame.data, frame.fin) else: data = frame.data if six.PY3 and op_code == ABNF.OPCODE_TEXT: data = data.decode("utf-8") self._callback(self.on_data, data, frame.opcode, True) self._callback(self.on_message, data) if ping_timeout and self.last_ping_tm \ and time.time() - self.last_ping_tm > ping_timeout \ and self.last_ping_tm - self.last_pong_tm > ping_timeout: raise WebSocketTimeoutException("ping/pong timed out") except (Exception, KeyboardInterrupt, SystemExit) as e: self._callback(self.on_error, e) if isinstance(e, SystemExit): # propagate SystemExit further raise finally: if thread and thread.isAlive(): event.set() thread.join() self.keep_running = False self.sock.close() close_args = self._get_close_args( close_frame.data if close_frame else None) self._callback(self.on_close, *close_args) self.sock = None def _get_close_args(self, data): """ this functions extracts the code, reason from the close body if they exists, and if the self.on_close except three arguments """ import inspect # if the on_close callback is "old", just return empty list if sys.version_info < (3, 0): if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3: return [] else: if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3: return [] if data and len(data) >= 2: code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2]) reason = data[2:].decode('utf-8') return [code, reason] return [None, None] def _callback(self, callback, *args): if callback: try: callback(self, *args) except Exception as e: _logging.error("error from callback {}: {}".format(callback, e)) if _logging.isEnabledForDebug(): _, _, tb = sys.exc_info() traceback.print_tb(tb)
版本 websocket-client 0.56.0
""" websocket - WebSocket client library for Python Copyright (C) 2010 Hiroki Ohtani(liris) This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA """ """ WebSocketApp provides higher level APIs. """ import inspect import select import sys import threading import time import traceback import six from ._abnf import ABNF from ._core import WebSocket, getdefaulttimeout from ._exceptions import * from . import _logging __all__ = ["WebSocketApp"] class Dispatcher: def __init__(self, app, ping_timeout): self.app = app self.ping_timeout = ping_timeout def read(self, sock, read_callback, check_callback): while self.app.sock.connected: r, w, e = select.select( (self.app.sock.sock, ), (), (), self.ping_timeout) if r: if not read_callback(): break check_callback() class SSLDispacther: def __init__(self, app, ping_timeout): self.app = app self.ping_timeout = ping_timeout def read(self, sock, read_callback, check_callback): while self.app.sock.connected: r = self.select() if r: if not read_callback(): break check_callback() def select(self): sock = self.app.sock.sock if sock.pending(): return [sock,] r, w, e = select.select((sock, ), (), (), self.ping_timeout) return r class WebSocketApp(object): """ Higher level of APIs are provided. The interface is like JavaScript WebSocket object. """ def __init__(self, url, header=None, on_open=None, on_message=None, on_error=None, on_close=None, on_ping=None, on_pong=None, on_cont_message=None, keep_running=True, get_mask_key=None, cookie=None, subprotocols=None, on_data=None): """ url: websocket url. header: custom header for websocket handshake. on_open: callable object which is called at opening websocket. this function has one argument. The argument is this class object. on_message: callable object which is called when received data. on_message has 2 arguments. The 1st argument is this class object. The 2nd argument is utf-8 string which we get from the server. on_error: callable object which is called when we get error. on_error has 2 arguments. The 1st argument is this class object. The 2nd argument is exception object. on_close: callable object which is called when closed the connection. this function has one argument. The argument is this class object. on_cont_message: callback object which is called when receive continued frame data. on_cont_message has 3 arguments. The 1st argument is this class object. The 2nd argument is utf-8 string which we get from the server. The 3rd argument is continue flag. if 0, the data continue to next frame data on_data: callback object which is called when a message received. This is called before on_message or on_cont_message, and then on_message or on_cont_message is called. on_data has 4 argument. The 1st argument is this class object. The 2nd argument is utf-8 string which we get from the server. The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. The 4th argument is continue flag. if 0, the data continue keep_running: this parameter is obsolete and ignored. get_mask_key: a callable to produce new mask keys, see the WebSocket.set_mask_key's docstring for more information subprotocols: array of available sub protocols. default is None. """ self.url = url self.header = header if header is not None else [] self.cookie = cookie self.on_open = on_open self.on_message = on_message self.on_data = on_data self.on_error = on_error self.on_close = on_close self.on_ping = on_ping self.on_pong = on_pong self.on_cont_message = on_cont_message self.keep_running = False self.get_mask_key = get_mask_key self.sock = None self.last_ping_tm = 0 self.last_pong_tm = 0 self.subprotocols = subprotocols def send(self, data, opcode=ABNF.OPCODE_TEXT): """ send message. data: message to send. If you set opcode to OPCODE_TEXT, data must be utf-8 string or unicode. opcode: operation code of data. default is OPCODE_TEXT. """ if not self.sock or self.sock.send(data, opcode) == 0: raise WebSocketConnectionClosedException( "Connection is already closed.") def close(self, **kwargs): """ close websocket connection. """ self.keep_running = False if self.sock: self.sock.close(**kwargs) self.sock = None def _send_ping(self, interval, event): while not event.wait(interval): self.last_ping_tm = time.time() if self.sock: try: self.sock.ping() except Exception as ex: _logging.warning("send_ping routine terminated: {}".format(ex)) break def run_forever(self, sockopt=None, sslopt=None, ping_interval=0, ping_timeout=None, http_proxy_host=None, http_proxy_port=None, http_no_proxy=None, http_proxy_auth=None, skip_utf8_validation=False, host=None, origin=None, dispatcher=None, suppress_origin = False, proxy_type=None): """ run event loop for WebSocket framework. This loop is infinite loop and is alive during websocket is available. sockopt: values for socket.setsockopt. sockopt must be tuple and each element is argument of sock.setsockopt. sslopt: ssl socket optional dict. ping_interval: automatically send "ping" command every specified period(second) if set to 0, not send automatically. ping_timeout: timeout(second) if the pong message is not received. http_proxy_host: http proxy host name. http_proxy_port: http proxy port. If not set, set to 80. http_no_proxy: host names, which doesn't use proxy. skip_utf8_validation: skip utf8 validation. host: update host header. origin: update origin header. dispatcher: customize reading data from socket. suppress_origin: suppress outputting origin header. Returns ------- False if caught KeyboardInterrupt True if other exception was raised during a loop """ if ping_timeout is not None and ping_timeout <= 0: ping_timeout = None if ping_timeout and ping_interval and ping_interval <= ping_timeout: raise WebSocketException("Ensure ping_interval > ping_timeout") if not sockopt: sockopt = [] if not sslopt: sslopt = {} if self.sock: raise WebSocketException("socket is already opened") thread = None self.keep_running = True self.last_ping_tm = 0 self.last_pong_tm = 0 def teardown(close_frame=None): """ Tears down the connection. If close_frame is set, we will invoke the on_close handler with the statusCode and reason from there. """ if thread and thread.isAlive(): event.set() thread.join() self.keep_running = False if self.sock: self.sock.close() close_args = self._get_close_args( close_frame.data if close_frame else None) self._callback(self.on_close, *close_args) self.sock = None try: self.sock = WebSocket( self.get_mask_key, sockopt=sockopt, sslopt=sslopt, fire_cont_frame=self.on_cont_message is not None, skip_utf8_validation=skip_utf8_validation, enable_multithread=True if ping_interval else False) self.sock.settimeout(getdefaulttimeout()) self.sock.connect( self.url, header=self.header, cookie=self.cookie, http_proxy_host=http_proxy_host, http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy, http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols, host=host, origin=origin, suppress_origin=suppress_origin, proxy_type=proxy_type) if not dispatcher: dispatcher = self.create_dispatcher(ping_timeout) self._callback(self.on_open) if ping_interval: event = threading.Event() thread = threading.Thread( target=self._send_ping, args=(ping_interval, event)) thread.setDaemon(True) thread.start() def read(): if not self.keep_running: return teardown() op_code, frame = self.sock.recv_data_frame(True) if op_code == ABNF.OPCODE_CLOSE: return teardown(frame) elif op_code == ABNF.OPCODE_PING: self._callback(self.on_ping, frame.data) elif op_code == ABNF.OPCODE_PONG: self.last_pong_tm = time.time() self._callback(self.on_pong, frame.data) elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: self._callback(self.on_data, frame.data, frame.opcode, frame.fin) self._callback(self.on_cont_message, frame.data, frame.fin) else: data = frame.data if six.PY3 and op_code == ABNF.OPCODE_TEXT: data = data.decode("utf-8") self._callback(self.on_data, data, frame.opcode, True) self._callback(self.on_message, data) return True def check(): if (ping_timeout): has_timeout_expired = time.time() - self.last_ping_tm > ping_timeout has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0 has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeout if (self.last_ping_tm and has_timeout_expired and (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)): raise WebSocketTimeoutException("ping/pong timed out") return True dispatcher.read(self.sock.sock, read, check) except (Exception, KeyboardInterrupt, SystemExit) as e: self._callback(self.on_error, e) if isinstance(e, SystemExit): # propagate SystemExit further raise teardown() return not isinstance(e, KeyboardInterrupt) def create_dispatcher(self, ping_timeout): timeout = ping_timeout or 10 if self.sock.is_ssl(): return SSLDispacther(self, timeout) return Dispatcher(self, timeout) def _get_close_args(self, data): """ this functions extracts the code, reason from the close body if they exists, and if the self.on_close except three arguments """ # if the on_close callback is "old", just return empty list if sys.version_info < (3, 0): if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3: return [] else: if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3: return [] if data and len(data) >= 2: code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2]) reason = data[2:].decode('utf-8') return [code, reason] return [None, None] def _callback(self, callback, *args): if callback: try: if inspect.ismethod(callback): callback(*args) else: callback(self, *args) except Exception as e: _logging.error("error from callback {}: {}".format(callback, e)) if _logging.isEnabledForDebug(): _, _, tb = sys.exc_info() traceback.print_tb(tb)
https://stackoverflow.com/questions/26980966/using-a-websocket-client-as-a-class-in-python
Passing method of non-WebSocketApp object as callback does not receive the WebSocketApp object as an argument
why the function “WebSocketApp run_forever” doesn’t work in linux? But it’s OK in windows.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。