当前位置:   article > 正文

Python调用HCNetSDK实现低延迟取流,云台控制以及其它功能(项目没用到所以没加进去)_hk sdk laliu python

hk sdk laliu python

最近有个需要调用监控摄像头二次开发的项目,记录一下

在网上查了很多资料,看到很多人都是在用rstp等形式取流。我最开始也是用rtsp,无奈法力匮乏,获取的摄像头延迟经过优化还有2s左右的延迟。然后我又上网查看了很多资料,看到很多人都在说海康没有针对python的sdk。实际上,海康提供了接口,只是说针对python的demo很少而已。
首先,在海康官网下载HCNetSDK包。根据自己的操作系统选对应的包。
在这里插入图片描述
然后解压sdk包后,将其中的库文件copy到/demo示例/5- Python开发示例/1-预览取流解码Demo/lib/win/里面。
接下来的就是我在取流demo中做的二次开发,根据开发文档增加的云台控制,预置点设置,零方位设置等等功能。
我修改了以下两个程序:
在这里插入图片描述

话不多说,直接上代码:

1、PlayCtrl.py:

# coding=utf-8

from ctypes import *
import sys

# 回调函数类型定义
if 'linux' in sys.platform:
    fun_ctype = CFUNCTYPE
else:
    fun_ctype = WINFUNCTYPE

# 定义预览参数结构体
class FRAME_INFO(Structure):
    pass
LPFRAME_INFO = POINTER(FRAME_INFO)
FRAME_INFO._fields_ = [
    ('nWidth', c_uint32),
    ('nHeight', c_uint32),
    ('nStamp', c_uint32),
    ('nType', c_uint32),
    ('nFrameRate', c_uint32),
    ('dwFrameNum', c_uint32)
]

# 显示回调函数
DISPLAYCBFUN = fun_ctype(None, c_long, c_char_p, c_long, c_long, c_long, c_long, c_long, c_long)
# 解码回调函数
DECCBFUNWIN = fun_ctype(None, c_long, POINTER(c_char), c_long, POINTER(FRAME_INFO), c_void_p, c_void_p)

class NET_DVR_INITIALPOSITIONCTRL(Structure):
    _fields_ = [
        ("dwSize", c_ulong),  # Use c_ulong for DWORD
        ("dwChan", c_ulong),  # Use c_ulong for DWORD
        ("byWorkMode", c_byte),
        ("byRes", c_byte * 127),
    ]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

2、将test_main.py重命名为video_control.py:

import os
import cv2
import time
import numpy as np
from HCNetSDK import *
from PlayCtrl import *

