赞
踩
from flask import Flask import time import multiprocessing app = Flask(__name__) num = multiprocessing.Value("d", 1) # d表示数值,主进程与子进程共享这个value。(主进程与子进程都是用的同一个value) def func(num): # 测试是否异常,有异常直接结束子进程 # print('子线程值:', num.value) # if 1: # num.value = 1 # 子进程改变数值的值,主进程跟着改变 # print('重置num值:', num.value) # quit('有异常退出') print('子线程1:', num.value) time.sleep(5) print('睡眠完成') a=1/0 num.value = 1 # 子进程改变数值的值,主进程跟着改变 print('子线程2:',num.value) # return '子线程处理完成' @app.route('/') def hello_world(): if num.value: num.value=0 p = multiprocessing.Process(target=func, args=(num,)) p.start() print('提交成功,请等待处理') return '提交成功,请等待处理' if not num.value: print('正在执行,请等待') return '正在执行,请等待' return 'Hello World!' if __name__ == '__main__': app.run(debug=True, port=6006)
from multiprocessing import Pool,Manager import os, time, random manager=Manager() value1=manager.Value('i',1) def work1(msg): print('值1:',msg.value) time.sleep(5) #将数据交给,处理程序 print('睡眠完成') # return {"a":a,"b":b,"c":c} # print("循环任务%d由进程号%d进程执行" % (msg, os.getpid())) # time.sleep(random.random()) # 随机生产0-1的浮点数 # end_time = time.time() # 结束时间 # print(msg, "执行完毕,耗时%0.2f" % (end_time - start_time)) msg.value=1 print('值2:',msg.value) @app.route('/') def work(): if value1.value: value1.value=0 print('子进程开始执行') pool = Pool(3) # 定义一个进程池,最大进程数为3 pool.apply_async(func=work1, args=(value1,)) pool.close() # 关闭进程池,关闭后pool不再接收新的请求任务 return '子进程开始执行!' else: print('有任务正在执行') return '有任务正在执行
def func(a,b,c): time.sleep(5) #将数据交给,处理程序 print('睡眠完成') return {"a":a,"b":b,"c":c} @app.route('/') def func2(): ctx = torch.multiprocessing.get_context("spawn") print(torch.multiprocessing.cpu_count()) pool = ctx.Pool(5) # 7.7G i=100 pool.apply_async(func, args=(f'a{i}',f'b{i}',f'c{i}')) pool.close() # i=100 # pool.apply_async(func, args=(f'a{i}',f'b{i}',f'c{i}')) pool_list = [1,2,3] for i in range(0,5): pool.apply_async(func, args=(f'a{i}',f'b{i}',f'c{i}')) # print('zhi:',res.get()) # pool_list.append(res) pool.close() # 关闭进程池,不再接受新的进程 # pool.join() # 主进程阻塞等待子进程的退出 for i in pool_list: # data = i.get() print('数据:',i) return 'Hello World!'
spawn在pool中可以,process直接卡住
# ctx = torch.multiprocessing
ctx = torch.multiprocessing.get_context("spawn")
print(torch.multiprocessing.cpu_count())
pool = ctx.Pool(2) # 7.7G
num=1
pool.apply_async(taskModel_execute, args=(start_model, execut_data),error_callback=throw_error)
pool.close()
ctx = torch.multiprocessing.get_context("spawn")
p = ctx.Process(target=taskModel_execute, args=(start_model, execut_data))
p.start()
Value、Array是通过共享内存的方式共享数据
Manager是通过共享进程的方式共享数据。
'spawn’开启多进程程数据拷贝
num=multiprocessing.Value('d',1.0)#num=0
arr=multiprocessing.Array('i',range(10))#arr=range(10)
p=multiprocessing.Process(target=func1,args=(num,arr))
manager=Manager()
list1=manager.list([1,2,3,4,5])
dict1=manager.dict()
array1=manager.Array('i',range(10))
value1=manager.Value('i',1)
进程间共享变量
如果希望不同进程读写同一个变量,需要做特殊的声明。multiprocessing提供了两种实现方式,一种是共享内存,一种是使用服务进程。共享内存只支持两种数据结构Value和Array。
子进程和主进程访问的count在内存中的地址是相同的。这里有两点需要注意:
4、进程异常捕获(非阻塞)
1.multiprocessing.Process捕获不了异常
p = multiprocessing.Process(target=taskimg_execute, args=(start_img,))
p.start()
2.pool捕获不了,只能通过函数接收
def throw_error(e):
print("error回调方法: ", e)
return {'code': 440, 'msg': '进程创建异常'+str(e), 'data': {'status': 400}}
pool = multiprocessing.Pool(processes = 3)
pool.apply_async(taskimg_execute, (start_img,),rror_callback=throw_error) pool.close()
import threading from flask import Flask import time app = Flask(__name__) g_num = 1 def func(data): global g_num print('子线程1:', g_num) time.sleep(5) print('睡眠完成') # a=1/0 g_num = 1 # 子进程改变数值的值,主进程跟着改变 print('子线程2:',g_num) # return '子线程处理完成' @app.route('/') def hello_world(): global g_num print('主进程:',g_num) if g_num: g_num=0 # t1 = threading.Thread(target=func) t1 = threading.Thread(target=func, args=(6,)) t1.start() # time.sleep(1) print('提交成功,请等待处理') return '提交成功,请等待处理' if not g_num: print('正在执行,请等待') return '正在执行,请等待' return 'Hello World!' if __name__ == '__main__': app.run(debug=True, port=6006)
import threading from flask import Flask import time import multiprocessing app = Flask(__name__) g_num = 1 def func2(num): print('进程1:', num.value) time.sleep(2) print('睡眠完成') # a=1/0 # num.value = 1 # 子进程改变数值的值,主进程跟着改变 print('这是子进程处理') print('进程2:',num.value) # return '子线程处理完成' def func(data): global g_num print('子线程1:', g_num) # time.sleep(5) # print('睡眠完成') num = multiprocessing.Value("d", 1) p = multiprocessing.Process(target=func2, args=(num,)) p.start() print('子线程提交任务') # a=1/0 g_num = 1 # 子进程改变数值的值,主进程跟着改变 print('子线程2:',g_num) # return '子线程处理完成' @app.route('/') def hello_world(): global g_num print('主进程:',g_num) if g_num: g_num=0 # t1 = threading.Thread(target=func) t1 = threading.Thread(target=func, args=(6,)) t1.start() # time.sleep(1) print('提交成功,请等待处理') return '提交成功,请等待处理' if not g_num: print('正在执行,请等待') return '正在执行,请等待' return 'Hello World!' @app.route('/thread') def mytest(): from threading import Thread from time import sleep, ctime def funcx(name, sec): print('---开始---', name, '时间', ctime()) # sleep(sec) time.sleep(3) print('***结束***', name, '时间', ctime()) # 创建 Thread 实例 t1 = Thread(target=funcx, args=('第一个线程', 3)) t2 = Thread(target=funcx, args=('第二个线程', 2)) t3 = Thread(target=funcx, args=('第一个线程',3)) t4 = Thread(target=funcx, args=('第二个线程', 2)) # 启动线程运行 t1.start() t2.start() t3.start() t4.start() return 'i am a thread test!' if __name__ == '__main__': app.run(debug=True, port=6006)
可以在任务管理器中查看,进程的各个数
raise ValueError(‘参数错误’)抛出的异常可以捕获
Python 多进程编程:创建进程的三种模式之spawn、fork、forkserver
首先fork和spawn都是构建子进程的不同方式,区别在于:
fork:除了必要的启动资源外,其他变量,包,数据等都继承自父进程,并且是copy-on-write的,也就是共享了父进程的一些内存页,因此启动较快,但是由于大部分都用的父进程数据,所以是不安全的进程
spawn:从头构建一个子进程,父进程的数据等拷贝到子进程空间内,拥有自己的Python解释器,所以需要重新加载一遍父进程的包,因此启动较慢,由于数据都是自己的,安全性较高
raise ValueError(‘参数错误’)抛出的异常可以捕获
多进程调用训练模型
报错:Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the ‘spawn’ start method
import torch
torch.multiprocessing.set_start_method('spawn',force=True)
#解决不了
# print(multiprocessing.get_start_method())
# multiprocessing.set_start_method('spawn', force=True)
try:
print('subprocess执行命令')
core.testtrain('我是主线程')
print('sub执行结束,没有异常')
except Exception as e:
print("Couldn't parse")
print('捕获异常Reason:', e)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。