当前位置:   article > 正文

fastapi 多次传输数据 yield socket_fastapi 一个接口持续发送消息 应如何实现

fastapi 一个接口持续发送消息 应如何实现

使用yield

服务器:

# -*- coding: utf-8 -*-
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uvicorn as uvicorn
from starlette.responses import StreamingResponse
app = FastAPI()


def generate_batches():
    for i in range(5):
        yield "ans" + str(i)


@app.get("/test")
async def get_data():
    async def stream_data():
        for batch in generate_batches():
            yield batch.encode('utf-8')

    return StreamingResponse(stream_data(), media_type='text/plain')


# 添加跨域中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 在这里设置允许的域名列表,如果是 * 则是允许所有域名
    allow_methods=["*"],  # 允许的 HTTP 请求方法
    allow_headers=["*"],  # 允许的 HTTP 请求头
)

if __name__ == '__main__':
    uvicorn.run(app=app, host="0.0.0.0", port=8087, workers=1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

客户端:

fetchData() {
      console.log("click")
      const eventSource = new EventSource('/test');
      eventSource.onmessage = function(event) {
        const data = event.data;
        console.log(data);
      };
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

失败,无法获得数据。

使用WebSocket

无参数测试

服务器

from fastapi import FastAPI
from fastapi.websockets import WebSocket
from fastapi.middleware.cors import CORSMiddleware
import uvicorn as uvicorn

app = FastAPI()


async def generate_batches(websocket: WebSocket):
    for i in range(5):
        await websocket.send_json({"data": "ans" + str(i)})


@app.websocket("/test")
async def get_data(websocket: WebSocket):
    await websocket.accept()
    await generate_batches(websocket)


# 添加跨域中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 在这里设置允许的域名列表,如果是 * 则是允许所有域名
    allow_methods=["*"],  # 允许的 HTTP 请求方法
    allow_headers=["*"],  # 允许的 HTTP 请求头
)

if __name__ == '__main__':
    uvicorn.run(app=app, host="0.0.0.0", port=8087, workers=1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

客户端

fetchData() {
      console.log("click")
      const socket = new WebSocket(`ws://localhost:8087/test`);

      socket.onmessage = function(event) {
        const data = JSON.parse(event.data);
        console.log(data.data);
      };

      socket.onopen = function() {
        console.log('WebSocket连接已建立');
      };

      socket.onclose = function() {
        console.log('WebSocket连接已关闭');
      };
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

结果是:

WebSocket连接已建立
ans0
ans1
ans2
ans3
ans4
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

带参数测试

服务器

print(f"url: {websocket.app['url']}")
print(f"mode: {websocket.app['mode']}")
  • 1
  • 2

报错:

TypeError: 'FastAPI' object is not subscriptable
  • 1

于是修改代码:
服务器

from fastapi import FastAPI
from fastapi.websockets import WebSocket
from fastapi.middleware.cors import CORSMiddleware
import uvicorn as uvicorn
from urllib.parse import unquote
app = FastAPI()


async def generate_batches(websocket: WebSocket):
    for i in range(5):
        await websocket.send_json({"data": "ans" + str(i)})


@app.websocket("/test")
async def get_data(websocket: WebSocket):
    await websocket.accept()
    url = unquote(websocket.scope["query_string"].decode().split("url=")[1].split("&")[0])
    mode = websocket.scope["query_string"].decode().split("mode=")[1]
    print(f"url: {url}")
    print(f"mode: {mode}")
    await generate_batches(websocket)


# 添加跨域中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 在这里设置允许的域名列表,如果是 * 则是允许所有域名
    allow_methods=["*"],  # 允许的 HTTP 请求方法
    allow_headers=["*"],  # 允许的 HTTP 请求头
)

if __name__ == '__main__':
    uvicorn.run(app=app, host="0.0.0.0", port=8087, workers=1)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

客户端

const socket = new WebSocket(`ws://localhost:8087/test?url=${encodeURIComponent(this.url)}&mode=${1}`);
  • 1

随时结束连接

export default {
  name: "my-test",
  data()
  {
    return {
      url: 'https://akavjht.top/image/0027/left.mp4',
      imageData: '',
      types: '',
      socket: null
    }
  },
  methods: {
    fetchData()
    {
      console.log("click")
      this.socket = new WebSocket(`ws://localhost:8087/test`);

      this.socket.onmessage = function(event) {
        const data = JSON.parse(event.data);
        console.log(data.data);
      };

      this.socket.onopen = function() {
        console.log('WebSocket连接已建立');
      };

      this.socket.onclose = function() {
        console.log('WebSocket连接已关闭');
      };
    },
    over()
    {
      this.socket.close();
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

出现一个问题,就是加入服务器发出100次请求。
前端会打印出第一个请求,然后在服务器发出结束信号的时候,一口气打印后面99次请求。不
查无原因。真愁人。


上面是vue进行测试,下面用airpost里面的websocket进行测试。传递参数使用text型。

from fastapi import FastAPI
from fastapi.websockets import WebSocket
from fastapi.middleware.cors import CORSMiddleware
import uvicorn as uvicorn

from Code.p1_Main_Predict2 import predict

app = FastAPI()


@app.websocket("/test")
async def get_data(websocket: WebSocket):
    await websocket.accept()
    await predict(websocket)


# 添加跨域中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 在这里设置允许的域名列表,如果是 * 则是允许所有域名
    allow_methods=["*"],  # 允许的 HTTP 请求方法
    allow_headers=["*"],  # 允许的 HTTP 请求头
)

if __name__ == '__main__':
    uvicorn.run(app=app, host="0.0.0.0", port=8087, workers=1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
async def predict(websocket):
    url = await websocket.receive_text()
	……
  • 1
  • 2
  • 3

离谱是真的离谱,接收websocket的字符串竟然是包含双引号的。所以导致后面通过url读取的信息一直失败。

url = str(await websocket.receive_text())[1:-1]
  • 1
  1. 先转为str,然后去掉第一个字符和最后一个字符
  2. str要将await包括在内。

问题解决

问题剖析

问题本质在于:所有的await语句会在最后一个await语句执行时才会执行,而不是一条一条执行。
原因:在异步编程中,使用await关键字可以将某个操作标记为异步操作,并挂起当前的协程(Coroutine),等待操作完成后再继续执行下面的代码。在给定的代码中,当遇到await关键字时,协程会挂起当前的执行,并等待对应的操作完成。只有在先前的await操作完成后,后续的await操作才会执行。

解决方法

在await语句下,添加await asyncio.sleep(0)

import cv2
import numpy as np
from PIL import Image
from Code.p2_yolo import YOLO
from Code.p4_draw import D
from Code.p0_data_param import DataParameter

import asyncio
import websockets
dataParameter = DataParameter()
a0_yolo = YOLO(dataParameter)
drawing = D(dataParameter)

async def send_message(websocket, message):
    await websocket.send(message)

async def process_frame(frame, websocket):
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame = Image.fromarray(np.uint8(frame))
    ship_boxes, ship_conf, ship_label, ship_type = a0_yolo.detect_image(frame)
    await send_message(websocket, str(ship_boxes) + "|" + str(ship_type))

async def handle_client(websocket, path):
    try:
        while True:
            url = str(await websocket.recv())
            capture = cv2.VideoCapture(url)
            while True:
                ref, frame = capture.read()
                if not ref:
                    print("检测画面失败")
                    await send_message(websocket, "摄像头检测画面失败")
                    break
                print(i)
                asyncio.create_task(process_frame(frame, websocket))
                await asyncio.sleep(0)  # 使用asyncio.sleep()进行延迟
    except websockets.exceptions.ConnectionClosedOK:
        print("客户端已断开连接")

start_server = websockets.serve(handle_client, "localhost", 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
增加请求

现在的问题,要让请求随时能打断检测过程。即:

while True:
	ref, frame = capture.read()
	if not ref:
		print("检测画面失败")
		await send_message(websocket, "摄像头检测画面失败")
		break
	print(i)
	asyncio.create_task(process_frame(frame, websocket))
	await asyncio.sleep(1)  # 使用asyncio.sleep()进行延迟
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

每次循环添加一个检测请求的过程,并且该过程是检测是否有请求,而不是直接接受请求,否则会导致。一次循环就停止。

async def receive_message(websocket):
    try:
        url = await asyncio.wait_for(websocket.recv(), timeout=0.1)  # 设置超时时间为5秒
        return str(url)
    except asyncio.TimeoutError:
        return None
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

给0.1秒的时间检测是否有请求,如果没有就继续,如果有就接受该请求。

完整程序

服务端:

import cv2
import numpy as np
from PIL import Image
from Code.p2_yolo import YOLO
from Code.p4_draw import D
from Code.p0_data_param import DataParameter

dataParameter = DataParameter()
a0_yolo = YOLO(dataParameter)
drawing = D(dataParameter)

import asyncio
import websockets


async def send_message(websocket, message):
    await websocket.send(message)


async def receive_message(websocket):
    try:
        url = await asyncio.wait_for(websocket.recv(), timeout=0.1)  # 设置超时时间为5秒
        return str(url)
    except asyncio.TimeoutError:
        return None


async def handle_client(websocket, path):
    try:
        capture = None  # 用于存储视频捕获对象
        url = await websocket.recv()
        await asyncio.sleep(0)

        while True:
            print("url:", url)

            # 停止旧的请求
            if capture is not None:
                capture.release()

            # 创建新的视频捕获对象
            capture = cv2.VideoCapture(url)

            while True:
                url = await receive_message(websocket)
                if url:
                    break

                # 读取视频帧
                ref, frame = capture.read()
                if not ref:
                    print("检测画面失败")
                    await send_message(websocket, "检测画面失败 或 视频流已结束")
                    url = await websocket.recv()
                    break
                width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
                height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frame = Image.fromarray(np.uint8(frame))
                ship_boxes, ship_conf, ship_label, ship_type = a0_yolo.detect_image(frame)
                print(ship_boxes)
                ship_boxes = [[i[0] / width, i[1] / height, i[2] / width, i[3] / height] for i in ship_boxes]
                await send_message(websocket, str(ship_boxes) + "|" + str(ship_type))
                await asyncio.sleep(0)
    except websockets.exceptions.ConnectionClosedOK:
        print("客户端已断开连接")
    finally:
        if capture is not None:
            capture.release()


start_server = websockets.serve(handle_client, "localhost", 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

客户端:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>WebSocket Demo</title>
</head>
<body>
<input type="text" id="urlInput" placeholder="输入视频流url">
<button onclick="sendMessage()">发送</button>

<script>
    const socket = new WebSocket('ws://localhost:8000');

    socket.onopen = function() {
        console.log('连接已建立');
    };

    socket.onmessage = function(event) {
        console.log('接收到消息:', event.data);
    }

    function sendMessage() {
        const urlInput = document.getElementById('urlInput');
        socket.send(urlInput.value);
    }
</script>
</body>
</html>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/863455
推荐阅读
相关标签
  

闽ICP备14008679号