当前位置:   article > 正文

使用FastAPI输入RTSP地址返回图片_fastapi 返回二进制图片

fastapi 返回二进制图片

1. 安装依赖

pip install fastapi fastapi-cdn-host uvicorn opencv-python loguru

2. 主要代码

  1. #!/usr/bin/env python
  2. import os
  3. import subprocess
  4. import sys
  5. from contextlib import closing
  6. from datetime import datetime
  7. from functools import lru_cache
  8. from pathlib import Path
  9. from typing import Annotated
  10. import cv2
  11. import fastapi_cdn_host
  12. import numpy as np
  13. from anyio import to_process
  14. from fastapi import FastAPI, HTTPException, Query, Request
  15. from fastapi.responses import RedirectResponse, Response
  16. from loguru import logger
  17. app = FastAPI()
  18. fastapi_cdn_host.patch_docs(app)
  19. class ValidationError(HTTPException):
  20. def __init__(self, detail: str, status_code=400) -> None:
  21. super().__init__(detail=detail, status_code=status_code)
  22. class ImageResponse(Response):
  23. media_type = "image/jpeg"
  24. def __init__(self, content: bytes, status_code=200, **kw) -> None:
  25. super().__init__(content=content, status_code=status_code, **kw)
  26. @classmethod
  27. def docs_schema(cls) -> dict:
  28. example = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x06f..."
  29. return {
  30. "content": {cls.media_type: {"example": str(example)}},
  31. "description": "返回二进制JPEG图片.",
  32. }
  33. @app.get(
  34. "/capture",
  35. summary="从rtsp地址中抽一帧截个图",
  36. response_class=ImageResponse,
  37. responses={200: ImageResponse.docs_schema()},
  38. )
  39. async def capture_picture(
  40. url: Annotated[
  41. str,
  42. Query(description="RTSP URL", example="rtsp://127.0.0.1:50009/xxx?a=01&b=..."),
  43. ],
  44. frame_index: Annotated[int, Query(description="第几帧", lt=100, gte=0)] = 0,
  45. ) -> Response:
  46. """截个图,返回二进制的JPEG图片
  47. 注:当frame_index为0时,会从第1帧开始自动跳过花屏和质量低的画面
  48. """
  49. image_bytes = await capture_one(url, frame_index)
  50. return ImageResponse(image_bytes)
  51. def run_shell(cmd: str, verbose=False, **kw) -> subprocess.CompletedProcess:
  52. if verbose:
  53. logger.info(f"{cmd = }")
  54. return subprocess.run(cmd, shell=True, **kw)
  55. def capture_output(cmd: str) -> str:
  56. r = run_shell(cmd, capture_output=True)
  57. return r.stdout.strip().decode()
  58. @lru_cache
  59. def has_gpu() -> bool:
  60. try:
  61. return torch.cuda.is_available() # type:ignore
  62. except NameError:
  63. return bool(capture_output("nvidia-smi -L"))
  64. RgbType = Annotated[tuple[int, int, int], "RGB value of one point"]
  65. RgbRangeType = Annotated[tuple[RgbType, RgbType], "Range of RGB value"]
  66. class PictureHouse:
  67. min_score = 250
  68. delta_score = 50
  69. def __init__(self, last_score: int = 0) -> None:
  70. self.last_score = last_score
  71. @staticmethod
  72. def check_rgb(
  73. frame: np.ndarray, rgb_range: RgbRangeType | None = None, thresthold=1000
  74. ) -> bool:
  75. """检测符合RGB区域值的点数,是否在阈值范围内"""
  76. if rgb_range is None:
  77. rgb_range = ((0, 130, 0), (5, 140, 5)) # Green Point
  78. pixels = cv2.countNonZero(cv2.inRange(frame, *rgb_range)) # type:ignore
  79. return pixels < thresthold
  80. @staticmethod
  81. def lap_score(frame: np.ndarray) -> int:
  82. """拉普拉斯法对图片进行评分"""
  83. img2gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # 将图片压缩为单通道的灰度图
  84. score = cv2.Laplacian(img2gray, cv2.CV_64F).var() # type:ignore[attr-defined]
  85. return int(score)
  86. def is_avaliable(self, frame: np.ndarray) -> bool:
  87. """排除花屏、无画面的图片"""
  88. result = False
  89. score = self.lap_score(frame)
  90. if (last := self.last_score) and score >= self.min_score:
  91. result = abs(score - last) <= self.delta_score and self.check_rgb(frame)
  92. self.last_score = score
  93. return result
  94. class RtspCapture(cv2.VideoCapture):
  95. def __init__(self, url: str, timeout=10) -> None:
  96. # 使用GPU加速https://www.jianshu.com/p/733d7311c509
  97. # https://blog.csdn.net/aggs1990/article/details/124448658
  98. gpu_args = [cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY]
  99. args = (gpu_args,) if has_gpu() else ()
  100. super().__init__(url, cv2.CAP_FFMPEG, *args)
  101. self.set(cv2.CAP_PROP_POS_MSEC, timeout * 1000)
  102. self._url = url
  103. def close(self) -> None:
  104. self.release()
  105. def pick_out(self, total=30) -> np.ndarray:
  106. image_filter = PictureHouse()
  107. for index in range(total):
  108. success, frame = self.read()
  109. if success and image_filter.is_avaliable(frame):
  110. logger.info(f"Get avaliable frame at {index=}")
  111. break
  112. else:
  113. raise ValidationError(f"Invalid {total} frames ({self._url})")
  114. return frame
  115. def screenshot(self, frame_index=0) -> np.ndarray:
  116. with closing(self) as cap:
  117. if not cap.isOpened():
  118. raise ValidationError(f"Failed to open stream: {self._url}")
  119. if frame_index >= 1:
  120. for _ in range(frame_index):
  121. cap.grab()
  122. retval, frame = cap.retrieve()
  123. if not retval:
  124. raise ValidationError(f"Failed to capture {frame_index=}")
  125. return frame
  126. return cap.pick_out()
  127. def _do_capture(rtsp_url: str, frame_index: int, timeout: int) -> bytes:
  128. frame = RtspCapture(rtsp_url, timeout).screenshot(frame_index)
  129. return cv2.imencode(".jpg", frame)[1].tobytes()
  130. async def capture_one(rtsp_url: str, frame_index: int, timeout=10) -> bytes:
  131. # Ref: https://anyio.readthedocs.io/en/stable/subprocesses.html#running-functions-in-worker-processes
  132. return await to_process.run_sync(_do_capture, rtsp_url, frame_index, timeout)
  133. @app.get("/app")
  134. async def app_info(request: Request) -> dict[str, str | dict | datetime]:
  135. headers = dict(request.headers)
  136. ip = getattr(request.client, "host", "")
  137. url = {
  138. f"request.url.{attr}": v
  139. for attr in dir(request.url)
  140. if not attr.startswith("__")
  141. and isinstance(v := getattr(request.url, attr), (str, bool, float, int))
  142. }
  143. return {
  144. "your ip": ip,
  145. "now": datetime.now(),
  146. "headers": headers,
  147. "url": url,
  148. }
  149. @app.get("/", include_in_schema=False)
  150. async def to_docs():
  151. return RedirectResponse("/docs")
  152. def runserver() -> None:
  153. """This is for debug mode to start server. For prod, use supervisor+gunicorn instead."""
  154. import uvicorn # type:ignore
  155. root_app = Path(__file__).stem + ":app"
  156. auto_reload = "PYCHARM_HOSTED" not in os.environ
  157. host = "0.0.0.0"
  158. port = 9000
  159. if sys.argv[1:]:
  160. port = int(sys.argv[1])
  161. if sys.platform == "darwin" or sys.platform.lower().startswith("win"):
  162. tool = "open" if Path("/usr/bin/open").exists() else "explorer"
  163. os.system(f"{tool} http://127.0.0.1:{port}") # Auto open browser
  164. uvicorn.run(root_app, host=host, port=port, reload=auto_reload)
  165. if __name__ == "__main__":
  166. runserver()

3. 页面效果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/916785
推荐阅读
相关标签
  

闽ICP备14008679号