赞
踩
本文实现用海康工业相机,触发模式,开发调试用软件触发,实际用硬件触发抓图,调用目标检测接口
去官网下载驱动,下载客户端,而不要选运行环境,客户端包含了运行环境,还有带示例,文档,里面有个安装文件大概如 MVS-2.1.2_x86_64_20231011.deb
# 安装
sudo dpkg -i MVS-2.1.2_x86_64_20231011.deb
# 删除用以下命令
sudo dpkg -r mvs
# 打开客户端
cd /opt/MVS/bin/
./MVS
文档在 /opt/MVS/doc/
示例在 /opt/MVS/Samples
python示例在 /opt/MVS/Samples/64/Python/
代码如下,使用2个相机,开起线程模拟循环调用软件触发。
sdk有callback传参数失效的坑,感觉是海康sdk的bug,现象是这样的,
使用MV_CC_RegisterImageCallBackEx(call_back_fun,user_data)
传给回调函数的参数user_data,user_data是个指针,运行开始传参是正确的,运行一段时间后传的地址正确,但是指针指向的数据内容错了,应该是被sdk修改了,所以起了2个回调函数,根据用户自定义的名称DeviceUserID区别是哪个相机,2个相机对应不同的目标检测业务
# -*- coding: utf-8 -*- import sys import time import os import numpy as np import cv2 from ctypes import * import termios from datetime import datetime import threading sys.path.append("/opt/MVS/Samples/64/Python/MvImport") # 导入相应SDK的库,实际安装位置绝对路径 from MvCameraControl_class import * #以下需要配置 #抓图保存路径,提供给app grab_dir="box-end/app/new_coming_images/" DEVICE_USER_ID_FRONT = "user_id_002" DEVICE_USER_ID_BACK = "user_id_001" #其他全局默认值,不用配置 CAM_LIST = [] DEVICE_NUM = 0 GRAB_RUN = True SERVICE_ID = "200002" USER_DATA_PY = [] #DEVICE_IDX = None # 打印设备详情 def printDeviceInfo(deviceList): for i in range(0, deviceList.nDeviceNum): mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents print("mvcc_dev_info",mvcc_dev_info) if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE: print ("\ngige device: [%d]" % i) strModeName = "" for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName: strModeName = strModeName + chr(per) print ("device model name: %s" % strModeName) nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24) nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16) nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8) nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff) print ("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4)) elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE: print ("\nu3v device: [%d]" % i) strModeName = "" for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName: if per == 0: break strModeName = strModeName + chr(per) print ("device model name: %s" % strModeName) strSerialNumber = "" for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber: if per == 0: break strSerialNumber = strSerialNumber + chr(per) print ("user serial number: %s" % strSerialNumber) # 判读图像格式是彩色还是黑白 def IsImageColor(enType): dates = { PixelType_Gvsp_RGB8_Packed: 'color', PixelType_Gvsp_BGR8_Packed: 'color', PixelType_Gvsp_YUV422_Packed: 'color', PixelType_Gvsp_YUV422_YUYV_Packed: 'color', PixelType_Gvsp_BayerGR8: 'color', PixelType_Gvsp_BayerRG8: 'color', PixelType_Gvsp_BayerGB8: 'color', PixelType_Gvsp_BayerBG8: 'color', PixelType_Gvsp_BayerGB10: 'color', PixelType_Gvsp_BayerGB10_Packed: 'color', PixelType_Gvsp_BayerBG10: 'color', PixelType_Gvsp_BayerBG10_Packed: 'color', PixelType_Gvsp_BayerRG10: 'color', PixelType_Gvsp_BayerRG10_Packed: 'color', PixelType_Gvsp_BayerGR10: 'color', PixelType_Gvsp_BayerGR10_Packed: 'color', PixelType_Gvsp_BayerGB12: 'color', PixelType_Gvsp_BayerGB12_Packed: 'color', PixelType_Gvsp_BayerBG12: 'color', PixelType_Gvsp_BayerBG12_Packed: 'color', PixelType_Gvsp_BayerRG12: 'color', PixelType_Gvsp_BayerRG12_Packed: 'color', PixelType_Gvsp_BayerGR12: 'color', PixelType_Gvsp_BayerGR12_Packed: 'color', PixelType_Gvsp_Mono8: 'mono', PixelType_Gvsp_Mono10: 'mono', PixelType_Gvsp_Mono10_Packed: 'mono', PixelType_Gvsp_Mono12: 'mono', PixelType_Gvsp_Mono12_Packed: 'mono'} return dates.get(enType, '未知') # 回调取图采集 def image_callback_main(pData, pFrameInfo, device_idx): global USER_DATA_PY stFrameInfo = cast(pFrameInfo, POINTER(MV_FRAME_OUT_INFO_EX)).contents #print("pUser",pUser,"cast",cast(pUser, POINTER(py_object))) # user_data_py = cast(pUser, POINTER(py_object)).contents.value # print("user_data_py",user_data_py) # cam = user_data_py["cam"] # device_idx = user_data_py["device_idx"] # print("pUser",pUser) # DEVICE_IDX = cast(pUser, POINTER(py_object)).contents.value # print("DEVICE_IDX",DEVICE_IDX) # print("USER_DATA_PY",USER_DATA_PY,"DEVICE_IDX",DEVICE_IDX) # cam = USER_DATA_PY[DEVICE_IDX]["cam"] DEVICE_IDX = device_idx cam = USER_DATA_PY[DEVICE_IDX]["cam"] if stFrameInfo: print("get one frame: Width[%d], Height[%d], nFrameNum[%d], enPixelType[%s]" % (stFrameInfo.nWidth, stFrameInfo.nHeight, stFrameInfo.nFrameNum, stFrameInfo.enPixelType)) time_start = time.time() #图片名格式 image-200001-4-2022-01-18-10-57-03-1642474623702.jpg img_name = "image-"+SERVICE_ID+"-"+str(DEVICE_IDX+1)+"-"+datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f') img_name_tmp = img_name + ".jpeg" img_name_jpg = img_name + ".jpg" img_path_tmp = os.path.join(grab_dir, img_name_tmp) img_path_jpg = os.path.join(grab_dir, img_name_jpg) stConvertParam = MV_CC_PIXEL_CONVERT_PARAM() memset(byref(stConvertParam), 0, sizeof(stConvertParam)) if IsImageColor(stFrameInfo.enPixelType) == 'mono': print("mono!") stConvertParam.enDstPixelType = PixelType_Gvsp_Mono8 nConvertSize = stFrameInfo.nWidth * stFrameInfo.nHeight elif IsImageColor(stFrameInfo.enPixelType) == 'color': print("color!") stConvertParam.enDstPixelType = PixelType_Gvsp_BGR8_Packed # opecv要用BGR,不能使用RGB nConvertSize = stFrameInfo.nWidth * stFrameInfo.nHeight* 3 else: print("not support!!!") img_buffer = (c_ubyte * stFrameInfo.nFrameLen)() stConvertParam.nWidth = stFrameInfo.nWidth stConvertParam.nHeight = stFrameInfo.nHeight stConvertParam.pSrcData = cast(pData, POINTER(c_ubyte)) stConvertParam.nSrcDataLen = stFrameInfo.nFrameLen stConvertParam.enSrcPixelType = stFrameInfo.enPixelType stConvertParam.pDstBuffer = (c_ubyte * nConvertSize)() stConvertParam.nDstBufferSize = nConvertSize print('time cos 1:', time.time() - time_start, 's') ret = cam.MV_CC_ConvertPixelType(stConvertParam) print('time cos 2:', time.time() - time_start, 's') if ret != 0: print("convert pixel fail! ret[0x%x]" % ret) del stConvertParam.pSrcData sys.exit() else: #print("convert ok!!") # 转OpenCV # 黑白处理 if IsImageColor(stFrameInfo.enPixelType) == 'mono': img_buffer = (c_ubyte * stConvertParam.nDstLen)() memmove(byref(img_buffer), stConvertParam.pDstBuffer, stConvertParam.nDstLen) img_buffer = np.frombuffer(img_buffer,count=int(stConvertParam.nDstLen), dtype=np.uint8) img_buffer = img_buffer.reshape((stFrameInfo.nHeight, stFrameInfo.nWidth)) #print("mono ok!!") # 彩色处理 if IsImageColor(stFrameInfo.enPixelType) == 'color': img_buffer = (c_ubyte * stConvertParam.nDstLen)() memmove(byref(img_buffer), stConvertParam.pDstBuffer, stConvertParam.nDstLen) img_buffer = np.frombuffer(img_buffer, count=int(stConvertParam.nDstBufferSize), dtype=np.uint8) img_buffer = img_buffer.reshape(stFrameInfo.nHeight,stFrameInfo.nWidth,3) #print("color ok!!") print('time cos 3:', time.time() - time_start, 's') height, width = img_buffer.shape[0:2] img_buffer = cv2.resize(img_buffer, (int(width/2), int(height/2)), interpolation=cv2.INTER_AREA) print("img_path",img_path_tmp) cv2.imwrite(img_path_tmp, img_buffer) os.rename(img_path_tmp, img_path_jpg) #cv2.imshow('img', img_buffer) #cv2.waitKey(10) #下面是模拟摄像机2 # img_name = "image-"+SERVICE_ID+"-"+str(DEVICE_IDX+2)+"-"+datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f') # img_name_tmp = img_name + ".jpeg" # img_name_jpg = img_name + ".jpg" # img_path_tmp = os.path.join(grab_dir, img_name_tmp) # img_path_jpg = os.path.join(grab_dir, img_name_jpg) # cv2.imwrite(img_path_tmp, img_buffer) # os.rename(img_path_tmp, img_path_jpg) print("") g_winfun_ctype = CFUNCTYPE g_st_frame_info = POINTER(MV_FRAME_OUT_INFO_EX) g_p_data = POINTER(c_ubyte) FrameInfoCallBack = g_winfun_ctype(None, g_p_data, g_st_frame_info, c_void_p) def image_callback_0(pData, pFrameInfo, pUser): image_callback_main(pData, pFrameInfo, 0) def image_callback_1(pData, pFrameInfo, pUser): image_callback_main(pData, pFrameInfo, 1) # 因为回调传值会变化,暂时只能用2个回调区别那个摄像头 CALL_BACK_FUN_0 = FrameInfoCallBack(image_callback_0) CALL_BACK_FUN_1 = FrameInfoCallBack(image_callback_1) def press_any_key_exit(): fd = sys.stdin.fileno() old_ttyinfo = termios.tcgetattr(fd) new_ttyinfo = old_ttyinfo[:] new_ttyinfo[3] &= ~termios.ICANON new_ttyinfo[3] &= ~termios.ECHO #sys.stdout.write(msg) #sys.stdout.flush() termios.tcsetattr(fd, termios.TCSANOW, new_ttyinfo) try: os.read(fd, 7) except: pass finally: termios.tcsetattr(fd, termios.TCSANOW, old_ttyinfo) def add_trigger_event_thread(cam=0, idx=None): print("add_trigger_event_thread") while True: if GRAB_RUN==False: break ret = cam.MV_CC_SetCommandValue("TriggerSoftware"); if ret != 0: print("TriggerSoftware fail! ret[0x%x]" % ret) break time.sleep(2) def start(): global GRAB_RUN global CAM_LIST global DEVICE_NUM global USER_DATA_PY deviceList = MV_CC_DEVICE_INFO_LIST() tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE # 1 枚举设备 | en:Enum device ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList) if ret != 0: print("enum devices fail! ret[0x%x]" % ret) GRAB_RUN = False return if deviceList.nDeviceNum == 0: print("find no device!") GRAB_RUN = False return print("Find %d devices!" % deviceList.nDeviceNum) # 打印设备详情 printDeviceInfo(deviceList) # 2 打开 # 2.1 创建相机实例 | en:Creat Camera Object DEVICE_NUM = deviceList.nDeviceNum for i in range(0, DEVICE_NUM): CAM_LIST.append(MvCamera()) # ch:选择设备并创建句柄| en:Select device and create handle stDeviceList = cast(deviceList.pDeviceInfo[int(i)], POINTER(MV_CC_DEVICE_INFO)).contents CAM_LIST[i].MV_CC_CreateHandle(stDeviceList) if ret != 0: print("create handle fail! ret[0x%x]" % ret) GRAB_RUN = False return # 2.2 打开设备 | en:Open device ret = CAM_LIST[i].MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0) if ret != 0: print("open device fail! ret[0x%x]" % ret) GRAB_RUN = False return ret = CAM_LIST[i].MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_CONTINUOUS) if ret != 0: print("set ExposureAuto fail! ret[0x%x]" % ret) GRAB_RUN = False return ret = CAM_LIST[i].MV_CC_SetEnumValue("GainAuto",MV_GAIN_MODE_CONTINUOUS) if ret != 0: print("set GainAuto fail! ret[0x%x]" % ret) GRAB_RUN = False return ret = CAM_LIST[i].MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_ON) #ret = CAM_LIST[i].MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF) if ret != 0: print('Enable trigger failed! [{0:#X}]'.format(ret)) GRAB_RUN = False return False ret = CAM_LIST[i].MV_CC_SetEnumValue('TriggerSource', MV_TRIGGER_SOURCE_SOFTWARE) #ret = CAM_LIST[i].MV_CC_SetEnumValue('TriggerSource', MV_TRIGGER_SOURCE_LINE0) if ret != 0: print('Set trigger source failed! [{0:#X}]'.format(ret)) GRAB_RUN = False return False # ret = CAM_LIST[i].MV_CC_SetEnumValue('TriggerActivation', 3) # if ret != 0: # print('Set trigger source failed! [{0:#X}]'.format(ret)) # GRAB_RUN = False # return False stStringValue = MVCC_STRINGVALUE() memset(byref(stStringValue), 0, sizeof(MVCC_STRINGVALUE)) ret = CAM_LIST[i].MV_CC_GetStringValue("DeviceUserID", stStringValue) if ret != 0: print("获取 string 型数据 %s 失败 ! 报错码 ret[0x%x]" % ("DeviceUserID", ret)) GRAB_RUN = False return False device_user_id = bytes.decode(stStringValue.chCurValue) print("device_user_id", device_user_id) USER_DATA_PY.append({ "cam":CAM_LIST[i], "device_idx":i }) #user_data = cast(pointer(py_object(i)), c_void_p) #user_data = cast(pointer(c_int(i)), c_void_p) #print("pass callback arg,i:",i) #user_data = pointer(c_int(i)) #device_idx = (c_int)() #device_idx = i #print("pass callback arg,device_idx:",device_idx) #user_data = cast(pointer(c_int(device_idx)), c_void_p) user_data = cast(pointer(py_object(i)), c_void_p) if device_user_id == DEVICE_USER_ID_FRONT: call_back_fun = CALL_BACK_FUN_0 elif device_user_id == DEVICE_USER_ID_BACK: call_back_fun = CALL_BACK_FUN_1 ret = CAM_LIST[i].MV_CC_RegisterImageCallBackEx(call_back_fun,user_data) if ret != 0: print('Register callback failed! [{0:#X}]'.format(ret)) GRAB_RUN = False return False hThreadHandle = threading.Thread(target=add_trigger_event_thread, args=(CAM_LIST[i],i)) hThreadHandle.start() # try: # except: # print ("error: unable to start thread") ret = CAM_LIST[i].MV_CC_StartGrabbing() if ret != 0: print('Start grabbing failed! [{0:#X}]'.format(ret)) GRAB_RUN = False return False def stop(): global GRAB_RUN global CAM_LIST global DEVICE_NUM for i in range(0, DEVICE_NUM): # 5 关闭 # 5.1 停止取流 | en:Stop grab image print("stop grabbing device index[%d]" % i) ret = CAM_LIST[i].MV_CC_StopGrabbing() if ret != 0: print("stop grabbing fail! ret[0x%x]" % ret) break #sys.exit() # 5.2 关闭设备 | Close device ret = CAM_LIST[i].MV_CC_CloseDevice() if ret != 0: print("close deivce fail! ret[0x%x]" % ret) break #sys.exit() # 6 销毁句柄 | Destroy handle ret = CAM_LIST[i].MV_CC_DestroyHandle() if ret != 0: print("destroy handle fail! ret[0x%x]" % ret) break #sys.exit() GRAB_RUN = False start() print ("press a key to stop grabbing.") press_any_key_exit() stop()
改成硬件触发只需要修改一句
TriggerSource的MV_TRIGGER_SOURCE_SOFTWARE改为MV_TRIGGER_SOURCE_LINE0
ret = CAM_LIST[i].MV_CC_SetEnumValue('TriggerSource', MV_TRIGGER_SOURCE_LINE0)
1相机I/O管脚接口定义,线的颜色可能不同,但是功能是一样的
管脚 | 信号 | I/O信号源 | 说明 | 线缆颜色 |
---|---|---|---|---|
1 | DC_PWR | – | 相机电源 | 橙 |
2 | OPTO_IN | Line 0+ | 光耦隔离输入 | 黄 |
3 | GPIO | Line 2 | 可配置输入或输出 | 紫 |
4 | OPTO_OUT | Line 1+ | 光耦隔离输出 | 蓝 |
5 | OPTO_GND | Line 0/1- | 光耦隔离信号地 | 绿 |
6 | GND | Line 2- | 相机电源地 | 灰 |
触发硬件用光电开关,NPN设备,网上的接线图没有画出电阻的接线位置,实际电阻位置如下
# -*- coding: utf-8 -*- #导入需要的库 import os import sys from pathlib import Path import numpy as np import cv2 import torch import torch.backends.cudnn as cudnn from tqdm import tqdm import shutil import io from models.common_2 import DetectMultiBackend from utils.dataloaders import IMG_FORMATS, VID_FORMATS, LoadImages, LoadStreams from utils.general import (LOGGER, check_file, check_img_size, check_imshow, check_requirements, colorstr, increment_path, non_max_suppression, print_args, scale_boxes, strip_optimizer, xyxy2xywh) from utils.plots import Annotator, colors, save_one_box from utils.torch_utils import select_device, time_sync #导入letterbox from utils.augmentations import Albumentations, augment_hsv, copy_paste, letterbox, mixup, random_perspective class U5(): def __init__(self, weights="/weights/last.pt", conf_thres=0.25): #weights=ROOT / 'yolov5s.pt' # 权重文件地址 .pt文件 self.weights = weights #source=ROOT / 'data/images' # 测试数据文件(图片或视频)的保存路径 #data=ROOT / 'data/coco128.yaml' # 标签文件地址 .yaml文件 self.imgsz=(640, 640) # 输入图片的大小 默认640(pixels) self.conf_thres=conf_thres # object置信度阈值 默认0.25 用在nms中 self.iou_thres=0.45 # 做nms的iou阈值 默认0.45 用在nms中 self.max_det=1000 # 每张图片最多的目标数量 用在nms中 device='' # 设置代码执行的设备 cuda device, i.e. 0 or 0,1,2,3 or cpu self.classes=None # 在nms中是否是只保留某些特定的类 默认是None 就是所有类只要满足条件都可以保留 --class 0, or --class 0 2 3 self.agnostic_nms=False # 进行nms是否也除去不同类别之间的框 默认False self.augment=False # 预测是否也要采用数据增强 TTA 默认False self.visualize=False # 特征图可视化 默认FALSE self.half=False # 是否使用半精度 Float16 推理 可以缩短推理时间 但是默认是False self.dnn=False # 使用OpenCV DNN进行ONNX推理 # 获取设备 self.device = select_device(device) # 载入模型 # self.model = DetectMultiBackend(weights, device=device, dnn=self.dnn, data=data) # self.model = DetectMultiBackend(weights, device=self.device, dnn=self.dnn) w = str(weights[0] if isinstance(weights, list) else weights) print(type(w),w) source_file = open(w, "rb") content = source_file.read() weights_bytes = io.BytesIO(content) self.model = DetectMultiBackend(weights_bytes, model_type="pt", device=self.device, dnn=self.dnn) source_file.close() weights_bytes.close() self.stride, self.names, self.pt, jit, onnx, engine = self.model.stride, self.model.names, self.model.pt, self.model.jit, self.model.onnx, self.model.engine imgsz = check_img_size(self.imgsz, s=self.stride) # 检查图片尺寸 print("names",self.names) # Half # 使用半精度 Float16 推理 self.half &= (self.pt or jit or onnx or engine) and self.device.type != 'cpu' # FP16 supported on limited backends with CUDA if self.pt or jit: self.model.model.half() if self.half else self.model.model.float() # 开始预测 self.model.warmup(imgsz=(1, 3, *self.imgsz)) # warmup def detect(self, img): # Dataloader # 载入数据 # dataset = LoadImages(source, img_size=imgsz, stride=stride, auto=pt) # Run inference dt, seen = [0.0, 0.0, 0.0], 0 #对图片进行处理 im0 = img # Padded resize im = letterbox(im0, self.imgsz, self.stride, auto=self.pt)[0] # Convert im = im.transpose((2, 0, 1))[::-1] # HWC to CHW, BGR to RGB im = np.ascontiguousarray(im) t1 = time_sync() im = torch.from_numpy(im).to(self.device) im = im.half() if self.half else im.float() # uint8 to fp16/32 im /= 255 # 0 - 255 to 0.0 - 1.0 if len(im.shape) == 3: im = im[None] # expand for batch dim t2 = time_sync() dt[0] += t2 - t1 # Inference # 预测 pred = self.model(im, augment=self.augment, visualize=self.visualize) t3 = time_sync() dt[1] += t3 - t2 # NMS pred = non_max_suppression(pred, self.conf_thres, self.iou_thres, self.classes, self.agnostic_nms, max_det=self.max_det) dt[2] += time_sync() - t3 #print("pred:",pred) #用于存放结果 detections=[] # 图片画框 line_thickness = 3 annotator = Annotator(im0, line_width=line_thickness, example=str(self.names)) # Process predictions for i, det in enumerate(pred): # per image 每张图片 seen += 1 # im0 = im0s.copy() gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] # normalization gain whwh if len(det): # Rescale boxes from img_size to im0 size det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], im0.shape).round() # Write results # 写入结果 for *xyxy, conf, index in reversed(det): xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4))/ gn).view(-1).tolist() #line = (index, *xywh) #print("line:",line) #xywh = [round(x) for x in xywh] #xywh = [xywh[0] - xywh[2] // 2, xywh[1] - xywh[3] // 2, xywh[2], xywh[3]] # 检测到目标位置,格式:(left,top,w,h) cls = self.names[int(index)] conf = float(conf) detections.append({'class': cls, 'conf': conf, 'position': xywh, 'index': int(index)}) #图片画框 annotator.box_label(xyxy, cls, color=colors(int(index), True)) #cv2.imwrite("/home/wai/hik/code/test.jpg", im0) #输出结果 #for i in detections: # print(i) #推测的时间 print(f'({t3 - t2:.3f}s)') return detections
# -*- coding: utf-8 -*- # + from distutils.command.config import config import sys #sys.path.append('/home/wai/hik/yolo/yolov5_s/') sys.path.append('yolov5_s/') from flask import Flask, g, jsonify, make_response, request, render_template import base64 import numpy as np import cv2 from sdk import U5 import time app = Flask(__name__) #app.json.ensure_ascii = False # 以下需要配置 #u5 = U5(weights = '/home/wai/hik/yolo/yolov5_s/runs/train/exp/weights/last.pt') u5 = U5(weights = 'yolov5_s/runs/train/exp/weights/last.pt') #根据车门类型获取正确的数量配置 SCAN_CODE_FILE = "/home/wai/hik/code/scan_code.txt" CFG_CORRECT_ANSWER = [ { "no":"000001", "cfg":{ "front":{ "screw_3":2, "lines":5, "block_1":1, "block_2":2 }, "back":{ "screw_1":7, "screw_2":29, "wang":0 } } }, { "no":"000002", "cfg":{ "front":{ "screw_1":7, "screw_2":29, "wang":0 }, "back":{ "screw_3":2, "lines":5, "block_1":1, "block_2":2 } } }, ] def detections_count(detections,key): count = 0 for item in detections: if item["class"] == key: count = count+1 return count @app.route('/api/ai_detection', methods=['POST']) def ai_detection(): print("==> ai_detection start") time_start = time.time() images = request.json.get('images') #print("type fileBase64",type(fileBase64),fileBase64) img_data = base64.b64decode(images[0][23:]) img_array = np.frombuffer(img_data, np.uint8) img = cv2.imdecode(img_array, cv2.COLOR_RGB2BGR) #print("img",img) #cv2.imwrite("server_test_img.jpg", img) print('time cos detect 1:', time.time()-time_start, 's') detections = u5.detect(img) print('time cos detect 2:', time.time()-time_start, 's') print("detections",detections) images_id = request.json.get('images_id') # 读取扫码枪扫码的文件 fileRead = open(SCAN_CODE_FILE, "r") scan_code = fileRead.read() fileRead.close() print("scan_code",scan_code) # 根据扫码的code获取正确配置 cfg = None for item in CFG_CORRECT_ANSWER: if item["no"] == scan_code: cfg = item["cfg"] break #print("cfg",cfg) if cfg == None: result = { 'boxes': [], 'image': images[0], 'image_id': images_id[0], 'scene_name': 'detect' } else: # 根据camera_id判断正反面,1正面,2反面 camera_id = images_id[0].split("-")[2] print("==> camera_id",camera_id) part_num = None if camera_id == "1": part_num = cfg["front"] elif camera_id == "2": part_num = cfg["back"] print("correct_cfg",part_num) good = 1 if part_num != None: for key in part_num: value = part_num[key] dc = detections_count(detections,key) print("==> ",key,"ai_count",dc,"correct_count",value) if value!=dc: good = 0 break # 图片中间画出结果 height, width = img.shape[0:2] if camera_id == "1": result_pos = (width-300, 200) elif camera_id == "2": result_pos = (100, 200) if good==0: result_text = "NG" result_color = (0,0,255) elif good==1: result_text = "OK" result_color = (0,128,0) #print("height,width",height,width) cv2.putText(img, result_text, result_pos, cv2.FONT_HERSHEY_COMPLEX, 5.0, result_color, 10) # print("img",img) img = cv2.imencode('.jpg', img)[1] image_result_base64 = 'data:image/jpeg;base64,' + str(base64.b64encode(img))[2:-1] result = { 'boxes': [], 'image': image_result_base64, 'image_id': images_id[0], 'good': good } #return jsonify({'code': 200, 'msg': '检测成功', 'detections': detections}) result_no_image = result.copy() result_no_image["image"]="base64" #print("result_no_image",result_no_image) print('time cos detect 3:', time.time()-time_start, 's') print("") return jsonify(result) # 运行代码 if __name__ == '__main__': app.run(host='0.0.0.0',port=5000)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。