当前位置:   article > 正文

Python实现WebSocket通讯与心跳控制详解_python websocket 心跳

python websocket 心跳

        为了使用Python实现WebSocket通讯和心跳控制,我们通常需要一个WebSocket客户端库和一个服务器端库。这里,我们将使用websockets库作为服务器和客户端的示例。
安装必要的库

        首先,你需要安装websockets库。可以使用pip进行安装:

pip install websockets

1.服务器端代码

        服务器端将处理WebSocket连接,发送和接收消息,并管理心跳检测。

  1. import asyncio  
  2. import websockets  
  3. import time  
  4.   
  5. HEARTBEAT_INTERVAL = 10  # 心跳间隔(秒)  
  6. HEARTBEAT_TIMEOUT = 30  # 心跳超时(秒)  
  7.   
  8. # 心跳检测字典,存储客户端和它们最后活动时间  
  9. clients = {}  
  10.   
  11. async def heartbeat_task(ws, client_id):  
  12.     while True:  
  13.         try:  
  14.             # 如果客户端在HEARTBEAT_TIMEOUT内没有发送消息,则关闭连接  
  15.             if time.time() - clients[client_id]['last_active'] > HEARTBEAT_TIMEOUT:  
  16.                 print(f"Client {client_id} timed out, closing connection.")  
  17.                 await ws.close()  
  18.                 break  
  19.             # 发送心跳消息  
  20.             await ws.send("heartbeat")  
  21.             print(f"Sent heartbeat to client {client_id}")  
  22.             await asyncio.sleep(HEARTBEAT_INTERVAL)  
  23.         except Exception as e:  
  24.             print(f"Heartbeat task for client {client_id} failed: {e}")  
  25.             break  
  26.   
  27. async def echo(websocket, path):  
  28.     client_id = id(websocket)  # 使用内存地址作为简单的客户端ID  
  29.     clients[client_id] = {'ws': websocket, 'last_active': time.time()}  
  30.   
  31.     # 启动心跳检测任务  
  32.     asyncio.create_task(heartbeat_task(websocket, client_id))  
  33.   
  34.     try:  
  35.         async for message in websocket:  
  36.             clients[client_id]['last_active'] = time.time()  # 更新最后活动时间  
  37.             if message == "ping":  
  38.                 print(f"Received ping from client {client_id}")  
  39.                 await websocket.send("pong")  
  40.             else:  
  41.                 print(f"Received '{message}' from client {client_id}")  
  42.                 await websocket.send(f"Echo: {message}")  
  43.     except websockets.exceptions.ConnectionClosed:  
  44.         print(f"Client {client_id} disconnected")  
  45.     finally:  
  46.         # 清理客户端信息  
  47.         del clients[client_id]  
  48.   
  49. start_server = websockets.serve(echo, "localhost", 8765)  
  50.   
  51. asyncio.get_event_loop().run_until_complete(start_server)  
  52. asyncio.get_event_loop().run_forever()

2.客户端代码

        客户端将连接到服务器,接收和发送消息,并响应心跳消息。

  1. import asyncio  
  2. import websockets  
  3. import time  
  4.   
  5. async def client():  
  6.     uri = "ws://localhost:8765"  
  7.     async with websockets.connect(uri) as websocket:  
  8.         while True:  
  9.             try:  
  10.                 message = input("Enter message to send (or 'exit' to quit): ")  
  11.                 if message == 'exit':  
  12.                     break  
  13.                 await websocket.send(message)  
  14.                 response = await websocket.recv()  
  15.                 print(f"Received: {response}")  
  16.                   
  17.                 # 发送pong消息以响应心跳消息  
  18.                 if response == "heartbeat":  
  19.                     await websocket.send("pong")  
  20.   
  21.                 # 模拟客户端工作,防止心跳超时  
  22.                 await asyncio.sleep(5)  
  23.             except websockets.exceptions.ConnectionClosed:  
  24.                 print("Connection closed by server.")  
  25.                 break  
  26.   
  27. asyncio.get_event_loop().run_until_complete(client())

3.运行和测试

        首先运行服务器端代码。
        然后运行客户端代码,并在提示时输入消息。
        观察服务器和客户端的输出,确保它们能够正常通讯,并且心跳控制按预期工作。

        注意:心跳检测的实现是基于一个简单的字典和内存地址作为客户端ID。在实际应用中,你可能需要使用更复杂的机制来跟踪客户端,如使用数据库或分布式缓存。此外,为了简化示例,心跳消息只是简单地发送字符串"heartbeat",并且客户端只是通过发送"pong"来响应。在真实场景中,你可能需要实现更复杂的逻辑来处理心跳。

关注公众号了解更多内容

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/887709
推荐阅读
相关标签
  

闽ICP备14008679号