当前位置:   article > 正文

基于FastAPI实现视频流传输包含推流拉流(极简版)_fastapi 信号

fastapi 信号

FastAPI简易视频流传输

运行效果如下:
在这里插入图片描述

服务端

在服务器/本地运行以下代码:

运行后访问服务IP地址:8000可直接查看视频流.

import uvicorn
from fastapi import FastAPI, Request, WebSocket
from fastapi.responses import HTMLResponse, StreamingResponse
import cv2
import numpy as np

app = FastAPI()

# 用于存储客户端WebSocket连接的列表
client_websockets = []

# 客户端WebSocket连接的路由
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    client_websockets.append(websocket)
    try:
        while True:
            # 接收并忽略来自客户端的所有消息
            await websocket.receive()
    finally:
        # 从列表中删除已断开的WebSocket连接
        client_websockets.remove(websocket)

# 用于接收客户端上传的视频流并推送到所有WebSocket连接
@app.post("/upload")
async def upload_video(request: Request):
    # 从请求正文中读取视频流
    content = await request.body()
    nparr = np.frombuffer(content, np.uint8)
    frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

    # 将视频帧转换为JPEG格式
    ret, buffer = cv2.imencode('.jpg', frame)
    jpg_as_text = buffer.tobytes()

    # 推送视频帧到所有WebSocket连接
    for websocket in client_websockets:
        await websocket.send_bytes(jpg_as_text)

    # 返回成功响应
    return {"status": "ok"}

# 用于提供前端查看视频的HTML页面
@app.get("/")
async def video_viewer():
    return HTMLResponse("""
        <html>
            <head>
                <title>Video Viewer</title>
            </head>
            <body>
                <h1>Real-time Video Viewer</h1>
                <img id="video_frame" src="#" alt="Video Feed" width="640" height="480"/>
                <script>
                    var video_frame = document.getElementById("video_frame");
                    var ws = new WebSocket("ws://" + window.location.host + "/ws");
                    ws.binaryType = "arraybuffer";
                    ws.onmessage = function(event) {
                        var arrayBuffer = event.data;
                        var blob = new Blob([arrayBuffer], {type: "image/jpeg"});
                        var url = URL.createObjectURL(blob);
                        video_frame.src = url;
                    };
                </script>
            </body>
        </html>
    """)

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

客户端

要在服务器开启之后运行, 记得改ip.

import requests
import cv2
import time
from requests import ReadTimeout, ConnectTimeout
from threading import Thread
# 使用场景多数是要根据其它信息控制是否开启摄像头, 又因为要在大循环里运行, 因此继承线程类方便管理
class VideoThread(Thread):
    def run(self) -> None:
        self.cap = cv2.VideoCapture(0)  # 从摄像头读取视频
        self.should_stop = False  # 标志位用于设置线程退出
        while not self.should_stop:
            ret, frame = self.cap.read()
            if not ret:
                break
            # 在客户端显示视频流
            cv2.imshow('Video Feed', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            _, img_encoded = cv2.imencode(".jpg", frame)  # 将帧转换为JPEG格式
            try:
                # 发送到服务器, 设置超时时间防止阻塞, 如果服务器性能一般般尽量不要发太快
                response = requests.post("http://你的服务端的ip:8000/upload", data=img_encoded.tostring(),
                                         timeout=0.05)  # 发送图像数据
                if response.status_code != 200:
                    print("接口名字是不是写错了")
            except ReadTimeout:
                # 因为设置了超时很容易网络原因超时, 丢几帧画面是可接受的
                pass
            except ConnectTimeout:
                print("服务端是不是还没开起来?")
                time.sleep(2)
        self.cap.release()
        cv2.destroyAllWindows()
    def quit(self):
        try:
            self.cap.release()
            cv2.destroyAllWindows()
        except Exception:
            # 万一出了什么异常情况也清理掉占用的资源
            pass
    def stop(self):
        self.should_stop = True

a=VideoThread()  #开启传视频
a.start()
time.sleep(60)
a.stop()

a=VideoThread()
a.start()
time.sleep(30)
a.stop()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

总结

这是一个非常简单但是非常丑陋的方式来展示视频流, 适合自己DIY个摄像头啥的, 只能单个查看, 单个上传, 传的太快服务器反应不过来会崩, 如何客户端接着就崩了.

好处就在于非常非常的简单,而且非常非常的方便, 只要把代码挂到服务器上就有这个功能了. 客户端在主线程里如果收到什么信号就可以用a=VideoThread()然后 a.start(),想要关闭了就a.stop().

这个代码也可以用堆屎山的方式加权限认证之类的来保证一个时间只有一个人在看, 然后发的话也可以加权限认证保证只有一个发, 甚至能再堆一些屎山搞成多个同时发多个同时收.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/280050?site
推荐阅读
相关标签
  

闽ICP备14008679号