赞
踩
pip install fastapi[all] websocket-client -i https://pypi.doubanio.com/simple
from typing import List from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse app = FastAPI() html = """ <!DOCTYPE html> <html> <head> <title>Chat</title> </head> <body> <h1>WebSocket Chat</h1> <h2>Your ID: <span id="ws-id"></span></h2> <form action="" οnsubmit="sendMessage(event)"> <input type="text" id="messageText" autocomplete="off"/> <button>Send</button> </form> <ul id='messages'> </ul> <script> var client_id = Date.now() document.querySelector("#ws-id").textContent = client_id; var ws = new WebSocket(`ws://192.168.144.81:8000/ws/${client_id}`); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html> """ class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def send_personal_message(self, message: str, websocket: WebSocket): await websocket.send_text(message) async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() @app.get("/") async def get(): return HTMLResponse(html) @app.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: int): await manager.connect(websocket) try: while True: data = await websocket.receive_text() await manager.send_personal_message(f"You wrote: {data}", websocket) await manager.broadcast(f"Client #{client_id} says: {data}") except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(f"Client #{client_id} left the chat") if __name__ == "__main__": import uvicorn uvicorn.run( app='main:app', host="0.0.0.0", log_level="debug", port=8000, loop="asyncio", # workers=4, )
from fastapi import FastAPI,BackgroundTasks from starlette.concurrency import run_in_threadpool def create_app(): app = FastAPI() return app app = create_app() @app.on_event("startup") async def init_scheduler(): from task import run run() if __name__ =='__main__': import uvicorn uvicorn.run( app='client:app', host="0.0.0.0", log_level="debug", port=8888, loop="asyncio", # workers=4, )
import websocket import threading, time def on_close(ws): ''' - websocket client 重试机制 ''' print ("Retry : %s" % time.ctime()) time.sleep(2) connect_websocket() # 每2s 重试一次 def on_open(ws): ''' - 当系统连接上后的提示 ''' print('connection established') def on_message(wsapp, message): ''' - 接收服务器websocket 发送来的消息 ''' print(message) def connect_websocket(): ''' - 尝试连接websocket 服务器 - 并给该线程设置守护 ''' ws = websocket.WebSocketApp( "ws://192.168.144.81:8000/ws/2323", on_open = on_open, on_close = on_close, cookie="chocolate", on_message=on_message ) wst = threading.Thread(target=ws.run_forever) wst.daemon = True wst.start() def run(): ''' - websocket 组件会跟随fastapi 主进程启动 - 客户端系统会每隔2s 尝试一次连接 ''' try: connect_websocket() except Exception as err: print(err) print("connect failed")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。