当前位置:   article > 正文

海康工业相机触发模式抓图和AI目标检测_海康相机自动触发程序

海康相机自动触发程序

简介

本文实现用海康工业相机,触发模式,开发调试用软件触发,实际用硬件触发抓图,调用目标检测接口

hikrobot工业相机抓图

安装驱动

去官网下载驱动,下载客户端,而不要选运行环境,客户端包含了运行环境,还有带示例,文档,里面有个安装文件大概如 MVS-2.1.2_x86_64_20231011.deb

# 安装
sudo dpkg -i MVS-2.1.2_x86_64_20231011.deb
# 删除用以下命令
sudo dpkg -r mvs
# 打开客户端
cd /opt/MVS/bin/
./MVS
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

文档在 /opt/MVS/doc/
示例在 /opt/MVS/Samples
python示例在 /opt/MVS/Samples/64/Python/

相机抓图

代码如下,使用2个相机,开起线程模拟循环调用软件触发。
sdk有callback传参数失效的坑,感觉是海康sdk的bug,现象是这样的,
使用MV_CC_RegisterImageCallBackEx(call_back_fun,user_data)
传给回调函数的参数user_data,user_data是个指针,运行开始传参是正确的,运行一段时间后传的地址正确,但是指针指向的数据内容错了,应该是被sdk修改了,所以起了2个回调函数,根据用户自定义的名称DeviceUserID区别是哪个相机,2个相机对应不同的目标检测业务

# -*- coding: utf-8 -*-
import sys
import time
import os

import numpy as np
import cv2
from ctypes import *
import termios
from datetime import datetime
import threading

sys.path.append("/opt/MVS/Samples/64/Python/MvImport")  # 导入相应SDK的库,实际安装位置绝对路径
from MvCameraControl_class import *

#以下需要配置
#抓图保存路径,提供给app
grab_dir="box-end/app/new_coming_images/"
DEVICE_USER_ID_FRONT = "user_id_002"
DEVICE_USER_ID_BACK = "user_id_001"

#其他全局默认值,不用配置
CAM_LIST = []
DEVICE_NUM = 0
GRAB_RUN = True
SERVICE_ID = "200002"
USER_DATA_PY = []
#DEVICE_IDX = None

