赞
踩
在网上查了很多资料,看到很多人都是在用rstp等形式取流。我最开始也是用rtsp,无奈法力匮乏,获取的摄像头延迟经过优化还有2s左右的延迟。然后我又上网查看了很多资料,看到很多人都在说海康没有针对python的sdk。实际上,海康提供了接口,只是说针对python的demo很少而已。
首先,在海康官网下载HCNetSDK包。根据自己的操作系统选对应的包。
然后解压sdk包后,将其中的库文件copy到/demo示例/5- Python开发示例/1-预览取流解码Demo/lib/win/里面。
接下来的就是我在取流demo中做的二次开发,根据开发文档增加的云台控制,预置点设置,零方位设置等等功能。
我修改了以下两个程序:
话不多说,直接上代码:
# coding=utf-8
from ctypes import *
import sys
# 回调函数类型定义
if 'linux' in sys.platform:
fun_ctype = CFUNCTYPE
else:
fun_ctype = WINFUNCTYPE
# 定义预览参数结构体
class FRAME_INFO(Structure):
pass
LPFRAME_INFO = POINTER(FRAME_INFO)
FRAME_INFO._fields_ = [
('nWidth', c_uint32),
('nHeight', c_uint32),
('nStamp', c_uint32),
('nType', c_uint32),
('nFrameRate', c_uint32),
('dwFrameNum', c_uint32)
]
# 显示回调函数
DISPLAYCBFUN = fun_ctype(None, c_long, c_char_p, c_long, c_long, c_long, c_long, c_long, c_long)
# 解码回调函数
DECCBFUNWIN = fun_ctype(None, c_long, POINTER(c_char), c_long, POINTER(FRAME_INFO), c_void_p, c_void_p)
class NET_DVR_INITIALPOSITIONCTRL(Structure):
_fields_ = [
("dwSize", c_ulong), # Use c_ulong for DWORD
("dwChan", c_ulong), # Use c_ulong for DWORD
("byWorkMode", c_byte),
("byRes", c_byte * 127),
]
import os
import cv2
import time
import numpy as np
from HCNetSDK import *
from PlayCtrl import *
class HKCam(object):
def __init__(self,camIP,username,password,devport=8000,lib_dir=r'.\lib\win'):
# 登录的设备信息
self.DEV_IP = create_string_buffer(camIP.encode())
self.DEV_PORT =devport
self.DEV_USER_NAME = create_string_buffer(username.encode())
self.DEV_PASSWORD = create_string_buffer(password.encode())
self.WINDOWS_FLAG = True
self.funcRealDataCallBack_V30 = None
self.recent_img = None #最新帧
self.n_stamp = None #帧时间戳
self.last_stamp = None #上次时间戳
self.login_success = False
os.chdir(lib_dir) # 加载库,先加载依赖库
self.Objdll = ctypes.CDLL(r'./HCNetSDK.dll') # 加载网络库
self.Playctrldll = ctypes.CDLL(r'./PlayCtrl.dll') # 加载播放库
# 设置组件库和SSL库加载路径 # 2 设置组件库和SSL库加载路径
self.SetSDKInitCfg()
# 初始化DLL
self.Objdll.NET_DVR_Init() # 3 相机初始化
# 启用SDK写日志
self.Objdll.NET_DVR_SetLogToFile(3, bytes('./SdkLog_Python/', encoding="utf-8"), False)
os.chdir(r'../../') # 切换工作路径到../../
# 登录
(self.lUserId, self.device_info) = self.LoginDev() # 4 登录相机
self.Playctrldll.PlayM4_ResetBuffer(self.lUserId,1)#清空指定缓冲区的剩余数据。这个地方传进来的是self.lUserId,为什么呢?
# print(self.lUserId)
if self.lUserId < 0:#登录失败
err = self.Objdll.NET_DVR_GetLastError()
print('Login device fail, error code is: %d' % self.Objdll.NET_DVR_GetLastError())
# 释放资源
self.Objdll.NET_DVR_Cleanup()
exit()
else:
print(f'[INFO] 摄像头[{camIP}]登录成功!!')
self.login_success = True
self.start_play() # 5 开始播放
time.sleep(1)
def start_play(self,):
#global funcRealDataCallBack_V30
self.PlayCtrl_Port = c_long(-1) # 播放句柄
# 获取一个播放句柄 #wuzh获取未使用的通道号
if not self.Playctrldll.PlayM4_GetPort(byref(self.PlayCtrl_Port)):
print(u'获取播放库句柄失败')
# 定义码流回调函数
self.funcRealDataCallBack_V30 = REALDATACALLBACK(self.RealDataCallBack_V30)
# 开启预览
self.preview_info = NET_DVR_PREVIEWINFO()
self.preview_info.hPlayWnd = 0
self.preview_info.lChannel = 1 # 通道号
self.preview_info.dwStreamType = 0 # 0:主码流,1:子码流
self.preview_info.dwLinkMode = 1 # 0:TCP方式,1:UDP方式
self.preview_info.bBlocked = 1 # 阻塞取流
# 开始预览并且设置回调函数回调获取实时流数据
self.lRealPlayHandle = self.Objdll.NET_DVR_RealPlay_V40(self.lUserId, byref(self.preview_info), self.funcRealDataCallBack_V30, None)
if self.lRealPlayHandle < 0:
print ('Open preview fail, error code is: %d' %self. Objdll.NET_DVR_GetLastError())
# 登出设备
self.Objdll.NET_DVR_Logout(self.lUserId)
# 释放资源
self.Objdll.NET_DVR_Cleanup()
exit()
# 设置组件库和SSL库加载路径
def SetSDKInitCfg(self,):
# 设置SDK初始化依赖库路径
# 设置HCNetSDKCom组件库和SSL库加载路径
# print(os.getcwd())
if self.WINDOWS_FLAG:
strPath = os.getcwd().encode('gbk')
sdk_ComPath = NET_DVR_LOCAL_SDK_PATH()
sdk_ComPath.sPath = strPath
self.Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_ComPath))
self.Objdll.NET_DVR_SetSDKInitCfg(3, create_string_buffer(strPath + b'\libcrypto-1_1-x64.dll'))
self.Objdll.NET_DVR_SetSDKInitCfg(4, create_string_buffer(strPath + b'\libssl-1_1-x64.dll'))
else:
strPath = os.getcwd().encode('utf-8')
sdk_ComPath = NET_DVR_LOCAL_SDK_PATH()
sdk_ComPath.sPath = strPath
self.Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_ComPath))
self.Objdll.NET_DVR_SetSDKInitCfg(3, create_string_buffer(strPath + b'/libcrypto.so.1.1'))
self.Objdll.NET_DVR_SetSDKInitCfg(4, create_string_buffer(strPath + b'/libssl.so.1.1'))
# 登录注册设备
def LoginDev(self,):
# 登录注册设备
device_info = NET_DVR_DEVICEINFO_V30()
lUserId = self.Objdll.NET_DVR_Login_V30(self.DEV_IP, self.DEV_PORT, self.DEV_USER_NAME, self.DEV_PASSWORD, byref(device_info))
return (lUserId, device_info)
# 读取摄像头数据
def read(self,):
while self.n_stamp==self.last_stamp:
continue
self.last_stamp=self.n_stamp
return self.n_stamp,self.recent_img
# 解码回调函数
def DecCBFun(self,nPort, pBuf, nSize, pFrameInfo, nUser, nReserved2):
if pFrameInfo.contents.nType == 3:
t0 = time.time()
# 解码返回视频YUV数据,将YUV数据转成jpg图片保存到本地
# 如果有耗时处理,需要将解码数据拷贝到回调函数外面的其他线程里面处理,避免阻塞回调导致解码丢帧
nWidth = pFrameInfo.contents.nWidth
nHeight = pFrameInfo.contents.nHeight
#nType = pFrameInfo.contents.nType
dwFrameNum = pFrameInfo.contents.dwFrameNum
nStamp = pFrameInfo.contents.nStamp
#print(nWidth, nHeight, nType, dwFrameNum, nStamp, sFileName)
YUV = np.frombuffer(pBuf[:nSize],dtype=np.uint8)
YUV = np.reshape(YUV,[nHeight+nHeight//2,nWidth])
img_rgb = cv2.cvtColor(YUV,cv2.COLOR_YUV2BGR_YV12)
self.recent_img,self.n_stamp = img_rgb,nStamp
# 码流回调
def RealDataCallBack_V30(self,lPlayHandle, dwDataType, pBuffer, dwBufSize, pUser):
# 码流回调函数
if dwDataType == NET_DVR_SYSHEAD:
# 设置流播放模式
self.Playctrldll.PlayM4_SetStreamOpenMode(self.PlayCtrl_Port, 0)
# 打开码流,送入40字节系统头数据
if self.Playctrldll.PlayM4_OpenStream(self.PlayCtrl_Port, pBuffer, dwBufSize, 1024*1024):
# 设置解码回调,可以返回解码后YUV视频数据
#global FuncDecCB
self.FuncDecCB = DECCBFUNWIN(self.DecCBFun)
self.Playctrldll.PlayM4_SetDecCallBackExMend(self.PlayCtrl_Port, self.FuncDecCB, None, c_long(0), None)
# 开始解码播放
if not self.Playctrldll.PlayM4_Play(self.PlayCtrl_Port, None):
print(u'播放库播放失败')
else:
print(u'播放库打开流失败')
elif dwDataType == NET_DVR_STREAMDATA:
self.Playctrldll.PlayM4_InputData(self.PlayCtrl_Port, pBuffer, dwBufSize)
else:
print (u'其他数据,长度:', dwBufSize)
# 控制云台入口函数
def ptz_control(self, command, duration=0.1):
"""
参数:
lUserID: [in] NET_DVR_Login_V40等登录接口的返回值
lChannel: [in] 通道号
command: [in] 云台控制命令
dwStop:[in] 云台停止动作或开始动作:0-开始;1-停止
dwSpeed [in] 云台控制的速度,用户按不同解码器的速度控制值设置。取值范围[1,7]
"""
if self.login_success:
ret = self.Objdll.NET_DVR_PTZControlWithSpeed_Other(self.lUserId, 1, command, 0, 5)
if ret == False:
print('PTZ control failed, error code is:', self.Objdll.NET_DVR_GetLastError())
else:
# print(f'PTZ control command {command} executed successfully.')
time.sleep(duration)
self.Objdll.NET_DVR_PTZControl_Other(self.lUserId, 1, command, 1, 5) # 停止命令
else:
print("Please log in to the device first.")
# 云台控制指令
def control_ptz(self, direction, duration=0.1):
"""
参数:
direction: 控制方向,可为 "up", "down", "left", "right", "zoom_in", "zoom_out",
"focus_near", "focus_far", "iris_open", "iris_close"。
duration: 控制持续时间,单位为秒。
"""
if direction.lower() == "up":
self.ptz_control(TILT_UP, duration)
elif direction.lower() == "down":
self.ptz_control(TILT_DOWN, duration)
elif direction.lower() == "left":
self.ptz_control(PAN_LEFT, duration)
elif direction.lower() == "right":
self.ptz_control(PAN_RIGHT, duration)
elif direction.lower() == "zoom_in":
self.ptz_control(ZOOM_IN, duration)
elif direction.lower() == "zoom_out":
self.ptz_control(ZOOM_OUT, duration)
elif direction.lower() == "focus_near":
self.ptz_control(FOCUS_NEAR, duration)
elif direction.lower() == "focus_far":
self.ptz_control(FOCUS_FAR, duration)
elif direction.lower() == "iris_open":
self.ptz_control(IRIS_OPEN, duration)
elif direction.lower() == "iris_close":
self.ptz_control(IRIS_CLOSE, duration)
else:
print("Invalid direction. Please use 'up', 'down', 'left', 'right', 'zoom_in', 'zoom_out', 'focus_near', 'focus_far', 'iris_open', 'iris_close'.")
# 设置预置点
def set_preset(self, preset_index):
"""
参数:
lUserID: [in] NET_DVR_Login_V40等登录接口的返回值
lChannel: [in] 通道号
dwPTZPresetCmd: [in] 预设点命令。8表示设置预设点,9表示清除预设点,39表示移动到预设点。
preset_index: [in] 预置点索引,从1开始,最多支持300个预置点。
"""
if self.login_success:
ret = self.Objdll.NET_DVR_PTZPreset_Other(self.lUserId, 1, 8, preset_index)
if ret:
print(f"预设点 {preset_index} 设置成功.")
else:
print("设置预设点失败")
else:
print("请先登录设备")
# 移动到预置点
def move_to_preset(self, preset_index):
"""
参数:
lUserID: [in] NET_DVR_Login_V40等登录接口的返回值
lChannel: [in] 通道号
dwPTZPresetCmd: [in] 预设点命令。8表示设置预设点,9表示清除预设点,39表示移动到预设点。
preset_index: [in] 预置点索引,从1开始,最多支持300个预置点。
"""
if self.login_success:
ret = self.Objdll.NET_DVR_PTZPreset_Other(self.lUserId, 1, 39, preset_index)
if ret:
print(f"正在移动至预置点{preset_index}...")
time.sleep(3)
print("移动至预置点{preset_index}成功")
return True
else:
print("移动至预置点失败")
return False
else:
print("请先登录设备")
return False
# 零方位角控制
def initialposition_ctrl(self, work_mode=1):
"""
参数:
lUserID:[in] NET_DVR_Login_V40等登录接口的返回值
dwCommand:[in] 控制命令. 零方位角控制:NET_DVR_PTZ_INITIALPOSITIONCTRL= 3283
lpInBuffer:[in] 输入参数,具体内容跟控制命令相关,详见列表
dwInBufferSize:[in] 输入参数长度
"""
if self.login_success:
ctrl_struct = NET_DVR_INITIALPOSITIONCTRL()
ctrl_struct.dwSize = sizeof(NET_DVR_INITIALPOSITIONCTRL)
ctrl_struct.dwChan = 1
ctrl_struct.byWorkMode = work_mode
ret = self.Objdll.NET_DVR_RemoteControl(
self.lUserId, 3283, # NET_DVR_PTZ_INITIALPOSITIONCTRL
byref(ctrl_struct), sizeof(ctrl_struct)
)
if ret:
print("移动至零方位成功")
return True
else:
print("移动至零方位失败")
return False
else:
print("请先登录设备")
return False
# 释放资源
def release(self):
self.Objdll.NET_DVR_StopRealPlay(self.lRealPlayHandle)
if self.PlayCtrl_Port.value > -1:
self.Playctrldll.PlayM4_Stop(self.PlayCtrl_Port)
self.Playctrldll.PlayM4_CloseStream( self.PlayCtrl_Port)
self.Playctrldll.PlayM4_FreePort( self.PlayCtrl_Port)
self.PlayCtrl_Port = c_long(-1)
self.Objdll.NET_DVR_Logout(self.lUserId)
self.Objdll.NET_DVR_Cleanup()
print('释放资源结束')
# 上下文管理
def __enter__(self):
return self
# 释放资源
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
def main():
"""
主函数,负责创建 HKCam 对象并处理用户键盘输入。
功能:w、s、a、d 键控制云台上下左右移动,
e、r 键控制焦距,
t、y 键控制焦点,
u、i 键控制光圈。
"""
# 创建 HKCam 对象
cam = HKCam("192.123.123.123", "admin", "admin", 8000, "D:/Documents/project/lib/win")
# 移动至初始预置点
# cam.move_to_preset(1)
# 调用 initialposition_ctrl 方法,并设置 work_mode 为 1 (调用零方位):
cam.initialposition_ctrl(work_mode=1)
# 键盘控制云台 + opencv视频播放
if cam.login_success:
while True:
try:
_, frame = cam.read()
if frame is not None:
cv2.imshow("HKCam", frame)
key = cv2.waitKey(10) & 0xFF
if key == 27: # Esc 键退出
break
# 处理键盘输入
if key == ord('w'): # 上
cam.control_ptz("up")
elif key == ord('s'): # 下
cam.control_ptz("down")
elif key == ord('a'): # 左
cam.control_ptz("left")
elif key == ord('d'): # 右
cam.control_ptz("right")
elif key == ord('e'): # 焦距变大
cam.control_ptz("zoom_in")
elif key == ord('r'): # 焦距变小
cam.control_ptz("zoom_out")
elif key == ord('t'): # 焦点前调
cam.control_ptz("focus_near")
elif key == ord('y'): # 焦点后调
cam.control_ptz("focus_far")
elif key == ord('u'): # 光圈扩大
cam.control_ptz("iris_open")
elif key == ord('i'): # 光圈缩小
cam.control_ptz("iris_close")
elif key == 13: # 回车键
preset_index = input("请输入预设点编号 (1-300): ")
try:
preset_index = int(preset_index)
if 1 <= preset_index <= 300:
cam.set_preset(preset_index)
else:
print("预设点编号应在 1-300 之间")
except ValueError:
print("请输入有效的数字")
except Exception as e:
print(f"发生playback异常: {e}")
break
cv2.destroyAllWindows()
else:
print("请先登录设备")
# 释放资源
cam.release()
if __name__ == "__main__":
main()
后续我还会加入多摄像头读取。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。