当前位置:   article > 正文

Python 调用海康机器人工业相机_python调用海康工业相机

python调用海康工业相机

一、前期准备

1、python环境搭建

2、安装MVS软件

3、网上博客参考
1)RTSP(Runtime Stream Protocol)协议方向
(很遗憾,此路不通!!!)
因为我们用的 海康机器人工业相机 MV-CU060-10GM 这款相机,不支持 RTSP 协议

如下博客适用于 海康威视摄像头,并不适用于 海康工业相机,如果是使用海康威视摄像头的小伙伴可以参考下。

参考博客:海康威视摄像头对接SDK实时预览功能和抓拍功能,懒癌福利,可直接CV

2)Java实现方向
(不能完全满足客户需求,此路不全通!!!)

使用Java目前参考官网的示例,实现了图片抓取,并上传的功能,但是没有实现视频流的实时获取和显示的功能。

如果需求只是获取图片,不要求视频流实时显示,可以通过Java就可以实现。

3)Python实现方向
(目前,网上没有直接能完全满足上述三个需求的,本人通过借鉴、整合,结合Flask框架实现的。)

参考博客如下:
python语言下使用opencv接口cv2.VideoCapture()接口调用海康机器人工业相机 (此篇博文可以重点看!!!)

通过python调用海康威视工业摄像头并进行图像存储,同时使用opencv实时图像显示(数据流问题已解决)

python调用海康工业相机并用opencv显示(整体实现)(此篇博文可以重点看!!!)

pyQT5 学习使用 笔记 六 pyQt5+opencv 显示海康GIGE相机动态视频流 (该方式虽然实现了视频的实时显示,但是,无法被给前端直接调用)

web实时显示摄像头图像(python) (此篇博文可以重点看!!!)

4)Flask 的相关教程和博客
https://www.w3cschool.cn/flask/
https://dormousehole.readthedocs.io/en/latest/
https://blog.csdn.net/weixin_44239541/article/details/89390139
https://zhuanlan.zhihu.com/p/104273184
https://blog.csdn.net/tulan_xiaoxin/article/details/79132214

二、Python 代码实现

1、测试代码

1)抓取图片测试代码

前提条件:将 *C:\Program Files (x86)\MVS\Development\Samples\Python* 目录下的 MvImport 目录,copy到自己的工程目录下,创建测试文件 TestGrabImage.py

代码如下:

  1. # -- coding: utf-8 --
  2. import cv2
  3. import sys
  4. import copy
  5. import msvcrt
  6. import numpy as np
  7. from ctypes import *
  8. sys.path.append("./MvImport")
  9. from MvCameraControl_class import *
  10. if __name__ == "__main__":
  11. deviceList = MV_CC_DEVICE_INFO_LIST()
  12. tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE
  13. # ch:枚举设备 | en:Enum device
  14. ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList)
  15. if ret != 0:
  16. print ("enum devices fail! ret[0x%x]" % ret)
  17. sys.exit()
  18. if deviceList.nDeviceNum == 0:
  19. print ("find no device!")
  20. sys.exit()
  21. print ("find %d devices!" % deviceList.nDeviceNum)
  22. for i in range(0, deviceList.nDeviceNum):
  23. mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
  24. if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
  25. print ("\ngige device: [%d]" % i)
  26. strModeName = ""
  27. for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
  28. strModeName = strModeName + chr(per)
  29. print ("device model name: %s" % strModeName)
  30. nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
  31. nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
  32. nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
  33. nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
  34. print ("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4))
  35. elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
  36. print ("\nu3v device: [%d]" % i)
  37. strModeName = ""
  38. for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
  39. if per == 0:
  40. break
  41. strModeName = strModeName + chr(per)
  42. print ("device model name: %s" % strModeName)
  43. strSerialNumber = ""
  44. for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
  45. if per == 0:
  46. break
  47. strSerialNumber = strSerialNumber + chr(per)
  48. print ("user serial number: %s" % strSerialNumber)
  49. nConnectionNum = 0
  50. if int(nConnectionNum) >= deviceList.nDeviceNum:
  51. print ("intput error!")
  52. sys.exit()
  53. # ch:创建相机实例 | en:Creat Camera Object
  54. cam = MvCamera()
  55. # ch:选择设备并创建句柄 | en:Select device and create handle
  56. stDeviceList = cast(deviceList.pDeviceInfo[int(nConnectionNum)], POINTER(MV_CC_DEVICE_INFO)).contents
  57. ret = cam.MV_CC_CreateHandle(stDeviceList)
  58. if ret != 0:
  59. print ("create handle fail! ret[0x%x]" % ret)
  60. sys.exit()
  61. # ch:打开设备 | en:Open device
  62. ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
  63. if ret != 0:
  64. print ("open device fail! ret[0x%x]" % ret)
  65. sys.exit()
  66. # ch:探测网络最佳包大小(只对GigE相机有效) | en:Detection network optimal package size(It only works for the GigE camera)
  67. if stDeviceList.nTLayerType == MV_GIGE_DEVICE:
  68. nPacketSize = cam.MV_CC_GetOptimalPacketSize()
  69. if int(nPacketSize) > 0:
  70. ret = cam.MV_CC_SetIntValue("GevSCPSPacketSize", nPacketSize)
  71. if ret != 0:
  72. print ("Warning: Set Packet Size fail! ret[0x%x]" % ret)
  73. else:
  74. print ("Warning: Get Packet Size fail! ret[0x%x]" % nPacketSize)
  75. # ch:设置触发模式为off | en:Set trigger mode as off
  76. ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
  77. if ret != 0:
  78. print ("set trigger mode fail! ret[0x%x]" % ret)
  79. sys.exit()
  80. # ch:获取数据包大小 | en:Get payload size
  81. stParam = MVCC_INTVALUE()
  82. memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))
  83. ret = cam.MV_CC_GetIntValue("PayloadSize", stParam)
  84. if ret != 0:
  85. print ("get payload size fail! ret[0x%x]" % ret)
  86. sys.exit()
  87. nPayloadSize = stParam.nCurValue
  88. # ch:开始取流 | en:Start grab image
  89. ret = cam.MV_CC_StartGrabbing()
  90. if ret != 0:
  91. print ("start grabbing fail! ret[0x%x]" % ret)
  92. sys.exit()
  93. stDeviceList = MV_FRAME_OUT_INFO_EX()
  94. memset(byref(stDeviceList), 0, sizeof(stDeviceList))
  95. data_buf = (c_ubyte * nPayloadSize)()
  96. ret = cam.MV_CC_GetOneFrameTimeout(byref(data_buf), nPayloadSize, stDeviceList, 1000)
  97. if ret == 0:
  98. print ("get one frame: Width[%d], Height[%d], nFrameNum[%d]" % (stDeviceList.nWidth, stDeviceList.nHeight, stDeviceList.nFrameNum))
  99. nRGBSize = stDeviceList.nWidth * stDeviceList.nHeight * 3
  100. stConvertParam=MV_SAVE_IMAGE_PARAM_EX()
  101. stConvertParam.nWidth = stDeviceList.nWidth
  102. stConvertParam.nHeight = stDeviceList.nHeight
  103. stConvertParam.pData = data_buf
  104. stConvertParam.nDataLen = stDeviceList.nFrameLen
  105. stConvertParam.enPixelType = stDeviceList.enPixelType
  106. stConvertParam.nImageLen = stConvertParam.nDataLen
  107. stConvertParam.nJpgQuality = 70
  108. stConvertParam.enImageType = MV_Image_Jpeg
  109. stConvertParam.pImageBuffer = (c_ubyte * nRGBSize)()
  110. stConvertParam.nBufferSize = nRGBSize
  111. # ret = cam.MV_CC_ConvertPixelType(stConvertParam)
  112. print(stConvertParam.nImageLen)
  113. ret = cam.MV_CC_SaveImageEx2(stConvertParam)
  114. if ret != 0:
  115. print ("convert pixel fail ! ret[0x%x]" % ret)
  116. del data_buf
  117. sys.exit()
  118. file_path = "AfterConvert_RGB2.jpg"
  119. file_open = open(file_path.encode('ascii'), 'wb+')
  120. img_buff = (c_ubyte * stConvertParam.nImageLen)()
  121. cdll.msvcrt.memcpy(byref(img_buff), stConvertParam.pImageBuffer, stConvertParam.nImageLen)
  122. file_open.write(img_buff)
  123. print ("Save Image succeed!")
  124. # ch:停止取流 | en:Stop grab image
  125. ret = cam.MV_CC_StopGrabbing()
  126. if ret != 0:
  127. print ("stop grabbing fail! ret[0x%x]" % ret)
  128. del data_buf
  129. sys.exit()
  130. # ch:关闭设备 | Close device
  131. ret = cam.MV_CC_CloseDevice()
  132. if ret != 0:
  133. print ("close deivce fail! ret[0x%x]" % ret)
  134. del data_buf
  135. sys.exit()
  136. # ch:销毁句柄 | Destroy handle
  137. ret = cam.MV_CC_DestroyHandle()
  138. if ret != 0:
  139. print ("destroy handle fail! ret[0x%x]" % ret)
  140. del data_buf
  141. sys.exit()
  142. del data_buf

