赞
踩
承接本项目系列的上一篇文章 【基于Xilinx Zynq7000的PYNQ框架项目】02 PYNQ镜像制作 ,使用 PYNQ 提供的 jupyter notebook 来写服务器端代码(意思是开发板作为服务器端):
import socket import struct import threading import cv2 import numpy import os class Server: def __init__(self): # 设置tcp服务端的socket self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 设置重复使用 self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) # 绑定地址和端口,此处的IP地址为开发板的IP,端口号可以随意设置,只要不跟已占用的端口号冲突 self.server.bind(('192.168.137.80', 11000)) # 设d置被动监听 self.server.listen(128) def run(self): while True: # 等待客户端连接 print('等待客户端连接') client, addr = self.server.accept() ProcessClient(client).start() class ProcessClient(threading.Thread): def __init__(self, client): super().__init__() self.client = client # 以下内容作用是根据文件夹的图片序号规定'i'的初值,方便下一次保存图片时使用 self.i=0 path = "home/xilinx/jupyter_notebooks/VideoFlows" # 设置传输过来的视频流在开发板上的存储路径 file_list = [] for file in file_list: file = int(file.split('.')[0]) file_list.append(file) if len(file_list) == 0: self.i = 0 else: file_list.sort() self.i = file_list[-1] + 1 def run(self): while True: data = self.client.recv(8) # 此处的接收大小 " 8 " 是跟客户端(电脑)传输的数据包大小对应的 if not data: break length, width, height = struct.unpack('ihh', data) # 图片的长、宽、高 # 接收二进制形式的图片数据 imgg = b'' while length: temp_size = self.client.recv(length) length -= len(temp_size) imgg += temp_size # 还原二进制图片数据成一维数组形式 data = numpy.fromstring(imgg, dtype='uint8') # 最终还原成矩阵形式 image = cv2.imdecode(data, cv2.IMREAD_UNCHANGED) # 存储每帧图片,以10张为一组进行循环存储,编号为 0 到 9 cv2.imwrite(r"/home/xilinx/jupyter_notebooks/VideoFlows/" + str(self.i) + ".jpg", image) self.i = self.i + 1 if self.i == 10: self.i = 0 # time.sleep(5) if __name__ == '__main__': server = Server() server.run()
注释已经写得比较详细,这里就不再讲述代码了。
在自己的笔记本电脑上写客户端代码:
import socket import struct import time import traceback import cv2 import numpy import sys class Client(object): def __init__(self, addr_port=('192.168.137.80', 11000)): # 连接的服务器的地址和端口号 self.addr_port = addr_port # 创建套接字 self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 地址端口可以复用 self.client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 视频的分辨率 self.resolution = (640, 480) def connect(self): try: self.client.connect(self.addr_port) return True except Exception as e: traceback.print_exc() # 打印原始的异常信息 print('连接失败') return False def send2server(self): camera = cv2.VideoCapture(0) # 获取摄像头 print('isOpened:', camera.isOpened()) while camera.isOpened(): try: # 读取摄像头的数据 ret, frame = camera.read() # 对每一帧图片做大小的处理和压缩 frame = cv2.resize(frame, self.resolution) # cv2.imencode(图片后缀名,原图片的数据,图片质量 可选范围为0-100 ) # 图片质量需合理设置,太大传得慢,太小画面不清晰 ret, img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 20]) print(img) print('-------------') print(img.tobytes()) # 转换为numpy格式数据 img_code = numpy.array(img) # 转为二进制数据 img = img_code.tobytes() # 获取数据长度 length = len(img) # 发送的数据 大小 宽 高 图片数据 # 数据打包变为二进制 # pack方法参数1 指定打包数据的数据大小 i 4字节 h代表2字节 # 【ihh】数据包的大小,i代表4字节,h代表2字节;【length】数据长度;【】后两个为长、宽;【img】图片数据 all_data = struct.pack('ihh', length, self.resolution[0], self.resolution[1]) + img self.client.send(all_data) # 传输间隔需合理设置,太小时延太高,太大帧数太低 time.sleep(0.5) except: camera.release() # 释放摄像头 traceback.print_exc() return if __name__ == '__main__': client = Client() if client.connect(): client.send2server()
注释已经写得比较详细,这里就不再讲述代码了。
在 MobaXterm 串口终端上运行服务器端代码 server.py :
在笔记本电脑上运行客户端代码 main.py ,显示的打印内容如下:
在MobaXterm 上打开自己设置的视频流存放目录(在 server.py 代码里输入的那个路径):
可以看到各图片的最后修改时间会实时发生变化
,说明成功接收到笔记本电脑的视频流。所谓视频流,其实就是足够多的图片在足够短的时间内播放。
本文章实现了将笔记本摄像头视频流传输到开发板上,开发板接收并存储到本地目录下。下一步就是导入每帧图片进行人脸检测啦~
有不清楚的地方,欢迎大家在评论区交流~
您的关注、点赞与收藏,是我持续创作优质博客的最大动力!
本项目系列文章:
【基于Xilinx ZYNQ7000的PYNQ框架项目】01人脸识别项目介绍与展示
【基于Xilinx Zynq7000的PYNQ框架项目】02 PYNQ镜像制作
【基于Xilinx ZYNQ7000的PYNQ框架项目】04开发板上运行人脸识别模型
【基于Xilinx ZYNQ7000的PYNQ框架项目】05使用Overlay库和python父子进程实现开发板按键控制LED灯流水或熄灭
更多优质博客:小黄能吃辣的CSDN主页
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。