赞
踩
本文的文字及图片来源于网络,仅供学习、交流使用,不具有任何商业用途,版权归原作者所有,如有问题请及时联系我们以作处理
以下文章来源于腾讯云,作者:无涯WuYa
( 想要学习Python?Python学习交流群:1039649593,满足你的需求,资料都已经上传群文件流,可以自行下载!还有海量最新2020python学习资料。 )
在服务端的测试中,除了考虑服务端的业务功能和API的各个兼容性外,还需要考虑的就是服务端的稳定性以及高并发请求下服务端的承载能力。关于并发多少的数量以及具体的响应时间要求,其实每个产品的形态都是不一样的,很难使用标准的说法来进行统一。这具体看被测试的组件它所面对的业务形态,如果业务形态是是很少使用的产品,其实对性能也就没什么要求了。所以关于这点还是得根据被测组件的架构设计,承载的量以及业务目标。本文章主要分享使用Python语言编写一个简单的并发请求的测试代码。
在Python的并发编程模式中,主要涉及的点是线程以及进程,还有对应的协程。而locust主要是基于协程来进行设计,协程我们可以把它理解为微线程。在IO密集性和CPU密集性中,如果是IO密集性的,建议使用多线程的方式更加高效,如果是CPU密集性,建议使用多进程的方式更加高效。本文章主要分享基于IO密集性,也就是多线程的方式。开启一个线程的方式是非常简单,我们可以通过函数式的编程方式,也可以使用面向对象的编程方式,见如下的具体案例代码,函数式的方式:
from threading import Thread
import time as t
import random
def job(name):
print('我是{0},我要开始工作啦'.format(name))
if __name__ == '__main__':
t=Thread(target=job,args=('李四',))
t.start()
print('主线程执行结束')
面向对象的方式:
from threading import Thread import time as t import random class Job(Thread): def __init__(self,name): super().__init__() self.name=name def run(self) -> None: print('我是{0},我要开始工作啦'.format(self.name)) if __name__ == '__main__': t=Job('李四') t.start() print('主线程程序执行结束')
其实在Thread的类中,并没有返回被测函数的返回值,也就是说我们在测试API接口的时候,需要拿到被测接口的状态码,请求响应时间,和响应数据,那么我们就需要重新Thread类继承后对run()的方法进行重写,拿到被测函数中我们所期望的数据,具体案例代码如下:
#!coding:utf-8 from threading import Thread class ThreadTest(Thread): def __init__(self,func,args=()): ''' :param func: 被测试的函数 :param args: 被测试的函数的返回值 ''' super(ThreadTest,self).__init__() self.func=func self.args=args def run(self) -> None: self.result=self.func(*self.args) def getResult(self): try: return self.result except BaseException as e: return e.args[0]
这里我们以测试百度首页作为案例,来并发请求后,拿到并发请求后响应时间,状态码,然后依据响应时间拿到中位数以及其他的数据,具体完整案例代码如下:
#!/usr/bin/env python #!coding:utf-8 from threading import Thread import requests import matplotlib.pyplot as plt import datetime import time import numpy as np import json class ThreadTest(Thread): def __init__(self,func,args=()): ''' :param func: 被测试的函数 :param args: 被测试的函数的返回值 ''' super(ThreadTest,self).__init__() self.func=func self.args=args def run(self) -> None: self.result=self.func(*self.args) def getResult(self): try: return self.result except BaseException as e: return e.args[0] def baiDu(code,seconds): ''' :param code: 状态码 :param seconds: 请求响应时间 :return: ''' r=requests.get(url='http://www.baidu.com/') code=r.status_code seconds=r.elapsed.total_seconds() return code,seconds def calculationTime(startTime,endTime): '''计算两个时间之差,单位是秒''' return (endTime-startTime).seconds def getResult(seconds): '''获取服务端的响应时间信息''' data={ 'Max':sorted(seconds)[-1], 'Min':sorted(seconds)[0], 'Median':np.median(seconds), '99%Line':np.percentile(seconds,99), '95%Line':np.percentile(seconds,95), '90%Line':np.percentile(seconds,90) } return data def highConcurrent(count): ''' 对服务端发送高并发的请求 :param cout: 并发数 :return: ''' startTime=datetime.datetime.now() sum=0 list_count=list() tasks=list() results = list() #失败的信息 fails=[] #成功任务数 success=[] codes = list() seconds = list() for i in range(1,count): t=ThreadTest(baiDu,args=(i,i)) tasks.append(t) t.start() for t in tasks: t.join() if t.getResult()[0]!=200: fails.append(t.getResult()) results.append(t.getResult()) endTime=datetime.datetime.now() for item in results: codes.append(item[0]) seconds.append(item[1]) for i in range(len(codes)): list_count.append(i) #生成可视化的趋势图 fig,ax=plt.subplots() ax.plot(list_count,seconds) ax.set(xlabel='number of times', ylabel='Request time-consuming', title='olap continuous request response time (seconds)') ax.grid() fig.savefig('olap.png') plt.show() for i in seconds: sum+=i rate=sum/len(list_count) # print('\n总共持续时间:\n',endTime-startTime) totalTime=calculationTime(startTime=startTime,endTime=endTime) if totalTime<1: totalTime=1 #吞吐量的计算 try: throughput=int(len(list_count)/totalTime) except Exception as e: print(e.args[0]) getResult(seconds=seconds) errorRate=0 if len(fails)==0: errorRate=0.00 else: errorRate=len(fails)/len(tasks)*100 throughput=str(throughput)+'/S' timeData=getResult(seconds=seconds) dict1={ '吞吐量':throughput, '平均响应时间':rate, '响应时间':timeData, '错误率':errorRate, '请求总数':len(list_count), '失败数':len(fails) } return json.dumps(dict1,indent=True,ensure_ascii=False) if __name__ == '__main__': print(highConcurrent(count=1000))
上面的代码执行后,就会形成可视化的请求响应时间以及其他的信息,执行后显示如下的信息:
{
"吞吐量": "500/S",
"平均响应时间": 0.08835436199999998,
"响应时间": {
"Max": 1.5547,
"Min": 0.068293,
"Median": 0.0806955,
"99%Line": 0.12070111,
"95%Line": 0.10141509999999998,
"90%Line": 0.0940216
},
"错误率": 0.0,
"请求总数": 1000,
"失败数": 0
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。