赞
踩
服务器:
# -*- coding: utf-8 -*- from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware import uvicorn as uvicorn from starlette.responses import StreamingResponse app = FastAPI() def generate_batches(): for i in range(5): yield "ans" + str(i) @app.get("/test") async def get_data(): async def stream_data(): for batch in generate_batches(): yield batch.encode('utf-8') return StreamingResponse(stream_data(), media_type='text/plain') # 添加跨域中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 在这里设置允许的域名列表,如果是 * 则是允许所有域名 allow_methods=["*"], # 允许的 HTTP 请求方法 allow_headers=["*"], # 允许的 HTTP 请求头 ) if __name__ == '__main__': uvicorn.run(app=app, host="0.0.0.0", port=8087, workers=1)
客户端:
fetchData() {
console.log("click")
const eventSource = new EventSource('/test');
eventSource.onmessage = function(event) {
const data = event.data;
console.log(data);
};
}
失败,无法获得数据。
服务器
from fastapi import FastAPI from fastapi.websockets import WebSocket from fastapi.middleware.cors import CORSMiddleware import uvicorn as uvicorn app = FastAPI() async def generate_batches(websocket: WebSocket): for i in range(5): await websocket.send_json({"data": "ans" + str(i)}) @app.websocket("/test") async def get_data(websocket: WebSocket): await websocket.accept() await generate_batches(websocket) # 添加跨域中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 在这里设置允许的域名列表,如果是 * 则是允许所有域名 allow_methods=["*"], # 允许的 HTTP 请求方法 allow_headers=["*"], # 允许的 HTTP 请求头 ) if __name__ == '__main__': uvicorn.run(app=app, host="0.0.0.0", port=8087, workers=1)
客户端
fetchData() { console.log("click") const socket = new WebSocket(`ws://localhost:8087/test`); socket.onmessage = function(event) { const data = JSON.parse(event.data); console.log(data.data); }; socket.onopen = function() { console.log('WebSocket连接已建立'); }; socket.onclose = function() { console.log('WebSocket连接已关闭'); }; }
结果是:
WebSocket连接已建立
ans0
ans1
ans2
ans3
ans4
服务器
print(f"url: {websocket.app['url']}")
print(f"mode: {websocket.app['mode']}")
报错:
TypeError: 'FastAPI' object is not subscriptable
于是修改代码:
服务器
from fastapi import FastAPI from fastapi.websockets import WebSocket from fastapi.middleware.cors import CORSMiddleware import uvicorn as uvicorn from urllib.parse import unquote app = FastAPI() async def generate_batches(websocket: WebSocket): for i in range(5): await websocket.send_json({"data": "ans" + str(i)}) @app.websocket("/test") async def get_data(websocket: WebSocket): await websocket.accept() url = unquote(websocket.scope["query_string"].decode().split("url=")[1].split("&")[0]) mode = websocket.scope["query_string"].decode().split("mode=")[1] print(f"url: {url}") print(f"mode: {mode}") await generate_batches(websocket) # 添加跨域中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 在这里设置允许的域名列表,如果是 * 则是允许所有域名 allow_methods=["*"], # 允许的 HTTP 请求方法 allow_headers=["*"], # 允许的 HTTP 请求头 ) if __name__ == '__main__': uvicorn.run(app=app, host="0.0.0.0", port=8087, workers=1)
客户端
const socket = new WebSocket(`ws://localhost:8087/test?url=${encodeURIComponent(this.url)}&mode=${1}`);
export default { name: "my-test", data() { return { url: 'https://akavjht.top/image/0027/left.mp4', imageData: '', types: '', socket: null } }, methods: { fetchData() { console.log("click") this.socket = new WebSocket(`ws://localhost:8087/test`); this.socket.onmessage = function(event) { const data = JSON.parse(event.data); console.log(data.data); }; this.socket.onopen = function() { console.log('WebSocket连接已建立'); }; this.socket.onclose = function() { console.log('WebSocket连接已关闭'); }; }, over() { this.socket.close(); } } }
出现一个问题,就是加入服务器发出100次请求。
前端会打印出第一个请求,然后在服务器发出结束信号的时候,一口气打印后面99次请求。不
查无原因。真愁人。
上面是vue进行测试,下面用airpost里面的websocket进行测试。传递参数使用text型。
from fastapi import FastAPI from fastapi.websockets import WebSocket from fastapi.middleware.cors import CORSMiddleware import uvicorn as uvicorn from Code.p1_Main_Predict2 import predict app = FastAPI() @app.websocket("/test") async def get_data(websocket: WebSocket): await websocket.accept() await predict(websocket) # 添加跨域中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 在这里设置允许的域名列表,如果是 * 则是允许所有域名 allow_methods=["*"], # 允许的 HTTP 请求方法 allow_headers=["*"], # 允许的 HTTP 请求头 ) if __name__ == '__main__': uvicorn.run(app=app, host="0.0.0.0", port=8087, workers=1)
async def predict(websocket):
url = await websocket.receive_text()
……
离谱是真的离谱,接收websocket的字符串竟然是包含双引号的。所以导致后面通过url读取的信息一直失败。
url = str(await websocket.receive_text())[1:-1]
问题本质在于:所有的await
语句会在最后一个await
语句执行时才会执行,而不是一条一条执行。
原因:在异步编程中,使用await
关键字可以将某个操作标记为异步操作,并挂起当前的协程(Coroutine),等待操作完成后再继续执行下面的代码。在给定的代码中,当遇到await关键字时,协程会挂起当前的执行,并等待对应的操作完成。只有在先前的await
操作完成后,后续的await操作才会执行。
在await语句下,添加await asyncio.sleep(0)
import cv2 import numpy as np from PIL import Image from Code.p2_yolo import YOLO from Code.p4_draw import D from Code.p0_data_param import DataParameter import asyncio import websockets dataParameter = DataParameter() a0_yolo = YOLO(dataParameter) drawing = D(dataParameter) async def send_message(websocket, message): await websocket.send(message) async def process_frame(frame, websocket): frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame = Image.fromarray(np.uint8(frame)) ship_boxes, ship_conf, ship_label, ship_type = a0_yolo.detect_image(frame) await send_message(websocket, str(ship_boxes) + "|" + str(ship_type)) async def handle_client(websocket, path): try: while True: url = str(await websocket.recv()) capture = cv2.VideoCapture(url) while True: ref, frame = capture.read() if not ref: print("检测画面失败") await send_message(websocket, "摄像头检测画面失败") break print(i) asyncio.create_task(process_frame(frame, websocket)) await asyncio.sleep(0) # 使用asyncio.sleep()进行延迟 except websockets.exceptions.ConnectionClosedOK: print("客户端已断开连接") start_server = websockets.serve(handle_client, "localhost", 8000) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
现在的问题,要让请求随时能打断检测过程。即:
while True:
ref, frame = capture.read()
if not ref:
print("检测画面失败")
await send_message(websocket, "摄像头检测画面失败")
break
print(i)
asyncio.create_task(process_frame(frame, websocket))
await asyncio.sleep(1) # 使用asyncio.sleep()进行延迟
每次循环添加一个检测请求的过程,并且该过程是检测是否有请求,而不是直接接受请求,否则会导致。一次循环就停止。
async def receive_message(websocket):
try:
url = await asyncio.wait_for(websocket.recv(), timeout=0.1) # 设置超时时间为5秒
return str(url)
except asyncio.TimeoutError:
return None
给0.1秒的时间检测是否有请求,如果没有就继续,如果有就接受该请求。
服务端:
import cv2 import numpy as np from PIL import Image from Code.p2_yolo import YOLO from Code.p4_draw import D from Code.p0_data_param import DataParameter dataParameter = DataParameter() a0_yolo = YOLO(dataParameter) drawing = D(dataParameter) import asyncio import websockets async def send_message(websocket, message): await websocket.send(message) async def receive_message(websocket): try: url = await asyncio.wait_for(websocket.recv(), timeout=0.1) # 设置超时时间为5秒 return str(url) except asyncio.TimeoutError: return None async def handle_client(websocket, path): try: capture = None # 用于存储视频捕获对象 url = await websocket.recv() await asyncio.sleep(0) while True: print("url:", url) # 停止旧的请求 if capture is not None: capture.release() # 创建新的视频捕获对象 capture = cv2.VideoCapture(url) while True: url = await receive_message(websocket) if url: break # 读取视频帧 ref, frame = capture.read() if not ref: print("检测画面失败") await send_message(websocket, "检测画面失败 或 视频流已结束") url = await websocket.recv() break width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame = Image.fromarray(np.uint8(frame)) ship_boxes, ship_conf, ship_label, ship_type = a0_yolo.detect_image(frame) print(ship_boxes) ship_boxes = [[i[0] / width, i[1] / height, i[2] / width, i[3] / height] for i in ship_boxes] await send_message(websocket, str(ship_boxes) + "|" + str(ship_type)) await asyncio.sleep(0) except websockets.exceptions.ConnectionClosedOK: print("客户端已断开连接") finally: if capture is not None: capture.release() start_server = websockets.serve(handle_client, "localhost", 8000) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
客户端:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>WebSocket Demo</title> </head> <body> <input type="text" id="urlInput" placeholder="输入视频流url"> <button onclick="sendMessage()">发送</button> <script> const socket = new WebSocket('ws://localhost:8000'); socket.onopen = function() { console.log('连接已建立'); }; socket.onmessage = function(event) { console.log('接收到消息:', event.data); } function sendMessage() { const urlInput = document.getElementById('urlInput'); socket.send(urlInput.value); } </script> </body> </html>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。