赞
踩
____tz_zs
本次,我将从主流的三方框架使用出发,带大家熟悉和使用 Python 中常见的 websocket 库。
websocket-client 库是一个简单好用的同步的 websocket 的客户端的库,基于回调的方式使用。
pypi地址: https://pypi.org/project/websocket-client/
GitHub地址: https://github.com/websocket-client/websocket-client
文档地址: https://websocket-client.readthedocs.io/en/latest/
websocket-client 库也是我们诸多项目中正在使用的 websocket 库,这个库开箱即用,非常的方便,其 WebSocketApp 适合于建立长期连接。
我们使用对象的函数作为 WebSocketApp 的回调函数,可以在全局变量中缓存和共享数据,持有 websocket 引用以便使用和关闭,维护长链接和心跳等,这种方式在构建应用程序时具有更大的灵活性。
下方为 WebSocketApp 的一个使用例子,创建 WebSocketApp
对象,传入 url
地址,指定 on_open
等回调函数,调用 run_forever 运行。
tips:
1、websocket.enableTrace(True)
可以开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。
2、也可以先创建对象,之后再使用如self.ws.on_open = self.on_open
来指定回调函数。只需在 run_forever() 之前指定回调函数即可。
3、控制台输出的send: b'\x8a\x80\xf4-\xd9\x8b'
等形式的信息,是框架的开启运行状态追踪的输出信息(框架的输出log)
# -*- coding:utf-8 -*- """ @author: tz_zs """ import websocket from websocket import WebSocketApp try: import thread except ImportError: import _thread as thread import time class Test(object): def __init__(self): super(Test, self).__init__() self.url = "ws://echo.websocket.org/" self.ws = None def on_message(self, message): print("####### on_message #######") print("message:%s" % message) def on_error(self, error): print("####### on_error #######") print("error:%s" % error) def on_close(self): print("####### on_close #######") def on_ping(self, message): print("####### on_ping #######") print("ping message:%s" % message) def on_pong(self, message): print("####### on_pong #######") print("pong message:%s" % message) def on_open(self): print("####### on_open #######") thread.start_new_thread(self.run, ()) def run(self, *args): while True: time.sleep(1) input_msg = input("输入要发送的消息(ps:输入关键词 close 结束程序):\n") if input_msg == "close": self.ws.close() # 关闭 print("thread terminating...") break else: self.ws.send(input_msg) def start(self): websocket.enableTrace(True) # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。 self.ws = WebSocketApp(self.url, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) # self.ws.on_open = self.on_open # 也可以先创建对象再这样指定回调函数。run_forever 之前指定回调函数即可。 self.ws.run_forever() if __name__ == '__main__': Test().start() """ --- request header --- GET / HTTP/1.1 Upgrade: websocket Host: echo.websocket.org Origin: http://echo.websocket.org Sec-WebSocket-Key: AXR9yvs3Ucn9LE35KkhXfw== Sec-WebSocket-Version: 13 Connection: upgrade ----------------------- --- response header --- HTTP/1.1 101 Web Socket Protocol Handshake Connection: Upgrade Date: Wed, 04 Aug 2021 06:29:05 GMT Sec-WebSocket-Accept: WoOPLeAQpWaV2Bqd4sDOFkSpUuw= Server: Kaazing Gateway Upgrade: websocket ----------------------- ####### on_open ####### 输入要发送的消息(ps:输入关键词 close 结束程序): aaadbbbbb send: b'\x81\x89\x82-\xdfj\xe3L\xbe\x0e\xe0O\xbd\x08\xe0' ####### on_message ####### message:aaadbbbbb 输入要发送的消息(ps:输入关键词 close 结束程序): sakdnakjf send: b'\x81\x89\xa8\xe0g\x8b\xdb\x81\x0c\xef\xc6\x81\x0c\xe1\xce' ####### on_message ####### message:sakdnakjf 输入要发送的消息(ps:输入关键词 close 结束程序): 123456 send: b'\x81\x86(\x84>\xb7\x19\xb6\r\x83\x1d\xb2' ####### on_message ####### message:123456 输入要发送的消息(ps:输入关键词 close 结束程序): send: b'\x8a\x80.\xf3`+' send: b'\x8a\x80P\x0c\xc6W' send: b'\x8a\x807j\x03l' send: b'\x8a\x80\xd0\xac%v' send: b'\x8a\x80\xb9\x9do\x08' send: b'\x8a\x80s\xbb\xad\x8f' send: b'\x8a\x80\xf4-\xd9\x8b' close send: b'\x88\x82\xf5L>\xc4\xf6\xa4' ####### on_close ####### Process finished with exit code 0 """
很多时候,特别是在线下测试环境中时,我们常常需要代理去访问某些服务器。如下 demo 中分别使用 http 和 socks5。
http_proxy_host
参数传入代理的 host 地址。
http_proxy_port
参数传入代理的端口号。
proxy_type
参数如果不填,默认为 "http"
,可选参数值有 'http'
, 'socks4'
, 'socks5'
, 'socks5h'
。
使用 socks5 可能会少包
PySocks module not found
,注意不要下错包了pip install PySocks
(No module named ‘socks’)
# -*- coding:utf-8 -*- """ @author: tz_zs """ import websocket from websocket import WebSocketApp try: import thread except ImportError: import _thread as thread import time class Test(object): def __init__(self): super(Test, self).__init__() self.url = "ws://echo.websocket.org/" self.ws = None def on_message(self, message): print("####### on_message #######") print("message:%s" % message) def on_error(self, error): print("####### on_error #######") print("error:%s" % error) def on_close(self): print("####### on_close #######") def on_ping(self, message): print("####### on_ping #######") print("ping message:%s" % message) def on_pong(self, message): print("####### on_pong #######") print("pong message:%s" % message) def on_open(self): print("####### on_open #######") thread.start_new_thread(self.run, ()) def run(self, *args): # for i in range(3): # time.sleep(1) # self.ws.send("Hello %d" % i) while True: time.sleep(1) input_msg = input("输入要发送的消息(ps:输入关键词 close 结束程序):\n") if input_msg == "close": self.ws.close() # 关闭 print("thread terminating...") break else: self.ws.send(input_msg) def start(self): websocket.enableTrace(True) # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。 self.ws = WebSocketApp(self.url, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) # self.ws.on_open = self.on_open # 也可以先创建对象再这样指定回调函数。run_forever 之前指定回调函数即可。 self.ws.run_forever(http_proxy_host="192.168.1.110", http_proxy_port=8123, proxy_type='http') # self.ws.run_forever(http_proxy_host="192.168.1.110", http_proxy_port=1080, proxy_type='socks5') if __name__ == '__main__': Test().start() """ Connecting proxy... --- request header --- CONNECT echo.websocket.org:80 HTTP/1.0 ----------------------- --- response header --- HTTP/1.1 200 Connection established ----------------------- --- request header --- GET / HTTP/1.1 Upgrade: websocket Host: echo.websocket.org Origin: http://echo.websocket.org Sec-WebSocket-Key: JvORoQC9F6tb639eFo5s+Q== Sec-WebSocket-Version: 13 Connection: upgrade ----------------------- --- response header --- HTTP/1.1 101 Web Socket Protocol Handshake Connection: Upgrade Date: Wed, 04 Aug 2021 07:23:32 GMT Sec-WebSocket-Accept: LyeriRX6MoTWDOUIwu3T1AhurSQ= Server: Kaazing Gateway Upgrade: websocket ----------------------- ####### on_open ####### 输入要发送的消息(ps:输入关键词 close 结束程序): 124 send: b'\x81\x83b\x83c\xd5S\xb1W' ####### on_message ####### message:124 输入要发送的消息(ps:输入关键词 close 结束程序): 743 send: b'\x81\x83*\\\xe8d\x1dh\xdb' ####### on_message ####### message:743 输入要发送的消息(ps:输入关键词 close 结束程序): close send: b'\x88\x82\xaeS\xd6\x94\xad\xbb' ####### on_close #######thread terminating... Process finished with exit code 0 """
源码 site-packages/websocket/_http.py
class proxy_info(object):
def __init__(self, **options):
self.type = options.get("proxy_type") or "http"
if not(self.type in ['http', 'socks4', 'socks5', 'socks5h']):
raise ValueError("proxy_type must be 'http', 'socks4', 'socks5' or 'socks5h'")
self.host = options.get("http_proxy_host", None)
if self.host:
self.port = options.get("http_proxy_port", 0)
self.auth = options.get("http_proxy_auth", None)
self.no_proxy = options.get("http_no_proxy", None)
else:
self.port = 0
self.auth = None
self.no_proxy = None
WebSocket 规范将 ping 和 pong 消息操作码定义为协议的一部分。使即使服务器和客户端之间没有传输数据,也可以保持长期连接处于活动状态。
框架接收到服务器发来的 ping 帧时,会立刻自动调用下方的函数,将数据使用 pong 帧原样发送给服务器,然后才回调 on_ping 将数据给使用者。所以,服务器发送到 ping 帧,我们一般不需要处理,框架自动回应了。
源码 site-packages/websocket/_core.py 297行
def pong(self, payload):
"""
send pong data.
payload: data payload to send server.
"""
if isinstance(payload, six.text_type):
payload = payload.encode("utf-8")
self.send(payload, ABNF.OPCODE_PONG)
设置参数 ping_interval
框架将每间隔时间后自动发送空内容的 ping 帧。如果不设置参数(默认参数为 0),则不自动发送。
源码 site-packages/websocket/_core.py 287行
def ping(self,payload=""):
"""
send ping data.
payload: data payload to send server.
"""
if isinstance(payload, six.text_type):
payload = payload.encode("utf-8")
self.send(payload, ABNF.OPCODE_PING)
但是,因为框架不是异步的,如果发生阻塞事件,ping/pong 可能会出现一些问题。所以一般需要设置 ping_timeout
超时时间。
注意,当 ping_interval 和 ping_timeout 参数都设置了的时候,框架要求参数 ping_interval 的值需大于参数 ping_timeout。
if ping_timeout and ping_interval and ping_interval <= ping_timeout:
raise WebSocketException("Ensure ping_interval > ping_timeout")
源码中发送空 ping 帧时,会保存发送时间 self.last_ping_tm
,接收到服务器返回的 pong 帧时,会保存接收时间 self.last_pong_tm
,如果同时满足(1)当前的时间距离上次发送 ping 的时间间隔大于参数 ping_timeout,(2)上次发送 ping 之后没有收到 pong,或接收到 pong 的时间距离上次发送 ping 的时间间隔大于参数 ping_timeout。
源码 site-packages/websocket/_app.py 294行
def check():
if (ping_timeout):
has_timeout_expired = time.time() - self.last_ping_tm > ping_timeout
has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0
has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeout
if (self.last_ping_tm
and has_timeout_expired
and (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)):
raise WebSocketTimeoutException("ping/pong timed out")
return True
如下 demo 中,设置 ping_interval 为 20, ping_timeout 为 10。
# -*- coding:utf-8 -*- """ @author: tz_zs """ import websocket from websocket import WebSocketApp, ABNF try: import thread except ImportError: import _thread as thread import time class Test(object): def __init__(self): super(Test, self).__init__() self.url = "ws://echo.websocket.org/" self.ws = None def on_message(self, message): print("####### on_message #######") print("message:%s" % message) def on_error(self, error): print("####### on_error #######") print("error:%s" % error) def on_close(self): print("####### on_close #######") def on_ping(self, message): print("####### on_ping #######") print("ping time:%s" % time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) print("ping message:%s" % message) def on_pong(self, message): print("####### on_pong #######") print("pong time:%s" % time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) print("pong message:%s" % message) def on_open(self): print("####### on_open #######") thread.start_new_thread(self.run, ()) def run(self, *args): # for i in range(3): # time.sleep(1) # self.ws.send("Hello %d" % i) while True: time.sleep(1) input_msg = input("输入要发送的ping消息(ps:输入关键词 close 结束程序):\n") if input_msg == "close": self.ws.close() # 关闭 print("thread terminating...") break else: self.ws.send(input_msg, ABNF.OPCODE_PING) # self.ws.send(input_msg) def start(self): websocket.enableTrace(True) # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。 self.ws = WebSocketApp(self.url, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close, on_ping=self.on_ping, on_pong=self.on_pong) self.ws.run_forever(ping_interval=20, ping_timeout=10) if __name__ == '__main__': Test().start()
框架每 20 秒发送空内容的 ping,控制台输出如下:
""" --- request header --- GET / HTTP/1.1 Upgrade: websocket Host: echo.websocket.org Origin: http://echo.websocket.org Sec-WebSocket-Key: MDsoARDQEI8/4YILiLGwHw== Sec-WebSocket-Version: 13 Connection: upgrade ----------------------- --- response header --- HTTP/1.1 101 Web Socket Protocol Handshake Connection: Upgrade Date: Wed, 04 Aug 2021 08:20:42 GMT Sec-WebSocket-Accept: QiNgZdxqfcpJIeKlV6Byezls2Gw= Server: Kaazing Gateway Upgrade: websocket ----------------------- ####### on_open ####### 输入要发送的ping消息(ps:输入关键词 close 结束程序): send: b'\x89\x80J\x1cn\x8c' ####### on_pong ####### pong time:2021-08-04 16:21:12 pong message:b'' send: b'\x89\x80_\xad\xa1\xd9' ####### on_pong ####### pong time:2021-08-04 16:21:32 pong message:b'' send: b'\x89\x80\xb6$\xc9\x9d' ####### on_pong ####### pong time:2021-08-04 16:21:52 pong message:b'' send: b'\x89\x80)\xc3\x1f\xdc' ####### on_pong ####### pong time:2021-08-04 16:22:12 pong message:b'' send: b'\x89\x80\xcf|\xfa&' ####### on_pong ####### pong time:2021-08-04 16:22:32 pong message:b'' send: b'\x89\x80\xe5\x19/\xf9' ####### on_pong ####### pong time:2021-08-04 16:22:52 pong message:b'' send: b'\x89\x80\xfb\x9bA8' ####### on_pong ####### pong time:2021-08-04 16:23:12 pong message:b'' send: b'\x89\x80\xfc\xaa\xa6}' ####### on_pong ####### pong time:2021-08-04 16:23:32 pong message:b'' send: b'\x89\x80O\xa0\x0e\xb6' ####### on_pong ####### pong time:2021-08-04 16:23:52 pong message:b'' """
手动发送有内容的 ping,控制台输出如下:
""" --- request header --- GET / HTTP/1.1 Upgrade: websocket Host: echo.websocket.org Origin: http://echo.websocket.org Sec-WebSocket-Key: Ovizd9gdVze4BCLhymQ92Q== Sec-WebSocket-Version: 13 Connection: upgrade ----------------------- --- response header --- ####### on_open ####### HTTP/1.1 101 Web Socket Protocol Handshake Connection: Upgrade Date: Wed, 04 Aug 2021 08:26:07 GMT Sec-WebSocket-Accept: vTtPyDWKViUA88UgBaLep+qi+CI= Server: Kaazing Gateway Upgrade: websocket ----------------------- 输入要发送的ping消息(ps:输入关键词 close 结束程序): ping_test send: b'\x89\x89+\x81\x19$[\xe8wCt\xf5|W_' ####### on_pong ####### pong time:2021-08-04 16:26:24 pong message:b'ping_test' 输入要发送的ping消息(ps:输入关键词 close 结束程序): ping1111111111 send: b'\x89\x8e\xe3\x05\xbck\x93l\xd2\x0c\xd24\x8dZ\xd24\x8dZ\xd24' ####### on_pong ####### pong time:2021-08-04 16:26:32 pong message:b'ping1111111111' 输入要发送的ping消息(ps:输入关键词 close 结束程序): send: b'\x89\x80#\x0e\xa8\xdd' ####### on_pong ####### pong time:2021-08-04 16:26:37 pong message:b'' close send: b'\x88\x82-0\xd6\xbc.\xd8' ####### on_close ####### Process finished with exit code 0 """
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。