当前位置:   article > 正文

python脚本测试——接口高并发场景_python怎么模拟高并发访问接口达到测试目的

python怎么模拟高并发访问接口达到测试目的

一、环境准备。

1、下载安装python

https://www.python.org/

2,安装教程请自行百度

https://jingyan.baidu.com/article/cd4c29792c36c6756f6e604b.html

3、IDEA在file–settings–Plugins中安装Python

二、复用需要修改的地方:

1、url路径

2、token信息

3、连接数据库的信息

4、sql语句

5、请求方式:get、post、delete、put

6、用户username的特殊性规则

7、并发线程数:concurrent_thread_count

8、请求数据在RequestBody中还是在RequestParam中

三、添加学生。

1、设计思路

设置线程数,作为循环次数,特殊值+i作为学号,性别取1、2的随机数

2、实现代码如下

# 并发测试框架//添加学生
import pymysql
import requests,threading
import random
# 接口路径和token
data = {
    "url": "http://。。。/base/add",
    "header": {
        "X-Access-Token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9."
    },
}
# 并发线程数
concurrent_thread_count = 1000
# 每个线程中循环测试接口的次数
ONE_WORKER_NUM = 1
# 测试代码
def test(username):
    random.randint(1,2)
    # 获取试题接口
    # 参数在请求体中时,使用bodys
    bodys =  {
        # 请求体参数
        "num": username,
        "name": "随删数据",
        "gender": random.randint(1, 2)
    }
    print(bodys)
    response = requests.post(data["url"], json=bodys, headers=data["header"])
    # response = requests.get(data["url"]+'?userId='+resultSql[0], headers=data["header"])
    if response.status_code == 200:
        result = response.content.decode('utf-8')
    else:
        result = "访问失败"
    print(threading.current_thread(), result)

# 单个线程,可以设置一个线程访问几次接口
def working(username):
    global ONE_WORKER_NUM
    for i in range(0, ONE_WORKER_NUM):
        test(username)

def t():
    Threads = []
    # 根据数据的数量当作循环次数,每次创建一个线程
    for i in range(concurrent_thread_count):
        username = 111111000+i
        t = threading.Thread(target=working, args=(username, ))
        t.setDaemon(True)
        Threads.append(t)
    for t in Threads:
        t.start()
    for t in Threads:
        t.join()

if __name__=="__main__":
    t()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

四、获取试题。

1、设计思路如下

1、第一步:先执行查询数据库操作,查询出来一共有多少条符合条件的数据;数据总数作为循环的次数,每次循环起一个线程,线程中带有参数username,根据添加用户时账号的规律设置username = “”"’%s’"""%(111111000+i)当作参数传递

2、第二步:每个线程中可以设置循环次数,每次调用具体方法

3、第三步:测试代码部分,根据传递的username作为条件查询用户id;获取到用户id后当作参数去访问接口,打印返回值

2、实现代码如下(使用多个不同的用户id做参数)

# 并发测试框架
import pymysql
import requests,threading
# 接口路径和token
data = {
    "url": "http://。。。/mobile/getAll",
    "header": {
        "X-Access-Token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9"
    },
}
# 每个线程中循环测试接口的次数
ONE_WORKER_NUM = 1
# 测试代码
def test(username):
# 根据username查询id
    conn = pymysql.connect(
        host = "192.168...",
        port = 3306,
        user = "root",
        passwd = "...",
        db="jeecg"
    )
    cur = conn.cursor()

    sql = "SELECT id FROM user WHERE username ="+username
    # print(sql)
    cur.execute(sql)
    resultSql = cur.fetchone()
    # 获取试题接口
    # 参数在请求体中时,使用bodys
    # bodys =  {
    #     # 请求体参数
    #     "userId": resultSql[0]
    # }
    # response = requests.get(data["url"]+'?userId='+resultSql[0], json=bodys, headers=data["header"])
    print(data["url"]+'?userId='+resultSql[0])
    response = requests.get(data["url"]+'?userId='+resultSql[0], headers=data["header"])
    if response.status_code == 200:
        result = response.content.decode('utf-8')
    else:
        result = "访问失败"
    print(threading.current_thread(), result)

# 单个线程,可以设置一个线程访问几次接口
def working(username):
    global ONE_WORKER_NUM
    for i in range(0, ONE_WORKER_NUM):
        test(username)

def t():
    # 查询符合条件的数据总数
    conn = pymysql.connect(
        host = "192.168...",
        port = 3306,
        user = "root",
        passwd = "...",
        db="jeecg"
    )
    cur = conn.cursor()
    sql = "SELECT COUNT(*) FROM user WHERE username LIKE '11111%' "
    # 打印SQL
    # print(sql)
    cur.execute(sql)
    result = cur.fetchone()
    # 打印循环次数
    print(result[0])

    Threads = []
    # 根据数据的数量当作循环次数,每次创建一个线程
    for i in range(result[0]):
        username = """'%s'"""%(111111000+i)
        t = threading.Thread(target=working, args=(username, ))
        # 使用setDaemon()和守护线程这方面知识有关, 比如在启动线程前设置thread.setDaemon(True),就是设置该线程为守护线程,
        # 表示该线程是不重要的,进程退出时不需要等待这个线程执行完成。
        # 这样做的意义在于:避免子线程无限死循环,导致退不出程序,也就是避免楼上说的孤儿进程。
        #
        # thread.setDaemon()设置为True, 则设为true的话 则主线程执行完毕后会将子线程回收掉,
        # 设置为false,主进程执行结束时不会回收子线程
        # setDaemon()说明:
        # setDaemon():设置此线程是否被主线程守护回收。默认False不回收,需要在 start 方法前调用;
        # 设为True相当于像主线程中注册守护,主线程结束时会将其一并回收。
        t.setDaemon(True)
        # python中的append()方法用于在列表末尾添加新的对象。
        Threads.append(t)
    for t in Threads:
        # 开始线程活动。
        # 对每一个线程对象来说它只能被调用一次,它安排对象在一个另外的单独线程中调用run()方法(而非当前所处线程)。
        # 当该方法在同一个线程对象中被调用超过一次时,会引入RuntimeError(运行时错误)。
        t.start()
    for t in Threads:
        t.join()

