当前位置:   article > 正文

python 常用知识点介绍2_python serial库

python serial库

1. serial库

安装:pip3 install pyserial

  1. import threading
  2. import serial
  3. import time
  4. def logPrinter(mesg='Nothing to log.', log_obs=1):
  5. if log_obs:
  6. mesg = time.strftime('[%Y.%m.%d %H:%M:%S] ', time.localtime(time.time())) + str(mesg)
  7. print(mesg)
  8. class SerialCtrl():
  9. '''
  10. 串口控制类 ,执行串口命令,并且通过多线程记录串口log
  11. '''
  12. def __init__(self, port, logfife='', baudrate=115200, log_obs=1):
  13. self.ignore = False # 是否忽略串口log的检测
  14. self.threadLock = threading.Lock() # 多线程锁
  15. self.logfile = logfife
  16. super().__init__()
  17. self.log_obs = log_obs
  18. self.port = port
  19. self.console = serial.Serial(port=self.port, baudrate=baudrate, timeout=0.1)
  20. self.serRunning = True
  21. if self.logfile == '':
  22. self.logfile = 'E:/log/%s_%s.log' % (time.strftime('%Y%m%d_%H%M%S', time.localtime(time.time())), self.port)
  23. self.crashlogFile = self.logfile.replace('.log', '') + '_crash.log' # 串口log异常文件名
  24. logPrinter(mesg='Create Log File "%s"' % self.logfile, log_obs=self.log_obs)
  25. # 守护线程,记录串口log
  26. self.T1 = threading.Thread(target=self.LogtoFile)
  27. self.T1.daemon = True
  28. self.T1.start()
  29. def Analysis_log(self, line):
  30. # 分析串口log是否有异常
  31. keywords = ['crash',
  32. 'oom']
  33. excludes = [
  34. ': "crash_syslog"',
  35. ]
  36. for k in keywords:
  37. if k in line:
  38. for exekey in excludes:
  39. if exekey in line:
  40. return ''
  41. line = time.strftime('[%Y.%m.%d %H:%M:%S]', time.localtime(time.time())) + line.replace('\n','').replace(
  42. '\r', '') + '\n'
  43. with open(self.crashlogFile, 'a') as f:
  44. line = '[warning from test]' + line
  45. f.write(line) # log异常
  46. return '[warning from test]'
  47. return ''
  48. def LogtoFile(self):
  49. # 调用子线程,记录串口log
  50. while self.serRunning:
  51. self.threadLock.acquire()
  52. line = self.console.readline()
  53. self.threadLock.release()
  54. try:
  55. line = line.decode('ascii')
  56. self.writeline(line)
  57. except Exception as e:
  58. print(e)
  59. def writeline(self, line):
  60. # 将串口log写进文件中, 同时进行log的分析
  61. if not self.ignore:
  62. # 检测串口log
  63. ii = self.Analysis_log(line)
  64. else:
  65. ii = '[Ignore]'
  66. with open(self.logfile, 'a+') as f:
  67. if line and line != '\r\n' and line != '\r':
  68. line = line.replace('\n', '').replace('\r', '') + '\n'
  69. line = time.strftime('[%Y.%m.%d %H:%M:%S]', time.localtime(time.time())) + ii + line
  70. f.writelines(line)
  71. def command(self, com):
  72. # 执行串口命令
  73. com = com + '\n'
  74. com = com.encode('utf-8')
  75. self.threadLock.acquire()
  76. self.console.write(com)
  77. time.sleep(1)
  78. output = self.console.readall()
  79. self.threadLock.release()
  80. # 返回输出结果
  81. try:
  82. output = output.decode('utf-8')
  83. logPrinter(log_obs=self.log_obs, mesg='[COM %s] %s' % (self.port, output))
  84. output_list = output.split('\n')
  85. for line in output_list:
  86. self.writeline(line)
  87. for k in range(len(output_list)):
  88. output_list[k] = output_list[k].replace('\r', '')
  89. return output_list
  90. except Exception as e:
  91. logPrinter(mesg=str(e))
  92. return ''
  93. def close(self):
  94. logPrinter(log_obs=self.log_obs, mesg='[%s] Close SerialCtrl of DUT, COM %s' % (self.__class__.__name__, self.port))
  95. self.serRunning = False
  96. self.console.close()
  97. if __name__ =='__main__':
  98. ser = SerialCtrl(port='com19')
  99. ser.command('reboot')
  100. time.sleep(100)
  101. ser.command("\004") # 模拟输入ctrl + D
  102. # https://blog.51cto.com/u_19261/6599846