2)Python+Qt 实现视频流实时显示测试代码
前提条件:将 *C:\Program Files (x86)\MVS\Development\Samples\Python* 目录下的 MvImport 目录,copy到自己的工程目录下,创建测试文件 TestVideoStream.py

代码如下:

  1. # -- coding: utf-8 --
  2. import cv2
  3. from PyQt5 import QtCore, QtGui, QtWidgets
  4. from PyQt5.QtCore import pyqtSignal
  5. from PyQt5.QtGui import *
  6. import numpy as np
  7. #from CameraControl_header import MV_CC_DEVICE_INFO_LIST
  8. #from mainWindow import Ui_MainWindow # 导入创建的GUI类
  9. import sys
  10. import threading
  11. import msvcrt
  12. from ctypes import *
  13. sys.path.append("./MvImport")
  14. from MvCameraControl_class import *
  15. from Ui_MainWindow import *
  16. from CameraParams_header import *
  17. class mywindow(QtWidgets.QMainWindow, Ui_MainWindow):
  18. sendAddDeviceName = pyqtSignal() #定义一个添加设备列表的信号。
  19. deviceList = MV_CC_DEVICE_INFO_LIST()
  20. g_bExit = False
  21. # ch:创建相机实例 | en:Creat Camera Object
  22. cam = MvCamera()
  23. def connect_and_emit_sendAddDeviceName(self):
  24. # Connect the sendAddDeviceName signal to a slot.
  25. self.sendAddDeviceName.connect(self.SelectDevice)
  26. # Emit the signal.
  27. self.sendAddDeviceName.emit()
  28. def __init__(self):
  29. super(mywindow, self).__init__()
  30. self.setupUi(self)
  31. self.connect_and_emit_sendAddDeviceName()
  32. self.butopenCam.clicked.connect(lambda:self.openCam(self.camSelect.currentData()))
  33. self.butcloseCam.clicked.connect(self.closeCam)
  34. # setting main window geometry
  35. desktop_geometry = QtWidgets.QApplication.desktop() # 获取屏幕大小
  36. main_window_width = desktop_geometry.width() # 屏幕的宽
  37. main_window_height = desktop_geometry.height() # 屏幕的高
  38. rect = self.geometry() # 获取窗口界面大小
  39. window_width = rect.width() # 窗口界面的宽
  40. window_height = rect.height() # 窗口界面的高
  41. x = (main_window_width - window_width) // 2 # 计算窗口左上角点横坐标
  42. y = (main_window_height - window_height) // 2 # 计算窗口左上角点纵坐标
  43. self.setGeometry(x, y, window_width, window_height) # 设置窗口界面在屏幕上的位置
  44. # 无边框以及背景透明一般不会在主窗口中用到,一般使用在子窗口中,例如在子窗口中显示gif提示载入信息等等
  45. # self.setWindowFlags(Qt.FramelessWindowHint)
  46. # self.setAttribute(Qt.WA_TranslucentBackground)
  47. #打开摄像头。
  48. def openCam(self,camid):
  49. self.g_bExit = False
  50. # ch:选择设备并创建句柄 | en:Select device and create handle
  51. stDeviceList = cast(self.deviceList.pDeviceInfo[int(camid)], POINTER(MV_CC_DEVICE_INFO)).contents
  52. ret = self.cam.MV_CC_CreateHandle(stDeviceList)
  53. if ret != 0:
  54. print("create handle fail! ret[0x%x]" % ret)
  55. sys.exit()
  56. # ch:打开设备 | en:Open device
  57. ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
  58. if ret != 0:
  59. print("open device fail! ret[0x%x]" % ret)
  60. sys.exit()
  61. # ch:探测网络最佳包大小(只对GigE相机有效) | en:Detection network optimal package size(It only works for the GigE camera)
  62. if stDeviceList.nTLayerType == MV_GIGE_DEVICE:
  63. nPacketSize = self.cam.MV_CC_GetOptimalPacketSize()
  64. if int(nPacketSize) > 0:
  65. ret = self.cam.MV_CC_SetIntValue("GevSCPSPacketSize", nPacketSize)
  66. if ret != 0:
  67. print("Warning: Set Packet Size fail! ret[0x%x]" % ret)
  68. else:
  69. print("Warning: Get Packet Size fail! ret[0x%x]" % nPacketSize)
  70. # ch:设置触发模式为off | en:Set trigger mode as off
  71. ret = self.cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
  72. if ret != 0:
  73. print("set trigger mode fail! ret[0x%x]" % ret)
  74. sys.exit()
  75. # ch:获取数据包大小 | en:Get payload size
  76. stParam = MVCC_INTVALUE()
  77. memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))
  78. ret = self.cam.MV_CC_GetIntValue("PayloadSize", stParam)
  79. if ret != 0:
  80. print("get payload size fail! ret[0x%x]" % ret)
  81. sys.exit()
  82. nPayloadSize = stParam.nCurValue
  83. # ch:开始取流 | en:Start grab image
  84. ret = self.cam.MV_CC_StartGrabbing()
  85. if ret != 0:
  86. print("start grabbing fail! ret[0x%x]" % ret)
  87. sys.exit()
  88. data_buf = (c_ubyte * nPayloadSize)()
  89. try:
  90. hThreadHandle = threading.Thread(target=self.work_thread, args=(self.cam, data_buf, nPayloadSize))
  91. hThreadHandle.start()
  92. except:
  93. print("error: unable to start thread")
  94. #关闭相机
  95. def closeCam(self):
  96. self.g_bExit=True
  97. # ch:停止取流 | en:Stop grab image
  98. ret = self.cam.MV_CC_StopGrabbing()
  99. if ret != 0:
  100. print("stop grabbing fail! ret[0x%x]" % ret)
  101. sys.exit()
  102. # ch:关闭设备 | Close device
  103. ret = self.cam.MV_CC_CloseDevice()
  104. if ret != 0:
  105. print("close deivce fail! ret[0x%x]" % ret)
  106. # ch:销毁句柄 | Destroy handle
  107. ret = self.cam.MV_CC_DestroyHandle()
  108. if ret != 0:
  109. print("destroy handle fail! ret[0x%x]" % ret)
  110. def work_thread(self,cam=0, pData=0, nDataSize=0):
  111. stFrameInfo = MV_FRAME_OUT_INFO_EX()
  112. memset(byref(stFrameInfo), 0, sizeof(stFrameInfo))
  113. while True:
  114. QIm = np.asarray(pData) # 将c_ubyte_Array转化成ndarray得到(3686400,)
  115. QIm = QIm.reshape((2048, 3072, 1)) # 根据自己分辨率进行转化
  116. # print(temp)
  117. # print(temp.shape)
  118. QIm = cv2.cvtColor(QIm, cv2.COLOR_BGR2RGB) # 这一步获取到的颜色不对,因为默认是BRG,要转化成RGB,颜色才正常
  119. pyrD1=cv2.pyrDown(QIm) #向下取样
  120. pyrD2 = cv2.pyrDown(pyrD1) # 向下取样
  121. image_height, image_width, image_depth = pyrD2.shape # 读取图像高宽深度
  122. pyrD3 = QImage(pyrD2, image_width, image_height, image_width * image_depth,QImage.Format_RGB888)
  123. self.label.setPixmap(QPixmap.fromImage(pyrD3))
  124. #cv2.namedWindow("result", cv2.WINDOW_AUTOSIZE)
  125. #cv2.imshow("result", temp)
  126. #if cv2.waitKey(1) & 0xFF == ord('q'):
  127. # break
  128. ret = cam.MV_CC_GetOneFrameTimeout(pData, nDataSize, stFrameInfo, 1000)
  129. if ret == 0:
  130. print("get one frame: Width[%d], Height[%d], nFrameNum[%d]" % (
  131. stFrameInfo.nWidth, stFrameInfo.nHeight, stFrameInfo.nFrameNum))
  132. else:
  133. print("no data[0x%x]" % ret)
  134. if self.g_bExit == True:
  135. del pData
  136. break
  137. #获得所有相机的列表存入cmbSelectDevice中
  138. def SelectDevice(self):
  139. '''选择所有能用的相机到列表中,
  140. gige相机需要配合 sdk 得到。
  141. '''
  142. #得到相机列表
  143. tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE
  144. # ch:枚举设备 | en:Enum device
  145. ret = MvCamera.MV_CC_EnumDevices(tlayerType, self.deviceList)
  146. if ret != 0:
  147. print("enum devices fail! ret[0x%x]" % ret)
  148. sys.exit()
  149. if self.deviceList.nDeviceNum == 0:
  150. print("find no device!")
  151. sys.exit()
  152. print("Find %d devices!" % self.deviceList.nDeviceNum)
  153. for i in range(0, self.deviceList.nDeviceNum):
  154. mvcc_dev_info = cast(self.deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
  155. if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
  156. print("\ngige device: [%d]" % i)
  157. strModeName = ""
  158. for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
  159. strModeName = strModeName + chr(per)
  160. print("device model name: %s" % strModeName)
  161. self.camSelect.addItem(strModeName,i) #写入设备列表。
  162. def pushbutton_function(self):
  163. #do some things
  164. Img=cv2.imread('JP1.JPG') #通过opencv读入一张图片
  165. image_height, image_width, image_depth=Img.shape #读取图像高宽深度
  166. QIm=cv2.cvtColor(Img,cv2.COLOR_BGR2RGB)
  167. QIm=QImage(QIm.data, image_width, image_height, image_width * image_depth,QImage.Format_RGB888)
  168. self.label.setPixmap(QPixmap.fromImage(QIm))
  169. if __name__ == '__main__':
  170. app = QtWidgets.QApplication(sys.argv)
  171. window = mywindow()
  172. window.show()
  173. sys.exit(app.exec_())

上述代码中用到的 Ui_MainWindow.py,放到 MvImport 目录下。

代码如下:

  1. # -*- coding: utf-8 -*-
  2. # Form implementation generated from reading ui file 'mainWindow.ui'
  3. #
  4. # Created by: PyQt5 UI code generator 5.15.0
  5. #
  6. # WARNING: Any manual changes made to this file will be lost when pyuic5 is
  7. # run again. Do not edit this file unless you know what you are doing.
  8. from PyQt5 import QtCore, QtGui, QtWidgets
  9. class Ui_MainWindow(object):
  10. def setupUi(self, MainWindow):
  11. MainWindow.setObjectName("MainWindow")
  12. MainWindow.resize(589, 530)
  13. self.centralwidget = QtWidgets.QWidget(MainWindow)
  14. self.centralwidget.setObjectName("centralwidget")
  15. self.verticalLayout = QtWidgets.QVBoxLayout(self.centralwidget)
  16. self.verticalLayout.setObjectName("verticalLayout")
  17. self.camSelect = QtWidgets.QComboBox(self.centralwidget)
  18. self.camSelect.setObjectName("camSelect")
  19. self.verticalLayout.addWidget(self.camSelect)
  20. self.label = QtWidgets.QLabel(self.centralwidget)
  21. self.label.setObjectName("label")
  22. self.verticalLayout.addWidget(self.label)
  23. self.butopenCam = QtWidgets.QPushButton(self.centralwidget)
  24. self.butopenCam.setObjectName("butopenCam")
  25. self.verticalLayout.addWidget(self.butopenCam)
  26. self.butcloseCam = QtWidgets.QPushButton(self.centralwidget)
  27. self.butcloseCam.setObjectName("butcloseCam")
  28. self.verticalLayout.addWidget(self.butcloseCam)
  29. MainWindow.setCentralWidget(self.centralwidget)
  30. self.menubar = QtWidgets.QMenuBar(MainWindow)
  31. self.menubar.setGeometry(QtCore.QRect(0, 0, 589, 23))
  32. self.menubar.setObjectName("menubar")
  33. MainWindow.setMenuBar(self.menubar)
  34. self.statusbar = QtWidgets.QStatusBar(MainWindow)
  35. self.statusbar.setObjectName("statusbar")
  36. MainWindow.setStatusBar(self.statusbar)
  37. self.retranslateUi(MainWindow)
  38. QtCore.QMetaObject.connectSlotsByName(MainWindow)
  39. def retranslateUi(self, MainWindow):
  40. _translate = QtCore.QCoreApplication.translate
  41. MainWindow.setWindowTitle(_translate("MainWindow", "MainWindow"))
  42. self.label.setText(_translate("MainWindow", "TextLabel"))
  43. self.butopenCam.setText(_translate("MainWindow", "打开相机"))
  44. self.butcloseCam.setText(_translate("MainWindow", "关闭相机"))

四、使用 Python + OpenCV + Flask 实现,可以满足视频实时获取,并返回通过GET请求返回给前端进行实时显示,也可以抓取图片,保存上传。

1、添加依赖文件
将 *C:\Program Files (x86)\MVS\Development\Samples\Python* 目录下的 MvImport 目录,copy到自己的工程目录下。

2、安装插件 DirectShow
1)进入第三方插件 DirectShow 路径

