赞
踩
1、下载安装python
https://www.python.org/
2,安装教程请自行百度
https://jingyan.baidu.com/article/cd4c29792c36c6756f6e604b.html
3、IDEA在file–settings–Plugins中安装Python
1、url路径
2、token信息
3、连接数据库的信息
4、sql语句
5、请求方式:get、post、delete、put
6、用户username的特殊性规则
7、并发线程数:concurrent_thread_count
8、请求数据在RequestBody中还是在RequestParam中
设置线程数,作为循环次数,特殊值+i作为学号,性别取1、2的随机数
# 并发测试框架//添加学生 import pymysql import requests,threading import random # 接口路径和token data = { "url": "http://。。。/base/add", "header": { "X-Access-Token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9." }, } # 并发线程数 concurrent_thread_count = 1000 # 每个线程中循环测试接口的次数 ONE_WORKER_NUM = 1 # 测试代码 def test(username): random.randint(1,2) # 获取试题接口 # 参数在请求体中时,使用bodys bodys = { # 请求体参数 "num": username, "name": "随删数据", "gender": random.randint(1, 2) } print(bodys) response = requests.post(data["url"], json=bodys, headers=data["header"]) # response = requests.get(data["url"]+'?userId='+resultSql[0], headers=data["header"]) if response.status_code == 200: result = response.content.decode('utf-8') else: result = "访问失败" print(threading.current_thread(), result) # 单个线程,可以设置一个线程访问几次接口 def working(username): global ONE_WORKER_NUM for i in range(0, ONE_WORKER_NUM): test(username) def t(): Threads = [] # 根据数据的数量当作循环次数,每次创建一个线程 for i in range(concurrent_thread_count): username = 111111000+i t = threading.Thread(target=working, args=(username, )) t.setDaemon(True) Threads.append(t) for t in Threads: t.start() for t in Threads: t.join() if __name__=="__main__": t()
1、第一步:先执行查询数据库操作,查询出来一共有多少条符合条件的数据;数据总数作为循环的次数,每次循环起一个线程,线程中带有参数username,根据添加用户时账号的规律设置username = “”"’%s’"""%(111111000+i)当作参数传递
2、第二步:每个线程中可以设置循环次数,每次调用具体方法
3、第三步:测试代码部分,根据传递的username作为条件查询用户id;获取到用户id后当作参数去访问接口,打印返回值
# 并发测试框架 import pymysql import requests,threading # 接口路径和token data = { "url": "http://。。。/mobile/getAll", "header": { "X-Access-Token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9" }, } # 每个线程中循环测试接口的次数 ONE_WORKER_NUM = 1 # 测试代码 def test(username): # 根据username查询id conn = pymysql.connect( host = "192.168...", port = 3306, user = "root", passwd = "...", db="jeecg" ) cur = conn.cursor() sql = "SELECT id FROM user WHERE username ="+username # print(sql) cur.execute(sql) resultSql = cur.fetchone() # 获取试题接口 # 参数在请求体中时,使用bodys # bodys = { # # 请求体参数 # "userId": resultSql[0] # } # response = requests.get(data["url"]+'?userId='+resultSql[0], json=bodys, headers=data["header"]) print(data["url"]+'?userId='+resultSql[0]) response = requests.get(data["url"]+'?userId='+resultSql[0], headers=data["header"]) if response.status_code == 200: result = response.content.decode('utf-8') else: result = "访问失败" print(threading.current_thread(), result) # 单个线程,可以设置一个线程访问几次接口 def working(username): global ONE_WORKER_NUM for i in range(0, ONE_WORKER_NUM): test(username) def t(): # 查询符合条件的数据总数 conn = pymysql.connect( host = "192.168...", port = 3306, user = "root", passwd = "...", db="jeecg" ) cur = conn.cursor() sql = "SELECT COUNT(*) FROM user WHERE username LIKE '11111%' " # 打印SQL # print(sql) cur.execute(sql) result = cur.fetchone() # 打印循环次数 print(result[0]) Threads = [] # 根据数据的数量当作循环次数,每次创建一个线程 for i in range(result[0]): username = """'%s'"""%(111111000+i) t = threading.Thread(target=working, args=(username, )) # 使用setDaemon()和守护线程这方面知识有关, 比如在启动线程前设置thread.setDaemon(True),就是设置该线程为守护线程, # 表示该线程是不重要的,进程退出时不需要等待这个线程执行完成。 # 这样做的意义在于:避免子线程无限死循环,导致退不出程序,也就是避免楼上说的孤儿进程。 # # thread.setDaemon()设置为True, 则设为true的话 则主线程执行完毕后会将子线程回收掉, # 设置为false,主进程执行结束时不会回收子线程 # setDaemon()说明: # setDaemon():设置此线程是否被主线程守护回收。默认False不回收,需要在 start 方法前调用; # 设为True相当于像主线程中注册守护,主线程结束时会将其一并回收。 t.setDaemon(True) # python中的append()方法用于在列表末尾添加新的对象。 Threads.append(t) for t in Threads: # 开始线程活动。 # 对每一个线程对象来说它只能被调用一次,它安排对象在一个另外的单独线程中调用run()方法(而非当前所处线程)。 # 当该方法在同一个线程对象中被调用超过一次时,会引入RuntimeError(运行时错误)。 t.start() for t in Threads: t.join() if __name__=="__main__": t()
1、先查询数据库中有多少个需要删除的数据,作为循环次数,特殊值+i作为学号传到创建的线程里面
2、再根据学号查学生的id,查到id后去请求删除接口
# 并发测试框架//删除学生 import pymysql import requests,threading # 接口路径和token data = { "url": "http://。。。/base/delete", "header": { "X-Access-Token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9" }, } # 每个线程中循环测试接口的次数 ONE_WORKER_NUM = 1 # 测试代码 def test(username): # 根据username查询id conn = pymysql.connect( host = "。。。", port = 3306, user = "root", passwd = "。。。", db="jeecg" ) cur = conn.cursor() sql = "SELECT id FROM mes_stu WHERE num ="+username # print(sql) cur.execute(sql) resultSql = cur.fetchone() # 获取试题接口 # 参数在请求体中时,使用bodys # bodys = { # # 请求体参数 # "userId": resultSql[0] # } # response = requests.get(data["url"]+'?userId='+resultSql[0], json=bodys, headers=data["header"]) print(data["url"]+'?userId='+resultSql[0]) response = requests.delete(data["url"]+'?id='+resultSql[0], headers=data["header"]) if response.status_code == 200: result = response.content.decode('utf-8') else: result = "访问失败" print(threading.current_thread(), result) # 单个线程,可以设置一个线程访问几次接口 def working(username): global ONE_WORKER_NUM for i in range(0, ONE_WORKER_NUM): test(username) def t(): # 查询符合条件的数据总数 conn = pymysql.connect( host = "。。。", port = 3306, user = "root", passwd = "。。。", db="jeecg" ) cur = conn.cursor() sql = "SELECT COUNT(*) FROM mes_stu WHERE num LIKE '11111%' " # 打印SQL # print(sql) cur.execute(sql) result = cur.fetchone() # 打印循环次数 print(result[0]) Threads = [] for i in range(result[0]): # 根据学号规律拼接 username = """'%s'"""%(111111000+i) t = threading.Thread(target=working, args=(username, )) t.setDaemon(True) Threads.append(t) for t in Threads: t.start() for t in Threads: t.join() if __name__=="__main__": t()
import requests from multiprocessing import Pool data = { "times": 10, # 并发量 "method": "GET", "url": "http://。。。/list", "header": { # "X-Access-Token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9" }, "body": { # 参数 } } def run_task(idx): response = requests.get(data["url"], json=data["body"], headers=data["header"]) if response.status_code == 200: result = response.content.decode('utf-8') else: result = "访问失败" print("第 %s 次执行:%s \n" % (idx, result)) if __name__ == '__main__': p = Pool(data["times"]) for index in range(data["times"]): p.apply_async(run_task, args=(index + 1,)) p.close() p.join() print("执行结束.")
import threading import time import requests # 这段脚本是本机执行, 请求QA环境, 100个线程并行, 每个线程内串行执行100次, 一共1W 次 接口调用 data = { "url": "http://。。。/list", "header": { # "X-Access-Token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9" }, "body": { # 参数 } } # concurrent request number concurrent_thread_count = 5 # loop in every thread iterate_count = 1 class RequestThread(threading.Thread): def __init__(self, thread_name): threading.Thread.__init__(self) self.test_success_count = 0 self.test_fail_count = 0 def run(self): i = 0 while i < iterate_count: self.test() i += 1 def test(self): try: conn = requests.get(data["url"], json=data["body"], headers=data["header"]) if conn.status_code == 200: result = conn.content.decode('utf-8') self.test_success_count += 1 else: self.test_fail_count += 1 print("第 %s 次执行:%s \n" % (self, result)) except Exception as e: print(e) finally: conn.close() start_time = time.time() threads = [] i = 0 while i < concurrent_thread_count: t = RequestThread("thread" + str(i)) threads.append(t) t.start() i += 1 for thr in threads: thr.join() print("test finished.") time_span = time.time() - start_time all_count = total_success = total_fail = 0 for t in threads: total_success += t.test_success_count total_fail += t.test_fail_count all_count += total_success + total_fail print("total %s Requests. " % all_count) print("%s Requests success. " % total_success) print("%s Requests fail. " % total_fail) print("total %s seconds." % time_span)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。