# 打印设备详情
def printDeviceInfo(deviceList):
    for i in range(0, deviceList.nDeviceNum):
        mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
        print("mvcc_dev_info",mvcc_dev_info)
        if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
            print ("\ngige device: [%d]" % i)
            strModeName = ""
            for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
                strModeName = strModeName + chr(per)
            print ("device model name: %s" % strModeName)

            nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
            nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
            nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
            nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
            print ("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4))
        elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
            print ("\nu3v device: [%d]" % i)
            strModeName = ""
            for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
                if per == 0:
                    break
                strModeName = strModeName + chr(per)
            print ("device model name: %s" % strModeName)

            strSerialNumber = ""
            for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
                if per == 0:
                    break
                strSerialNumber = strSerialNumber + chr(per)
            print ("user serial number: %s" % strSerialNumber)

# 判读图像格式是彩色还是黑白
def IsImageColor(enType):
    dates = {
        PixelType_Gvsp_RGB8_Packed: 'color',
        PixelType_Gvsp_BGR8_Packed: 'color',
        PixelType_Gvsp_YUV422_Packed: 'color',
        PixelType_Gvsp_YUV422_YUYV_Packed: 'color',
        PixelType_Gvsp_BayerGR8: 'color',
        PixelType_Gvsp_BayerRG8: 'color',
        PixelType_Gvsp_BayerGB8: 'color',
        PixelType_Gvsp_BayerBG8: 'color',
        PixelType_Gvsp_BayerGB10: 'color',
        PixelType_Gvsp_BayerGB10_Packed: 'color',
        PixelType_Gvsp_BayerBG10: 'color',
        PixelType_Gvsp_BayerBG10_Packed: 'color',
        PixelType_Gvsp_BayerRG10: 'color',
        PixelType_Gvsp_BayerRG10_Packed: 'color',
        PixelType_Gvsp_BayerGR10: 'color',
        PixelType_Gvsp_BayerGR10_Packed: 'color',
        PixelType_Gvsp_BayerGB12: 'color',
        PixelType_Gvsp_BayerGB12_Packed: 'color',
        PixelType_Gvsp_BayerBG12: 'color',
        PixelType_Gvsp_BayerBG12_Packed: 'color',
        PixelType_Gvsp_BayerRG12: 'color',
        PixelType_Gvsp_BayerRG12_Packed: 'color',
        PixelType_Gvsp_BayerGR12: 'color',
        PixelType_Gvsp_BayerGR12_Packed: 'color',
        PixelType_Gvsp_Mono8: 'mono',
        PixelType_Gvsp_Mono10: 'mono',
        PixelType_Gvsp_Mono10_Packed: 'mono',
        PixelType_Gvsp_Mono12: 'mono',
        PixelType_Gvsp_Mono12_Packed: 'mono'}
    return dates.get(enType, '未知')

# 回调取图采集
def image_callback_main(pData, pFrameInfo, device_idx):
    global USER_DATA_PY    
    stFrameInfo = cast(pFrameInfo, POINTER(MV_FRAME_OUT_INFO_EX)).contents

    #print("pUser",pUser,"cast",cast(pUser, POINTER(py_object)))
    # user_data_py = cast(pUser, POINTER(py_object)).contents.value
    # print("user_data_py",user_data_py)
    # cam = user_data_py["cam"]
    # device_idx = user_data_py["device_idx"]

    # print("pUser",pUser)
    # DEVICE_IDX = cast(pUser, POINTER(py_object)).contents.value
    # print("DEVICE_IDX",DEVICE_IDX)
    # print("USER_DATA_PY",USER_DATA_PY,"DEVICE_IDX",DEVICE_IDX)
    # cam = USER_DATA_PY[DEVICE_IDX]["cam"]

    DEVICE_IDX = device_idx
    cam = USER_DATA_PY[DEVICE_IDX]["cam"]

    if stFrameInfo:
        print("get one frame: Width[%d], Height[%d], nFrameNum[%d], enPixelType[%s]" % (stFrameInfo.nWidth, stFrameInfo.nHeight, stFrameInfo.nFrameNum, stFrameInfo.enPixelType))
        time_start = time.time()
        #图片名格式 image-200001-4-2022-01-18-10-57-03-1642474623702.jpg
        img_name = "image-"+SERVICE_ID+"-"+str(DEVICE_IDX+1)+"-"+datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
        img_name_tmp = img_name + ".jpeg"
        img_name_jpg = img_name + ".jpg"
        img_path_tmp = os.path.join(grab_dir, img_name_tmp)
        img_path_jpg = os.path.join(grab_dir, img_name_jpg)
        stConvertParam = MV_CC_PIXEL_CONVERT_PARAM()
        memset(byref(stConvertParam), 0, sizeof(stConvertParam))
        if IsImageColor(stFrameInfo.enPixelType) == 'mono':
            print("mono!")
            stConvertParam.enDstPixelType = PixelType_Gvsp_Mono8
            nConvertSize = stFrameInfo.nWidth * stFrameInfo.nHeight
        elif IsImageColor(stFrameInfo.enPixelType) == 'color':
            print("color!")
            stConvertParam.enDstPixelType = PixelType_Gvsp_BGR8_Packed  # opecv要用BGR,不能使用RGB
            nConvertSize = stFrameInfo.nWidth * stFrameInfo.nHeight* 3
        else:
            print("not support!!!")

        img_buffer = (c_ubyte * stFrameInfo.nFrameLen)()
        
        stConvertParam.nWidth = stFrameInfo.nWidth
        stConvertParam.nHeight = stFrameInfo.nHeight
        stConvertParam.pSrcData = cast(pData, POINTER(c_ubyte))
        stConvertParam.nSrcDataLen = stFrameInfo.nFrameLen
        stConvertParam.enSrcPixelType = stFrameInfo.enPixelType
        stConvertParam.pDstBuffer = (c_ubyte * nConvertSize)()
        stConvertParam.nDstBufferSize = nConvertSize
        print('time cos 1:', time.time() - time_start, 's') 
        ret = cam.MV_CC_ConvertPixelType(stConvertParam)
        print('time cos 2:', time.time() - time_start, 's') 
        if ret != 0:
            print("convert pixel fail! ret[0x%x]" % ret)
            del stConvertParam.pSrcData
            sys.exit()
        else:
            #print("convert ok!!")
            # 转OpenCV
            # 黑白处理
            if IsImageColor(stFrameInfo.enPixelType) == 'mono':
                img_buffer = (c_ubyte * stConvertParam.nDstLen)()
                memmove(byref(img_buffer), stConvertParam.pDstBuffer, stConvertParam.nDstLen)
                img_buffer = np.frombuffer(img_buffer,count=int(stConvertParam.nDstLen), dtype=np.uint8)
                img_buffer = img_buffer.reshape((stFrameInfo.nHeight, stFrameInfo.nWidth))
                #print("mono ok!!")
            # 彩色处理
            if IsImageColor(stFrameInfo.enPixelType) == 'color':
                img_buffer = (c_ubyte * stConvertParam.nDstLen)()
                memmove(byref(img_buffer), stConvertParam.pDstBuffer, stConvertParam.nDstLen)
                img_buffer = np.frombuffer(img_buffer, count=int(stConvertParam.nDstBufferSize), dtype=np.uint8)
                img_buffer = img_buffer.reshape(stFrameInfo.nHeight,stFrameInfo.nWidth,3)
                #print("color ok!!")
            print('time cos 3:', time.time() - time_start, 's')

            height, width = img_buffer.shape[0:2]
            img_buffer = cv2.resize(img_buffer, (int(width/2), int(height/2)), interpolation=cv2.INTER_AREA)
            print("img_path",img_path_tmp)
            cv2.imwrite(img_path_tmp, img_buffer)
            os.rename(img_path_tmp, img_path_jpg)
            #cv2.imshow('img', img_buffer)
            #cv2.waitKey(10)

            #下面是模拟摄像机2
            # img_name = "image-"+SERVICE_ID+"-"+str(DEVICE_IDX+2)+"-"+datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
            # img_name_tmp = img_name + ".jpeg"
            # img_name_jpg = img_name + ".jpg"
            # img_path_tmp = os.path.join(grab_dir, img_name_tmp)
            # img_path_jpg = os.path.join(grab_dir, img_name_jpg)
            # cv2.imwrite(img_path_tmp, img_buffer)
            # os.rename(img_path_tmp, img_path_jpg)
        print("")

g_winfun_ctype = CFUNCTYPE
g_st_frame_info = POINTER(MV_FRAME_OUT_INFO_EX)
g_p_data = POINTER(c_ubyte)
FrameInfoCallBack = g_winfun_ctype(None, g_p_data, g_st_frame_info, c_void_p)
def image_callback_0(pData, pFrameInfo, pUser):
    image_callback_main(pData, pFrameInfo, 0)

def image_callback_1(pData, pFrameInfo, pUser):
    image_callback_main(pData, pFrameInfo, 1)


# 因为回调传值会变化,暂时只能用2个回调区别那个摄像头
CALL_BACK_FUN_0 = FrameInfoCallBack(image_callback_0)
CALL_BACK_FUN_1 = FrameInfoCallBack(image_callback_1)

def press_any_key_exit():
    fd = sys.stdin.fileno()
    old_ttyinfo = termios.tcgetattr(fd)
    new_ttyinfo = old_ttyinfo[:]
    new_ttyinfo[3] &= ~termios.ICANON
    new_ttyinfo[3] &= ~termios.ECHO
    #sys.stdout.write(msg)
    #sys.stdout.flush()
    termios.tcsetattr(fd, termios.TCSANOW, new_ttyinfo)
    try:
        os.read(fd, 7)
    except:
        pass
    finally:
        termios.tcsetattr(fd, termios.TCSANOW, old_ttyinfo)

def add_trigger_event_thread(cam=0, idx=None):
    print("add_trigger_event_thread")
    while True:
        if GRAB_RUN==False:
            break
        ret = cam.MV_CC_SetCommandValue("TriggerSoftware");
        if ret != 0:
            print("TriggerSoftware fail! ret[0x%x]" % ret)
            break
        time.sleep(2)

def start():
    global GRAB_RUN
    global CAM_LIST
    global DEVICE_NUM
    global USER_DATA_PY

    deviceList = MV_CC_DEVICE_INFO_LIST()
    tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE

    # 1 枚举设备 | en:Enum device
    ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList)
    if ret != 0:
        print("enum devices fail! ret[0x%x]" % ret)
        GRAB_RUN = False
        return
    if deviceList.nDeviceNum == 0:
        print("find no device!")
        GRAB_RUN = False
        return
    print("Find %d devices!" % deviceList.nDeviceNum)
    # 打印设备详情
    printDeviceInfo(deviceList)

    # 2 打开
    # 2.1 创建相机实例 | en:Creat Camera Object
    DEVICE_NUM = deviceList.nDeviceNum
    for i in range(0, DEVICE_NUM):
        CAM_LIST.append(MvCamera())
        # ch:选择设备并创建句柄| en:Select device and create handle
        stDeviceList = cast(deviceList.pDeviceInfo[int(i)], POINTER(MV_CC_DEVICE_INFO)).contents
        CAM_LIST[i].MV_CC_CreateHandle(stDeviceList)
        if ret != 0:
            print("create handle fail! ret[0x%x]" % ret)
            GRAB_RUN = False
            return

        # 2.2 打开设备 | en:Open device
        ret = CAM_LIST[i].MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
        if ret != 0:
            print("open device fail! ret[0x%x]" % ret)
            GRAB_RUN = False
            return

        ret = CAM_LIST[i].MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_CONTINUOUS)
        if ret != 0:
            print("set ExposureAuto fail! ret[0x%x]" % ret)
            GRAB_RUN = False
            return

        ret = CAM_LIST[i].MV_CC_SetEnumValue("GainAuto",MV_GAIN_MODE_CONTINUOUS)
        if ret != 0:
            print("set GainAuto fail! ret[0x%x]" % ret)
            GRAB_RUN = False
            return

        ret = CAM_LIST[i].MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_ON)
        #ret = CAM_LIST[i].MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
        if ret != 0:
            print('Enable trigger failed! [{0:#X}]'.format(ret))
            GRAB_RUN = False
            return False

        ret = CAM_LIST[i].MV_CC_SetEnumValue('TriggerSource', MV_TRIGGER_SOURCE_SOFTWARE)
        #ret = CAM_LIST[i].MV_CC_SetEnumValue('TriggerSource', MV_TRIGGER_SOURCE_LINE0)
        if ret != 0:
            print('Set trigger source failed! [{0:#X}]'.format(ret))
            GRAB_RUN = False
            return False

        # ret = CAM_LIST[i].MV_CC_SetEnumValue('TriggerActivation', 3)
        # if ret != 0:
        #     print('Set trigger source failed! [{0:#X}]'.format(ret))
        #     GRAB_RUN = False
        #     return False

        stStringValue = MVCC_STRINGVALUE()
        memset(byref(stStringValue), 0, sizeof(MVCC_STRINGVALUE))
        ret = CAM_LIST[i].MV_CC_GetStringValue("DeviceUserID", stStringValue)
        if ret != 0:
            print("获取 string 型数据 %s 失败 ! 报错码 ret[0x%x]" % ("DeviceUserID", ret))
            GRAB_RUN = False
            return False
        device_user_id = bytes.decode(stStringValue.chCurValue)
        print("device_user_id", device_user_id)

        USER_DATA_PY.append({
            "cam":CAM_LIST[i],
            "device_idx":i
        })
        #user_data = cast(pointer(py_object(i)), c_void_p)
        #user_data = cast(pointer(c_int(i)), c_void_p)
        #print("pass callback arg,i:",i)
        #user_data = pointer(c_int(i))

        #device_idx = (c_int)()
        #device_idx = i
        #print("pass callback arg,device_idx:",device_idx)
        #user_data = cast(pointer(c_int(device_idx)), c_void_p)
        user_data = cast(pointer(py_object(i)), c_void_p)
        if device_user_id == DEVICE_USER_ID_FRONT:
            call_back_fun = CALL_BACK_FUN_0
        elif device_user_id == DEVICE_USER_ID_BACK:
            call_back_fun = CALL_BACK_FUN_1
        ret = CAM_LIST[i].MV_CC_RegisterImageCallBackEx(call_back_fun,user_data)
        if ret != 0:
            print('Register callback failed! [{0:#X}]'.format(ret))
            GRAB_RUN = False
            return False


        hThreadHandle = threading.Thread(target=add_trigger_event_thread, args=(CAM_LIST[i],i))
        hThreadHandle.start()

        # try:
        # except:
        #     print ("error: unable to start thread")

        ret = CAM_LIST[i].MV_CC_StartGrabbing()
        if ret != 0:
            print('Start grabbing failed! [{0:#X}]'.format(ret))
            GRAB_RUN = False
            return False

