赞
踩
近日做了一个自动关灯的小东西,放在宿舍里可以避免断电后忘记关灯导致第二天"怀民亦未寝.jpg"。不过有一个问题,这东西是粘在墙上的,想要调试的话总不能搬个电脑蹲在灯旁边debug一个下午吧。正当笔者苦恼于又要买一个3m超长数据线的时候,灵光一现,想到python作为一种脚本语言,是否可以在运行时更新代码呢?
说干就干,先想想怎么写出一个可以自己更新自己的python代码。
诶,上网一查,已经有前人想出这鬼点子了,并且还给出了部分的代码(
https://www.zhihu.com/question/626768033/answer/3255532727)。
- # option begin
- max = 200
- # option end
-
- def updateOption(max):
- with open(__file__, 'r+') as f:
- arrLines = f.readlines()
- idxOptionBegin = arrLines.index('# option begin\n')
- idxOptionEnd = arrLines.index('# option end\n')
- for idx, optionLine in enumerate(arrLines):
- if optionLine.startswith('max = ') and idxOptionBegin < idx < idxOptionEnd:
- arrLines[idx] = 'max = ' + str(max) + '\n'
- f.seek(0)
- f.writelines(arrLines)
- """
- Author: 中等难度的贪吃蛇
- """
我们来copy一小段学习一下,不得不说这位答主的码风很让人赏心悦目,把python的语法糖用的恰到好处。话归正题,这段代码用注释`#option begin/end`标注了可修改的区间,`__file__`是python内置变量,即为文件自身的名字,总体上先读取所有代码行,把待修改行进行修改后全部输出到源文件中去。
复制粘贴好是好,但是不实操总有东西弄不懂。比方如果在执行过程中把后面的语句删除了,那么他还会执行吗?
简单写下代码
- with open(__file__, 'w+') as f:
- f.write("111\n" * 3)
- f.write("222")
运行一下会发现原文件变成如下
- 111
- 111
- 111
- 222
可见python在运行之初,会将所有的代码纳入内存之中,即使修改代码文件,也不会对运行结果造成影响。
那现在再来看一下esp32上面的micropython是否支持这一特性吧。毕竟micropython有些库不太稳定,让笔者曾一度怀疑它的实力。简单修改一下上面”贪吃蛇“的代码,写了一段更好看的小段代码。
- # option begin
- param = 0
- # option end
-
- def updateOption(**kwargs):
- with open(__file__, 'r+') as f:
- arrLines = f.readlines()
- idxOptionBegin = arrLines.index('# option begin\n')
- idxOptionEnd = arrLines.index('# option end\n')
- for idx, optionLine in enumerate(arrLines):
- if not idxOptionBegin < idx < idxOptionEnd:
- continue
-
- for key in kwargs:
- if optionLine.startswith(key + ' = '):
- arrLines[idx] = key + ' = ' + str(kwargs[key]) + '\n'
-
- with open(__file__, "w+") as f:
- f.writelines(arrLines)
-
- updateOption(param = param + 1)
- print(param)
这段代码在pc上运行的结果是每次运行param都会加1,将其改名为“main.py”,放在esp32之中尝试运行后重新打开文件,`param = 1`!看来mipy是支持这种热修改的,太棒了。
那么一切顺利,我们现在可以考虑如何让电脑端传出的待更新代码传到esp32上面去。
有几种方法,一是把代码放到公网上,然后esp32再从公网上将代码取下来;二是局域网通信,esp32和pc端直接进行通信。乍一看似乎法一好些,毕竟可以身处天涯而心系esp32,在哪里都可以更新,但其实有很大的弊端:首先存放代码的公网服务器的延迟都不低,效率和稳定性不如局域网,再者局域网可以实现指令的即时传递,进而实现对esp32文件系统更加便捷的控制,最后就是那个代码桶的网站是境外的,小esp32翻不了墙。
局域网通信的话就又有问题要考虑了。公网上的ip总是固定的,但局域网内的ip每次断电重连都会变化。总不能从127.0.0.0到255.255.255.255挨个试吧。诶,255.255.255.255?广播!我们可以用udp把自己的地址广播出去(应该不会收到二向箔吧),然后等待另一方的连接。
好,那么我们就可以初步利用udp的广播和tcp协议实现局域网未知ip匹配并进行消息传递的功能。
下面给出的是esp32端的核心代码
- # 在之前需要让esp32连接wifi,获取本地ip,并定义好通信的PORT
- # 至于如何连接wifi可以见我之前发的博客
-
- udp_addr = ('255.255.255.255', PORT)
- udp_sock = socket(AF_INET, SOCK_DGRAM)
-
- tcp_addr = (ip, PORT)
- tcp_listen_sock = socket(AF_INET, SOCK_STREAM)
- tcp_listen_sock.settimeout(1) # 这里由于esp32在连接上位机之外还要进行其他操作,所以设置了超时
- tcp_listen_sock.bind(tcp_addr)
- tcp_listen_sock.listen(1)
-
- conn, addr = None, None
- print("waiting for connect...")
- while True:
- # 广播
- # 其实没必要把自己的ip放进数据中,广播的接收方可以接收到发送方的ip的
- message = "[HI]"
- udp_sock.sendto(message, udp_addr)
-
- # 接收tcp连接
- try:
- conn, addr = tcp_listen_sock.accept()
- print(conn, addr, "connected")
- break
- except Exception as exc:
- # etimedout是阻塞超时异常,eagain是非阻塞没接收到信息抛出的异常
- if str(exc) == "[Errno 116] ETIMEDOUT" or str(exc) == "[Errno 11] EAGAIN":
- pass
- else:
- raise OSError(exc)
-
- if (conn, addr) == (None, None):
- udp_sock.close()
- sys.exit()
-
- # 连接成功
-
- conn.settimeout(5.0)
- while True:
- data = conn.recv(1024)
- if len(data) == 0: #判断客户端是否断开连接
- print("close socket")
- conn.close()
- break
- print(data)
- ret = conn.send(data)
-
- udp_sock.close()
接下来是pc端的核心代码:
- address = ('', PORT)
- s = socket(AF_INET, SOCK_DGRAM)
- s.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
-
- s.bind(address)
-
- # udp sock等待接收广播
- while True:
- print(' wait recv...')
- data, address = s.recvfrom(1024)
- print(' [recv form %s:%d]:%s' % (address[0], address[1], data))
-
- s.close()
-
- if data.decode() == "[HI]":
- break
-
- IPADDR = address[0]
-
- # tcp连接
- tcp_client_socket = socket(AF_INET, SOCK_STREAM)
-
- server_ip, server_port = IPADDR, PORT
- tcp_client_socket.connect((server_ip, server_port))
-
- # 连接成功
- send_data = input("请输入要发送的数据:")
- tcp_client_socket.send(send_data.encode())
-
- recvData = tcp_client_socket.recv(1024)
- print('接收到的数据为:', recvData.decode())
-
- tcp_client_socket.close()
在成功实现广播连接之后,实现OTA在线更新的大厦已经落成,所剩只是一点修饰的工作了。两小朵乌云之一是定义好各个指令的协议,并将这个esp32上位机模块和esp32OTA模块封装起来,之二且待后文详说。
首先我们要将recv、send等等函数封装一下,因为发送接收时会出现各种问题,不是掉链接,没回应,超时,就是回应的很奇怪,不符合预期。而这些问题的重要性也都不同,掉链接的话可以直接结束ota程序了,其他的话有的可以再试试,有的根本不用在意。
所以我们先定义了几种返回值,然后对两个关键函数封装了一下。
- SUCCESS = 0
- NOTICE = 1
- WARNING = 2
- ERROR = 3
- CRITICAL = 4
-
- class UpperComputer(object):
-
- def recv_signal(self, description_of_this_time, expected_dat = None):
- try:
- dat = self.conn.recv(1024).decode()
- if len(dat) == 0:
- log.print_only("upper comp lost connection when {}".format(description_of_this_time))
- return "", CRITICAL
-
- if expected_dat != None and dat != expected_dat:
- log.error("Response Invalid when {}. (res: {})".format(description_of_this_time, dat))
- return dat, ERROR
-
- return dat, SUCCESS
-
- except Exception as exc:
- log.error("No Response when {}.(exc: {})".format(description_of_this_time, exc))
- return "", ERROR
-
- def send_signal(self, dat):
- try:
- self.conn.send(dat.encode())
- return True, SUCCESS
- except Exception as exc:
- log.error("Send Failed {}. {}".format(dat, exc))
- return False, CRITICAL
这样封装的好处是每次不需要进行繁复的try-except,只需要判断返回值是否正常即可。同时还避免了重复的log语句。
接下来是初始化函数,对udpsock和tcpsock分别进行初始化;广播连接函数已经介绍过了,之后是指令处理函数。需要注意tcp协议下如果连续发送消息,那么接收方可能一次recv收到对方两次send的内容,就可能会导致invalid response或者no reponse。为了避免这种情况,应该双方一唱一和,A发完消息B要回复收到。所以对于上位机发送的有后续补充信息的指令,要回复“ready”,没有补充信息的,就只需要回复"finish"或者查询的值即可。下面是esp32上用于和上位机通信的模块,pc端用于通信的模块类似,只是没有operation_handler()。
- class UpperComputer (object):
- def __init__(self, self_ip, port):
- self.port = port
- self.ip = self_ip
-
- self.usock = socket(..)
- self.tsock = socket(..)
-
- def broadcast_and_connect(self):
- # udp broadcast
- # tcp connect
- return True
-
- def operation_handler(self):
-
- dat, ret = self.recv_signal("ready to recv operation")
- if ret >= WARNING:
- return ret
-
- if dat == "[UPDATE]":
- self.send_signal("[READY]")
- return self.update()
- # ...
- elif dat == '[GETFILELIST]':
- return self.get_file_list()
- else:
- self.conn.send(dat.encode()) # echo
- return NOTICE
经过这样的抽象处理,OTA的功能就已经实现了,并且使用起来很简便。
- FILE_ROOT = "D:\\我的文档\\学习\\ESP32\\update\\"
- SEND_FILE_DICT = {"下位机程序.py": "main.py"} # key代表pc端的文件名称,value代表需要保存到esp32中的位置
- SAVE_PATH = "D:\\我的文档\\学习\\ESP32\\download\\"
- DOWNLOAD_LIST = ['log.txt', 'main.py']
-
- if __name__ == "__main__":
- esp32 = ESP32Ctrl(PORT)
- esp32.connect()
- # esp32.update(FILE_ROOT, SEND_FILE_DICT)
- # esp32.reboot()
- # esp32.download(DOWNLOAD_LIST, SAVE_PATH)
- file_list, _ = esp32.get_file_list()
- print(file_list)
- esp32.delete("log.txt")
-
- esp32.close()
但是每次更新都需要去改pc端的代码,比方需要上传或下载什么文件、想要获取文件列表还要重新改代码运行一次,颇有些麻烦。笔者想到初次接触到esp32时,用到了一个模块叫做ampy。这个模块利用命令行实现了对esp32上放置、移除文件、运行代码等操作,我们是否也可以把我们的上位机功能做成命令行呢?
python在实现命令行中os模块非常有用,os.path可以对文件路径进行诸多操作,os.chdir还可以自动记录当前所在文件夹,不用手动判断文件夹是否存在了。
- class COMMANDER(object):
- def command_handler(self, command):
- arg_list = command.split(" ")
- op = arg_list [0].lower()
-
- # cd 进入文件夹
- if op in ['cd']:
- try:
- command = command[len(op + " ") : ]
- command.strip("\"\'")
- os.chdir( './' + command)
-
- self.update_cwd()
- except Exception as exc:
- print(Fore.RED + f"ERROR: {exc}" + Fore.RESET)
- return
-
- # elif ..:
- # else:
我们现将本模块称为espremote,调用指令为espremote command [args]。下面给出了espremote的实现方法和效果展示。
- elif op in ['espremote']:
- if len(arg_list) == 1 or arg_list[1] == 'help':
- print(help_info['help'])
- return
-
- op2 = arg_list[1].lower()
- arg_list = arg_list[2:]
-
- if op2 in ['conn']:
- esp32.connect()
- return
- # elif ..:
- # else:
Espremote模块是一个能够给予特定协议远程控制esp32开发板的实用工具。利用Espremote可以远程连接板子、管理文件系统以及上传、下载文件。
该模块的pc端、esp32端模块和示例程序已经放在本人的仓库里了,欢迎大家取用。
本文首发于个人博客,欢迎访问。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。