当前位置:   article > 正文

locust能监控服务器性能吗,利用locust进行压测,检验一下之前搭建的性能监控平台...

locust进行压测时 需要对压测机进行监控还是服务器

from idna importunicodefrom locust importHttpLocust, TaskSet, task, User, HttpUser, betweenimportjsonimportuuidimportosimporttime

?

?classMyTasks(TaskSet):

?

header= {"Content-Type": "application/json", "systemid": "yanpan", "authcode": "7ef54175f9d97b7ed26aeb2b5f121f40"}

ygdatas= json.load(open(‘ygys.json‘, ‘r‘, encoding=‘UTF-8‘)) #原告要素表

bgdatas = json.load(open(‘bgys.json‘, ‘r‘, encoding=‘UTF-8‘)) #被告要素表

defget_bh(self):

bh= ‘‘.join(str(uuid.uuid4()).split(‘-‘)) #通过python生成uuid

returnbh

?defget_bhAJ(self):

bhAj= ‘‘.join(str(uuid.uuid4()).upper().split(‘-‘)) #通过python生成uuid

returnbhAj

?#获取要素填写的前端页面(为了使场景更真实一点)

defGetYsui(self):

self.client.get(‘http://xxx.xx.xx.xx:8000/lhjf-yg?ygsf=2&systemid=yanpan&authcode=7ef54175f9d97b7ed26aeb2b5f121f40&ay=9015&ssdw=1&jbfy=2400&bhAj=298C0D590F06D08DD773BAF9C7C49049&dsr=FD63B3C0D12A78A58DCF6D6A6C4D4A78&gldsr=&sfkbj=1&dc=1&dy=1&zc=1&bc=1&qrtj=1&gbgwdt=1&fymc=%E6%B9%96%E5%8D%97%E7%9C%81%E9%AB%98%E7%BA%A7%E4%BA%BA%E6%B0%91%E6%B3%95%E9%99%A2001&ah=%EF%BC%882018%EF%BC%89%E6%B9%98%E6%B0%91%E5%88%9D696%E5%8F%B7&time=1593599607909&ywlx=30100‘,headers=self.header)

?#填写原告要素表

defWriteygysb(self, ygdatas):#填写原告要素表

self.GetYsui()

time.sleep(1)

self.client.post("http://xxx.xx.xx.xx:8000/yaosu/api/v1/ys", headers=self.header, data=ygdatas)

?#填写被告要素表

defWritebgysb(self, bgdatas):#填写被告要素表

self.GetYsui()

time.sleep(1)

self.client.post("http://xxx.xx.xx.xx:8000/yaosu/api/v1/ys", headers=self.header, data=bgdatas)

?

?#清除库里由于压测添加的脏数据

defdelete(self, bhAJ):

params1= {"bhAj": bhAJ, "ssdw": "1"}

self.client.delete(f‘http://xxx.xx.xx.xx:8888/api/v1/ys?{bhAJ}&ssdw=1‘,params=params1, headers=self.header)

params2= {"bhAj": bhAJ, "ssdw": "2"}

self.client.delete(f‘http://xxx.xx.xx.xx:8888/api/v1/ys?{bhAJ}&ssdw=2‘,params=params2, headers=self.header)

?

@task#压测任务,将流程串联起来(获取前端页面-填写原被告要素表-清楚脏数据【这里清楚脏数据替代了同步的操作】)

defYsPressureTest(self):#填写原告要素表

bh =self.get_bh()

bhAj=self.get_bhAJ()

self.ygdatas["bhAj"] =bhAj

self.ygdatas["bh"] =bh

self.bgdatas["bhAj"] =bhAj

self.bgdatas["bh"] =bh

ygdatas=json.dumps(self.ygdatas)

bgdatas=json.dumps(self.bgdatas)

self.Writeygysb(ygdatas)

self.Writebgysb(bgdatas)

self.delete(bhAj)

?

?

?classRunLoadTests(HttpUser):‘‘‘创建运行类‘‘‘tasks=[MyTasks]

wait_time= between(5, 10) #每个用户5-10秒内执行一遍流程

?

?if __name__=="main":

os.system("locust -f test_load.py --web-host=127.0.0.1 --host=xxx.xx.xx.xx")

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/515074
推荐阅读
相关标签
  

闽ICP备14008679号