def stop():
    global GRAB_RUN
    global CAM_LIST
    global DEVICE_NUM
    for i in range(0, DEVICE_NUM):
        # 5 关闭
        # 5.1 停止取流 | en:Stop grab image
        print("stop grabbing device index[%d]" % i)
        ret = CAM_LIST[i].MV_CC_StopGrabbing()
        if ret != 0:
            print("stop grabbing fail! ret[0x%x]" % ret)
            break
            #sys.exit()

        # 5.2 关闭设备 | Close device
        ret = CAM_LIST[i].MV_CC_CloseDevice()
        if ret != 0:
            print("close deivce fail! ret[0x%x]" % ret)
            break
            #sys.exit()

        # 6 销毁句柄 | Destroy handle
        ret = CAM_LIST[i].MV_CC_DestroyHandle()
        if ret != 0:
            print("destroy handle fail! ret[0x%x]" % ret)
            break
            #sys.exit()
    GRAB_RUN = False
start()
print ("press a key to stop grabbing.")
press_any_key_exit()
stop()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367
  • 368
  • 369
  • 370
  • 371
  • 372
  • 373
  • 374
  • 375
  • 376
  • 377
  • 378
  • 379
  • 380
  • 381
  • 382
  • 383
  • 384
  • 385
  • 386
  • 387
  • 388

