为了解决以上问题,基于OpenCV和ORB的多摄像头拼接行人检测系统应运而生。OpenCV是一个开源的计算机视觉库,提供了丰富的图像处理和计算机视觉算法,可以方便地进行图像的处理、特征提取和目标检测等操作。ORB(Oriented FAST and Rotated BRIEF)是一种快速的特征提取和匹配算法,具有旋转不变性和尺度不变性,适用于多摄像头拼接行人检测系统中的特征匹配和跟踪。
为了学习行人图像的局部特征描述(Local Feature Representations)来提高网络模型重识别的能力,设计了该局部特征分支。不同与上面的全局特征分支用两个尺寸为1x1的池化层进行池化操作,在局部特征分支中仅使用一个输出尺寸为6×1的全局平均池化GAP来对主干网络输出的特征图进行池化操作。池化操作后会得到一个尺寸为[1,2048,6,1]的特征图,然后对其第三个维度进行分割,可以得到6个不同的尺寸为[1,2048,1,1]特征图。相当于将一张行人图像水平划分为6块,拆分后的每一个特征图分别对应其中的一块。然后对上面的6个特征图进行降维操作,得到6个2048维的特征向量。同样的将这6个特征向量分别连接一个BN层、LeakyReLU层、256维的全连接层、第二个BN层,一个维度为行人ID数的全连接层,最后使用6个交叉嫡损失函数分别对6个输出进行训练。
class ImageStitcher: def __init__(self, src, des): self.src = src self.des = des self.GOOD_POINTS_LIMITED = 0.99 def stitch_images(self): img1_3 = cv.imread(self.src, 1) # 基准图像 img2_3 = cv.imread(self.des, 1) # 拼接图像 orb = cv.ORB_create() kp1, des1 = orb.detectAndCompute(img1_3, None) kp2, des2 = orb.detectAndCompute(img2_3, None) bf = cv.BFMatcher.create() matches = bf.match(des1, des2) matches = sorted(matches, key=lambda x: x.distance) goodPoints = [] for i in range(len(matches) - 1): if matches[i].distance < self.GOOD_POINTS_LIMITED * matches[i + 1].distance: goodPoints.append(matches[i]) src_pts = np.float32([kp1[m.queryIdx].pt for m in goodPoints]).reshape(-1, 1, 2) dst_pts = np.float32([kp2[m.trainIdx].pt for m in goodPoints]).reshape(-1, 1, 2) M, mask = cv.findHomography(dst_pts, src_pts, cv.RHO) h1, w1, p1 = img2_3.shape h2, w2, p2 = img1_3.shape h = np.maximum(h1, h2) w = np.maximum(w1, w2) ... ... dst = cv.add(dst1, imageTransform) dst_no = np.copy(dst) dst_target = np.maximum(dst1, imageTransform) return dst_target
class Stitcher: def __init__(self): global model model = ['ORB', 'SIFT', 'SURF', 'BRISK', 'AKAZE'] # determine if we are using OpenCV v3.X self.isv3 = imutils.is_cv3() def stitch(self, images, ratio=0.75, reprojThresh=4.0, showMatches=False): (imageB, imageA) = images start = time.time() (kpsA, featuresA) = self.detectAndDescribe(imageA) end = time.time() print('%.5f s' % (end - start)) (kpsB, featuresB) = self.detectAndDescribe(imageB) start = time.time() M = self.matchKeypoints(kpsA, kpsB, featuresA, featuresB, ratio, reprojThresh) end = time.time() print('%.5f s' % (end - start)) if M is None: return None if showMatches: start = time.time() vis = self.drawMatches(imageA, imageB, kpsA, kpsB, matches, status) end = time.time() print('%.5f s' % (end - start)) return (result, vis) return result def detectAndDescribe(self, image): global model gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if self.isv3: descriptor = cv2.xfeatures2d.SIFT_create() (kps, features) = descriptor.detectAndCompute(image, None) else: detector = cv2.FeatureDetector_create(model[0]) kps = detector.detect(gray) extractor = cv2.DescriptorExtractor_create(model[0]) (kps, features) = extractor.compute(gray, kps) kps = np.float32([kp.pt for kp in kps]) return (kps, features) def matchKeypoints(self, kpsA, kpsB, featuresA, featuresB, ratio, reprojThresh): matcher = cv2.DescriptorMatcher_create("BruteForce") rawMatches = matcher.knnMatch(featuresA, featuresB, 2) matches = [] for m in rawMatches: if len(m) == 2 and m[0].distance < m[1].distance * ratio: matches.append((m[0].trainIdx, m[0].queryIdx)) ... ... return None def drawMatches(self, imageA, imageB, kpsA, kpsB, matches, status): (hA, wA) = imageA.shape[:2] (hB, wB) = imageB.shape[:2] vis = np.zeros((max(hA, hB), wA + wB, 3), dtype="uint8") vis[0:hA, 0:wA] = imageA vis[0:hB, wA:] = imageB for ((trainIdx, queryIdx), s) in zip(matches, status): if s == 1: ptA = (int(kpsA[queryIdx][0]), int(kpsA[queryIdx][1])) ptB = (int(kpsB[trainIdx][0]) + wA, int(kpsB[trainIdx][1])) cv2.line(vis, ptA, ptB, (0, 255, 0), 1) return vis
class Stitcher: def __init__(self): self.isv3 = imutils.is_cv3() def stitch(self, images, ratio=0.75, reprojThresh=4.0, showMatches=False): (imageB, imageA) = images start = time.time() (kpsA, featuresA) = self.detectAndDescribe(imageA) end = time.time() print('%.5f s' % (end - start)) (kpsB, featuresB) = self.detectAndDescribe(imageB) start = time.time() M = self.matchKeypoints(kpsA, kpsB, featuresA, featuresB, ratio, reprojThresh) end = time.time() print('%.5f s' % (end - start)) if M is None: return None (matches, H, status) = M start = time.time() result = cv2.warpPerspective(imageA, H, (imageA.shape[1] + imageB.shape[1], imageA.shape[0])) result[0:imageB.shape[0], 0:imageB.shape[1]] = imageB end = time.time() print('%.5f s' % (end - start)) if showMatches: start = time.time() vis = self.drawMatches(imageA, imageB, kpsA, kpsB, matches, status) end = time.time() print('%.5f s' % (end - start)) return (result, vis) return result def matchKeypoints(self, kpsA, kpsB, featuresA, featuresB, ratio, reprojThresh): matcher = cv2.DescriptorMatcher_create("BruteForce") rawMatches = matcher.knnMatch(featuresA, featuresB, 2) matches = [] for m in rawMatches: if len(m) == 2 and m[0].distance < m[1].distance * ratio: matches.append((m[0].trainIdx, m[0].queryIdx)) if len(matches) > 4: ptsA = np.float32([kpsA[i] for (_, i) in matches]) ptsB = np.float32([kpsB[i] for (i, _) in matches]) (H, status) = cv2.findHomography(ptsA, ptsB, cv2.RANSAC, reprojThresh) return (matches, H, status) return None def drawMatches(self, imageA, imageB, kpsA, kpsB, matches, status): (hA, wA) = imageA.shape[:2] (hB, wB) = imageB.shape[:2] vis = np.zeros((max(hA, hB), wA + wB, 3), dtype="uint8") vis[0:hA, 0:wA] = imageA vis[0:hB, wA:] = imageB for ((trainIdx, queryIdx), s) in zip(matches, status): if s == 1: ptA = (int(kpsA[queryIdx][0]), int(kpsA[queryIdx][1])) ptB = (int(kpsB[trainIdx][0]) + wA, int(kpsB[trainIdx][1])) cv2.line(vis, ptA, ptB, (0, 255, 0), 1) return vis
class CrossConv(nn.Module): # Cross Convolution Downsample def __init__(self, c1, c2, k=3, s=1, g=1, e=1.0, shortcut=False): # ch_in, ch_out, kernel, stride, groups, expansion, shortcut super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, (1, k), (1, s)) self.cv2 = Conv(c_, c2, (k, 1), (s, 1), g=g) self.add = shortcut and c1 == c2 def forward(self, x): return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x)) class Sum(nn.Module): # Weighted sum of 2 or more layers https://arxiv.org/abs/1911.09070 def __init__(self, n, weight=False): # n: number of inputs super().__init__() self.weight = weight # apply weights boolean self.iter = range(n - 1) # iter object if weight: self.w = nn.Parameter(-torch.arange(1.0, n) / 2, requires_grad=True) # layer weights def forward(self, x): y = x[0] # no weight if self.weight: w = torch.sigmoid(self.w) * 2 for i in self.iter: y = y + x[i + 1] * w[i] else: for i in self.iter: y = y + x[i + 1] return y class MixConv2d(nn.Module): # Mixed Depth-wise Conv https://arxiv.org/abs/1907.09595 def __init__(self, c1, c2, k=(1, 3), s=1, equal_ch=True): # ch_in, ch_out, kernel, stride, ch_strategy super().__init__() n = len(k) # number of convolutions if equal_ch: # equal c_ per group i = torch.linspace(0, n - 1E-6, c2).floor() # c2 indices c_ = [(i == g).sum() for g in range(n)] # intermediate channels else: # equal weight.numel() per group b = [c2] + [0] * n a = np.eye(n + 1, n, k=-1) a -= np.roll(a, 1, axis=1) a *= np.array(k) ** 2 a[0] = 1 c_ = np.linalg.lstsq(a, b, rcond=None)[0].round() # solve for equal weight indices, ax = b self.m = nn.ModuleList( [nn.Conv2d(c1, int(c_), k, s, k // 2, groups=math.gcd(c1, int(c_)), bias=False) for k, c_ in zip(k, c_)]) self.bn = nn.BatchNorm2d(c2) self.act = nn.SiLU() def forward(self, x): return self.act(self.bn(torch.cat([m(x) for m in self.m], 1))) class Ensemble(nn.ModuleList): # Ensemble of models def __init__(self): super().__init__() def forward(self, x, augment=False, profile=False, visualize=False): y = [] for module in self: y.append(module(x, augment, profile, visualize)[0]) # y = torch.stack(y).max(0)[0] # max ensemble # y = torch.stack(y).mean(0) # mean ensemble y = torch.cat(y, 1) # nms ensemble return y, None # inference, train output def attempt_load(weights, map_location=None, inplace=True, fuse=True): from models.yolo import Detect, Model # Loads an ensemble of models weights=[a,b,c] or a single model weights=[a] or weights=a model = Ensemble() for w in weights if isinstance(weights, list) else [weights]: ckpt = torch.load(attempt_download(w), map_location=map_location) # load if fuse: model.append(ckpt['ema' if ckpt.get('ema') else 'model'].float().fuse().eval()) # FP32 model else: model.append(ckpt['ema' if ckpt.get('ema') else 'model'].float().eval()) # without layer fuse # Compatibility updates for m in model.modules(): if type(m) in [nn.Hardswish, nn.LeakyReLU, nn.ReLU, nn.ReLU6, nn.SiLU, Detect, Model]: m.inplace = inplace # pytorch 1.7.0 compatibility if type(m) is Detect: if not isinstance(m.anchor_grid, list): # new Detect Layer compatibility delattr(m, 'anchor_grid') setattr(m, 'anchor_grid', [torch.zeros(1)] * m.nl) elif type(m) is Conv: m._non_persistent_buffers_set = set() # pytorch 1.6.0 compatibility if len(model) == 1: return model[-1] # return model else: print(f'Ensemble created with {weights}\n') for k in ['names']: setattr(model, k, getattr(model[-1], k)) model.stride = model[torch.argmax(torch.tensor([m.stride.max() for m in model])).int()].stride # max stride return model # return ensemble
文件路径 | 功能概述 |
code.py | 主程序文件,实现图像处理和目标检测的整体流程 |
match.py | 图像匹配和拼接的功能实现 |
sift.py | 图像拼接的实验模块 |
test.py | 图像拼接工具,将两张图像拼接成一张全景图 |
ui.py | 用户界面模块,提供图形界面交互 |
models\common.py | 公共模型组件和函数 |
models\experimental.py | 实验性模型和功能 |
models_init_.py | 模型模块的初始化文件 |
utils\activations.py | 激活函数的实现 |
utils\augmentations.py | 数据增强函数的实现 |
utils\autoanchor.py | 自动锚框生成的功能实现 |
utils\autobatch.py | 自动批处理的功能实现 |
utils\callbacks.py | 回调函数的实现 |
utils\datasets.py | 数据集处理的功能实现 |
utils\downloads.py | 下载功能的实现 |
utils\general.py | 通用工具函数的实现 |
utils\loss.py | 损失函数的实现 |
utils\metrics.py | 评估指标的实现 |
utils\plots.py | 绘图函数的实现 |
utils\torch_utils.py | PyTorch工具函数的实现 |
utils_init_.py | 工具模块的初始化文件 |
utils\aws\resume.py | AWS平台的模型恢复功能实现 |
utils\aws_init_.py | AWS模块的初始化文件 |
utils\flask_rest_api\example_request.py | Flask REST API的示例请求 |
utils\flask_rest_api\restapi.py | Flask REST API的实现 |
utils\loggers_init_.py | 日志记录模块的初始化文件 |
utils\loggers\wandb\log_dataset.py | 使用WandB记录数据集的功能实现 |
utils\loggers\wandb\sweep.py | 使用WandB进行超参数搜索的功能实现 |
utils\loggers\wandb\wandb_utils.py | WandB工具函数的实现 |
utils\loggers\wandb_init_.py | WandB模块的初始化文件 |
