赞
踩
1. 安装依赖
pip install fastapi fastapi-cdn-host uvicorn opencv-python loguru
2. 主要代码
- #!/usr/bin/env python
- import os
- import subprocess
- import sys
- from contextlib import closing
- from datetime import datetime
- from functools import lru_cache
- from pathlib import Path
- from typing import Annotated
-
- import cv2
- import fastapi_cdn_host
- import numpy as np
- from anyio import to_process
- from fastapi import FastAPI, HTTPException, Query, Request
- from fastapi.responses import RedirectResponse, Response
- from loguru import logger
-
- app = FastAPI()
- fastapi_cdn_host.patch_docs(app)
-
-
- class ValidationError(HTTPException):
- def __init__(self, detail: str, status_code=400) -> None:
- super().__init__(detail=detail, status_code=status_code)
-
-
- class ImageResponse(Response):
- media_type = "image/jpeg"
-
- def __init__(self, content: bytes, status_code=200, **kw) -> None:
- super().__init__(content=content, status_code=status_code, **kw)
-
- @classmethod
- def docs_schema(cls) -> dict:
- example = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x06f..."
- return {
- "content": {cls.media_type: {"example": str(example)}},
- "description": "返回二进制JPEG图片.",
- }
-
-
- @app.get(
- "/capture",
- summary="从rtsp地址中抽一帧截个图",
- response_class=ImageResponse,
- responses={200: ImageResponse.docs_schema()},
- )
- async def capture_picture(
- url: Annotated[
- str,
- Query(description="RTSP URL", example="rtsp://127.0.0.1:50009/xxx?a=01&b=..."),
- ],
- frame_index: Annotated[int, Query(description="第几帧", lt=100, gte=0)] = 0,
- ) -> Response:
- """截个图,返回二进制的JPEG图片
- 注:当frame_index为0时,会从第1帧开始自动跳过花屏和质量低的画面
- """
- image_bytes = await capture_one(url, frame_index)
- return ImageResponse(image_bytes)
-
-
- def run_shell(cmd: str, verbose=False, **kw) -> subprocess.CompletedProcess:
- if verbose:
- logger.info(f"{cmd = }")
- return subprocess.run(cmd, shell=True, **kw)
-
-
- def capture_output(cmd: str) -> str:
- r = run_shell(cmd, capture_output=True)
- return r.stdout.strip().decode()
-
-
- @lru_cache
- def has_gpu() -> bool:
- try:
- return torch.cuda.is_available() # type:ignore
- except NameError:
- return bool(capture_output("nvidia-smi -L"))
-
-
- RgbType = Annotated[tuple[int, int, int], "RGB value of one point"]
- RgbRangeType = Annotated[tuple[RgbType, RgbType], "Range of RGB value"]
-
-
- class PictureHouse:
- min_score = 250
- delta_score = 50
-
- def __init__(self, last_score: int = 0) -> None:
- self.last_score = last_score
-
- @staticmethod
- def check_rgb(
- frame: np.ndarray, rgb_range: RgbRangeType | None = None, thresthold=1000
- ) -> bool:
- """检测符合RGB区域值的点数,是否在阈值范围内"""
- if rgb_range is None:
- rgb_range = ((0, 130, 0), (5, 140, 5)) # Green Point
- pixels = cv2.countNonZero(cv2.inRange(frame, *rgb_range)) # type:ignore
- return pixels < thresthold
-
- @staticmethod
- def lap_score(frame: np.ndarray) -> int:
- """拉普拉斯法对图片进行评分"""
- img2gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # 将图片压缩为单通道的灰度图
- score = cv2.Laplacian(img2gray, cv2.CV_64F).var() # type:ignore[attr-defined]
- return int(score)
-
- def is_avaliable(self, frame: np.ndarray) -> bool:
- """排除花屏、无画面的图片"""
- result = False
- score = self.lap_score(frame)
- if (last := self.last_score) and score >= self.min_score:
- result = abs(score - last) <= self.delta_score and self.check_rgb(frame)
- self.last_score = score
- return result
-
-
- class RtspCapture(cv2.VideoCapture):
- def __init__(self, url: str, timeout=10) -> None:
- # 使用GPU加速https://www.jianshu.com/p/733d7311c509
- # https://blog.csdn.net/aggs1990/article/details/124448658
- gpu_args = [cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY]
- args = (gpu_args,) if has_gpu() else ()
- super().__init__(url, cv2.CAP_FFMPEG, *args)
- self.set(cv2.CAP_PROP_POS_MSEC, timeout * 1000)
- self._url = url
-
- def close(self) -> None:
- self.release()
-
- def pick_out(self, total=30) -> np.ndarray:
- image_filter = PictureHouse()
- for index in range(total):
- success, frame = self.read()
- if success and image_filter.is_avaliable(frame):
- logger.info(f"Get avaliable frame at {index=}")
- break
- else:
- raise ValidationError(f"Invalid {total} frames ({self._url})")
- return frame
-
- def screenshot(self, frame_index=0) -> np.ndarray:
- with closing(self) as cap:
- if not cap.isOpened():
- raise ValidationError(f"Failed to open stream: {self._url}")
- if frame_index >= 1:
- for _ in range(frame_index):
- cap.grab()
- retval, frame = cap.retrieve()
- if not retval:
- raise ValidationError(f"Failed to capture {frame_index=}")
- return frame
- return cap.pick_out()
-
-
- def _do_capture(rtsp_url: str, frame_index: int, timeout: int) -> bytes:
- frame = RtspCapture(rtsp_url, timeout).screenshot(frame_index)
- return cv2.imencode(".jpg", frame)[1].tobytes()
-
-
- async def capture_one(rtsp_url: str, frame_index: int, timeout=10) -> bytes:
- # Ref: https://anyio.readthedocs.io/en/stable/subprocesses.html#running-functions-in-worker-processes
- return await to_process.run_sync(_do_capture, rtsp_url, frame_index, timeout)
-
-
- @app.get("/app")
- async def app_info(request: Request) -> dict[str, str | dict | datetime]:
- headers = dict(request.headers)
- ip = getattr(request.client, "host", "")
- url = {
- f"request.url.{attr}": v
- for attr in dir(request.url)
- if not attr.startswith("__")
- and isinstance(v := getattr(request.url, attr), (str, bool, float, int))
- }
- return {
- "your ip": ip,
- "now": datetime.now(),
- "headers": headers,
- "url": url,
- }
-
-
- @app.get("/", include_in_schema=False)
- async def to_docs():
- return RedirectResponse("/docs")
-
-
- def runserver() -> None:
- """This is for debug mode to start server. For prod, use supervisor+gunicorn instead."""
- import uvicorn # type:ignore
-
- root_app = Path(__file__).stem + ":app"
- auto_reload = "PYCHARM_HOSTED" not in os.environ
- host = "0.0.0.0"
- port = 9000
- if sys.argv[1:]:
- port = int(sys.argv[1])
- if sys.platform == "darwin" or sys.platform.lower().startswith("win"):
- tool = "open" if Path("/usr/bin/open").exists() else "explorer"
- os.system(f"{tool} http://127.0.0.1:{port}") # Auto open browser
- uvicorn.run(root_app, host=host, port=port, reload=auto_reload)
-
-
- if __name__ == "__main__":
- runserver()
3. 页面效果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。