硬件触发

改成硬件触发只需要修改一句
TriggerSource的MV_TRIGGER_SOURCE_SOFTWARE改为MV_TRIGGER_SOURCE_LINE0

ret = CAM_LIST[i].MV_CC_SetEnumValue('TriggerSource', MV_TRIGGER_SOURCE_LINE0)
  • 1

硬件触发接线图

1相机I/O管脚接口定义,线的颜色可能不同,但是功能是一样的

管脚信号I/O信号源说明线缆颜色
1DC_PWR相机电源
2OPTO_INLine 0+光耦隔离输入
3GPIOLine 2可配置输入或输出
4OPTO_OUTLine 1+光耦隔离输出
5OPTO_GNDLine 0/1-光耦隔离信号地绿
6GNDLine 2-相机电源地

触发硬件用光电开关,NPN设备,网上的接线图没有画出电阻的接线位置,实际电阻位置如下
在这里插入图片描述

AI目标检测接口

对yolov5的封装

# -*- coding: utf-8 -*-
#导入需要的库
import os
import sys
from pathlib import Path
import numpy as np
import cv2
import torch
import torch.backends.cudnn as cudnn
from tqdm import tqdm
import shutil
import io 

from models.common_2 import DetectMultiBackend
from utils.dataloaders import IMG_FORMATS, VID_FORMATS, LoadImages, LoadStreams
from utils.general import (LOGGER, check_file, check_img_size, check_imshow, check_requirements, colorstr,
                           increment_path, non_max_suppression, print_args, scale_boxes, strip_optimizer, xyxy2xywh)
