赞
踩
与其他测试类型相比,性能测试的技术复杂度更高,需要掌握的技术栈除了常规的性能测试工具外还需要掌握编程语言、系统架构、数据库与MQ等知识。是不是说必须需要掌握了这些知识就可以做性能测试而没有掌握就无法做性能呢?很显然这个答案它是否定的。
在企业级里面最常见的一个场景是开发给测试同学一个接口,让测试下这个接口的吞吐量以及响应时间等,当然还有该接口的最大承载能力。针对这样的需求简单的理解就是测试这个接口它的最大承载边界是什么,比如最大并发量是多少,吞吐量是多少,以及响应时间是多少等等。它的前提条件是不能出错,客户端发送的所有请求服务端都是能够处理的,一旦发送的请求服务端无法处理了,那么就可以找到它的最大边界了。针对这样的需求,给大家推荐一个使用Python语言编写的轻量级性能测试工具。
它的设计思想就是专门用于测试高并发场景下验证服务端的系统吞吐能力与响应时间等,同时结合测试即服务让该工具成为一个服务来进行使用。涉及到的源码如下:
#!/usr/bin/env python
#!coding:utf-8
from flask import Flask,jsonify,request
from flask_restful import Api,Resource
from flask import Flask
import requests
import matplotlib.pyplot as plt
from threading import Thread
import numpy as np
import datetime
app=Flask(__name__)
api=Api(app=app)
class OlapThread(Thread):
def __init__(self,func,args=()):
'''
:param func: 被测试的函数
:param args: 被测试的函数的返回值
'''
super(OlapThread,self).__init__()
self.func=func
self.args=args
def run(self) -> None:
self.result=self.func(*self.args)
def getResult(self):
try:
return self.result
except BaseException as e:
return e.args[0]
def targetURL(code,seconds,text,requestUrl,method=None,data=None,headers=None):
'''
高并发请求目标服务器
:param code:协议状态码
:param seconds:响应时间
:param text:响应时间
:param requestUrl: 请求地址
:param requestData:请求参数
:param method: 请求方法
:return:
'''
if method=='GET':
r=requests.get(url=requestUrl)
print('输出信息昨状态码:{0},响应结果:{1}'.format(r.status_code,r.text))
code=r.status_code
seconds=r.elapsed.total_seconds()
text=r.text
return code,seconds,text
elif method=='POST':
r=requests.post(url=requestUrl,json=data,headers=headers)
print('输出信息昨状态码:{0},响应结果:{1}'.format(r.status_code, r.text))
code=r.status_code
seconds=r.elapsed.total_seconds()
text=r.text
return code,seconds,text
def calculationTime(startTime,endTime):
'''计算两个时间之差,单位是秒'''
return (endTime-startTime).seconds
def getResult(seconds):
'''获取服务端的响应时间信息'''
data={
'Max':sorted(seconds)[-1],
'Min':sorted(seconds)[0],
'Median':np.median(seconds),
'99%Line':np.percentile(seconds,99),
'95%Line':np.percentile(seconds,95),
'90%Line':np.percentile(seconds,90)
}
return data
# def show(i,j):
# '''
# :param i: 请求总数
# :param j: 请求响应时间列表
# :return:
# '''
# fig,ax=plt.subplots()
# ax.plot(list_count,seconds)
# ax.set(xlabel='number of times', ylabel='Request time-consuming',
# title='olap continuous request response time (seconds)')
# ax.grid()
# fig.savefig('target.png')
# plt.show()
def highConcurrent(count,requestUrl,method,data,headers):
'''
对服务端发送高并发的请求
:param count: 并发数
:param requestData:请求参数
:param requestUrl: 请求地址
:param method:请求方法
:param data:请求参数
:param headers:请求头
:return:
'''
startTime=datetime.datetime.now()
sum=0
list_count=list()
tasks=list()
results = list()
#失败的信息
fails=[]
#成功任务数
success=[]
codes = list()
seconds = list()
texts=[]
for i in range(0,count):
t=OlapThread(targetURL,args=(i,i,i,requestUrl,method,data,headers))
tasks.append(t)
t.start()
print('测试中:{0}'.format(i))
for t in tasks:
t.join()
if t.getResult()[0]!=200:
fails.append(t.getResult())
results.append(t.getResult())
for item in fails:
print('请求失败的信息:\n',item[2])
endTime=datetime.datetime.now()
for item in results:
codes.append(item[0])
seconds.append(item[1])
texts.append(item[2])
for i in range(len(codes)):
list_count.append(i)
#生成可视化的趋势图
fig,ax=plt.subplots()
ax.plot(list_count,seconds)
ax.set(xlabel='number of times', ylabel='Request time-consuming',
title=' request response time (seconds)')
ax.grid()
fig.savefig('rs.png')
plt.show()
for i in seconds:
sum+=i
rate=sum/len(list_count)
# print('\n总共持续时间:\n',endTime-startTime)
totalTime=calculationTime(startTime=startTime,endTime=endTime)
if totalTime<1:
totalTime=1
#吞吐量的计算
try:
throughput=int(len(list_count)/totalTime)
except Exception as e:
print(e.args[0])
getResult(seconds=seconds)
errorRate=0
if len(fails)==0:
errorRate=0.00
else:
errorRate=len(fails)/len(tasks)*100
throughput=str(throughput)+'/S'
timeData=getResult(seconds=seconds)
# print('总耗时时间:',(endTime-startTime))
timeConsuming=(endTime-startTime)
return timeConsuming,throughput,rate,timeData,errorRate,len(list_count),len(fails)
class Interface(Resource):
def get(self):
return {'status':0,'msg':'ok','datas':[]}
def post(self):
if not request.json:
return jsonify({'status':1001,'msg':'请求参数不是JSON的数据,请检查,谢谢!'})
else:
try:
data={
'count':request.json.get('count'),
'method':request.json.get('method'),
'requestUrl':request.json.get('requestUrl'),
'data':request.json.get('data'),
'headers':request.json.get('headers')
}
timeConsuming,throughput,rate,timeData,errorRate,sum,fails=highConcurrent(
count=data['count'],
requestUrl=data['requestUrl'],
method=data['method'],
data=data['data'],
headers=data['headers']
)
print('执行总耗时:',timeConsuming)
return jsonify({'status':0,'msg': '请求成功','datas':[{
'吞吐量':throughput,
'平均响应时间':rate,
'响应时间信息':timeData,
'错误率':errorRate,
'请求总数':sum,
'失败数':fails
}]}, 200)
except Exception as e:
return e.args[0]
api.add_resource(Interface,'/v1/interface')
if __name__ == '__main__':
app.run(debug=True,port=5001,host='0.0.0.0')
涉及到的Dockerfile文件内容如下(这样也可以容器化运行):
FROM centos:7.8.2003
MAINTAINER 无涯
#下载yum
RUN curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo;
RUN curl -o /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo;
#安装Python环境
RUN yum install python3-devel python3-pip -y
#安装flask库
RUN pip3 install -i https://pypi.douban.com/simple flask
RUN pip3 install -i https://pypi.douban.com/simple flask_restful
RUN pip3 install -i https://pypi.douban.com/simple requests
RUN pip3 install matplotlib
RUN pip3 install -i https://pypi.douban.com/simple numpy
#复制文件到容器目录
COPY app.py /opt
#切换目录
WORKDIR /opt
#启动服务
EXPOSE 5001
CMD ["python3","app.py"]
下面来说下它的使用方法,前面已经阐述到它是一个服务,其实从代码里面可以看到使用Flask编写了一个轻量级的服务,端口是5001。服务提供了GET与POST请求方法的入口,下面主要是针对GET与POST请求方法来说明它的基本使用。
启动成功后,在PostMan中进行访问,如针对百度首页发送请求,每秒发送2000个请求,具体如下图所示:
点击Send开始测试,测试结果信息如下所示:
如上显示了发送请求后返回的吞吐量与响应时间以及请求响应时间的趋势图。
下面再以POST请求为案例,如测试登录请求地址来验证登录服务的吞吐能力,它的请求如下图所示:
发送请求后返回的信息如下:
这是我整理的《2024最新Python自动化测试全套教程》,以及配套的接口文档/项目实战【网盘资源】,需要的朋友可以下方视频的置顶评论获取。肯定会给你带来帮助和方向。
【已更新】B站讲的最详细的Python接口自动化测试实战教程全集(实战最新版)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。