cd C:\Program Files    (x86)\MVS\Development\ThirdPartyPlatformAdapter\DirectShow\x64\MvDSS2
 


2)安装 DirectShow
使用 管理员权限 运行 InstallDSSvc_x64.bat

在这里插入图片描述

0)index.html

在工程中创建 templates 目录,用于存放 index.hml

<html>
  <head>
  </head>
  <body>
      <h1>拍照预览</h1>
    <img src="{{ url_for('startPreview') }}" width="50%">
  </body>
</html>

1)JsonResponse.py

用于规范返回给前端的数据类型,代码如下:

  1. # -- coding: utf-8 --
  2. class JsonResponse(object):
  3. """
  4. 统一的json返回格式
  5. """
  6. def __init__(self, code, msg, data):
  7. self.code = code
  8. self.msg = msg
  9. self.data = data
  10. @classmethod
  11. def success(cls, code=0, msg='success', data=None):
  12. return cls(code, msg, data)
  13. @classmethod
  14. def error(cls, code=-1, msg='error', data=None):
  15. return cls(code, msg, data)
  16. def to_dict(self):
  17. return {
  18. "code": self.code,
  19. "msg": self.msg,
  20. "data": self.data
  21. }

2)JsonFlask.py

用于重定义数据返回格式,代码如下:

  1. # -- coding: utf-8 --
  2. from flask import Flask, jsonify
  3. from JsonResponse import *
  4. class JsonFlask(Flask):
  5. def make_response(self, rv):
  6. """视图函数可以直接返回: list、dict、None"""
  7. if rv is None or isinstance(rv, (list, dict)):
  8. rv = JsonResponse.success(rv)
  9. if isinstance(rv, JsonResponse):
  10. rv = jsonify(rv.to_dict())
  11. return super().make_response(rv)