from utils.plots import Annotator, colors, save_one_box
from utils.torch_utils import select_device, time_sync
#导入letterbox
from utils.augmentations import Albumentations, augment_hsv, copy_paste, letterbox, mixup, random_perspective

class U5():
    def __init__(self, weights="/weights/last.pt", conf_thres=0.25):
        #weights=ROOT / 'yolov5s.pt'  # 权重文件地址   .pt文件
        self.weights = weights
        #source=ROOT / 'data/images'  # 测试数据文件(图片或视频)的保存路径
        #data=ROOT / 'data/coco128.yaml'  # 标签文件地址   .yaml文件
        
        self.imgsz=(640, 640)  # 输入图片的大小 默认640(pixels)
        self.conf_thres=conf_thres  # object置信度阈值 默认0.25  用在nms中
        self.iou_thres=0.45  # 做nms的iou阈值 默认0.45   用在nms中
        self.max_det=1000  # 每张图片最多的目标数量  用在nms中
        device=''  # 设置代码执行的设备 cuda device, i.e. 0 or 0,1,2,3 or cpu
        self.classes=None  # 在nms中是否是只保留某些特定的类 默认是None 就是所有类只要满足条件都可以保留 --class 0, or --class 0 2 3
        self.agnostic_nms=False  # 进行nms是否也除去不同类别之间的框 默认False
        self.augment=False  # 预测是否也要采用数据增强 TTA 默认False
        self.visualize=False  # 特征图可视化 默认FALSE
        self.half=False  # 是否使用半精度 Float16 推理 可以缩短推理时间 但是默认是False
        self.dnn=False  # 使用OpenCV DNN进行ONNX推理

        # 获取设备
        self.device = select_device(device)


        # 载入模型
        # self.model = DetectMultiBackend(weights, device=device, dnn=self.dnn, data=data)
	# self.model = DetectMultiBackend(weights, device=self.device, dnn=self.dnn)
        w = str(weights[0] if isinstance(weights, list) else weights)
        print(type(w),w)
        source_file = open(w, "rb")
        content = source_file.read()
        weights_bytes = io.BytesIO(content)
        self.model = DetectMultiBackend(weights_bytes, model_type="pt", device=self.device, dnn=self.dnn)
        source_file.close()
        weights_bytes.close()        

        self.stride, self.names, self.pt, jit, onnx, engine = self.model.stride, self.model.names, self.model.pt, self.model.jit, self.model.onnx, self.model.engine
        imgsz = check_img_size(self.imgsz, s=self.stride)  # 检查图片尺寸
        print("names",self.names)

        # Half
        # 使用半精度 Float16 推理
        self.half &= (self.pt or jit or onnx or engine) and self.device.type != 'cpu'  # FP16 supported on limited backends with CUDA
        if self.pt or jit:
            self.model.model.half() if self.half else self.model.model.float()

        # 开始预测
        self.model.warmup(imgsz=(1, 3, *self.imgsz))  # warmup

    def detect(self, img):
        # Dataloader
        # 载入数据
        # dataset = LoadImages(source, img_size=imgsz, stride=stride, auto=pt)
    
        # Run inference
        dt, seen = [0.0, 0.0, 0.0], 0
    
        #对图片进行处理
        im0 = img
        # Padded resize
        im = letterbox(im0, self.imgsz, self.stride, auto=self.pt)[0]
        # Convert
        im = im.transpose((2, 0, 1))[::-1]  # HWC to CHW, BGR to RGB
        im = np.ascontiguousarray(im)
        t1 = time_sync()
        im = torch.from_numpy(im).to(self.device)
        im = im.half() if self.half else im.float()  # uint8 to fp16/32
        im /= 255  # 0 - 255 to 0.0 - 1.0
        if len(im.shape) == 3:
            im = im[None]  # expand for batch dim
        t2 = time_sync()
        dt[0] += t2 - t1
    
        # Inference
        # 预测
        pred = self.model(im, augment=self.augment, visualize=self.visualize)
        t3 = time_sync()
        dt[1] += t3 - t2
    
        # NMS
        pred = non_max_suppression(pred, self.conf_thres, self.iou_thres, self.classes, self.agnostic_nms, max_det=self.max_det)
        dt[2] += time_sync() - t3
        #print("pred:",pred)
    
        #用于存放结果
        detections=[]

        # 图片画框
        line_thickness = 3
        annotator = Annotator(im0, line_width=line_thickness, example=str(self.names))
        # Process predictions
        for i, det in enumerate(pred):  # per image 每张图片
            seen += 1
            # im0 = im0s.copy()
            gn = torch.tensor(im0.shape)[[1, 0, 1, 0]]  # normalization gain whwh
            if len(det):
                # Rescale boxes from img_size to im0 size
                det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], im0.shape).round()
                # Write results
                # 写入结果
                for *xyxy, conf, index in reversed(det):
                    xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4))/ gn).view(-1).tolist()
                    #line = (index, *xywh)
                    #print("line:",line)
                    #xywh = [round(x) for x in xywh]
                    #xywh = [xywh[0] - xywh[2] // 2, xywh[1] - xywh[3] // 2, xywh[2], xywh[3]]  # 检测到目标位置,格式:(left,top,w,h)

                    cls = self.names[int(index)]
                    conf = float(conf)
                    detections.append({'class': cls, 'conf': conf, 'position': xywh, 'index': int(index)})

                    #图片画框
                    annotator.box_label(xyxy, cls, color=colors(int(index), True))

            #cv2.imwrite("/home/wai/hik/code/test.jpg", im0)
        #输出结果
        #for i in detections:
        #    print(i)
    
        #推测的时间
        print(f'({t3 - t2:.3f}s)')
        return detections

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144

