赞
踩
- import requests
- import threading
-
- # 测试案例json报文
- d_data = {
-
- }
-
-
- # 调用接口的url
- s_url = 'http://(ip或域名)/v1/rule_threshold'
-
-
- # 调用接口的函数
- def response_function(i, s_url, d_data):
- p_response = requests.post(s_url, json=d_data)
- # d_result = p_response.json()
- return p_response
-
- # 重构多线程
- class mythread(threading.Thread):
- def __init__(self, func, args=()):
- super(mythread, self).__init__()
- self.func = func
- self.args = args
- def run(self):
- self.result = self.func(*self.args)
- def get_result(self):
- try:
- return self.result
- except Exception:
- return None
-
- # 利用多线程方式调用接口
- # 创建线程列表
- threads = []
- # 创建并启动线程
- for i in range(20):
- thread = mythread(response_function, args=(i, s_url, d_data))
- thread.start()
- threads.append(thread)
-
-
- # 等待所有线程执行完毕,拿到执行结果
- for thread in threads:
- thread.join()
- print(thread.get_result().json())
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。