赞
踩
Locust是一款易于使用的分布式负载测试工具,完全基于事件,即一个locust节点也可以在一个进程中支持数千并发用户,不使用回调,通过gevent使用轻量级过程(即在自己的进程内运行)。
min_wait/max_wait 模拟用户有在上边说的类中定义了好多任务/行为, 每个任务/行为间隔多久执行一次, 单位是毫秒, 默认1000, 也即隔一秒种后执行下一个任务
weight 权重: 模拟时, 同一段时间, 手机用户的访问量要比PC的访问量大, 那么对应的locust(或其子类)的weight值就大小不一
host 就是需要被压测的网站的域名(或域名前缀), 如果启动服务时没有通过参数**-host来指定域名, 那么就用使用该host**属性指定的值
通过在行为(回调函数)前加**@task(weight)**描述符来指定某一个行为被执行的频率
属性tasks来指定每一个行为被执行的频率 tasks=[fun1, fun2…] 或者 tasks={fun1:weight1, fun2:weight2…}
里边的行为或函数是被随机调用/执行的, 只是根据 weight 的不通, 随机到的频率不通而已
行为/任务可以嵌套执行, 先执行task1(也就是 fun1 下同), 然后执行task2 … 这样会更真实的模拟,其写法就是,,将这些有关联任务定义/封装到一个taskset子类中,,然后通过上边介绍的 tasks属性tasks={classname:weight}, 在另一个TaskSet子类中去关联该类以达到嵌套的目的
在执行子任务时, 通过 self.interrupt() 来终止子任务的执行, 来回到父任务类中执行, 否则子任务会一直执行
成员函数, on_start(), 如果定义的话, 就会在开始的时候执行
他可以发送http请求, 他有一个属性叫client(实例化的时候自动生成), 存储HttpSession类的实例(HttpSession类在实例化Locust的时候自动创建), 用来保存请求session
TaskSet类里也有属性client: self.client.get()或者self.client.post() ,这个client内部就是httplocust里的client
请求返回一个对象, 他有两个成员, response.status_code 和 response.content
如果因连接失败, 超时等等原因造成请求失败, 不会发出异常, 而是将上边的content置为空, status_code 置为0
可以对返回content内容自定义处理, 因为有的时候返回404是你希望得到的
import time import socket from locust import Locust, TaskSet, events, task class TcpSocketClient(socket.socket): def __init__(self, af_inet, socket_type): super(TcpSocketClient, self).__init__(af_inet, socket_type) def connect(self, addr): start_time = time.time() try: super(TcpSocketClient, self).connect(addr) except Exception as e: total_time = int((time.time() - start_time) * 1000) events.request_failure.fire(request_type="tcpsocket", name="connect", response_time=total_time, exception=e) else: total_time = int((time.time() - start_time) * 1000) events.request_success.fire(request_type="tcpsocket", name="connect", response_time=total_time, response_length=0) def send(self, msg): start_time = time.time() try: super(TcpSocketClient, self).send(msg) except Exception as e: total_time = int((time.time() - start_time) * 1000) events.request_failure.fire(request_type="tcpsocket", name="send", response_time=total_time, exception=e) else: total_time = int((time.time() - start_time) * 1000) events.request_success.fire(request_type="tcpsocket", name="send", response_time=total_time, response_length=0) def recv(self, bufsize): recv_data = '' start_time = time.time() try: recv_data = super(TcpSocketClient, self).recv(bufsize) except Exception as e: total_time = int((time.time() - start_time) * 1000) events.request_failure.fire(request_type="tcpsocket", name="recv", response_time=total_time, exception=e) else: total_time = int((time.time() - start_time) * 1000) events.request_success.fire(request_type="tcpsocket", name="recv", response_time=total_time, response_length=0) return recv_data class TcpSocketLocust(Locust): def __init__(self, *args, **kwargs): super(TcpSocketLocust, self).__init__(*args, **kwargs) self.client = TcpSocketClient(socket.AF_INET, socket.SOCK_STREAM) ADDR = (self.host, self.port) self.client.connect(ADDR) class TcpTestUser(TcpSocketLocust): host = "127.0.0.1" #连接的TCP服务的IP port = 12345 #连接的TCP服务的端口 min_wait = 100 max_wait = 1000 class task_set(TaskSet): @task def send_data(self): self.client.send("test") #发送的数据 data = self.client.recv(2048).decode() print(data) if __name__ == "__main__": user = TcpTestUser() user.run()
运行
$ locust -f locust_file.py --no-web -c10 -r10
PS:
self.client: locust协议入口实例,我们只要重写一个实例给client即可。
以下两个钩子事件,用来收集报告信息,否则写好后执行你会发现收集不到性能数据
events.request_failure.fire()
events.request_success.fire()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。