flask对http接口的实现

# -*- coding: utf-8 -*-
# +
from distutils.command.config import config
import sys
#sys.path.append('/home/wai/hik/yolo/yolov5_s/')
sys.path.append('yolov5_s/')

from flask import Flask, g, jsonify, make_response, request, render_template
import base64
import numpy as np
import cv2
from sdk import U5
import time

app = Flask(__name__)
#app.json.ensure_ascii = False


# 以下需要配置
#u5 = U5(weights = '/home/wai/hik/yolo/yolov5_s/runs/train/exp/weights/last.pt')
u5 = U5(weights = 'yolov5_s/runs/train/exp/weights/last.pt')
#根据车门类型获取正确的数量配置
SCAN_CODE_FILE = "/home/wai/hik/code/scan_code.txt"
CFG_CORRECT_ANSWER = [
    {
        "no":"000001",
        "cfg":{
            "front":{
                "screw_3":2,
                "lines":5,
                "block_1":1,
                "block_2":2
            },
            "back":{
                "screw_1":7,
                "screw_2":29,
                "wang":0
            }
        }
    },
    {
        "no":"000002",
        "cfg":{
            "front":{
                "screw_1":7,
                "screw_2":29,
                "wang":0
            },
            "back":{
                "screw_3":2,
                "lines":5,
                "block_1":1,
                "block_2":2
            }
        }
    },
]