2. python3中 str和bytes类型的相互转换

  1. s1='你好'
  2. print(type(s1)) # <class 'str'>
  3. # str --->bytes 通过encode()或者bytes()函数来进行
  4. s2=s1.encode(encoding='gbk')
  5. print(s2) #输出 b'\xc4\xe3\xba\xc3'
  6. s3=bytes(s1,encoding='utf-8')
  7. print(s3) # 输出 b'\xe4\xbd\xa0\xe5\xa5\xbd'
  8. # bytes--->str 通过decode()或者str()函数来进行
  9. print(str(s2,encoding='gbk')) # 输出 你好
  10. print(s3.decode(encoding='utf-8')) # 输出 你好

3, python 操作mysql数据库

  1. import pymysql
  2. #打开数据库连接
  3. conn = pymysql.connect(host='127.0.0.1', user ="root", passwd ="root", db ="test")
  4. cursor=conn.cursor()
  5. # 查询数据
  6. cursor.execute("select * from wifi where include='yes'") # 执行mysql
  7. res=cursor.fetchone() # 读取一行,返回的数据是元组
  8. while res:
  9. print(res)
  10. res = cursor.fetchone()
  11. # 插入一条记录
  12. cursor.execute("insert into newrper_mirouters(id,mi_routers) Values (%s, '%s')" % (id, k))
  13. conn.commit()
  14. # 断开连接
  15. conn.close()

4,json和字典区别

  • python中没有json这种数据类型
  • 在python,json中被识别为str类型,字典是dict类型
  • json的格式和dict有点相似,都是通过键值对的方式构成的,但是也有不同的地方:
  •     json的key只能是字符串,python的dict的key必须是不可变类型(str,数字,元组)
  •     json的字符串强制双引号,dict字符串可以单引号、双引号。

json和dict之间可以转换:

json.dumps()  可以将字典转化为json格式

json.loads() 可以将json转化为字典格式

  1. import json
  2. dict1={'A':-1,'B':8,'c':4}
  3. str1='{"ee":1,"rr":2}'
  4. # d1=json.dumps(dict1) # 将字典转化为json格式
  5. # print type(d1) # 输出str
  6. # print d1 # 输出{"A": -1, "c": 4, "B": 8}
  7. d2=json.loads(str1)
  8. print type(d2) # 输出<type 'dict'>
  9. print d2 # 输出{u'ee': 1, u'rr': 2}

5,通过python脚本执行cmd命令的方法

可以通过subprocess或者os模块中的方法来实现

subprocess的目的就是启动一个新的进程并且与之通信,可以用来来替代os.system, os.popen命令。

subprocess模块常用的函数如下:

1, subprocess.call()

# 执行命令,并且返回状态码,多个参数用列表进行传输,retcode = call(["ls", "-l"])

也可以用字符串作为参数运行命令(通过设置参数shell=True)

  1. a=subprocess.call(['ping','qq.com'])
  2. print(a) # 输出0
  3. print(type(a)) # int

2,subprocess.popen(args,shell=False,stdout=None)

