赞
踩
from idna importunicodefrom locust importHttpLocust, TaskSet, task, User, HttpUser, betweenimportjsonimportuuidimportosimporttime
?
?classMyTasks(TaskSet):
?
header= {"Content-Type": "application/json", "systemid": "yanpan", "authcode": "7ef54175f9d97b7ed26aeb2b5f121f40"}
ygdatas= json.load(open(‘ygys.json‘, ‘r‘, encoding=‘UTF-8‘)) #原告要素表
bgdatas = json.load(open(‘bgys.json‘, ‘r‘, encoding=‘UTF-8‘)) #被告要素表
defget_bh(self):
bh= ‘‘.join(str(uuid.uuid4()).split(‘-‘)) #通过python生成uuid
returnbh
?defget_bhAJ(self):
bhAj= ‘‘.join(str(uuid.uuid4()).upper().split(‘-‘)) #通过python生成uuid
returnbhAj
?#获取要素填写的前端页面(为了使场景更真实一点)
defGetYsui(self):
self.client.get(‘http://xxx.xx.xx.xx:8000/lhjf-yg?ygsf=2&systemid=yanpan&authcode=7ef54175f9d97b7ed26aeb2b5f121f40&ay=9015&ssdw=1&jbfy=2400&bhAj=298C0D590F06D08DD773BAF9C7C49049&dsr=FD63B3C0D12A78A58DCF6D6A6C4D4A78&gldsr=&sfkbj=1&dc=1&dy=1&zc=1&bc=1&qrtj=1&gbgwdt=1&fymc=%E6%B9%96%E5%8D%97%E7%9C%81%E9%AB%98%E7%BA%A7%E4%BA%BA%E6%B0%91%E6%B3%95%E9%99%A2001&ah=%EF%BC%882018%EF%BC%89%E6%B9%98%E6%B0%91%E5%88%9D696%E5%8F%B7&time=1593599607909&ywlx=30100‘,headers=self.header)
?#填写原告要素表
defWriteygysb(self, ygdatas):#填写原告要素表
self.GetYsui()
time.sleep(1)
self.client.post("http://xxx.xx.xx.xx:8000/yaosu/api/v1/ys", headers=self.header, data=ygdatas)
?#填写被告要素表
defWritebgysb(self, bgdatas):#填写被告要素表
self.GetYsui()
time.sleep(1)
self.client.post("http://xxx.xx.xx.xx:8000/yaosu/api/v1/ys", headers=self.header, data=bgdatas)
?
?#清除库里由于压测添加的脏数据
defdelete(self, bhAJ):
params1= {"bhAj": bhAJ, "ssdw": "1"}
self.client.delete(f‘http://xxx.xx.xx.xx:8888/api/v1/ys?{bhAJ}&ssdw=1‘,params=params1, headers=self.header)
params2= {"bhAj": bhAJ, "ssdw": "2"}
self.client.delete(f‘http://xxx.xx.xx.xx:8888/api/v1/ys?{bhAJ}&ssdw=2‘,params=params2, headers=self.header)
?
@task#压测任务,将流程串联起来(获取前端页面-填写原被告要素表-清楚脏数据【这里清楚脏数据替代了同步的操作】)
defYsPressureTest(self):#填写原告要素表
bh =self.get_bh()
bhAj=self.get_bhAJ()
self.ygdatas["bhAj"] =bhAj
self.ygdatas["bh"] =bh
self.bgdatas["bhAj"] =bhAj
self.bgdatas["bh"] =bh
ygdatas=json.dumps(self.ygdatas)
bgdatas=json.dumps(self.bgdatas)
self.Writeygysb(ygdatas)
self.Writebgysb(bgdatas)
self.delete(bhAj)
?
?
?classRunLoadTests(HttpUser):‘‘‘创建运行类‘‘‘tasks=[MyTasks]
wait_time= between(5, 10) #每个用户5-10秒内执行一遍流程
?
?if __name__=="main":
os.system("locust -f test_load.py --web-host=127.0.0.1 --host=xxx.xx.xx.xx")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。