赞
踩
在这个案例中,我们将OpenVINO的SSDLite MobileNetV2物体识别算法在视频流中进行推理。另外,如何通过多线程的方式进行视频读取,以及视频分析,这段代码是很值得一学。此案例涉及:
环境描述:
将Gitee代码下载下来后,进入13-realtime-objectdetection
,你需要新建一个虚拟环境,并下载相关的依赖(关于OpenVINO的安装和使用,这里就不再赘述,需要了解的同学可以直接打开Gitee的链接,在前几个章节或主路径下的README文件有介绍)。进入虚拟环境后,直接在terminal中运行python realtime-objectdetection.py
即可。如果你对于一步步调试感兴趣,可以看一下realtime-objectdetection.ipynb
。
我们一如既往地先看一下这个预训练模型。Intel的Open Model Zoo中提供了许多与训练模型,感兴趣的同学可以去看看。SSDLite MobileNetV2
模型一看便知是SSDLite+MobileNetV2组成的一个物体识别模型。概念我们就不赘述了,描述一下其IR模型的输入输出(这里我们需要先下载TensorFlow版本的原始模型,再转换成IR模型):
输入图像的大小是1, 300, 300, 3
,格式为[B, H, W, C]
,即[batch size,image height,image width,number of channels]
,图像为BGR格式;
模型输出的尺寸是1, 1, N, 7
,N为识别到的框框数量,7指的是[image_id, label, conf, x_min, y_min, x_max, y_max]
。
这里直接附上代码,这里对于模型的下载,转化和加载做具体解释,感兴趣的同学可以参见之前的博客或代码,比如4-model-optimizer-convert2IR
。
import collections
import os
import sys
import time
import cv2
import numpy as np
from IPython import display
from openvino.runtime import Core
import threading
'''
下载ssdlite_mobilenet_v2原模型,并将其放入model文件夹中。
'''
base_model_dir = "model"
# model name as named in Open Model Zoo
model_name = "ssdlite_mobilenet_v2"
download_command = f"omz_downloader " \
f"--name {model_name} " \
f"--output_dir {base_model_dir} " \
f"--cache_dir {base_model_dir}"
! $download_command
print("1 - Download ssdlite_mobilenet_v2 original TensorFlow model.")
'''
将TensorFlow模型转化为IR模型,这里我们的模型精度调整为FP16,默认是FP32
'''
precision = "FP16"
# output path for the conversion
converted_model_path = f"model/public/{model_name}/{precision}/{model_name}.xml"
if not os.path.exists(converted_model_path):
convert_command = f"omz_converter " \
f"--name {model_name} " \
f"--download_dir {base_model_dir} " \
f"--precisions {precision}"
! $convert_command
print("2 - Transform original model into IR format.")
'''
Load the Model
我们将模型下载下来并转换成IR模型后,加载模型
'''
# initialize inference engine
ie_core = Core()
# read the network and corresponding weights from file
model = ie_core.read_model(model=converted_model_path)
# compile the model for the CPU (you can choose manually CPU, GPU, MYRIAD etc.)
# or let the engine choose the best available device (AUTO)
compiled_model = ie_core.compile_model(model=model, device_name="CPU")
print("3 - Load model and compile model.")
# get input and output nodes
input_layer = compiled_model.input(0)
output_layer = compiled_model.output(0)
print("- Input layer info: ", input_layer)
print("- Output layer info: ", output_layer)
# get input size
height, width = list(input_layer.shape)[1:3]
Terminal中打印:
################|| Downloading ssdlite_mobilenet_v2 ||################
========== Retrieving model\public\ssdlite_mobilenet_v2\ssdlite_mobilenet_v2_coco_2018_05_09.tar.gz from the cache
========== Unpacking model\public\ssdlite_mobilenet_v2\ssdlite_mobilenet_v2_coco_2018_05_09.tar.gz
1 - Download ssdlite_mobilenet_v2 original TensorFlow model.
2 - Transform original model into IR format.
3 - Load model and compile model.
- Input layer info: <ConstOutput: names[image_tensor, image_tensor:0] shape{1,300,300,3} type: u8>
- Output layer info: <ConstOutput: names[detection_boxes, detection_boxes:0] shape{1,1,100,7} type: f32>
这一部分就是我觉得很值得一看的地方了。VideoPlayer
类中,我们通过 cv2.VideoCapture 读取每一帧视频数据,新建一个线程,这个线程负责按照指定的FPS读取视频数据。而主线程需要的时候,可以通过 next 函数调用下一帧图像。这里的代码写的非常好,因为如果我们将推理和读每一帧图像数据放在同一个线程,就可能会发生丢帧导致的各种问题,比如视频卡顿,延迟,甚至是延迟时间太长导致的程序“奔溃”。
三种视频输入方式在source的不同写法:
VIdeoPlayer
类相关的代码如下:
class VideoPlayer:
"""
Custom video player to fulfill FPS requirements. You can set target FPS and output size,
flip the video horizontally or skip first N frames.
:param source: Video source. It could be either camera device or video file. For rtsp camera, format should be something like: rtsp://192.168.1.2:8080/out.h264
:param size: Output frame size.
:param flip: Flip source horizontally.
:param fps: Target FPS.
:param skip_first_frames: Skip first N frames.
"""
def __init__(self, source, size=None, flip=False, fps=None, skip_first_frames=0):
self.__cap = cv2.VideoCapture(source)
if not self.__cap.isOpened():
raise RuntimeError(
f"Cannot open {'camera' if isinstance(source, int) else ''} {source}"
)
# skip first N frames
self.__cap.set(cv2.CAP_PROP_POS_FRAMES, skip_first_frames)
# fps of input file
self.__input_fps = self.__cap.get(cv2.CAP_PROP_FPS)
if self.__input_fps <= 0:
self.__input_fps = 60
# target fps given by user
self.__output_fps = fps if fps is not None else self.__input_fps
self.__flip = flip
self.__size = None
self.__interpolation = None
if size is not None:
self.__size = size
# AREA better for shrinking, LINEAR better for enlarging
self.__interpolation = (
cv2.INTER_AREA
if size[0] < self.__cap.get(cv2.CAP_PROP_FRAME_WIDTH)
else cv2.INTER_LINEAR
)
# first frame
_, self.__frame = self.__cap.read()
self.__lock = threading.Lock()
self.__thread = None
self.__stop = False
"""
Start playing.
"""
def start(self):
self.__stop = False
self.__thread = threading.Thread(target=self.__run, daemon=True)
self.__thread.start()
"""
Stop playing and release resources.
"""
def stop(self):
self.__stop = True
if self.__thread is not None:
self.__thread.join()
self.__cap.release()
def __run(self):
prev_time = 0
while not self.__stop:
t1 = time.time()
ret, frame = self.__cap.read()
if not ret:
break
# fulfill target fps
if 1 / self.__output_fps < time.time() - prev_time:
prev_time = time.time()
# replace by current frame
with self.__lock:
self.__frame = frame
t2 = time.time()
# time to wait [s] to fulfill input fps
wait_time = 1 / self.__input_fps - (t2 - t1)
# wait until
time.sleep(max(0, wait_time))
self.__frame = None
"""
Get current frame.
"""
def next(self):
with self.__lock:
if self.__frame is None:
return None
# need to copy frame, because can be cached and reused if fps is low
frame = self.__frame.copy()
if self.__size is not None:
frame = cv2.resize(frame, self.__size, interpolation=self.__interpolation)
if self.__flip:
frame = cv2.flip(frame, 1)
return frame
除此之外,还有一些可视化相关的函数。我们列出所有可用的类并为它们创建颜色。 然后,在后处理阶段,我们将归一化坐标为[0, 1]
的框转换为像素坐标为[0, image_size_in_px]
的框。之后,我们使用非最大抑制来删除重叠框以及低于阈值为0.5的框。最后,我们可以将剩下的绘制框和标签绘制在视频中。
# https://tech.amikelive.com/node-718/what-object-categories-labels-are-in-coco-dataset/
classes = [
"background", "person", "bicycle", "car", "motorcycle", "airplane", "bus", "train",
"truck", "boat", "traffic light", "fire hydrant", "street sign", "stop sign",
"parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow", "elephant",
"bear", "zebra", "giraffe", "hat", "backpack", "umbrella", "shoe", "eye glasses",
"handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball", "kite",
"baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket", "bottle",
"plate", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple",
"sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair",
"couch", "potted plant", "bed", "mirror", "dining table", "window", "desk", "toilet",
"door", "tv", "laptop", "mouse", "remote", "keyboard", "cell phone", "microwave", "oven",
"toaster", "sink", "refrigerator", "blender", "book", "clock", "vase", "scissors",
"teddy bear", "hair drier", "toothbrush", "hair brush"
]
# colors for above classes (Rainbow Color Map)
colors = cv2.applyColorMap(
src=np.arange(0, 255, 255 / len(classes), dtype=np.float32).astype(np.uint8),
colormap=cv2.COLORMAP_RAINBOW,
).squeeze()
print("4 - 我们列出所有可用的类并为它们创建颜色。")
print("5 - 我们使用非最大抑制来删除重叠框以及低于阈值为0.5的框。最后,我们可以将剩下的绘制框和标签绘制在视频中。")
def process_results(frame, results, thresh=0.6):
# size of the original frame
h, w = frame.shape[:2]
# results is a tensor [1, 1, 100, 7]
results = results.squeeze()
boxes = []
labels = []
scores = []
for _, label, score, xmin, ymin, xmax, ymax in results:
# create a box with pixels coordinates from the box with normalized coordinates [0,1]
boxes.append(
tuple(map(int, (xmin * w, ymin * h, (xmax - xmin) * w, (ymax - ymin) * h)))
)
labels.append(int(label))
scores.append(float(score))
# apply non-maximum suppression to get rid of many overlapping entities
# see https://paperswithcode.com/method/non-maximum-suppression
# this algorithm returns indices of objects to keep
indices = cv2.dnn.NMSBoxes(
bboxes=boxes, scores=scores, score_threshold=thresh, nms_threshold=0.6
)
# if there are no boxes
if len(indices) == 0:
return []
# filter detected objects
return [(labels[idx], scores[idx], boxes[idx]) for idx in indices.flatten()]
def draw_boxes(frame, boxes):
for label, score, box in boxes:
# choose color for the label
color = tuple(map(int, colors[label]))
# draw box
x2 = box[0] + box[2]
y2 = box[1] + box[3]
cv2.rectangle(img=frame, pt1=box[:2], pt2=(x2, y2), color=color, thickness=3)
# draw label name inside the box
cv2.putText(
img=frame,
text=f"{classes[label]} {score:.2f}",
org=(box[0] + 10, box[1] + 30),
fontFace=cv2.FONT_HERSHEY_COMPLEX,
fontScale=frame.shape[1] / 1000,
color=color,
thickness=1,
lineType=cv2.LINE_AA,
)
return frame
最后,我们附上主程序,即将物体识别放入视频流中处理并可视化的主要程序。
'''
主程序
- source: 这里支持三种视频输入方式:
- 视频文件:../201-vision-monodepth/data/Coco Walking in Berkeley.mp4
- USB摄像头:0(取决于接口的值,可能是0,或者1,或者其他)
- RTSP流:rtsp://192.168.1.2:8080/out.h264
- flip: 一些摄像头出来的图象是倒的,这里需要flip一下。
- use_popup: 如果我们是在.py下运行,需要弹窗显示视频结果,那么设置为True,如果我们是在notebook中运行,设置为false。
'''
def run_object_detection(source=0, flip=False, use_popup=False, skip_first_frames=0):
player = None
try:
# create video player to play with target fps
player = VideoPlayer(
source=source, flip=flip, fps=25, skip_first_frames=skip_first_frames
)
# start capturing
player.start()
if use_popup:
title = "Press ESC to Exit"
cv2.namedWindow(
winname=title, flags=cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE
)
processing_times = collections.deque()
while True:
# grab the frame
frame = player.next()
if frame is None:
print("Source ended")
break
# if frame larger than full HD, reduce size to improve the performance
scale = 1280 / max(frame.shape)
if scale < 1:
frame = cv2.resize(
src=frame,
dsize=None,
fx=scale,
fy=scale,
interpolation=cv2.INTER_AREA,
)
# resize image and change dims to fit neural network input
input_img = cv2.resize(
src=frame, dsize=(width, height), interpolation=cv2.INTER_AREA
)
# create batch of images (size = 1)
input_img = input_img[np.newaxis, ...]
# measure processing time
start_time = time.time()
# get results
results = compiled_model([input_img])[output_layer]
stop_time = time.time()
# get poses from network results
boxes = process_results(frame=frame, results=results)
# draw boxes on a frame
frame = draw_boxes(frame=frame, boxes=boxes)
processing_times.append(stop_time - start_time)
# use processing times from last 200 frames
if len(processing_times) > 200:
processing_times.popleft()
_, f_width = frame.shape[:2]
# mean processing time [ms]
processing_time = np.mean(processing_times) * 1000
fps = 1000 / processing_time
cv2.putText(
img=frame,
text=f"Inference time: {processing_time:.1f}ms ({fps:.1f} FPS)",
org=(20, 40),
fontFace=cv2.FONT_HERSHEY_COMPLEX,
fontScale=f_width / 1000,
color=(0, 0, 255),
thickness=1,
lineType=cv2.LINE_AA,
)
# use this workaround if there is flickering
if use_popup:
cv2.imshow(winname=title, mat=frame)
key = cv2.waitKey(1)
# escape = 27
if key == 27:
break
else:
# encode numpy array to jpg
_, encoded_img = cv2.imencode(
ext=".jpg", img=frame, params=[cv2.IMWRITE_JPEG_QUALITY, 100]
)
# create IPython image
i = display.Image(data=encoded_img)
# display the image in this notebook
display.clear_output(wait=True)
display.display(i)
# ctrl-c
except KeyboardInterrupt:
print("Interrupted")
# any different error
except RuntimeError as e:
print(e)
finally:
if player is not None:
# stop capturing
player.stop()
if use_popup:
cv2.destroyAllWindows()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。