赞
踩
最近要实现一个在线聊天功能,基于fastapi的websocket实现了这个功能。下面介绍一下遇到的技术问题
在线上环境部署时,一般是多进程的方式进行部署启动fastapi服务,而每个启动的进程都有自己的独立存储空间。导致存储的连接对象分布在不同的进程中,当进行通信时,可能无法找到已连接的连接对象。
使用使用redis的订阅发布机制,使所有的进程都能进行消息订阅。这样能保证每个进程收到消息后都会进行相关的信息处理了。
①在服务启动时,进行消息订阅,并一直监听消息通道。当有消息发布时,进行消息处理。
# 初始化app app = FastAPI(title="Ws Chat", description="测试", version="1.0.0") app.openapi_version = "3.0.0" app.include_router(chat.app, prefix='/api/chat', tags=['Chat']) @app.on_event('startup') async def on_startup(): print(f"订阅初始化:{os.getpid()}") # 执行消息订阅机制https://aioredis.readthedocs.io/en/latest/examples/ loop = asyncio.get_event_loop() loop.create_task(register_pubsub()) async def reader(channel): # 进行消息的消费 async for msg in channel.listen(): # 监听通道 # print(msg) msg_data = msg.get("data") if msg_data and isinstance(msg_data, str): msg_data_dict = json.loads(msg_data) print(f"chat:{msg_data_dict}") sender = msg_data_dict.get("sender") # 进行消息处理 await chat.cm.handle_websocket_message(msg_data_dict, sender) async def register_pubsub(): pool = aioredis.from_url( "redis://{}".format(host), db=db, password=password, port=port, encoding="utf-8", decode_responses=True ) psub = pool.pubsub() async with psub as p: # 消息订阅 await p.subscribe("chat") await reader(p) await p.unsubscribe("chat")
②websocket处理类
from fastapi import WebSocket, WebSocketDisconnect class ConnectionManager: def __init__(self): # 保存当前所有的链接的websocket对象 self.websocket_connections = {} async def connect(self, websocket: WebSocket, client_id): # 添加连接并发送欢迎消息 await websocket.accept() self.websocket_connections[client_id] = websocket await websocket.send_json({"type": "system", "msg": "Welcome to the chat app!", "sender": "system", "recipient": client_id}) try: # 处理消息 while True: # 获取信息 message = await websocket.receive_json() # 处理发送信息 await self.handle_websocket_message(message, client_id) except WebSocketDisconnect: # 连接断开时移除连接 del self.websocket_connections[client_id] async def handle_websocket_message(self, message: dict, client_id): # 处理私聊消息 if message.get("type") == "private_message": recipient = message.get("recipient") msg = message.get("msg") recipient_conn = self.websocket_connections.get(recipient) if recipient_conn: # 在线 await recipient_conn.send_json({"type": "private_message", "sender": client_id, "msg": msg, "recipient": recipient}) async def broadcast(self, message: dict): # 循环变量给所有在线激活的链接发送消息-全局广播 for connection in self.websocket_connections: await connection.send_text(message) async def close(self, websocket: WebSocket, client_id): # 断开客户端的链接 await websocket.close() del self.websocket_connections[client_id] async def disconnect(self, user_id): websocket: WebSocket = self.websocket_connections[user_id] await websocket.close() del self.websocket_connections[user_id]
from app.chat_manager.server import ConnectionManager
cm = ConnectionManager()
@app.websocket("/connect_chat")
async def connect_chat(websocket: WebSocket, user_code: str):
try:
await cm.connect(websocket, user_code)
except WebSocketDisconnect:
# 连接断开时移除连接
del cm.websocket_connections[user_code]
④http请求进行消息发布
@app.post("/create_chat", summary="发起聊天")
async def create_chat(param: DiagnosisChatSch, r=Depends(get_redis)):
"""
"""
ws_param = {"type": "private_message",
"msg": param.msg,
"sender": param.sender,
"recipient": param.recipient}
# 进行消息发布
await r.publish('diagnosis_chat', json.dumps(ws_param))
return {'code': 200, 'msg': '成功', 'data': ''}
github源码地址:https://github.com/zhangyukuo/fastapi_ws_chat
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。