3)HikRobotCamera.py

核心代码文件,实现如下功能:
开始预览视频流停止预览视频流获取图片记录日志等功能。

代码如下

  1. # -- coding: utf-8 --
  2. import cv2
  3. from flask import Flask, render_template, Response
  4. import sys
  5. import msvcrt
  6. import base64
  7. import datetime
  8. import logging
  9. sys.path.append("./MvImport")
  10. from MvCameraControl_class import *
  11. from JsonResponse import *
  12. from JsonFlask import *
  13. logging.basicConfig(level=logging.DEBUG,#控制台打印的日志级别
  14. filename='hikrobot.log',
  15. filemode='a',##模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
  16. #a是追加模式,默认如果不写的话,就是追加模式
  17. format=
  18. '%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
  19. #日志格式
  20. )
  21. # 这里配置一下 template_folder为当前目录,不然可以找不到 index.html
  22. app = JsonFlask(__name__, template_folder='.')
  23. # index
  24. @app.route('/')
  25. def index():
  26. return render_template('./templates/index.html')
  27. # 获取码流
  28. def generate(cap):
  29. # 捕获异常信息
  30. try:
  31. while True:
  32. # 如果是关闭相机,先退出取视频流的循环
  33. global open
  34. if (not open):
  35. break;
  36. retgrab = cap.grab()
  37. if retgrab == True:
  38. logging.debug("Grab true")
  39. ret1, frame = cap.retrieve()
  40. # print(type(frame))
  41. if frame is None:
  42. logging.error("frame is None")
  43. continue
  44. ret1, jpeg = cv2.imencode('.jpg', frame)
  45. jpg_frame = jpeg.tobytes()
  46. yield (b'--frame\r\n'
  47. b'Content-Type: image/jpeg\r\n\r\n' + jpg_frame + b'\r\n')
  48. except Exception as e:
  49. logging.error("generate error: %s" % str(e))
  50. # 开始预览
  51. @app.route('/startPreview')
  52. def startPreview():
  53. logging.info("======================================")
  54. logging.info("start to preview video stream, current_time: " + str(datetime.datetime.now()))
  55. # 全局变量,用于控制获取视频流的开关状态
  56. global open
  57. open = True
  58. # 全局变量,获取视频连接
  59. global cap
  60. cap = cv2.VideoCapture(1)
  61. if False == cap.isOpened():
  62. logging.error("can't open camera")
  63. quit()
  64. else:
  65. logging.info("start to open camera")
  66. logging.info("open camera ok")
  67. # 分辨率设置 3072*2048(海康机器人工业相机 MV-CU060-10GM)
  68. cap.set(cv2.CAP_PROP_FRAME_WIDTH, 3072)
  69. cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 2048)
  70. # 帧率配置
  71. cap.set(cv2.CAP_PROP_FPS, 15)
  72. return Response(generate(cap), mimetype='multipart/x-mixed-replace;boundary=frame')
  73. # 停止预览
  74. @app.route('/stopPreview')
  75. def stopPreview():
  76. logging.info("======================================")
  77. logging.info("stop to preview video stream, current_time: " + str(datetime.datetime.now()))
  78. logging.info("start to close camera")
  79. # 全局变量,用于停止循环
  80. global open
  81. open = False
  82. logging.info("release resources start")
  83. # 全局变量,用于释放相机资源
  84. try:
  85. global cap
  86. cap.release()
  87. cv2.destroyAllWindows()
  88. except Exception as e:
  89. logging.error("stopPreview error: %s" % str(e))
  90. logging.info("release resources end")
  91. logging.info("camera closed successfully, current_time: " + str(datetime.datetime.now()))
  92. logging.info("======================================")
  93. return "stop to preview"
  94. @app.route('/openAndSave')
  95. def openAndSave():
  96. logging.info("======================================")
  97. logging.info("start to grab image, current_time: " + str(datetime.datetime.now()))
  98. code = 100000
  99. msg = "连接相机时发生错误"
  100. # img_base64 = None
  101. try:
  102. deviceList = MV_CC_DEVICE_INFO_LIST()
  103. tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE
  104. # ch:枚举设备 | en:Enum device
  105. ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList)
  106. if ret != 0:
  107. logging.error("enum devices fail! ret[0x%x]" % ret)
  108. sys.exit()
  109. if deviceList.nDeviceNum == 0:
  110. logging.error("find no device!")
  111. sys.exit()
  112. logging.info("find %d devices!" % deviceList.nDeviceNum)
  113. for i in range(0, deviceList.nDeviceNum):
  114. mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
  115. if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
  116. logging.info("\ngige device: [%d]" % i)
  117. strModeName = ""
  118. for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
  119. strModeName = strModeName + chr(per)
  120. logging.info("device model name: %s" % strModeName)
  121. nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
  122. nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
  123. nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
  124. nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
  125. logging.info("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4))
  126. elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
  127. logging.info("\nu3v device: [%d]" % i)
  128. strModeName = ""
  129. for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
  130. if per == 0:
  131. break
  132. strModeName = strModeName + chr(per)
  133. logging.info("device model name: %s" % strModeName)
  134. strSerialNumber = ""
  135. for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
  136. if per == 0:
  137. break
  138. strSerialNumber = strSerialNumber + chr(per)
  139. logging.info("user serial number: %s" % strSerialNumber)
  140. nConnectionNum = 0
  141. if int(nConnectionNum) >= deviceList.nDeviceNum:
  142. logging.error("intput error!")
  143. sys.exit()
  144. # ch:创建相机实例 | en:Creat Camera Object
  145. cam = MvCamera()
  146. # ch:选择设备并创建句柄 | en:Select device and create handle
  147. stDeviceList = cast(deviceList.pDeviceInfo[int(nConnectionNum)], POINTER(MV_CC_DEVICE_INFO)).contents
  148. ret = cam.MV_CC_CreateHandle(stDeviceList)
  149. if ret != 0:
  150. logging.error("create handle fail! ret[0x%x]" % ret)
  151. sys.exit()
  152. # ch:打开设备 | en:Open device
  153. ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
  154. if ret != 0:
  155. logging.error("open device fail! ret[0x%x]" % ret)
  156. sys.exit()
  157. # ch:探测网络最佳包大小(只对GigE相机有效) | en:Detection network optimal package size(It only works for the GigE camera)
  158. if stDeviceList.nTLayerType == MV_GIGE_DEVICE:
  159. nPacketSize = cam.MV_CC_GetOptimalPacketSize()
  160. if int(nPacketSize) > 0:
  161. ret = cam.MV_CC_SetIntValue("GevSCPSPacketSize", nPacketSize)
  162. if ret != 0:
  163. logging.warn("Warning: Set Packet Size fail! ret[0x%x]" % ret)
  164. else:
  165. logging.warn("Warning: Get Packet Size fail! ret[0x%x]" % nPacketSize)
  166. # ch:设置触发模式为off | en:Set trigger mode as off
  167. ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
  168. if ret != 0:
  169. logging.error("set trigger mode fail! ret[0x%x]" % ret)
  170. sys.exit()
  171. # ch:获取数据包大小 | en:Get payload size
  172. stParam = MVCC_INTVALUE()
  173. memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))
  174. ret = cam.MV_CC_GetIntValue("PayloadSize", stParam)
  175. if ret != 0:
  176. logging.error("get payload size fail! ret[0x%x]" % ret)
  177. sys.exit()
  178. nPayloadSize = stParam.nCurValue
  179. # ch:开始取流 | en:Start grab image
  180. ret = cam.MV_CC_StartGrabbing()
  181. if ret != 0:
  182. logging.error("start grabbing fail! ret[0x%x]" % ret)
  183. sys.exit()
  184. stDeviceList = MV_FRAME_OUT_INFO_EX()
  185. memset(byref(stDeviceList), 0, sizeof(stDeviceList))
  186. data_buf = (c_ubyte * nPayloadSize)()
  187. ret = cam.MV_CC_GetOneFrameTimeout(byref(data_buf), nPayloadSize, stDeviceList, 1000)
  188. if ret == 0:
  189. logging.info("get one frame: Width[%d], Height[%d], nFrameNum[%d]" % (stDeviceList.nWidth, stDeviceList.nHeight, stDeviceList.nFrameNum))
  190. nRGBSize = stDeviceList.nWidth * stDeviceList.nHeight * 3
  191. stConvertParam=MV_SAVE_IMAGE_PARAM_EX()
  192. stConvertParam.nWidth = stDeviceList.nWidth
  193. stConvertParam.nHeight = stDeviceList.nHeight
  194. stConvertParam.pData = data_buf
  195. stConvertParam.nDataLen = stDeviceList.nFrameLen
  196. stConvertParam.enPixelType = stDeviceList.enPixelType
  197. stConvertParam.nImageLen = stConvertParam.nDataLen
  198. stConvertParam.nJpgQuality = 70
  199. stConvertParam.enImageType = MV_Image_Jpeg
  200. stConvertParam.pImageBuffer = (c_ubyte * nRGBSize)()
  201. stConvertParam.nBufferSize = nRGBSize
  202. # ret = cam.MV_CC_ConvertPixelType(stConvertParam)
  203. logging.info("nImageLen: %d" % stConvertParam.nImageLen)
  204. ret = cam.MV_CC_SaveImageEx2(stConvertParam)
  205. if ret != 0:
  206. logging.error("convert pixel fail ! ret[0x%x]" % ret)
  207. del data_buf
  208. sys.exit()
  209. #file_path = "AfterConvert_RGB2.jpg"
  210. #file_open = open(file_path, 'wb+')
  211. #file_open = open(file_path.encode('utf8'), 'wb')
  212. img_buff = (c_ubyte * stConvertParam.nImageLen)()
  213. cdll.msvcrt.memcpy(byref(img_buff), stConvertParam.pImageBuffer, stConvertParam.nImageLen)
  214. #file_open.write(img_buff)
  215. # 对返回的图片进行 base64 格式转换
  216. img_base64 = "data:image/jpg;base64," + str(base64.b64encode(img_buff)).split("'")[1]
  217. code = 200
  218. msg = "success"
  219. logging.info("Save Image succeed!")
  220. # ch:停止取流 | en:Stop grab image
  221. ret = cam.MV_CC_StopGrabbing()
  222. if ret != 0:
  223. logging.error("stop grabbing fail! ret[0x%x]" % ret)
  224. del data_buf
  225. sys.exit()
  226. # ch:关闭设备 | Close device
  227. ret = cam.MV_CC_CloseDevice()
  228. if ret != 0:
  229. logging.error("close deivce fail! ret[0x%x]" % ret)
  230. del data_buf
  231. sys.exit()
  232. # ch:销毁句柄 | Destroy handle
  233. ret = cam.MV_CC_DestroyHandle()
  234. if ret != 0:
  235. logging.error("destroy handle fail! ret[0x%x]" % ret)
  236. del data_buf
  237. sys.exit()
  238. del data_buf
  239. except Exception as e:
  240. logging.error("openAndSave error: %s" % str(e))
  241. # print("openAndSave finished, current_time: " + str(datetime.datetime.now()))
  242. return JsonResponse(code, msg, img_base64)
  243. # 执行web服务, 端口号可自行修订
  244. logging.info("start to run camera app, current_time: " + str(datetime.datetime.now()))
  245. app.run(host='0.0.0.0', port=65432, debug=True, threaded=True)
  1. # -- coding: utf-8 --
  2. import cv2
  3. from flask import Flask, render_template, Response
  4. import sys
  5. import msvcrt
  6. import base64
  7. import datetime
  8. import logging
  9. sys.path.append("./MvImport")
  10. from MvCameraControl_class import *
  11. from JsonResponse import *
  12. from JsonFlask import *
  13. logging.basicConfig(level=logging.DEBUG,#控制台打印的日志级别
  14. filename='hikrobot.log',
  15. filemode='a',##模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
  16. #a是追加模式,默认如果不写的话,就是追加模式
  17. format=
  18. '%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
  19. #日志格式
  20. )
  21. # 这里配置一下 template_folder为当前目录,不然可以找不到 index.html
  22. app = JsonFlask(__name__, template_folder='.')
  23. # index
  24. @app.route('/')
  25. def index():
  26. return render_template('./templates/index.html')
  27. # 获取码流
  28. def generate(cap):
  29. # 捕获异常信息
  30. try:
  31. while True:
  32. # 如果是关闭相机,先退出取视频流的循环
  33. global open
  34. if (not open):
  35. break;
  36. retgrab = cap.grab()
  37. if retgrab == True:
  38. logging.debug("Grab true")
  39. ret1, frame = cap.retrieve()
  40. # print(type(frame))
  41. if frame is None:
  42. logging.error("frame is None")
  43. continue
  44. ret1, jpeg = cv2.imencode('.jpg', frame)
  45. jpg_frame = jpeg.tobytes()
  46. yield (b'--frame\r\n'
  47. b'Content-Type: image/jpeg\r\n\r\n' + jpg_frame + b'\r\n')
  48. except Exception as e:
  49. logging.error("generate error: %s" % str(e))
  50. # 开始预览
  51. @app.route('/startPreview')
  52. def startPreview():
  53. logging.info("================== startPreview start ====================")
  54. logging.info("start to preview video stream, current_time: " + str(datetime.datetime.now()))
  55. # 全局变量,用于控制获取视频流的开关状态
  56. try:
  57. global open
  58. open = True
  59. # 全局变量,获取视频连接
  60. global cap
  61. cap = cv2.VideoCapture(1)
  62. if False == cap.isOpened():
  63. logging.error("startPreview -- can't open camera")
  64. quit()
  65. else:
  66. logging.info("startPreview -- start to open camera")
  67. logging.info("startPreview -- open camera ok")
  68. # 分辨率设置 3072*2048(海康机器人工业相机 MV-CU060-10GM)
  69. cap.set(cv2.CAP_PROP_FRAME_WIDTH, 3072)
  70. cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 2048)
  71. # 帧率配置
  72. cap.set(cv2.CAP_PROP_FPS, 15)
  73. response = Response(generate(cap), mimetype='multipart/x-mixed-replace;boundary=frame')
  74. except Exception as e:
  75. logging.error("startPreview error: %s" % str(e))
  76. return response
  77. # 停止预览
  78. @app.route('/stopPreview')
  79. def stopPreview():
  80. logging.info("================== stopPreview start ====================")
  81. logging.info("stop to preview video stream, current_time: " + str(datetime.datetime.now()))
  82. logging.info("start to close camera")
  83. # 全局变量,用于停止循环
  84. global open
  85. open = False
  86. logging.info("release resources start")
  87. # 全局变量,用于释放相机资源
  88. try:
  89. global cap
  90. cap.release()
  91. cv2.destroyAllWindows()
  92. except Exception as e:
  93. logging.error("stopPreview error: %s" % str(e))
  94. logging.info("release resources end")
  95. logging.info("camera closed successfully, current_time: " + str(datetime.datetime.now()))
  96. logging.info("=================== stopPreview end ===================")
  97. return "stop to preview"
  98. # 获取base64图片
  99. @app.route('/grabImage')
  100. def grabImage():
  101. code = 100000
  102. msg = "连接相机时发生错误"
  103. # 捕获异常信息
  104. try:
  105. print("grabImage -- stopPreview start")
  106. stopPreview()
  107. print("grabImage -- stopPreview end")
  108. # 全局变量,获取视频连接
  109. global cap
  110. cap = cv2.VideoCapture(1)
  111. if False == cap.isOpened():
  112. logging.error("can't open camera")
  113. quit()
  114. else:
  115. logging.info("grabImage -- start to open camera")
  116. logging.info("grabImage -- open camera ok")
  117. # 分辨率设置 3072*2048(海康机器人工业相机 MV-CU060-10GM)
  118. cap.set(cv2.CAP_PROP_FRAME_WIDTH, 3072)
  119. cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 2048)
  120. # 帧率配置
  121. cap.set(cv2.CAP_PROP_FPS, 15)
  122. retgrab1 = cap.grab()
  123. if retgrab1 == True:
  124. logging.debug("grabImage -- Grab true")
  125. ret1, frame = cap.retrieve()
  126. ret1, jpeg = cv2.imencode('.jpg', frame)
  127. img_buff = jpeg.tobytes()
  128. # 对返回的图片进行 base64 格式转换
  129. img_base64 = "data:image/jpg;base64," + str(base64.b64encode(img_buff)).split("'")[1]
  130. code = 200
  131. msg = "success"
  132. stopPreview()
  133. except Exception as e:
  134. logging.error("generate error: %s" % str(e))
  135. return JsonResponse(code, msg, img_base64)
  136. # 获取base64图片
  137. @app.route('/hik/openAndSave')
  138. def openAndSave():
  139. code = 100000
  140. msg = "连接相机时发生错误"
  141. # 捕获异常信息
  142. try:
  143. logging.info("================= openAndSave start =====================")
  144. # 拍照之前,先停止预览
  145. stopPreview()
  146. logging.info("start to grab image, current_time: " + str(datetime.datetime.now()))
  147. # 全局变量,获取视频连接
  148. global cap
  149. cap = cv2.VideoCapture(1)
  150. if False == cap.isOpened():
  151. logging.error("openAndSave -- can't open camera")
  152. quit()
  153. else:
  154. logging.info("openAndSave -- start to open camera")
  155. logging.info("openAndSave -- open camera ok")
  156. # 分辨率设置 3072*2048(海康机器人工业相机 MV-CU060-10GM)
  157. cap.set(cv2.CAP_PROP_FRAME_WIDTH, 3072)
  158. cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 2048)
  159. # 帧率配置
  160. cap.set(cv2.CAP_PROP_FPS, 15)
  161. retgrab1 = cap.grab()
  162. if retgrab1 == True:
  163. logging.debug("openAndSave -- Grab true")
  164. ret1, frame = cap.retrieve()
  165. ret1, jpeg = cv2.imencode('.jpg', frame)
  166. img_buff = jpeg.tobytes()
  167. # 对返回的图片进行 base64 格式转换
  168. img_base64 = "data:image/jpg;base64," + str(base64.b64encode(img_buff)).split("'")[1]
  169. code = 200
  170. msg = "success"
  171. stopPreview()
  172. logging.info("================= openAndSave end =====================")
  173. except Exception as e:
  174. logging.error("openAndSave error: %s" % str(e))
  175. return JsonResponse(code, msg, img_base64)
  176. @app.route('/hik/openAndSave2')
  177. def openAndSave2():
  178. # 拍照之前,先停止预览
  179. stopPreview()
  180. logging.info("======================================")
  181. logging.info("start to grab image, current_time: " + str(datetime.datetime.now()))
  182. code = 100000
  183. msg = "连接相机时发生错误"
  184. # img_base64 = None
  185. try:
  186. deviceList = MV_CC_DEVICE_INFO_LIST()
  187. tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE
  188. # ch:枚举设备 | en:Enum device
  189. ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList)
  190. if ret != 0:
  191. logging.error("enum devices fail! ret[0x%x]" % ret)
  192. sys.exit()
  193. if deviceList.nDeviceNum == 0:
  194. logging.error("find no device!")
  195. sys.exit()
  196. logging.info("find %d devices!" % deviceList.nDeviceNum)
  197. for i in range(0, deviceList.nDeviceNum):
  198. mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
  199. if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
  200. logging.info("\ngige device: [%d]" % i)
  201. strModeName = ""
  202. for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
  203. strModeName = strModeName + chr(per)
  204. logging.info("device model name: %s" % strModeName)
  205. nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
  206. nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
  207. nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
  208. nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
  209. logging.info("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4))
  210. elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
  211. logging.info("\nu3v device: [%d]" % i)
  212. strModeName = ""
  213. for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
  214. if per == 0:
  215. break
  216. strModeName = strModeName + chr(per)
  217. logging.info("device model name: %s" % strModeName)
  218. strSerialNumber = ""
  219. for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
  220. if per == 0:
  221. break
  222. strSerialNumber = strSerialNumber + chr(per)
  223. logging.info("user serial number: %s" % strSerialNumber)
  224. nConnectionNum = 0
  225. if int(nConnectionNum) >= deviceList.nDeviceNum:
  226. logging.error("intput error!")
  227. sys.exit()
  228. # ch:创建相机实例 | en:Creat Camera Object
  229. cam = MvCamera()
  230. # ch:选择设备并创建句柄 | en:Select device and create handle
  231. stDeviceList = cast(deviceList.pDeviceInfo[int(nConnectionNum)], POINTER(MV_CC_DEVICE_INFO)).contents
  232. ret = cam.MV_CC_CreateHandle(stDeviceList)
  233. if ret != 0:
  234. logging.error("create handle fail! ret[0x%x]" % ret)
  235. sys.exit()
  236. # ch:打开设备 | en:Open device
  237. ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
  238. if ret != 0:
  239. logging.error("open device fail! ret[0x%x]" % ret)
  240. sys.exit()
  241. # ch:探测网络最佳包大小(只对GigE相机有效) | en:Detection network optimal package size(It only works for the GigE camera)
  242. if stDeviceList.nTLayerType == MV_GIGE_DEVICE:
  243. nPacketSize = cam.MV_CC_GetOptimalPacketSize()
  244. if int(nPacketSize) > 0:
  245. ret = cam.MV_CC_SetIntValue("GevSCPSPacketSize", nPacketSize)
  246. if ret != 0:
  247. logging.warn("Warning: Set Packet Size fail! ret[0x%x]" % ret)
  248. else:
  249. logging.warn("Warning: Get Packet Size fail! ret[0x%x]" % nPacketSize)
  250. # ch:设置触发模式为off | en:Set trigger mode as off
  251. ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
  252. if ret != 0:
  253. logging.error("set trigger mode fail! ret[0x%x]" % ret)
  254. sys.exit()
  255. # ch:获取数据包大小 | en:Get payload size
  256. stParam = MVCC_INTVALUE()
  257. memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))
  258. ret = cam.MV_CC_GetIntValue("PayloadSize", stParam)
  259. if ret != 0:
  260. logging.error("get payload size fail! ret[0x%x]" % ret)
  261. sys.exit()
  262. nPayloadSize = stParam.nCurValue
  263. # ch:开始取流 | en:Start grab image
  264. ret = cam.MV_CC_StartGrabbing()
  265. if ret != 0:
  266. logging.error("start grabbing fail! ret[0x%x]" % ret)
  267. sys.exit()
  268. stDeviceList = MV_FRAME_OUT_INFO_EX()
  269. memset(byref(stDeviceList), 0, sizeof(stDeviceList))
  270. data_buf = (c_ubyte * nPayloadSize)()
  271. ret = cam.MV_CC_GetOneFrameTimeout(byref(data_buf), nPayloadSize, stDeviceList, 1000)
  272. if ret == 0:
  273. logging.info("get one frame: Width[%d], Height[%d], nFrameNum[%d]" % (stDeviceList.nWidth, stDeviceList.nHeight, stDeviceList.nFrameNum))
  274. nRGBSize = stDeviceList.nWidth * stDeviceList.nHeight * 3
  275. stConvertParam=MV_SAVE_IMAGE_PARAM_EX()
  276. stConvertParam.nWidth = stDeviceList.nWidth
  277. stConvertParam.nHeight = stDeviceList.nHeight
  278. stConvertParam.pData = data_buf
  279. stConvertParam.nDataLen = stDeviceList.nFrameLen
  280. stConvertParam.enPixelType = stDeviceList.enPixelType
  281. stConvertParam.nImageLen = stConvertParam.nDataLen
  282. stConvertParam.nJpgQuality = 70
  283. stConvertParam.enImageType = MV_Image_Jpeg
  284. stConvertParam.pImageBuffer = (c_ubyte * nRGBSize)()
  285. stConvertParam.nBufferSize = nRGBSize
  286. # ret = cam.MV_CC_ConvertPixelType(stConvertParam)
  287. logging.info("nImageLen: %d" % stConvertParam.nImageLen)
  288. ret = cam.MV_CC_SaveImageEx2(stConvertParam)
  289. if ret != 0:
  290. logging.error("convert pixel fail ! ret[0x%x]" % ret)
  291. del data_buf
  292. sys.exit()
  293. #file_path = "AfterConvert_RGB2.jpg"
  294. #file_open = open(file_path, 'wb+')
  295. #file_open = open(file_path.encode('utf8'), 'wb')
  296. img_buff = (c_ubyte * stConvertParam.nImageLen)()
  297. cdll.msvcrt.memcpy(byref(img_buff), stConvertParam.pImageBuffer, stConvertParam.nImageLen)
  298. #file_open.write(img_buff)
  299. # 对返回的图片进行 base64 格式转换
  300. img_base64 = "data:image/jpg;base64," + str(base64.b64encode(img_buff)).split("'")[1]
  301. code = 200
  302. msg = "success"
  303. logging.info("Save Image succeed!")
  304. # ch:停止取流 | en:Stop grab image
  305. ret = cam.MV_CC_StopGrabbing()
  306. if ret != 0:
  307. logging.error("stop grabbing fail! ret[0x%x]" % ret)
  308. del data_buf
  309. sys.exit()
  310. # ch:关闭设备 | Close device
  311. ret = cam.MV_CC_CloseDevice()
  312. if ret != 0:
  313. logging.error("close deivce fail! ret[0x%x]" % ret)
  314. del data_buf
  315. sys.exit()
  316. # ch:销毁句柄 | Destroy handle
  317. ret = cam.MV_CC_DestroyHandle()
  318. if ret != 0:
  319. logging.error("destroy handle fail! ret[0x%x]" % ret)
  320. del data_buf
  321. sys.exit()
  322. del data_buf
  323. except Exception as e:
  324. logging.error("openAndSave error: %s" % str(e))
  325. # print("openAndSave finished, current_time: " + str(datetime.datetime.now()))
  326. return JsonResponse(code, msg, img_base64)
  327. # 执行web服务, 端口号可自行修订
  328. logging.info("start to run camera app, current_time: " + str(datetime.datetime.now()))
  329. if __name__ == '__main__':
  330. try:
  331. app.run(host='0.0.0.0', port=65432, debug=True, threaded=True)
  332. except Exception as e:
  333. logging.error("app error: %s" % str(e))

6、功能验证
1)开始预览
可以将 http://127.0.0.1:65432/startPreview 在前端用 img 标签 显示视频的实时预览效果。

http://127.0.0.1:65432/startPreview


2)停止预览
由于使用的这款海康机器人工业相机(MV-CU060-10GM)只能创建一个连接,所以,当预览完实时视频,需要调用该接口释放相机资源,避免资源被长期占用。

http://127.0.0.1:65432/stopPreview


3)获取图片
返回 base64 格式 的图片,前端可以直接接收显示,调用上传接口保存。

http://127.0.0.1:65432/openAndSave


版权声明:本文为CSDN博主「龙凌云」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/aikudexiaohai/article/details/130078959

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/357878
推荐阅读
相关标签
  

闽ICP备14008679号