def detections_count(detections,key):
    count = 0
    for item in detections:
        if item["class"] == key:
            count = count+1
    return count


@app.route('/api/ai_detection', methods=['POST'])
def ai_detection():
    print("==> ai_detection start")
    time_start = time.time()
    images = request.json.get('images')
    #print("type fileBase64",type(fileBase64),fileBase64)

    img_data = base64.b64decode(images[0][23:])
    img_array = np.frombuffer(img_data, np.uint8)
    img = cv2.imdecode(img_array, cv2.COLOR_RGB2BGR)
    #print("img",img)
    #cv2.imwrite("server_test_img.jpg", img)
    print('time cos detect 1:', time.time()-time_start, 's') 
    detections = u5.detect(img)
    print('time cos detect 2:', time.time()-time_start, 's') 
    print("detections",detections)
    images_id = request.json.get('images_id')

    # 读取扫码枪扫码的文件
    fileRead = open(SCAN_CODE_FILE, "r")
    scan_code = fileRead.read()
    fileRead.close()
    print("scan_code",scan_code)
    # 根据扫码的code获取正确配置
    cfg = None
    for item in CFG_CORRECT_ANSWER:
        if item["no"] == scan_code:
            cfg = item["cfg"]
            break
    #print("cfg",cfg)

    if cfg == None:
        result = {
            'boxes': [],
            'image': images[0],
            'image_id': images_id[0],
            'scene_name': 'detect'
        }
    else:
       
        # 根据camera_id判断正反面,1正面,2反面
        camera_id = images_id[0].split("-")[2]
        print("==> camera_id",camera_id)

        part_num = None
        if camera_id == "1":
            part_num = cfg["front"]
        elif camera_id == "2":
            part_num = cfg["back"]
        print("correct_cfg",part_num)

        good = 1
        if part_num != None:
            for key in part_num:
                value = part_num[key]
                dc = detections_count(detections,key)
                print("==> ",key,"ai_count",dc,"correct_count",value)
                if value!=dc:
                    good = 0
                    break

        # 图片中间画出结果
        height, width = img.shape[0:2]

        if camera_id == "1":
            result_pos = (width-300, 200)
        elif camera_id == "2":
            result_pos = (100, 200)

        if good==0:
            result_text = "NG"
            result_color = (0,0,255)
        elif good==1:
            result_text = "OK"
            result_color = (0,128,0)

        #print("height,width",height,width)
        cv2.putText(img, result_text, result_pos, cv2.FONT_HERSHEY_COMPLEX, 5.0, result_color, 10)
        # print("img",img)
        img = cv2.imencode('.jpg', img)[1]
        image_result_base64 = 'data:image/jpeg;base64,' + str(base64.b64encode(img))[2:-1]

        result = {
            'boxes': [],
            'image': image_result_base64,
            'image_id': images_id[0],
            'good': good
        }
    #return jsonify({'code': 200, 'msg': '检测成功', 'detections': detections})
    result_no_image = result.copy()
    result_no_image["image"]="base64"
    #print("result_no_image",result_no_image)
    print('time cos detect 3:', time.time()-time_start, 's') 
    print("")
    return jsonify(result)

# 运行代码
if __name__ == '__main__':
    app.run(host='0.0.0.0',port=5000)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/754005
推荐阅读
相关标签
  

闽ICP备14008679号