赞
踩
import datetime import random import time from multiprocessing.dummy import Pool as ThreadPool # 修改为从数据库中读取用例列表 def get_case_list(): return [i for i in range(1, 20)] # 具体的执行方法, 记录测试结果和测试时间 def run_case(case_id): run_time = random.randrange(1, 10) time.sleep(run_time) print('run case: %d, result is pass, run time is : %d seconds\n' % (case_id, run_time)) # 设置并发数量 threadNum = 5 # 设置多进程执行 pool = ThreadPool(threadNum) # 格式化成2016-03-20 11:45:39形式 # print(f'并发数为: {threadNum}') start = datetime.datetime.now() print('Start:', start) pool.map(run_case, get_case_list()) end = datetime.datetime.now() print('End:', end) print(f'{threadNum}个进程并发耗时: {(end - start).seconds} 秒') pool.close() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 pool.join()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。