赞
踩
什么是进程
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。
进程的特征:
动态性:进程的实质是程序在多道程序系统中的一次执行过程,进程是动态产生,动态消亡的。
并发性:任何进程都可以同其他进程一起并发执行
独立性:进程是一个能独立运行的基本单位,同时也是系统分配资源和调度的独立单位;
异步性:由于进程间的相互制约,使进程具有执行的间断性,即进程按各自独立的、不可预知的速度向前推进
结构特征:进程由程序、数据和进程控制块三部分组成。
多个不同的进程可以包含相同的程序:一个程序在不同的数据集里就构成不同的进程,能得到不同的结果;但是执行过程中,程序不能发生改变。
进程与程序的区别:
程序是指令和数据的有序集合,其本身没有任何运行的含义,是一个静态的概念。
而进程是程序在处理机上的一次执行过程,它是一个动态的概念。
并发与并行
并发:伪并行,看着像同时运行,其实是任务之间的切换(遇到IO切换的会提高代码效率),任务切换+报错状态(保存现场)
并行:真正同时运行,应用的是多核技术(多个cpu)
同步\异步\阻塞\非阻塞
进程的三状态:就绪(等待操作系统调度去cpu里面执行) 执行 阻塞
同步:多个任务提交出去要排队一个任务执行完后才执行下一个是串行的
异步 :任务的提交方式,多个任务提交出去,同时执行
阻塞与非阻塞:
阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的
同步阻塞
效率最低
异步阻塞
异步操作是可以被阻塞住的,只不过它不是处理消息时阻塞,而是等待消息通知时阻塞
同步非阻塞形式
实际上效率低下
程序要在两种不同的行为之间来回切换
异步非阻塞形式
效率高
程序不需要在两种不同的操作之间来回切换
multiprocess模块
Process类中参数的介绍
group参数未使用,值始终为None
target表示调用对象,即子进程要执行的任务
args表示调用对象的位置参数元组,args=(1,2,'egon',)
kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
name为子进程的名称
from multiprocessing import Process
import time
def f1():
time.sleep(2)
print("xxx")
def f2():
time.sleep(2)
print("kkk")
#windows系统下必须写main,因为windows系统创建子进程的方式决定的,开启一个子进程,
#这个子进程 会copy一份主进程的所有代码,并且机制类似于import引入,这样就容易导致引入代码的时候,
#被引入的代码中的可执行程序被执行,导致递归开始进程,会报错
if __name__ == '__main__':
p1 = Process(target=f1,)
p2 = Process(target=f2,)
p1.start()
p2.start()
Process类中方法介绍:
p.start():启动进程,并调用该进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类中一定要实现这方法
p.termintate(): 强制终止进程p,不会进行任何清理操作
p.is_alive(): 如果p仍然运行,返回True
p.join(): 主进程等待子进程运行完才继续执行,p.join只能join住start开启的进程,而不能join住run开启的进程
#-*- coding:utf-8 -*-
importtimefrom multiprocessing importProcessdeff1():
time.sleep(2)print("xxx")deff2():
time.sleep(2)print("sss")if __name__ == '__main__':
p1= Process(target=f1,)
p1.start()
p1.join()#主进程等待子进程运行完才继续执行
print("开始")
p2= Process(target=f2,)
p2.start()
p2.join()print("我是主进程!!!")
join的用法
from multiprocessing importProcessdeff1(n):print(n)if __name__ == '__main__':#p1 = Process(target=f1,args=("alex",))
p1 = Process(target=f1,kwargs={'n':'alex'})
p1.start()
进程传参
from multiprocessing importProcessclassMyProcess(Process):def __init__(self,n):
super().__init__() #执行父类的init
self.n =ndefrun(self):print("wusir和%s在一起"%self.n)if __name__ == '__main__':
p1= MyProcess("alex")
p1.start()
进程传参2
Process类中自带封装的各属性
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
p.name:进程的名称
p.pid:进程的pid
windows中的Process()必须放到if __name__ == '__main__':下
没有写就会不断的递归调用模块,导致报错
创建进程的第二种方法(继承)
from multiprocessing import Process
class MyProcess(Process):
def __init__(self,n):
super().__init__()
self.n = n
def run(self):
print("%s和alex不可告人的密码"%self.n)
if __name__ == '__main__':
p1= MyProcess("wusir")
p1.start()
from multiprocessing importProcess
n= 100
defwork():globaln
n=0print('子进程:',n)#0
if __name__ == '__main__':
p= Process(target=work,)
p.start()
p.join()print("主进程内",n)#100
进程间数据是隔离的
Process对象其他方法或属性
from multiprocessing importProcessimporttimeimportrandomclassPiao(Process):def __init__(self,name):
self.name=name
super().__init__()defrun(self):print("%s is piaoing" %self.name)
time.sleep(random.randrange(1,5))print("%s is piaoing" %self.name)
p1= Piao("alex")
p1.start()print(p1.is_alive()) #True
p1.terminate() #关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
time.sleep(0.2)print("开始")print(p1.is_alive()) #False
terminate和is_live
from multiprocessing importProcessimporttimeimportrandomclassPiao(Process):def __init__(self,name):
super().__init__()
self.name=namedefrun(self):print("%s is piaoing"%self.name)
time.sleep(random.randrange(1,3))print("%s is paioing" %self.name)if __name__ == '__main__':
p= Piao('alex')
p.start()print("开始")print(p.pid)
pid用法
僵尸进程:当子进程比父进程先结束,而父进程又没有回收子进程,释放子进程占用的资源,此时子进程将成为一个僵尸进程
孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。
守护进程
一.守护进程会在主进程代码执行结束后终止
二.守护进程无法再开启子进程
p.deamon = True #一定要在p.start()前添加
#-*- coding:utf-8 -*-
importtimefrom multiprocessing importProcessdeff1():
time.sleep(3)print('守护进程的代码')deff2():
time.sleep(5)print("普通进程的代码")if __name__ == '__main__':
p= Process(target=f1,)
p.daemon= True #设置为守护进程
p.start()
p2= Process(target=f2,)
p2.start()#等待p2进程结束,才进行执行下面的代码
#p2.join()
#守护进程会跟着父进程代码的结束而结束
print("主程序结束")
守护进程
进程同步锁
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,串行的修改,牺牲了速度确保证了数据安全
用文件共享数据实现进程间通信存在的问题:
1.效率低(数据在硬盘上)
2.需要自己加锁处理
from multiprocessing importProcess,Lockimporttimedeff1(i,lic):
lic.acquire()
time.sleep(1)print(i)
lic.release()if __name__ == '__main__':
lic=Lock()for i in range(20):
p= Process(target=f1,args=(i,lic))
p.start()
互斥锁
加锁模拟抢票
#-*- coding:utf-8 -*-#先创建一个ticket.txt文件,写入{'count':1}
importtimefrom multiprocessing importProcess,Lockdefshow_t(i):
with open('ticket','r',encoding='utf-8') as f:
ticket_data=f.read()
t_data=eval(ticket_data)print("%s查询剩余票数为%s"%(i,t_data['count']))defget_t(i,l1):
l1.acquire()
with open('ticket','r',encoding='utf-8') as f:
ticket_data=f.read()
t_data=eval(ticket_data)if t_data['count'] >0:
t_data['count'] -= 1
print("%s抢票成功"%i)
time.sleep(0.2)
with open('ticket','w') as f:
f.write(str(t_data))else:print('没票了!!!')
l1.release()if __name__ == '__main__':
l1=Lock()for i in range(10):
p1= Process(target=show_t,args=(i,))
p1.start()for i in range(10):
p2= Process(target=get_t,args=(i,l1))
p2.start()
抢票程序
队列
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。队列就像一个特殊的列表,但是可以设置固定长度,并且从前面插入数据,从后面取出数据,先进先出。
1 队列和管道都是将数据存放于内存中
2 队列基于(管道+锁)实现,
from multiprocessing importProcess ,Queue
q= Queue(3)
q.put(1)print(">>>>>:",q.qsize())#返回当前队列的内容长度
print(q.full())
q.put(2)print(">>>>>:",q.qsize())
q.put(3)print(q.full())try:
q.put_nowait(4) #queue.Full
except:print("满了")print(q.get())print(q.get())print(q.get())print("是不是不空",q.empty())try:
q.get_nowait()#queue.Empty
except:print("队列空了")print("那多了")
其他
q =Queue([maxsize]) 创建共享的进程队列。maxsize是队列中允许的最大项数。
q.get() 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止
q.get_nowait( ) 不等待,会报错
q.put_nowait( ) 不等待,会报错
q.put() 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。
q.qsize() 返回队列中目前项目的正确数量。
q.empty() 如果调用此方法时 q为空,返回True。
q.full() 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的。
from multiprocessing importProcess,Queuedeff1(q):
q.put("约吗")if __name__ == '__main__':
q=Queue(3)
p= Process(target=f1,args=(q,))
p.start()
son_p_msg=q.get()print("来自进程的消息:",son_p_msg)
基于队列的通信
Manager模块
#-*- coding:utf-8 -*-
importtimefrom multiprocessing importProcess,Lock,Managerdeff1(m_d,l1):
with l1:
tmp= m_d['num']
tmp-= 1time.sleep(0.1)
m_d['num'] =tmpif __name__ == '__main__':
m=Manager()
l1=Lock()
m_d= m.dict({'num':100})
p_list=[]for i in range(10):
p= Process(target=f1,args=(m_d,l1))
p.start()
p_list.append(p)
[pp.join()for pp inp_list]print(m_d['num'])
修改进程共享数据
生产者消费者模型
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
#-*- coding:utf-8 -*-
from multiprocessing importQueue,Processimporttimedefproducer(q):for i in range(10):
time.sleep(0.7)
s= '大包子%s号'%iprint(s+'新鲜出炉')
q.put(s)defconsumer(q):while 1:
time.sleep(1)
baozi=q.get()print(baozi+"被吃了")if __name__ == '__main__':
q= Queue(10)
pro_p= Process(target=producer,args=(q,))
con_p= Process(target=consumer,args=(q,))
pro_p.start()
con_p.start()
low版,不能推迟
#-*- coding:utf-8 -*-
from multiprocessing importQueue,Processimporttimedefproducer(q):for i in range(10):
time.sleep(0.7)
s= '大包子%s号'%iprint(s+'新鲜出炉')
q.put(s)
q.put(None)defconsumer(q):while 1:
time.sleep(1)
baozi=q.get()if baozi ==None:print("都吃完了")break
print(baozi+"被吃了")if __name__ == '__main__':
q= Queue(10)
pro_p= Process(target=producer,args=(q,))
con_p= Process(target=consumer,args=(q,))
pro_p.start()
con_p.start()
None判断版
#-*- coding:utf-8 -*-
from multiprocessing importQueue,Processimporttimedefproducer(q):for i in range(10):
time.sleep(0.7)
s= '大包子%s号'%iprint(s+'新鲜出炉')
q.put(s)defconsumer(q):while 1:
time.sleep(1)try:
baozi= q.get_nowait()#不合适,因为无法确定做的块,还是吃的块,如果按照这样的写法,你吃的快的话,那么这个消费者的程序就直接结束了,不能满足需求
exceptException:break
print(baozi+"被吃了")if __name__ == '__main__':
q= Queue(10)
pro_p= Process(target=producer,args=(q,))
con_p= Process(target=consumer,args=(q,))
pro_p.start()
con_p.start()
升级版
#-*- coding:utf-8 -*-
importtimefrom multiprocessing importProcess,Queue,JoinableQueuedefproducer(q):for i in range(10):
time.sleep(0.2)
s= "大包子%s号"%iprint(s+"新鲜出炉")
q.put(s)
q.join()#就等着task_done()信号的数量,和我put进去的数量相同时,才继续执行
print("所有数据处理完毕")defconsumer(q):while 1:
time.sleep(0.3)
baozi=q.get()print(baozi+"被吃了")
q.task_done()#给队列发送一个取出的这个任务已经处理完毕的信号
if __name__ == '__main__':
q= JoinableQueue(30)
pro_p= Process(target=producer,args=(q,))
con_p= Process(target=consumer, args=(q,))
pro_p.start()
con_p.daemon=True
con_p.start()
pro_p.join()print("主进程结束")
终极版
JoinableQueue([maxsize])
maxsize是队列中允许最大项数,缩略无大小限制
q.task_done() 消费者使用此方法发出信号,表示q.get()的返回项目已经被处理了,如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join() 生产者调用此方法进行阻塞,直到队列中所有项目被处理.阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
from multiprocessing importProcess,JoinableQueueimporttime,random,osdefconsumer(q):whileTrue:
res=q.get()
time.sleep(random.randrange(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
q.task_done()defproducer(name,q):for i in range(10):
time.sleep(random.randrange(1,3))
res='%s%s' %(name,i)
q.put(res)print('\033[44m%s 生成了 %s\033[0m' %(os.getpid(),res))
q.join()if __name__ == '__main__':
q=JoinableQueue()
p1= Process(target=producer, args=('包子',q))
p2= Process(target=producer, args=('馒头', q))
p3= Process(target=producer, args=('alex', q))
c1= Process(target=consumer,args=(q,))
c2= Process(target=consumer,args=(q,))
c1.deamon=True
c2.deamon=True
p_1=[p1,p2,p3,c1,c2]for p inp_1:
p.start()
p1.join()
p2.join()
p3.join()print("主程序")
View Code
管道
进程间的通信
Pipe([duplex]) 在进程之间创建一条通道,管道默认全双工的
conn1.recv() 接收conn2.send(obj)发送的对象.
conn1.send(obj) 通过连接发送对象
conn1.close() 关闭连接
from multiprocessing importProcess,Pipedeff1(conn):
from_qian=conn.recv()print("我是子进程")print("来自主进程的消息:",from_qian)if __name__ == '__main__':
conn1,conn2= Pipe()#创建一个管道对象,全双工,返回管道的两端,但是一端发送的消息,只能另外一端接收,自己这一端是不能接收的
#可以将一端或者两端发送给其他的进程,那么多个进程之间就可以通过这一个管道进行通信了
p1 = Process(target=f1,args=(conn2,))
p1.start()
conn1.send('小宝贝')print("主进程")
管道通信
信号量
importtimeimportrandomfrom multiprocessing importProcess,Semaphoredeff1(i,s):
s.acquire()print("%s男嘉宾到了"%i)
time.sleep(random.randint(1,3))
s.release()if __name__ == '__main__':
s= Semaphore(4) #计数器4,acquire一次减一,为0,其他人等待,release加一
for i in range(10):
p= Process(target=f1,args=(i,s))
p.start()
Semaphore
事件
#-*- coding:utf-8 -*-
from multiprocessing importProcess,Event
e=Event()print("e的状态",e.is_set())
e.set()#将e的状态改为True
print("e的状态",e.is_set())
e.clear()#将e的转态改为False
e.wait() #e这个事件对象如果为False,就在我加wait的地方等待
print("进程过了wait")
Event
进程池
p.apply() #同步方法
p.apply_async() #异步方法
p.close() 关闭进程池,防止进一步操作
p.join() 等待所有工作进程退出
importtimefrom multiprocessing importProcess,Pool#def f1(n):#time.sleep(1)#print(n)#对比多进程和进程池的效率
deff1(n):for i in range(5):
n= n +iif __name__ == '__main__':#统计进程池进行100个任务的时间
s_time =time.time()
pool= Pool(4) #里面这个参数是指定进程池中有多少个进程用的,4表示4个进程,如果不传参数,默认开启的进程数一般是cpu的个数
#pool.map(f1,[1,2]) #参数必须是可迭代的
pool.map(f1,range(100)) #参数数据必须是可迭代的,异步提交任务时自带join功能
e_time =time.time()
dif_time= e_time -s_time#统计100个进程,来执行100个任务的时间
p_s_t = time.time()#多进程起始时间
p_list =[]for i in range(100):
p= Process(target=f1,args=(i,))
p.start()
p_list.append(p)
[pp.join()for pp inp_list]
p_e_t=time.time()
p_dif_t= p_e_t -p_s_tprint("进程池的时间:",dif_time)print('多进程的执行时间:',p_dif_t)
进程池的map方法
#-*- coding:utf-8 -*-
importtimefrom multiprocessing importProcess,Pooldeff1(n):
time.sleep(1)return n*nif __name__ == '__main__':
pool= Pool(4)for i in range(10):print("xxxx")
res= pool.apply(f1,args=(i,))print(res)
进程池的同步方法
#-*- coding:utf-8 -*-
importtimefrom multiprocessing importProcess,Pooldeff1(n):
time.sleep(0.5)return n*nif __name__ == '__main__':
pool= Pool(4)
res_list=[]for i in range(10):print("xxxx")#异步给进程池提交任务
res = pool.apply_async(f1,args=(i,))
res_list.append(res)#print("等待所有任务执行完")
#pool.close() #锁住进程池,意思就是不让其他的程序再往这个进程池里面提交任务了
#pool.join()
#打印结果,如果异步提交之后的结果对象
for i inres_list:print(i.get())
进程池的异步方法
回调函数
#-*- coding:utf-8 -*-
importosfrom multiprocessing importPool,Processdeff1(n):print('进程池里面的进程id',os.getpid())print('>>>>',n)return n*ndefcall_back_func(f):print(">>>>>>>>>>>",os.getpid())print("回调函数的结果",f)if __name__ == '__main__':
pool= Pool(4)
res= pool.apply_async(f1,args=(5,),callback=call_back_func)
pool.close()
pool.join()print('子进程的id',os.getpid())
callback
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。