赞
踩
直接上代码
- import random
- import sys
- import os
- import time
- from socket import *
- from locust import User, events, task, TaskSet, between,constant_pacing
- import datetime
-
- class UserBehavior(TaskSet):
- # wait_time = between(1, 1)
- def on_start(self):
- self.tcp_client_socket = socket(AF_INET, SOCK_STREAM)
- server_addr = ("10.87.13.12", 514)
- self.tcp_client_socket.connect(server_addr)
-
- @task
- def buy(self):
- try:
- # 发送数据
- # print("ready to sending")
- start_time = time.time()
- date = (datetime.datetime.now()).strftime("%Y-%m-%d %H:%M:%S")
- data = f'<46>USM |1|{date}|1|1|dfs1|10.5.68.136|登录系统: 密码登录||\n'
- data1 = f'<46>USM |1|{date}|1|1|dsf1|10.5.68.136|登录系统: 密码登录||\n'
- _tmp = random.randint(1, 2)
- if _tmp == 1:
- self.tcp_client_socket.send(data.encode("utf-8"))
- else:
- self.tcp_client_socket.send(data1.encode("utf-8"))
- total_time = int((time.time() - start_time) * 1000)
- # print("success to send---")
- events.request_success.fire(request_type="TCP_bon", name="new_face", response_time=total_time,
- response_length=0)
- except Exception as e:
- events.request_failure.fire(request_type="TCP_bon", name="new_face", response_time=0, exception=e,
- response_length=0)
-
- def on_stop(self):
- # 释放socket
- self.tcp_client_socket.close()
-
- class ApiUser(User):
-
- wait_time = constant_pacing(1)
-
- tasks = [UserBehavior]
-
- if __name__ == "__main__":
- wokersSum = int(sys.argv[1])
- os.system('ulimit -n 65535')
- # 正式开始发送日志
- print(f"开始起服务准备测试日志性能————————————————————————————————————————————————————")
- os.system('ps auxf | grep locust | grep -v grep | awk \'{print$2}\'|xargs kill -15')
- for i in range(wokersSum):
- os.system(f'locust -f main50percent.py --master-port=11111 --worker &')
- time.sleep(1)
- print(f"开启中{i}")
- os.system(f'locust -f main50percent.py --master --expect-workers={wokersSum} --master-bind-port=11111')

运行方法:
pyhon3 main50percent.py 5
后面这个5表示用5个工作进程来跑
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。