赞
踩
上篇文章记录了基于yolov5进行模型训练,本篇文章记录调用zed2基于yolov5模型进行图像实时推理。
开发板:Jetson Xviewer NX
摄像头: zed2
yolo版本:yolov5-5.0
系统:Ubuntu18.04
此篇文章主要基于yolov5-5.0版本上改写,源码地址:
yolov5 GitHub 地址: GitHub: https://github.com/ultralytics/yolov5/tree/v5.0
首先我们对检测代码封装成一个类,代码如下:
# -*- coding: utf-8 -*
import time
import numpy as np
import torch
from models.experimental import attempt_load
from utils.datasets import letterbox
from utils.general import non_max_suppression,scale_coords, plot_one_box
from utils.torch_utils import select_device,time_synchronized
from numpy import random
class Detector:
def __init__(self):
self.img_size = 640 #确保输入图片的尺寸能整除stride=32
# self.stride =int(model.stride.max()) #检测模型时yolov5也用了FPN里面的一个思路,将卷积神经网络中最后三层通过上采样的方式进行了一个堆叠的,要求图片的分辨率为32的一个整数倍
# self.img_size = check_img_size(self.img_size, s=self.stride) #检测图片尺寸img_size是否可以被32整除,如果不能,则改变你输入的img_size为向上取32的倍数的分辨率
self.threshold = 0.6 #置信度
self.hide_labels = False #是否隐藏标签,False:不隐藏,显示标签
self.hide_conf = False #是否隐藏置信度
self.weights = 'weights/yolov5s.pt' #加载权重文件
# self.other_weights = other_weights #增加别的权重文件进行检测
#self.device = '0' if torch.cuda.is_available() else 'cpu'
self.device = '' #使用的设备类型cpu or cuda,默认为空
self.device = select_device(self.device) #获取CPU 或 CUDA
#yolov5s
model = attempt_load(self.weights, map_location=self.device) #载入权重文件
model.to(self.device).eval() #模型转移到指定的设备上,推理阶段使用model.eval() 训练阶段使用model.train()
model.half() # GPU支持半精度, 如果使用cpu,不支持.half,改为.float()
self.m = model
self.names = model.module.names if hasattr(model, 'module') else model.names #模型能够检测的所有类别标签
self.colors = [[random.randint(0, 255) for _ in range(3)] for _ in self.names] #检测框颜色值,从0-225随机选择3个数
# #other_weights
# other_model = attempt_load(self.other_weights,map_location=self.device)
# other_model.to(self.device).eval()
# other_model.half()
# self.om = other_model
# self.other_names = other_model.module.names if hasattr(other_model, 'module') else other_model.names
# self.other_colors = [[random.randint(0, 255) for _ in range(3)] for _ in self.other_names]
def detect(self,im):
#如果图像是None,退出程序
if im is None:
exit(1)
else:
im0 = im.copy() #拷贝原图
# t0时刻开始推理
t0 = time.time() #图像预处理,[1080,1920,3]->[384,640,3]
img = letterbox(im, new_shape=self.img_size)[0]
img = img[:, :, ::-1].transpose(2, 0, 1) # [384,640,3] -> [3,384,640]
img = np.ascontiguousarray(img) #在内存上使用连续的内存存储图像
img = torch.from_numpy(img).to(self.device) #由numpy array创建torch Tensor; transfer to device
img = img.half() ## 半精度,如果使用cpu,则改成img.float()
img /= 255.0 # 图像归一化
if img.ndimension() == 3:
img = img.unsqueeze(0) ## [3,384,640] -> [1,3,384,640]
t1 = time.time()
pred = self.m(img, augment=False)[0] #模型推理出的结果
pred = pred.float()
pred = non_max_suppression(pred, self.threshold, 0.4) #非极大抑制值,返回值为过滤后的预测框
##other_weights
# other_pred = self.om(img, augment=False)[0]
# other_pred = comm_pred.float()
# other_pred = non_max_suppression(other_pred, self.threshold, 0.4)
t2 = time_synchronized()
boxes = [] #初始化返回结果
#yolo5s
for det in pred:
if det is not None and len(det): #如果有框
# Rescale boxes from img_size to im0 size,将预测信息映射到原图,即将标注的bounding_box大小调整为和原图一致,
det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round()
for *x, conf, cls_id in det: #坐标,置信度,标签id
# lbl = self.names[int(cls_id)] #根据标签id 获取标签名称
lbl = None if self.hide_labels else (self.names[int(cls_id)] if self.hide_conf else f'{self.names[int(cls_id)]} {conf:.2f}') #标签名+置信度(保留2位小数)
# # 筛选出要检测的类别,过滤掉其它类别,显示指定类别名称
# if not lbl in ['person', 'car']:
# continue
x1, y1 = int(x[0]), int(x[1]) #左上角坐标
x2, y2 = int(x[2]), int(x[3]) #右下角坐标
boxes.append((x1, y1, x2, y2, lbl, conf)) #检测框坐标(左上右下),类别标签,置信度
plot_one_box(x, im0, label=lbl, color=self.colors[int(cls_id)], line_thickness=3) #在原图上画框,标签+置信度,颜色根据id随机,框的粗细=3
##other_weights
#for det in other_pred:
#if det is not None and len(det):
# # Rescale boxes from img_size to im0 size
#det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round()
#for *x, conf, cls_id in det:
## lbl = self.comm_names[int(cls_id)]
## lbl = None if self.hide_labels else (self.other_names[int(cls_id)] if self.hide_conf else f'{self.names[int(cls_id)]} {conf:.2f}')
#lbl = None if self.hide_labels else self.other_names[int(cls_id)] #加上置信度有时会报错,所以先去掉了
#x1, y1 = int(x[0]), int(x[1])
#x2, y2 = int(x[2]), int(x[3])
#boxes.append((x1, y1, x2, y2, lbl, conf))
#plot_one_box(x, im0, label=lbl, color=self.other_colors[int(cls_id)], line_thickness=3)
return im0 #返回图像结果
plot_one_box()函数讲解:
bboxes = detector.detect(im)
if len(bboxes)>0:
for (x1, y1, x2, y2, lbl, conf) in bboxes: #检测框,标签,置信度
x=(x1, y1, x2, y2)
plot_one_box(x, image, label=lbl, color=(detector.colors[int(conf)]), line_thickness=3)
x: 是4个整型,x1, y1,x2, y2 = int(x[0]), int(x[1]), int(x[2]), int(x[3])
image:原始图像, im.copy(), im = im[:, :, 0:3]
color: int(conf) conf代替了类别id,cls_id
不使用plot_one_box()函数画框及标签:
for (x1, y1, x2, y2, lbl, conf) in boxes:
color = (0, 255, 0)
c1, c2 = (x1, y1), (x2, y2)
cv2.rectangle(view, c1, c2, color, thickness=2, lineType=cv2.LINE_AA)
line_thickness = round(0.002 * (view.shape[0] + view.shape[1]) / 2) + 1
font_thickness = max(line_thickness - 1, 1)
cv2.putText(view, '{}:{:.2f}'.format(lbl,conf), (c1[0], c1[1]), 0, line_thickness / 2, [225, 255, 255], thickness=font_thickness, lineType=cv2.LINE_AA)
创建detect_cv.py
import time
import cv2
import torch
from utils.general importset_logging
from detect import Detector
@torch.no_grad()
def detect_cv(cap = 0): #摄像头0
cap = cv2.VideoCapture(cap)
set_logging()
#调用detect.py 函数#
dect = Detector()
#####此处开始检测#######
while True:
t0 = time.time()
ref,img0=cap.read()
im0 = dect.detect(img0) #调用检测函数
print(f'time: ({time.time() - t0:.3f}s)')
cv2.imshow("im0",im0)
key = cv2.waitKey(20)
if key == 27:
break
cv2.destroyAllWindows()
if __name__ == "__main__":
detect_cv()
基于上述代码进行改写,但需要额外导入残差网络模块,猜测和cuda有关,目前还不清楚,后面学习会持续更新。。。
新建detect_zed.py文件
#解决cuda报错问题:RuntimeError: cuDNN error: CUDNN_STATUS_MAPPING_ERROR
import os
from deep.feature_extractor import Extractor
model_path = os.getcwd() + '/deep/checkpoint/ckpt.t7'
extractor = Extractor(model_path, use_cuda=True)
import time
import cv2
import torch
# from utils.general import set_logging
import pyzed.sl as sl
from detect import Detector
@torch.no_grad()
def detect_zed():
# cap = cv2.VideoCapture(cap)
zed = sl.Camera()
# 设置相机的分辨率1080和采集帧率30fps
init_params = sl.InitParameters()
init_params.camera_resolution = sl.RESOLUTION.HD1080 # Use HD1080 video mode
init_params.camera_fps = 30 # fps可选:15、30、60、100
err = zed.open(init_params) # 根据自定义参数打开相机
if err != sl.ERROR_CODE.SUCCESS:
exit(1)
runtime_parameters = sl.RuntimeParameters() # 设置相机获取参数
runtime_parameters.sensing_mode = sl.SENSING_MODE.STANDARD
i = 0
# 创建sl.Mat对象来存储图像(容器),Mat类可以处理1到4个通道的多种矩阵格式(定义储存图象的类型)
image = sl.Mat() # 图像
# # 获取分辨率
# resolution = zed.get_camera_information().camera_resolution
# w, h = resolution.width , resolution.height
# x,y = int(w/2),int(h/2) # 中心点
dect = Detector()
while True:
# 获取最新的图像,修正它们,并基于提供的RuntimeParameters(深度,点云,跟踪等)计算测量值。
if zed.grab(runtime_parameters) == sl.ERROR_CODE.SUCCESS: # 相机成功获取图象
# 获取图像
timestamp = zed.get_timestamp(sl.TIME_REFERENCE.CURRENT) # 获取图像被捕获时的时间点
zed.retrieve_image(image, sl.VIEW.LEFT) # image:容器,sl.VIEW.LEFT:内容 ,左镜头
view = image.get_data() # 转换成图像数组,便于后续的显示或者储存
# 显示彩色图
# img0 = cv2.resize(view,(384,384))
# img0 = cv2.cvtColor(img0, cv2.COLOR_BGR2RGB)
im = view[:, :, 0:3]
#目标检测
t0 = time.time()
im0 = dect.detect(im)
print(f'time: ({time.time() - t0:.3f}s)')
cv2.namedWindow('im0',cv2.WINDOW_NORMAL)
cv2.resizeWindow('im0',800,600)
cv2.imshow("im0",im0)
key = cv2.waitKey(20)
if key == 27:
break
cv2.destroyAllWindows()
if __name__ == "__main__":
detect_zed()
deep 文件夹下包含feature_extractor.py 和 model.py
feature_extractor.py
import torch
import torchvision.transforms as transforms
import numpy as np
import cv2
import logging
from .model import Net
class Extractor(object):
def __init__(self, model_path, use_cuda=True):
self.net = Net(reid=True)
self.device = "cuda" if torch.cuda.is_available() and use_cuda else "cpu"
state_dict = torch.load(model_path, map_location=lambda storage, loc: storage)['net_dict']
self.net.load_state_dict(state_dict)
# logger = logging.getLogger("root.tracker")
# logger.info("Loading weights from {}... Done!".format(model_path))
self.net.to(self.device)
self.size = (64, 128)
self.norm = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])
def _preprocess(self, im_crops):
"""
TO:
1. to float with scale from 0 to 1
2. resize to (64, 128) as Market1501 dataset did
3. concatenate to a numpy array
3. to torch Tensor
4. normalize
"""
def _resize(im, size):
return cv2.resize(im.astype(np.float32)/255., size)
im_batch = torch.cat([self.norm(_resize(im, self.size)).unsqueeze(0) for im in im_crops], dim=0).float()
return im_batch
def __call__(self, im_crops):
im_batch = self._preprocess(im_crops)
with torch.no_grad():
im_batch = im_batch.to(self.device)
features = self.net(im_batch)
return features.cpu().numpy()
if __name__ == '__main__':
img = cv2.imread("demo.jpg")[:,:,(2,1,0)]
extr = Extractor("checkpoint/ckpt.t7")
feature = extr(img)
print(feature.shape)
model.py
import torch
import torch.nn as nn
import torch.nn.functional as F
class BasicBlock(nn.Module):
def __init__(self, c_in, c_out,is_downsample=False):
super(BasicBlock,self).__init__()
self.is_downsample = is_downsample
if is_downsample:
self.conv1 = nn.Conv2d(c_in, c_out, 3, stride=2, padding=1, bias=False)
else:
self.conv1 = nn.Conv2d(c_in, c_out, 3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(c_out)
self.relu = nn.ReLU(True)
self.conv2 = nn.Conv2d(c_out,c_out,3,stride=1,padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(c_out)
if is_downsample:
self.downsample = nn.Sequential(
nn.Conv2d(c_in, c_out, 1, stride=2, bias=False),
nn.BatchNorm2d(c_out)
)
elif c_in != c_out:
self.downsample = nn.Sequential(
nn.Conv2d(c_in, c_out, 1, stride=1, bias=False),
nn.BatchNorm2d(c_out)
)
self.is_downsample = True
def forward(self,x):
y = self.conv1(x)
y = self.bn1(y)
y = self.relu(y)
y = self.conv2(y)
y = self.bn2(y)
if self.is_downsample:
x = self.downsample(x)
return F.relu(x.add(y),True)
def make_layers(c_in,c_out,repeat_times, is_downsample=False):
blocks = []
for i in range(repeat_times):
if i ==0:
blocks += [BasicBlock(c_in,c_out, is_downsample=is_downsample),]
else:
blocks += [BasicBlock(c_out,c_out),]
return nn.Sequential(*blocks)
class Net(nn.Module):
def __init__(self, num_classes=751 ,reid=False):
super(Net,self).__init__()
# 3 128 64
self.conv = nn.Sequential(
nn.Conv2d(3,64,3,stride=1,padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
# nn.Conv2d(32,32,3,stride=1,padding=1),
# nn.BatchNorm2d(32),
# nn.ReLU(inplace=True),
nn.MaxPool2d(3,2,padding=1),
)
# 32 64 32
self.layer1 = make_layers(64,64,2,False)
# 32 64 32
self.layer2 = make_layers(64,128,2,True)
# 64 32 16
self.layer3 = make_layers(128,256,2,True)
# 128 16 8
self.layer4 = make_layers(256,512,2,True)
# 256 8 4
self.avgpool = nn.AvgPool2d((8,4),1)
# 256 1 1
self.reid = reid
self.classifier = nn.Sequential(
nn.Linear(512, 256),
nn.BatchNorm1d(256),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Linear(256, num_classes),
)
def forward(self, x):
x = self.conv(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = x.view(x.size(0),-1)
# B x 128
if self.reid:
x = x.div(x.norm(p=2,dim=1,keepdim=True))
return x
# classifier
x = self.classifier(x)
return x
if __name__ == '__main__':
net = Net()
x = torch.randn(4,3,128,64)
y = net(x)
import ipdb; ipdb.set_trace()
deepsort的权重 ckpt.t7 自行下载
当我们训练了新的权重文件,需要在原有基础上增加新的检测时,可以直接在detect.py文件中直接增加即可,以 other_weights为例,主要是以下几个部分:
######1. 增加权重文件
self.other_weights = other_weights #增加别的权重文件进行检测
#########2.以指定设备加载模型
other_model = attempt_load(self.other_weights,map_location=self.device)
other_model.to(self.device).eval()
other_model.half()
self.om = other_model
self.other_names = other_model.module.names if hasattr(other_model, 'module') else other_model.names
self.other_colors = [[random.randint(0, 255) for _ in range(3)] for _ in self.other_names]
#####3.预测推理
other_pred = self.om(img, augment=False)[0]
other_pred = comm_pred.float()
other_pred = non_max_suppression(other_pred, self.threshold, 0.4)
#####4.画框输出结果
for det in other_pred:
if det is not None and len(det):
# Rescale boxes from img_size to im0 size
det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round()
for *x, conf, cls_id in det:
## lbl = self.comm_names[int(cls_id)]
## lbl = None if self.hide_labels else (self.other_names[int(cls_id)] if self.hide_conf else f'{self.names[int(cls_id)]} {conf:.2f}')
lbl = None if self.hide_labels else self.other_names[int(cls_id)] #加上置信度有时会报错,所以先去掉了
x1, y1 = int(x[0]), int(x[1])
x2, y2 = int(x[2]), int(x[3])
boxes.append((x1, y1, x2, y2, lbl, conf))
plot_one_box(x, im0, label=lbl, color=self.other_colors[int(cls_id)], line_thickness=3)
import cv2
import pyzed.sl as sl
import threading
from flask import Flask, render_template, Response
from detect import Detector
import os
from deep.feature_extractor import Extractor
model_path = os.getcwd() + '/deep/checkpoint/ckpt.t7'
extractor = Extractor(model_path, use_cuda=True)
outputFrame = None
lock = threading.Lock()
app = Flask(__name__)
@app.route("/bdlf")
def bdlf():
return render_template("bdlf.html")
def detect_zed():
global outputFrame, lock
zed = sl.Camera()
init_params = sl.InitParameters()
init_params.camera_resolution = sl.RESOLUTION.HD720
init_params.camera_fps = 60
err = zed.open(init_params)
if err!= sl.ERROR_CODE.SUCCESS:
exit(1)
image = sl.Mat()
runtime_parameters = sl.RuntimeParameters()
detector = Detector()
while True:
if zed.grab(runtime_parameters) == sl.ERROR_CODE.SUCCESS:
zed.retrieve_image(image, sl.VIEW.RIGHT)
image_cv = image.get_data()
im = image_cv[:, :, 0:3]
if im is None:
break
else:
dect_img = detector.detect(im)
outputFrame = dect_img
def generate():
global outputFrame, lock
# loop over frames from the output stream
while True:
# wait until the lock is acquired
with lock:
if outputFrame is None:
continue
(flag, encodedImage) = cv2.imencode(".jpg", outputFrame) #以JPEG格式编码帧
if not flag:
continue
# yield the output frame in the byte format
yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' +
bytearray(encodedImage) + b'\r\n')
@app.route("/video_feed")
def video_feed():
# 通过将一帧帧的图像返回,就达到了看视频的目的。multipart/x-mixed-replace是单次的http请求-响应模式,如果网络中断,会导致视频流异常终止,必须重新连接才能恢复
return Response(generate(),
mimetype="multipart/x-mixed-replace; boundary=frame") #返回一个流媒体数据,以关键字“frame”为标记
if __name__ == "__main__":
host="0.0.0.0"
port=7000
debug=False
t1 = threading.Thread(target=detect_zed, args=(),daemon=True) #多写个线程,可以快速刷新页面
t2 = threading.Thread(target=app.run, args=(host,port,debug))
t1.start()
t2.start()
代码分文件编写,可能会使得视频延时,可以将目标检测函数加载放在主文件(或同一个文件)中,方便第一时间加载,减轻视频延时现象。
小记:
(1)darknet 是神经网络框架,pytorch,torch 属于深度学习框架; yolov4(Darknet架构),而yolov5 是在pytorch中实现。
(2)彩色图像的三维矩阵:第一轴轴长:图片宽度;第二轴轴长:图片高度;第三轴轴长:图片深度。
opencv采用BGR模式,而不是RGB,所以,第一轴轴长:图片高度;第二轴轴长:图片宽度;第三轴轴长:图片深度
(3)frame = torch.from_numpy(frame.transpose((2, 0, 1))).float()
在cv2中,读取到的视频帧一般是numpy数组格式,维度顺序为高度、宽度和通道数(H×W×C),而在PyTorch中,输入数据通常是张量格式,维度顺序为通道数、高度和宽度(C×H×W)。因此,我们需要将读取到的视频帧的维度顺序从(H×W×C)转换为(C×H×W)。
这行代码将 frame 转换为 PyTorch 中的张量,并且将通道维度放在了最前面。具体来说,这里首先使用 numpy.transpose() 函数将通道维度(默认为最后一维)移到第一维,然后使用 torch.from_numpy() 函数将 numpy.ndarray 转换为 PyTorch 张量,最后将张量的类型转换为 float。
该代码的作用是将 OpenCV 读取的视频帧转换为 PyTorch 张量,方便后续使用 PyTorch 进行模型训练等操作。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。