赞
踩
为了使用Python实现WebSocket通讯和心跳控制,我们通常需要一个WebSocket客户端库和一个服务器端库。这里,我们将使用websockets库作为服务器和客户端的示例。
安装必要的库
首先,你需要安装websockets库。可以使用pip进行安装:
pip install websockets
服务器端将处理WebSocket连接,发送和接收消息,并管理心跳检测。
- import asyncio
- import websockets
- import time
-
- HEARTBEAT_INTERVAL = 10 # 心跳间隔(秒)
- HEARTBEAT_TIMEOUT = 30 # 心跳超时(秒)
-
- # 心跳检测字典,存储客户端和它们最后活动时间
- clients = {}
-
- async def heartbeat_task(ws, client_id):
- while True:
- try:
- # 如果客户端在HEARTBEAT_TIMEOUT内没有发送消息,则关闭连接
- if time.time() - clients[client_id]['last_active'] > HEARTBEAT_TIMEOUT:
- print(f"Client {client_id} timed out, closing connection.")
- await ws.close()
- break
- # 发送心跳消息
- await ws.send("heartbeat")
- print(f"Sent heartbeat to client {client_id}")
- await asyncio.sleep(HEARTBEAT_INTERVAL)
- except Exception as e:
- print(f"Heartbeat task for client {client_id} failed: {e}")
- break
-
- async def echo(websocket, path):
- client_id = id(websocket) # 使用内存地址作为简单的客户端ID
- clients[client_id] = {'ws': websocket, 'last_active': time.time()}
-
- # 启动心跳检测任务
- asyncio.create_task(heartbeat_task(websocket, client_id))
-
- try:
- async for message in websocket:
- clients[client_id]['last_active'] = time.time() # 更新最后活动时间
- if message == "ping":
- print(f"Received ping from client {client_id}")
- await websocket.send("pong")
- else:
- print(f"Received '{message}' from client {client_id}")
- await websocket.send(f"Echo: {message}")
- except websockets.exceptions.ConnectionClosed:
- print(f"Client {client_id} disconnected")
- finally:
- # 清理客户端信息
- del clients[client_id]
-
- start_server = websockets.serve(echo, "localhost", 8765)
-
- asyncio.get_event_loop().run_until_complete(start_server)
-
- asyncio.get_event_loop().run_forever()
客户端将连接到服务器,接收和发送消息,并响应心跳消息。
- import asyncio
- import websockets
- import time
-
- async def client():
- uri = "ws://localhost:8765"
- async with websockets.connect(uri) as websocket:
- while True:
- try:
- message = input("Enter message to send (or 'exit' to quit): ")
- if message == 'exit':
- break
- await websocket.send(message)
- response = await websocket.recv()
- print(f"Received: {response}")
-
- # 发送pong消息以响应心跳消息
- if response == "heartbeat":
- await websocket.send("pong")
-
- # 模拟客户端工作,防止心跳超时
- await asyncio.sleep(5)
- except websockets.exceptions.ConnectionClosed:
- print("Connection closed by server.")
- break
-
-
- asyncio.get_event_loop().run_until_complete(client())
首先运行服务器端代码。
然后运行客户端代码,并在提示时输入消息。
观察服务器和客户端的输出,确保它们能够正常通讯,并且心跳控制按预期工作。
注意:心跳检测的实现是基于一个简单的字典和内存地址作为客户端ID。在实际应用中,你可能需要使用更复杂的机制来跟踪客户端,如使用数据库或分布式缓存。此外,为了简化示例,心跳消息只是简单地发送字符串"heartbeat",并且客户端只是通过发送"pong"来响应。在真实场景中,你可能需要实现更复杂的逻辑来处理心跳。
关注公众号了解更多内容
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。