赞
踩
官方文档: https://websockets.readthedocs.io/
(Version: 8.1)
pip install websockets
服务端:
不断监听客户端连接,当客户端发送一个name字符串后,返回一条 ‘hello’ + name 的问候消息。
# WS server example import asyncio import websockets async def hello(websocket, path): # path标识请求路径,可以来自定义需求 name = await websocket.recv() print(f"< {name}") greeting = f"Hello {name}!" await websocket.send(greeting) print(f"> {greeting}") start_server = websockets.serve(hello, "localhost", 8765) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
客户端:
客户端连接到server,执行协程处理器 handler,当协程退出后与server断开连接。
# WS client example import asyncio import websockets async def hello(): uri = "ws://localhost:8765" async with websockets.connect(uri) as websocket: name = input("What's your name? ") await websocket.send(name) print(f"> {name}") greeting = await websocket.recv() print(f"< {greeting}") asyncio.get_event_loop().run_until_complete(hello())
connect()通过 上下文管理器保证了协程退出之前关闭socket连接。
安全的WebSocket连接提高了保密性和可靠性,降低了了使用不安全的proxy代理服务的风险。
WSS协议之于WS就像HTTPS之于HTTP: 连接是用传输层安全(TLS)加密的,TLS通常被称为安全套接字层(SSL)。WSS需要类似HTTPS的TLS证书。
下面介绍如改写上面的服务器示例以提供安全连接。请参阅ssl
模块的文档配置上下文。
# WSS (WS over TLS) server example, with a self-signed certificate import asyncio import pathlib import ssl import websockets async def hello(websocket, path): name = await websocket.recv() print(f"< {name}") greeting = f"Hello {name}!" await websocket.send(greeting) print(f"> {greeting}") ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) localhost_pem = pathlib.Path(__file__).with_name("localhost.pem") ssl_context.load_cert_chain(localhost_pem) start_server = websockets.serve( hello, "localhost", 8765, ssl=ssl_context ) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
改写相应的客户端安全连接实现:
# WSS (WS over TLS) client example, with a self-signed certificate import asyncio import pathlib import ssl import websockets ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) localhost_pem = pathlib.Path(__file__).with_name("localhost.pem") ssl_context.load_verify_locations(localhost_pem) async def hello(): uri = "wss://localhost:8765" async with websockets.connect( uri, ssl=ssl_context ) as websocket: name = input("What's your name? ") await websocket.send(name) print(f"> {name}") greeting = await websocket.recv() print(f"< {greeting}") asyncio.get_event_loop().run_until_complete(hello())
注意:
此处客户端需要一个ssl上下文,因为此处使用的是一个 自签名的CA证书。
如果使用有效的证书(即由Python安装信任的CA签名)连接到安全的WebSocket服务器的客户端可以简单地将ssl=True传递给connect(),而不必构建ssl上下文。
从浏览器连接到我们自己实现的服务器:
从console里运行以下程序:
# WS server that sends messages at random intervals import asyncio import datetime import random import websockets async def time(websocket, path): while True: now = datetime.datetime.utcnow().isoformat() + "Z" await websocket.send(now) await asyncio.sleep(random.random() * 3) start_server = websockets.serve(time, "127.0.0.1", 5678) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
将以下代码写入html文件,并从浏览器里打开
<!DOCTYPE html> <html> <head> <title>WebSocket demo</title> </head> <body> <script> var ws = new WebSocket("ws://127.0.0.1:5678/"), messages = document.createElement('ul'); ws.onmessage = function (event) { var messages = document.getElementsByTagName('ul')[0], message = document.createElement('li'), content = document.createTextNode(event.data); message.appendChild(content); messages.appendChild(message); }; document.body.appendChild(messages); </script> </body> </html>
一个websocket服务器可以从多个客户端接收消息,处理消息并同步状态到所有客户端。
以下示例展示了任意一客户端将一变量counter进行增加或减少后,所有已连接的客户端都能同步更新counter的实时值。
协程模块 asyncio
保证了变量的更新是按先后顺序的。
服务端代码:
# WS server example that synchronizes state across clients import asyncio import json import logging import websockets logging.basicConfig() STATE = {"value": 0} # 保存所有在线客户端 USERS = set() def state_event(): return json.dumps({"type": "state", **STATE}) def users_event(): return json.dumps({"type": "users", "count": len(USERS)}) # 更新所有客户端显示的counter值 async def notify_state(): if USERS: # asyncio.wait doesn't accept an empty list message = state_event() await asyncio.wait([user.send(message) for user in USERS]) # 通知客户端在线数量 async def notify_users(): if USERS: # asyncio.wait doesn't accept an empty list message = users_event() await asyncio.wait([user.send(message) for user in USERS]) # 注册客户端 async def register(websocket): USERS.add(websocket) await notify_users() # 注销客户端 async def unregister(websocket): USERS.remove(websocket) await notify_users() async def counter(websocket, path): # register(websocket) sends user_event() to websocket await register(websocket) try: await websocket.send(state_event()) # 迭代websocket以不断接收消息,此处要求对象实现了 __iter__()、__await__()、 __aenter__()、 __aexit__() 方法。 async for message in websocket: data = json.loads(message) if data["action"] == "minus": STATE["value"] -= 1 await notify_state() elif data["action"] == "plus": STATE["value"] += 1 await notify_state() else: logging.error("unsupported event: {}", data) finally: # 客户端断开后,退出上面的 for 循环,即客户端协程退出后 await unregister(websocket) start_server = websockets.serve(counter, "localhost", 6789) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
将以下代码写入html文件,并用浏览器打开多个页面模拟多个ws客户端
<html> <head> <title>WebSocket demo</title> <style type="text/css"> body { font-family: "Courier New", sans-serif; text-align: center; } .buttons { font-size: 4em; display: flex; justify-content: center; } .button, .value { line-height: 1; padding: 2rem; margin: 2rem; border: medium solid; min-height: 1em; min-width: 1em; } .button { cursor: pointer; user-select: none; } .minus { color: red; } .plus { color: green; } .value { min-width: 2em; } .state { font-size: 2em; } </style> </head> <body> <div class="buttons"> <div class="minus button">-</div> <div class="value">?</div> <div class="plus button">+</div> </div> <div class="state"> <span class="users">?</span> online </div> <script> var minus = document.querySelector('.minus'), plus = document.querySelector('.plus'), value = document.querySelector('.value'), users = document.querySelector('.users'), websocket = new WebSocket("ws://127.0.0.1:6789/"); minus.onclick = function (event) { websocket.send(JSON.stringify({action: 'minus'})); } plus.onclick = function (event) { websocket.send(JSON.stringify({action: 'plus'})); } websocket.onmessage = function (event) { data = JSON.parse(event.data); switch (data.type) { case 'state': value.textContent = data.value; break; case 'users': users.textContent = ( data.count.toString() + " user" + (data.count == 1 ? "" : "s")); break; default: console.error( "unsupported event", data); } }; </script> </body> </html>
你要在连接的生命周期里处理多条消息,你必须实现一个loop,以下为你提供了一个构建Websocket Server的基础模板。
接收消息并且传递到消费者协程中。
async def consumer_handler(websocket, path):
async for message in websocket:
await consumer(message)
当客户端断开连接后终止迭代。
从生产者协程生成消息,并且发送出去
async def producer_handler(websocket, path):
while True:
message = await producer()
await websocket.send(message)
此处生产者代表你的产生消息的业务逻辑。
注意:当客户端断开连接后,send() 会引发 ConnectionClosed
异常,从而从 while True
的 loop 中 退出。
你可以将上面两种模式结合起来,两个协程任务并行。
async def handler(websocket, path):
consumer_task = asyncio.ensure_future(
consumer_handler(websocket, path))
producer_task = asyncio.ensure_future(
producer_handler(websocket, path))
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
参考上面多客户端同步的代码,你需要记录所有已连接的客户端,当他们连接到server时进行注册,断开时注销。
connected = set()
async def handler(websocket, path):
# Register.
connected.add(websocket)
try:
# Implement logic here.
await asyncio.wait([ws.send("Hello!") for ws in connected])
await asyncio.sleep(10)
finally:
# Unregister.
connected.remove(websocket)
这个简单的示例展示了如何在内存中跟踪连接的客户端,这只在运行单个进程时有效。在实际应用程序中,handler可以注册到消息代理broker 上的某些channels。
handler处理器: 实现一个协程处理单个连接,接收两个参数(WebSocket协议实例和url 路径参数)
recv()
和send()
来接收和发送消息。recv()
或send()
引发ConnectionClosed
时,清除并退出。如果您启动了其他异步任务,在退出之前终止它们。recv()
,可以考虑awaiting wait_closed()
,以便在连接关闭时快速检测。ping()
或pong()
,但一般不需要使用serve()
创建一个服务器,它类似于asyncio
中loop的create_server()
。您还可以将它用作异步上下文管理器。
WebSocketServerProtocol
的子类,并将这个子类或工厂函数作为 create_protocol
参数传递。使用connect()
创建一个客户端,它类似于asyncio
中loop的create_connection()
。您还可以将它用作异步上下文管理器。
WebSocketClientProtocol
的子类,并将这个子类或工厂函数作为 create_protocol
参数传递。随时调用recv()和send()来接收和发送消息。
如果你愿意,你可以ping()或pong(),但一般不需要。
如果没有使用connect()
作为上下文管理器,请调用close()
来终止连接。
如果你不了解websocket的工作原理,请打开日志调试
import logging
logger = logging.getLogger('websockets')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
如果你不了解asyncio
库, 建议查看官方文档develop with asyncio.
import asyncio
import functools
import websockets
async def handler(websocket, path, extra_argument):
...
bound_handler = functools.partial(handler, extra_argument='spam')
start_server = websockets.serve(bound_handler, '127.0.0.1', 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
实现此结果的另一种方法是在存在extra_argument
变量的范围内定义handler
协程,而不是通过参数注入它。
close()
方法,然后等待它自身的wait_closed()
方法执行结束。在Unix系统上(windows就不要试了),退出通常是通过发送一个信号来触发的。
import asyncio import signal import websockets async def echo(websocket, path): async for message in websocket: await websocket.send(message) async def echo_server(stop): async with websockets.serve(echo, "localhost", 8765): await stop loop = asyncio.get_event_loop() # The stop condition is set when receiving SIGTERM. stop = loop.create_future() loop.add_signal_handler(signal.SIGTERM, stop.set_result, None) # Run the server until the stop condition is met. loop.run_until_complete(echo_server(stop))
如果你的server不是运行在主线程上,可以使用 call_soon_threadsafe()
.
Websocket是HTTP/1.1.的扩展,在同一个端口上同时提供HTTP和WebSocket是ok的。
WebSocket的作者并不认为这是一个好主意,因为HTTP和WebSocket的操作特性有很大的不同。
websockets
使用process_request
参数钩子,为响应HTTP请求提供了最低限度的支持。典型的用例包括健康检查。这里有一个例子
# WS echo server with HTTP endpoint at /health/ import asyncio import http import websockets async def health_check(path, request_headers): if path == "/health/": return http.HTTPStatus.OK, [], b"OK\n" async def echo(websocket, path): async for message in websocket: await websocket.send(message) start_server = websockets.serve( echo, "localhost", 8765, process_request=health_check ) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
websockets 支持扩展
serve()
and connect()
默认支持Per-Message Deflate, 你可以
通过参数禁用compression=None
.
如果希望自定义Per-Message Deflate参数,还可以显式配置每个消息的Deflate扩展。
import websockets
from websockets.extensions import permessage_deflate
websockets.serve(
...,
extensions=[
permessage_deflate.ServerPerMessageDeflateFactory(
server_max_window_bits=11,
client_max_window_bits=11,
compress_settings={'memLevel': 4},
),
],
)
from websockets.extensions import permessage_deflate
websockets.connect(
...,
extensions=[
permessage_deflate.ClientPerMessageDeflateFactory(
server_max_window_bits=11,
client_max_window_bits=11,
compress_settings={'memLevel': 4},
),
],
)
参考API文档ServerPerMessageDeflateFactory 和 ClientPerMessageDeflateFactory 了解更多细节。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。