赞
踩
Xmodem、Ymodem和Zmodem协议是最常用的三种通信协议。
Xmodem协议是最早的,传输128字节信息块。
Ymodem支持128字节和1024字节进行传输。
工作中需要使用到Ymodem传输文件后进行启动,找了一圈没有找到合适的,简单了解一下协议以后自己通过Python的serial模块写了一个,目前使用128字节进行传输,目前已经进行过测试,使用128和1024传输没有存在bug。
协议部分不再细说,直接上代码,可能有部分地方考虑不足的地方,欢迎留言
import os from serial import Serial class YModem: def __init__(self): self.ser = None self.EOT = b'\x04' self.ACK = b'\x06' self.NAK = b'\x15' self.CAN = b'\x18' self.CRC = b'\x43' self.ACK_FLAG = False @staticmethod def crc16(x, invert): crc_in = 0x0000 poly = 0x1021 for b in x: if type(b) is str: # 这里做了个判断可以直接传入字符串,但存在意义不大 crc_in ^= (ord(b) << 8) else: crc_in ^= (b << 8) for i in range(8): if crc_in & 0x8000: crc_in = (crc_in << 1) ^ poly else: crc_in = (crc_in << 1) s = hex(crc_in).upper() # return (s[-2:], s[-4:-2]) if invert else (s[-4:-2], s[-2:]) return (s[-2:], s[-4:-2]) if invert else (s[-4:-2], s[-2:]) @staticmethod def str2hex(s): i_data = 0 su = s.upper() for c in su: tmp = ord(c) if tmp <= ord('9'): i_data = i_data << 4 i_data += tmp - ord('0') elif ord('A') <= tmp <= ord('F'): i_data = i_data << 4 i_data += tmp - ord('A') + 10 return i_data def open(self, port, baud): try: self.ser = Serial(port, baud, timeout=1.2) print(f'port {port} open ...') return True except Exception as e: print(f'serial {port} is not open {str(e)}') return False def send_first(self, mode, file_name, file_size): if mode == 128: x = 1 else: x = 2 print('start send YModem first package') space = bytearray() header = bytearray([x, 0, 255]) file_byte = bytearray([ord(i) for i in file_name]) size = bytearray([ord(i) for i in str(file_size)]) for i in range(mode - len(file_byte) - 1 - len(size)): space.append(0) a1, b1 = self.crc16(file_byte + bytearray([0]) + size + space, False) hex_a1 = self.str2hex(a1) hex_b1 = self.str2hex(b1) last = bytearray([hex_a1, hex_b1]) data = header + file_byte + bytearray([0]) + size + space + last self.ser.write(data) print('send first package success') def send_data(self, file_data, mode, package_count): print('start send file data') if mode == 128: x = 1 else: x = 2 for i in range(1, package_count + 1): header = bytearray() header.append(x) header.append(i % 256) header.append(255 - (i % 256)) if i == package_count: data = bytearray(file_data[(i - 1) * mode:]) for j in range(mode - len(data)): data.append(26) else: data = file_data[(i - 1) * mode: i * mode] a1, b1 = self.crc16(data, False) hex_a1 = self.str2hex(a1) hex_b1 = self.str2hex(b1) last = bytearray([hex_a1, hex_b1]) self.ser.write(header + data + last) # 判断ack for j in range(10): char = self.ser.read(1) if char == self.ACK: break elif char == self.NAK or j == 9: print('error') return False else: continue self.ser.write(self.EOT) is_ack = self.ser.read(1) if is_ack == self.ACK: return True print('send file data end') def send_last(self, mode): print('send last data') if mode == 128: x = 1 else: x = 2 last_package = bytearray([x, 0, 255]) for i in range(mode): last_package.append(0) self.ser.write(last_package) self.ser.write(bytearray([0, 0])) for i in range(10): is_ack = self.ser.read(1) if is_ack == self.ACK: print('send last ok') return True print('send last error') return False def send(self, file_path, mode=128): if not os.path.exists(file_path): print('file is not found') return False data_byte = bytes() with open(file_path, 'rb') as f: for i in f.readlines(): data_byte += i name = os.path.basename(file_path) file_size = os.path.getsize(file_path) if file_size % mode >= 1: package_count = int(file_size / mode) + 1 else: package_count = int(file_size / mode) count = 0 while True: count += 1 if count == 50: print('recv char is not crc') return False char = self.ser.read(1) if char == self.CRC: print('recv char is crc') self.send_first(mode, name, file_size) for i in range(30): is_ack = self.ser.read(1) if is_ack == self.ACK: self.ACK_FLAG = True break if self.ACK_FLAG: self.send_data(data_byte, mode, package_count) if self.send_last(mode): return True else: print('send data ok but not start app') continue def close(self): self.ser.close() self.ser = None if __name__ == '__main__': y_mode = YModem() y_mode.open('COM13', 921600) y_mode.send(r'.\test.bin') y_mode.close()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。