args: 表示要执行的命令,可以是str类型,也可以是数组类型
shell=True ,表示在系统默认的shell环境中执行新的进程,此shell在windows表示为cmd.exe,在linux为/bin/sh,args是字符串类型的时候,shell需要为True
stdin, stdout and stderr: 用来指定标准输入、标准输出和错误输出 常用值 subprocess.PIPE,结合stdout.readline()可以得到输出结果

  1. a=subprocess.Popen(args='ping qq.com',shell=True,stdout=subprocess.PIPE)
  2. while a.poll() is None: # poll()方法用来检查进程是否还在之执行中,None表示正在执行中
  3. line = a.stdout.readline().decode('gbk')
  4. if line!='':
  5. print(line)
  6. print(a.poll()) # 输出0
  1. def mysuprocess(com, timeout):
  2. p = subprocess.Popen(com, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  3. try:
  4. out, err = p.communicate(timeout=timeout)
  5. logPrinter("子进程的标准输出:\n" + out.decode())
  6. return out.decode()
  7. except TimeoutExpired:
  8. logPrinter('cmd命令超时')
  9. p.terminate()
  10. logPrinter("子进程超时退出!")
  11. return ''

3,subprocess.check_output(**kwargs)

执行命令并且将输出结果返回,如果命令执行出错,将会抛出一个CalledProcessError异常

  1. a=subprocess.check_output('ipconfig')
  2. print(a.decode('gbk')) #输出执行结果

4, communicate()方法

communicate()方法可以和子进程进行交互,发送数据到stdin,从stdout和stderr读取数据

  1. subp = subprocess.Popen('ping qq.com',shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
  2. print(subp.communicate('-n 2')) # 输出执行结果

注意:communicate 其实是循环读取管道中的数据(每次 32768 字节)并将其存在一个 list 里面,到最后将 list 中的所有数据连接起来(b''.join(list)) 返回给用户。

于是就出现了一个坑:如果子进程输出内容非常多甚至无限输出,则机器内存会被撑爆

解决办法:

将stdout重定向到文件中

  1. f=open('aa.txt','a+',encoding='gbk')
  2. subp = subprocess.Popen('ping qq.com -n 2',shell=True,stdout=f)
  3. f.close()

6,通过python对比文件的异同

(1)通过文件的md5值来比较文件的不同

利用hashlib模块的md5方法,生成固定为16字节长度的字符串(用32位的16进制字符串来表示),通过对比两个文件的MD5值,来比较文件是否发生变化
  1. import hashlib
  2. def get_content_mtd(self,file_name):
  3. md5 = hashlib.md5(open(file_name,'rb').read()).hexdigest()
  4. return md5

(2)比较两个文件最后被修改的时间戳

os.stat() 方法用于在给定的路径上执行一个系统 stat 的调用,st_mtime返回最后一次修改的时间

  1. def get_modify_time(self,file_name):
  2. mtime= os.stat(file_name).st_mtime
  3. print(mtime)
  4. return mtime

7,利用python识别简单的验证码

环境搭建:

(1)安装tesseract-OCR,下载地址:Index of /tesseract

(2)安装完成后,需要配置环境变量,在path中添加tesseract的安装目录:E:\softinstall\Tesseract-OCR;在系统变量中添加TESSDATA_PREFIX;

(3)pip3 install pytesseract

  1. from PIL import Image
  2. import pytesseract
  3. image = Image.open('aa.png')
  4. text = pytesseract.image_to_string(image)
  5. print(text)

注:这种方法只能是被简单,清晰地验证码;对于模糊的验证码,需要更复杂的处理(降噪、切割等)。

8, python时间戳相减

  1. # ---------------- 两个时间戳相减 ---------------------
  2. t1 = '2022-09-30 10:31:06'
  3. print('t1 ', t1)
  4. t_now = time.strftime('%Y-%m-%d %H:%M:%S') # str
  5. print('time now ', t_now)
  6. # 两个时间字符串相减
  7. td = datetime.datetime.strptime(t_now, '%Y-%m-%d %H:%M:%S') - datetime.datetime.strptime(t1, '%Y-%m-%d %H:%M:%S')
  8. print('时间相减得到的时间差 ',td) # datetime.timedelta
  9. print('时间差中的天数 ', td.days) # int
  10. print('时间差中的秒数', td.seconds) # int

输出:

t1  2022-09-30 10:31:06
time now  2022-09-30 15:34:19
时间相减得到的时间差  5:03:13
时间差中的天数  0
时间差中的秒数 18193

  1. # ----------------------- 时间移动 -----------------
  2. t1 = '2022-09-30 10:31:06'
  3. print('t1: ', t1)
  4. t_now = datetime.datetime.strptime(t1 ,'%Y-%m-%d %H:%M:%S') # <class 'datetime.datetime'>
  5. print('当前时间:', t_now)
  6. t2 = t_now + datetime.timedelta(days=-1, hours=1, seconds=1, minutes=2) # <class 'datetime.datetime'>
  7. print('移动后的时间:', t2)

输出:

t1:  2022-09-30 10:31:06
当前时间: 2022-09-30 10:31:06
移动后的时间: 2022-09-29 11:33:07

9,python快速生成大文件

  1. import time
  2. t1 = time.time()
  3. with open('2G', 'w') as f:
  4. f.seek(2*1024*1024*1024) # 移动文件读取指针到指定位置
  5. f.write('123')
  6. t2 = time.time()
  7. print(t2-t1) # 10s

10, BeautifulSoup分析html文件

  1. import requests
  2. import re
  3. from bs4 import BeautifulSoup
  4. res = requests.get('http://xx.xx.xx.xx:xx/file/test.html')
  5. html = res.text
  6. soup = BeautifulSoup(html, 'html.parser') # html.parser是解析器类型
  7. all_a = soup.find_all(name='a', string=re.compile("1")) # find_all 获取所有的标签, re.compile("1")过滤包含1的标签
  8. print(all_a)
  9. ele = all_a[0]
  10. print(ele.text) # 获取标签的文本值
  11. print(ele['href']) # 获取标签的href属性值

11. dir()

 print(dir('')) # 内置方法dir()用于列出对象的所有属性及方法

12. windows下获取网卡IPv4 /IPv6地址的方法

  1. import psutil
  2. def get_netcard_v6(ifname='office'):
  3. # 获取Ipv6地址
  4. info = psutil.net_if_addrs()
  5. for i in info[ifname]:
  6. print(i)
  7. if i[0] == 23 and 'fe80' not in i[1]:
  8. return i[1]
  9. raise Exception('获取Ipv6地址失败')
  10. def get_netcard_v4(ifname='office'):
  11. # 获取Ipv4地址
  12. info = psutil.net_if_addrs()
  13. for i in info[ifname]:
  14. if i[0] == 2:
  15. return i[1]
  16. raise Exception('获取Ipv4地址失败')

13. python修改图片的尺寸

pip install pillow

  1. from PIL import Image
  2. root = '8e6558c0b6cbf00c.png'
  3. pic = Image.open(root)
  4. pic = pic.resize((500, 1000)) # 单位是像素,宽,高
  5. pic.save('111.png)

14. python进行url编码和解码

  1. from urllib import parse
  2. option = {
  3. 'A': '123',
  4. 'B': '啦啦啦'
  5. }
  6. # urlencode进行编码,参数为dict类型,输出字符串,拼接键值对
  7. o1 = parse.urlencode(option)
  8. print(o1)
  9. # 输出 A=123&B=%E5%95%A6%E5%95%A6%E5%95%A6
  10. # quote进行编码, 参数为字符串, 输出为字符串,将&和=一起编码
  11. o2 = parse.quote(o1)
  12. print(o2)
  13. # 输出 A%3D123%26B%3D%25E5%2595%25A6%25E5%2595%25A6%25E5%2595%25A6
  14. # unquote进行url解码,输出字符串
  15. o3 = parse.unquote(o2)
  16. print(o3)
  17. # 输出 A=123&B=%E5%95%A6%E5%95%A6%E5%95%A6
  18. o4 = parse.quote(str(option))
  19. print(o4)
  20. # 输出%7B%27A%27%3A%20%27123%27%2C%20%27B%27%3A%20%27%E5%95%A6%E5%95%A6%E5%95%A6%27%7D
  21. o5 = parse.unquote(o4)
  22. print(o5)
  23. # 输出{'A': '123', 'B': '啦啦啦'}

14、python logging模块

1、logging模块默认定义了日志的6个级别,分别为 CRIRICAL>ERROR>WARNING>INFO>DEBUG>NOTEST,

默认的日志级别是warning,低于warning的日志将不会被打印。

  1. import logging
  2. logging.warning(11)
  3. logging.error(99)
  4. logging.info('88')

输出:

  1. WARNING:root:11
  2. ERROR:root:99

2、设置日志打印级别、日志的格式

basicConfig方法可以设置日志的级别、日志的格式

  1. import logging
  2. logging.basicConfig(
  3. level=logging.NOTSET,
  4. format="%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s"
  5. )
  6. logging.info(123)

输出

2024-03-21 15:29:36,626 - debug.py[line:14] - INFO: 123

3、进阶使用

日志库采用模块化方法,记录器、处理程序、格式化程序

  1. def setLogger():
  2. if data.logger == '':
  3. # 第一步,创建一个logger
  4. data.logger = logging.getLogger()
  5. data.logger.setLevel(logging.INFO) # Log等级总开关
  6. # 第二步,创建一个handler,用于写入日志文件
  7. logfile = data.result_log
  8. fh = logging.FileHandler(logfile, mode='a')
  9. fh.setLevel(logging.INFO) # 输出到file的log等级的开关
  10. # 第三步,定义handler的输出格式
  11. formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
  12. fh.setFormatter(formatter)
  13. # 创建一个handler,用于输出到控制台
  14. ch = logging.StreamHandler()
  15. ch.setLevel(logging.INFO) # 输出到console的log等级的开关
  16. ch.setFormatter(formatter)
  17. data.logger.addHandler(ch)
  18. # 第四步,将logger添加到handler里面
  19. data.logger.addHandler(fh)

15、adb命令控制安卓手机录屏方法

(1)screenrecord 命令可以进行录屏,最长3分钟

adb -s 9fa50772 shell screenrecord /sdcard/20240416130828.mp4

(2)停止录屏

在python代码中停止adb录屏的方法有两种:

方法一:

  1. import subprocess
  2. # 启动录屏
  3. process = subprocess.Popen('adb shell screenrecord /sdcard/123.mp4')
  4. time.sleep(5)
  5. # 结束录屏
  6. process.terminate()
  7. # 将文件传输到电脑
  8. res = subprocess.check_output('adb pull /sdcard/123.mp4 123.mp4').decode()
  9. print(res)

方法二:

  1. import subprocess
  2. # 启动录屏
  3. subprocess.Popen('adb shell screenrecord /sdcard/123.mp4')
  4. time.sleep(5)
  5. # 杀掉进程,结束录屏
  6. subprocess.check_output('adb shell pkill -2 screenrecord')

Kill -2 :功能类似于Ctrl + C 是程序在结束之前,能够保存相关数据,然后再退出。

Kill -9 :直接强制结束程序。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/771146
推荐阅读
相关标签
  

闽ICP备14008679号