if __name__=="__main__":
    t()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95

五、删除学生。

1、设计思路

1、先查询数据库中有多少个需要删除的数据,作为循环次数,特殊值+i作为学号传到创建的线程里面

2、再根据学号查学生的id,查到id后去请求删除接口

2、代码实现如下

# 并发测试框架//删除学生
import pymysql
import requests,threading
# 接口路径和token
data = {
    "url": "http://。。。/base/delete",
    "header": {
        "X-Access-Token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9"
    },
}
# 每个线程中循环测试接口的次数
ONE_WORKER_NUM = 1
# 测试代码
def test(username):
    # 根据username查询id
    conn = pymysql.connect(
        host = "。。。",
        port = 3306,
        user = "root",
        passwd = "。。。",
        db="jeecg"
    )
    cur = conn.cursor()

    sql = "SELECT id FROM mes_stu WHERE num ="+username
    # print(sql)
    cur.execute(sql)
    resultSql = cur.fetchone()
    # 获取试题接口
    # 参数在请求体中时,使用bodys
    # bodys =  {
    #     # 请求体参数
    #     "userId": resultSql[0]
    # }
    # response = requests.get(data["url"]+'?userId='+resultSql[0], json=bodys, headers=data["header"])
    print(data["url"]+'?userId='+resultSql[0])
    response = requests.delete(data["url"]+'?id='+resultSql[0], headers=data["header"])
    if response.status_code == 200:
        result = response.content.decode('utf-8')
    else:
        result = "访问失败"
    print(threading.current_thread(), result)

# 单个线程,可以设置一个线程访问几次接口
def working(username):
    global ONE_WORKER_NUM
    for i in range(0, ONE_WORKER_NUM):
        test(username)

def t():
    # 查询符合条件的数据总数
    conn = pymysql.connect(
        host = "。。。",
        port = 3306,
        user = "root",
        passwd = "。。。",
        db="jeecg"
    )
    cur = conn.cursor()
    sql = "SELECT COUNT(*) FROM mes_stu WHERE num LIKE '11111%' "
    # 打印SQL
    # print(sql)
    cur.execute(sql)
    result = cur.fetchone()
    # 打印循环次数
    print(result[0])

    Threads = []
    for i in range(result[0]):
        # 根据学号规律拼接
        username = """'%s'"""%(111111000+i)
        t = threading.Thread(target=working, args=(username, ))
        t.setDaemon(True)
        Threads.append(t)
    for t in Threads:
        t.start()
    for t in Threads:
        t.join()

if __name__=="__main__":
    t()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

六、直接并发(参数需要自己写)

import requests
from multiprocessing import Pool

data = {
    "times": 10, # 并发量
    "method": "GET",
    "url": "http://。。。/list",
    "header": {
        # "X-Access-Token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9"
    },
    "body": {
        # 参数
    }
}

def run_task(idx):
    response = requests.get(data["url"], json=data["body"], headers=data["header"])
    if response.status_code == 200:
        result = response.content.decode('utf-8')
    else:
        result = "访问失败"
    print("第 %s 次执行:%s \n" % (idx, result))

if __name__ == '__main__':
    p = Pool(data["times"])
    for index in range(data["times"]):
        p.apply_async(run_task, args=(index + 1,))

    p.close()
    p.join()
    print("执行结束.")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

七、多个线程并行, 每个线程内串行执行多次(可一次),参数需要自己写

import threading
import time
import requests
# 这段脚本是本机执行, 请求QA环境, 100个线程并行, 每个线程内串行执行100次, 一共1W 次 接口调用
data = {
    "url": "http://。。。/list",
    "header": {
        # "X-Access-Token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9"
    },
    "body": {
        # 参数
    }
}
# concurrent request number
concurrent_thread_count = 5

# loop in every thread
iterate_count = 1

class RequestThread(threading.Thread):

    def __init__(self, thread_name):
        threading.Thread.__init__(self)
        self.test_success_count = 0
        self.test_fail_count = 0

    def run(self):
        i = 0
        while i < iterate_count:
            self.test()
            i += 1

    def test(self):
        try:
            conn  = requests.get(data["url"], json=data["body"], headers=data["header"])
            if conn.status_code == 200:
                result = conn.content.decode('utf-8')
                self.test_success_count += 1
            else:
                self.test_fail_count += 1
            print("第 %s 次执行:%s \n" % (self, result))
        except Exception as e:
            print(e)
        finally:
            conn.close()


start_time = time.time()
threads = []
i = 0
while i < concurrent_thread_count:
    t = RequestThread("thread" + str(i))
    threads.append(t)
    t.start()
    i += 1

for thr in threads:
    thr.join()

print("test finished.")
time_span = time.time() - start_time
all_count = total_success = total_fail = 0
for t in threads:
    total_success += t.test_success_count
    total_fail += t.test_fail_count
all_count += total_success + total_fail

print("total %s Requests. " % all_count)
print("%s Requests success. " % total_success)
print("%s Requests fail. " % total_fail)
print("total %s seconds." % time_span)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/172545
推荐阅读
相关标签
  

闽ICP备14008679号