class HKCam(object):
    def __init__(self,camIP,username,password,devport=8000,lib_dir=r'.\lib\win'):
        # 登录的设备信息
        self.DEV_IP = create_string_buffer(camIP.encode())
        self.DEV_PORT =devport
        self.DEV_USER_NAME = create_string_buffer(username.encode())
        self.DEV_PASSWORD = create_string_buffer(password.encode())
        self.WINDOWS_FLAG = True
        self.funcRealDataCallBack_V30 = None
        self.recent_img = None #最新帧
        self.n_stamp = None #帧时间戳
        self.last_stamp = None #上次时间戳
        self.login_success = False
                           
        os.chdir(lib_dir) # 加载库,先加载依赖库
        self.Objdll = ctypes.CDLL(r'./HCNetSDK.dll')  # 加载网络库
        self.Playctrldll = ctypes.CDLL(r'./PlayCtrl.dll')  # 加载播放库
        # 设置组件库和SSL库加载路径                                                              # 2 设置组件库和SSL库加载路径
        self.SetSDKInitCfg()
        # 初始化DLL
        self.Objdll.NET_DVR_Init()                                                               # 3 相机初始化
        # 启用SDK写日志
        self.Objdll.NET_DVR_SetLogToFile(3, bytes('./SdkLog_Python/', encoding="utf-8"), False)
        os.chdir(r'../../') # 切换工作路径到../../
        # 登录
        (self.lUserId, self.device_info) = self.LoginDev()                                       # 4 登录相机
        self.Playctrldll.PlayM4_ResetBuffer(self.lUserId,1)#清空指定缓冲区的剩余数据。这个地方传进来的是self.lUserId,为什么呢?
        # print(self.lUserId)
        if self.lUserId < 0:#登录失败
            err = self.Objdll.NET_DVR_GetLastError()
            print('Login device fail, error code is: %d' % self.Objdll.NET_DVR_GetLastError())
            # 释放资源
            self.Objdll.NET_DVR_Cleanup()
            exit()
        else:
            print(f'[INFO] 摄像头[{camIP}]登录成功!!')
            self.login_success = True
        self.start_play()                                                                         # 5 开始播放
        time.sleep(1)
    def start_play(self,):
        #global funcRealDataCallBack_V30                                                                        
        self.PlayCtrl_Port = c_long(-1)  # 播放句柄
        # 获取一个播放句柄 #wuzh获取未使用的通道号
        if not self.Playctrldll.PlayM4_GetPort(byref(self.PlayCtrl_Port)):
            print(u'获取播放库句柄失败')
        # 定义码流回调函数       
        self.funcRealDataCallBack_V30 = REALDATACALLBACK(self.RealDataCallBack_V30)
        # 开启预览
        self.preview_info = NET_DVR_PREVIEWINFO()
        self.preview_info.hPlayWnd = 0
        self.preview_info.lChannel = 1  # 通道号
        self.preview_info.dwStreamType = 0  # 0:主码流,1:子码流
        self.preview_info.dwLinkMode = 1  # 0:TCP方式,1:UDP方式
        self.preview_info.bBlocked = 1  # 阻塞取流
        # 开始预览并且设置回调函数回调获取实时流数据
        self.lRealPlayHandle = self.Objdll.NET_DVR_RealPlay_V40(self.lUserId, byref(self.preview_info), self.funcRealDataCallBack_V30, None)
        if self.lRealPlayHandle < 0:
            print ('Open preview fail, error code is: %d' %self. Objdll.NET_DVR_GetLastError())
            # 登出设备
            self.Objdll.NET_DVR_Logout(self.lUserId)
            # 释放资源
            self.Objdll.NET_DVR_Cleanup()
            exit()
    # 设置组件库和SSL库加载路径
    def SetSDKInitCfg(self,):
        # 设置SDK初始化依赖库路径
        # 设置HCNetSDKCom组件库和SSL库加载路径
        # print(os.getcwd())
        if self.WINDOWS_FLAG:
            strPath = os.getcwd().encode('gbk')
            sdk_ComPath = NET_DVR_LOCAL_SDK_PATH()
            sdk_ComPath.sPath = strPath
            self.Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_ComPath))
            self.Objdll.NET_DVR_SetSDKInitCfg(3, create_string_buffer(strPath + b'\libcrypto-1_1-x64.dll'))
            self.Objdll.NET_DVR_SetSDKInitCfg(4, create_string_buffer(strPath + b'\libssl-1_1-x64.dll'))
        else:
            strPath = os.getcwd().encode('utf-8')
            sdk_ComPath = NET_DVR_LOCAL_SDK_PATH()
            sdk_ComPath.sPath = strPath
            self.Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_ComPath))
            self.Objdll.NET_DVR_SetSDKInitCfg(3, create_string_buffer(strPath + b'/libcrypto.so.1.1'))
            self.Objdll.NET_DVR_SetSDKInitCfg(4, create_string_buffer(strPath + b'/libssl.so.1.1'))
    # 登录注册设备
    def LoginDev(self,):
        # 登录注册设备
        device_info = NET_DVR_DEVICEINFO_V30()
        lUserId = self.Objdll.NET_DVR_Login_V30(self.DEV_IP, self.DEV_PORT, self.DEV_USER_NAME, self.DEV_PASSWORD, byref(device_info))
        return (lUserId, device_info)
    # 读取摄像头数据
    def read(self,):
        while self.n_stamp==self.last_stamp:
            continue
        self.last_stamp=self.n_stamp
        return self.n_stamp,self.recent_img
    # 解码回调函数
    def DecCBFun(self,nPort, pBuf, nSize, pFrameInfo, nUser, nReserved2):
            if pFrameInfo.contents.nType == 3:
                t0 = time.time()
                # 解码返回视频YUV数据,将YUV数据转成jpg图片保存到本地
                # 如果有耗时处理,需要将解码数据拷贝到回调函数外面的其他线程里面处理,避免阻塞回调导致解码丢帧
                nWidth = pFrameInfo.contents.nWidth
                nHeight = pFrameInfo.contents.nHeight
                #nType = pFrameInfo.contents.nType
                dwFrameNum = pFrameInfo.contents.dwFrameNum
                nStamp = pFrameInfo.contents.nStamp
                #print(nWidth, nHeight, nType, dwFrameNum, nStamp, sFileName)
                YUV = np.frombuffer(pBuf[:nSize],dtype=np.uint8)
                YUV = np.reshape(YUV,[nHeight+nHeight//2,nWidth])
                img_rgb = cv2.cvtColor(YUV,cv2.COLOR_YUV2BGR_YV12)
                self.recent_img,self.n_stamp = img_rgb,nStamp
    # 码流回调
    def RealDataCallBack_V30(self,lPlayHandle, dwDataType, pBuffer, dwBufSize, pUser):
        # 码流回调函数
         if dwDataType == NET_DVR_SYSHEAD:
            # 设置流播放模式
            self.Playctrldll.PlayM4_SetStreamOpenMode(self.PlayCtrl_Port, 0)
            # 打开码流,送入40字节系统头数据
            if self.Playctrldll.PlayM4_OpenStream(self.PlayCtrl_Port, pBuffer, dwBufSize, 1024*1024):
                # 设置解码回调,可以返回解码后YUV视频数据
                #global FuncDecCB
                self.FuncDecCB = DECCBFUNWIN(self.DecCBFun)
                self.Playctrldll.PlayM4_SetDecCallBackExMend(self.PlayCtrl_Port, self.FuncDecCB, None, c_long(0), None)
                # 开始解码播放
                if not self.Playctrldll.PlayM4_Play(self.PlayCtrl_Port, None):
                    print(u'播放库播放失败')
                    
            else:
                print(u'播放库打开流失败')
         elif dwDataType == NET_DVR_STREAMDATA:
            self.Playctrldll.PlayM4_InputData(self.PlayCtrl_Port, pBuffer, dwBufSize)
         else:
            print (u'其他数据,长度:', dwBufSize)
    # 控制云台入口函数
    def ptz_control(self, command, duration=0.1):
        """
        参数:
            lUserID: [in] NET_DVR_Login_V40等登录接口的返回值
            lChannel: [in] 通道号
            command: [in] 云台控制命令
            dwStop:[in] 云台停止动作或开始动作:0-开始;1-停止
            dwSpeed [in] 云台控制的速度,用户按不同解码器的速度控制值设置。取值范围[1,7] 

        """
        if self.login_success:
            ret = self.Objdll.NET_DVR_PTZControlWithSpeed_Other(self.lUserId, 1, command, 0, 5)
            if ret == False:
                print('PTZ control failed, error code is:', self.Objdll.NET_DVR_GetLastError())
            else:
                # print(f'PTZ control command {command} executed successfully.')
                time.sleep(duration)
                self.Objdll.NET_DVR_PTZControl_Other(self.lUserId, 1, command, 1, 5)  # 停止命令
        else:
            print("Please log in to the device first.")
    # 云台控制指令
    def control_ptz(self, direction, duration=0.1):
        """
        参数:
            direction: 控制方向,可为 "up", "down", "left", "right", "zoom_in", "zoom_out", 
                       "focus_near", "focus_far", "iris_open", "iris_close"。
            duration: 控制持续时间,单位为秒。
        """
        if direction.lower() == "up":
            self.ptz_control(TILT_UP, duration)
        elif direction.lower() == "down":
            self.ptz_control(TILT_DOWN, duration)
        elif direction.lower() == "left":
            self.ptz_control(PAN_LEFT, duration)
        elif direction.lower() == "right":
            self.ptz_control(PAN_RIGHT, duration)
        elif direction.lower() == "zoom_in":
            self.ptz_control(ZOOM_IN, duration)
        elif direction.lower() == "zoom_out":
            self.ptz_control(ZOOM_OUT, duration)
        elif direction.lower() == "focus_near":
            self.ptz_control(FOCUS_NEAR, duration)
        elif direction.lower() == "focus_far":
            self.ptz_control(FOCUS_FAR, duration)
        elif direction.lower() == "iris_open":
            self.ptz_control(IRIS_OPEN, duration)
        elif direction.lower() == "iris_close":
            self.ptz_control(IRIS_CLOSE, duration)
        else:
            print("Invalid direction. Please use 'up', 'down', 'left', 'right', 'zoom_in', 'zoom_out', 'focus_near', 'focus_far', 'iris_open', 'iris_close'.")   
    # 设置预置点
    def set_preset(self, preset_index):
        """
        参数:
            lUserID: [in] NET_DVR_Login_V40等登录接口的返回值
            lChannel: [in] 通道号
            dwPTZPresetCmd: [in] 预设点命令。8表示设置预设点,9表示清除预设点,39表示移动到预设点。
            preset_index: [in] 预置点索引,从1开始,最多支持300个预置点。
        """
        if self.login_success:
            ret = self.Objdll.NET_DVR_PTZPreset_Other(self.lUserId, 1, 8, preset_index)
            if ret:
                print(f"预设点 {preset_index} 设置成功.")
            else:
                print("设置预设点失败")
        else:
            print("请先登录设备")
    # 移动到预置点
    def move_to_preset(self, preset_index):
        """
         参数:
            lUserID: [in] NET_DVR_Login_V40等登录接口的返回值
            lChannel: [in] 通道号
            dwPTZPresetCmd: [in] 预设点命令。8表示设置预设点,9表示清除预设点,39表示移动到预设点。
            preset_index: [in] 预置点索引,从1开始,最多支持300个预置点。
        """
        if self.login_success:
            ret = self.Objdll.NET_DVR_PTZPreset_Other(self.lUserId, 1, 39, preset_index)
            if ret:
                print(f"正在移动至预置点{preset_index}...")
                time.sleep(3)
                print("移动至预置点{preset_index}成功")
                return True
            else:
                print("移动至预置点失败")
                return False
        else:
            print("请先登录设备")
            return False
    # 零方位角控制
    def initialposition_ctrl(self, work_mode=1):
        """
         参数:
            lUserID:[in] NET_DVR_Login_V40等登录接口的返回值 
            dwCommand:[in] 控制命令. 零方位角控制:NET_DVR_PTZ_INITIALPOSITIONCTRL= 3283  
            lpInBuffer:[in] 输入参数,具体内容跟控制命令相关,详见列表 
            dwInBufferSize:[in] 输入参数长度 

        """
        if self.login_success:
            ctrl_struct = NET_DVR_INITIALPOSITIONCTRL()
            ctrl_struct.dwSize = sizeof(NET_DVR_INITIALPOSITIONCTRL)
            ctrl_struct.dwChan = 1  
            ctrl_struct.byWorkMode = work_mode

            ret = self.Objdll.NET_DVR_RemoteControl(
                self.lUserId, 3283,  # NET_DVR_PTZ_INITIALPOSITIONCTRL
                byref(ctrl_struct), sizeof(ctrl_struct)
            )
            if ret:
                print("移动至零方位成功")
                return True
            else:
                print("移动至零方位失败")
                return False
        else:
            print("请先登录设备")
            return False
    # 释放资源
    def release(self):
        self.Objdll.NET_DVR_StopRealPlay(self.lRealPlayHandle)
        if self.PlayCtrl_Port.value > -1:
            self.Playctrldll.PlayM4_Stop(self.PlayCtrl_Port)
            self.Playctrldll.PlayM4_CloseStream( self.PlayCtrl_Port)
            self.Playctrldll.PlayM4_FreePort( self.PlayCtrl_Port)
            self.PlayCtrl_Port = c_long(-1)
        self.Objdll.NET_DVR_Logout(self.lUserId)
        self.Objdll.NET_DVR_Cleanup()
        print('释放资源结束')
    # 上下文管理    
    def __enter__(self):
        return self
    # 释放资源
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
    
def main():
    """
    主函数,负责创建 HKCam 对象并处理用户键盘输入。
    功能:w、s、a、d 键控制云台上下左右移动,
        e、r 键控制焦距,
        t、y 键控制焦点,
        u、i 键控制光圈。
    """
    # 创建 HKCam 对象
    cam = HKCam("192.123.123.123", "admin", "admin", 8000, "D:/Documents/project/lib/win") 
    # 移动至初始预置点
    # cam.move_to_preset(1)
    # 调用 initialposition_ctrl 方法,并设置 work_mode 为 1 (调用零方位):
    cam.initialposition_ctrl(work_mode=1)
    # 键盘控制云台 + opencv视频播放
    if cam.login_success:
            while True:
                try:
                    _, frame = cam.read()
                    if frame is not None:
                        cv2.imshow("HKCam", frame)
                        key = cv2.waitKey(10) & 0xFF
                        if key == 27:  # Esc 键退出
                            break
                        # 处理键盘输入
                        if key == ord('w'):  # 上
                            cam.control_ptz("up")
                        elif key == ord('s'):  # 下
                            cam.control_ptz("down")
                        elif key == ord('a'):  # 左
                            cam.control_ptz("left")
                        elif key == ord('d'):  # 右
                            cam.control_ptz("right")
                        elif key == ord('e'):  # 焦距变大
                            cam.control_ptz("zoom_in")
                        elif key == ord('r'):  # 焦距变小
                            cam.control_ptz("zoom_out")
                        elif key == ord('t'):  # 焦点前调
                            cam.control_ptz("focus_near")
                        elif key == ord('y'):  # 焦点后调
                            cam.control_ptz("focus_far")
                        elif key == ord('u'):  # 光圈扩大
                            cam.control_ptz("iris_open")
                        elif key == ord('i'):  # 光圈缩小
                            cam.control_ptz("iris_close")
                        elif key == 13:  # 回车键
                            preset_index = input("请输入预设点编号 (1-300): ")
                            try:
                                preset_index = int(preset_index)
                                if 1 <= preset_index <= 300:
                                    cam.set_preset(preset_index)
                                else:
                                    print("预设点编号应在 1-300 之间")
                            except ValueError:
                                print("请输入有效的数字")
                except Exception as e:
                    print(f"发生playback异常: {e}")
                    break
            cv2.destroyAllWindows()
    else:
        print("请先登录设备")
    # 释放资源
    cam.release()

if __name__ == "__main__":
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342

效果

在这里插入图片描述
在这里插入图片描述

后续我还会加入多摄像头读取。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/738715
推荐阅读
相关标签
  

闽ICP备14008679号