赞
踩
主要实现了部分房间进行聊天的功能以及在房间进行某种举动之后的操作
注:不是很完善后续将进行补充
import json, os from select import select from typing import Dict from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect from sqlalchemy.orm import Session from starlette.requests import Request from http import HTTPStatus from fastapi.responses import HTMLResponse from models.users import Room from settings.settings import get_db from crud.users import get_users, get_user, get_mobile_user, create_user, delete_user from schemas import users as schemas_user # from schemas.users import * from utils.jwt import * from utils.redis_utils import redis_get_msg, redis_set_msg, redis from config.databases import REDIS_DATABASES_URL ## html在页面上做显示的用途 html = """ <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>websocket</title> </head> <body> <h1>User1 Chat</h1> <form action="" οnsubmit="sendMessage(event)"> <input type="text" id="messageText" autocomplete="off"/> <button>Send</button> </form> <ul id='messages'> </ul> <script> var ws = new WebSocket("ws://127.0.0.1:8888/v1/test/"); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html> """ ## 注册子路由 router = APIRouter() ## 定义聊天类 class ConnectionManager: ## 初始化定义记录全部的链接 def __init__(self): self.active_connections: List[WebSocket] = [] self.sessions = dict() ## async def connect(self, ws: WebSocket): await ws.accept() self.active_connections.append(ws) def disconnect(self, ws: WebSocket): self.active_connections.remove(ws) ## 关闭链接时的动作 async def on_close(self, websocket: WebSocket, close_code: int): await websocket.close(close_code) self.active_connections.remove(websocket) @staticmethod async def send_personal_message(message: str, ws: WebSocket): await ws.send_text(message) ## 点对点的操作 @staticmethod async def send_user_message(message: str, ws: WebSocket): await ws.send_text(message) ## 广播信息的操作 async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) ## 解析关键词 async def filter_message(self, message:str): json_object = None
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。