赞
踩
在Python中多任务的实现方式为:
1、多进程
2、多线程
3、协程
4、多进程+多线程
一、多任务原理
多任务:操作系统可以同时完成多个任务
单核CPU多任务原理:单个cpu核心轮流的执行各个程序,但是切换的速度非常快导致表面上看是同时在执行
多核CPU多任务原理:真正的实现多任务,但是任务的数量会远远超过CPU核心的数量,因此操作系统会将各个任务轮流调度到各个核心上执行
并发:任务看上去一起执行,实际上任务书多余CPU核心数
并行:任务真正的一起执行,实际任务小于CPU核心数
二、进程、线程的概念
(1)进程:一个执行中的程序,操作系统管理其上所有进程的执行(比如QQ、微信等就相当于一个进程)
(2)线程:可称为轻量级进程;任何进程都会默认启动一个线程,即为主线程,主线程可以启动新的线程,称其为子线程。一个进程的各个线程与主线程共享一个数据空间,相比于独立的进程而言,线程间共享数据和通信更加容易(比如QQ的多个聊天窗口就相当于多个线程)
进程、线程优缺点比较:
优点 | 缺点 | |
---|---|---|
进程 | 稳定性高:一个子进程崩溃了,不会影响主进程 | 创建进程的开销大、操作系统运行的进程数有限 |
线程 | 比多进程快,但是也快不到哪去 | 任何线程的崩溃都会影响到整个进程 |
三、Python实现多进程 - - multiprocessing
multiprocessing 是一个用于产生进程的包,在 multiprocessing 中,通过创建一个 Process 对象然后调用它的 start() 方法来生成进程
示例一:用Process创建子进程
#父子进程的先后执行顺序 from multiprocessing import Process import time def run(): print('子进程启动') time.sleep(3) print('子进程结束') if __name__ == '__main__': print('父进程启动') p = Process(target=run) #父进程结束不影响子进程 p.start() #jion()方法让子进程结束在执行父进程,阻塞父进程,等待所有子进程执行完毕 p.join() print('父进程结束') """ 运行结果: 父进程启动 子进程启动 子进程结束 父进程结束 """
示例二:通过进程池Pool创建大量子进程
from multiprocessing import Pool import time,os,random def runing(name): #os.getpid 可以获得进程号,打印子进程的名称和进程号 print('子进程 %d 启动-%s'%(name,os.getpid())) #进程开始的时间 start = time.time() #随机选择进程休息时间 time.sleep(random.choice([1,2,3])) #进程结束时间 end = time.time() #打印进程的名称、进程号和运行时间 print('子进程 %d 结束-%s-耗时%.2f'%(name,os.getpid(),end-start)) if __name__ =='__main__': print('父进程启动') #创建多个进程 #pool默认大小是cpu核心数量 #创建两个进程,会同时运行两个进程 pp = Pool(2) for i in range(8): #创建进程,放入进程池同时管理 pp.apply_async(runing,args=(i,)) #在调用join之前必须先调用close,调用close之后就不能在继续添加新的进程了 pp.close() #等待子进程结束 pp.join() print('父进程结束') """ 运行结果: 父进程启动 子进程 0 启动-15996 子进程 1 启动-16016 子进程 0 结束-15996-耗时2.00 子进程 2 启动-15996 子进程 1 结束-16016-耗时2.00 子进程 3 启动-16016 子进程 2 结束-15996-耗时1.00 子进程 4 启动-15996 子进程 4 结束-15996-耗时1.00 子进程 5 启动-15996 子进程 3 结束-16016-耗时2.00 子进程 6 启动-16016 子进程 6 结束-16016-耗时2.00 子进程 7 启动-16016 子进程 5 结束-15996-耗时3.00 子进程 7 结束-16016-耗时2.00 父进程结束 """
示例三:全局变量在多个进程中不能共享
from multiprocessing import Process import os num = 100 def run(): print('子进程启动') global num num += 1 print(num) print('子进程结束') if __name__ =='__main__': print('父进程启动') p = Process(target=run) p.start() #在子进程中修改全局变量对父进程中的全局变量没有影响 #在创建子进程时对全局变量做了一个备份,父进程与子进程中的num是完全不同的两个变量 p.join() print('父进程结束%s'%num) """ 运行结果: 父进程启动 子进程启动 101 子进程结束 父进程结束100 """
示例四:进程间的通信
进程间的通信可以通过消息队列(queue)来实现
通过消息队列实现两个进程间通信,一个进程从终端输入(put)数据,通过消息队列发送,另一个进程通过消息队列接收(get)数据
from multiprocessing import Process,Queue import time,os def write(q): print("启动写子进程%s"%(os.getpid())) for chr in ["A","B","C","D"]: #put方法将信息写入消息队列 q.put(chr) time.sleep(1) print("结束子进程%s"%(os.getpid())) def read(q): print("启动读子进程%s"%(os.getpid())) while True: #get方法将信息读取 value = q.get(True) print("value = "+ value) print("结束读子进程%s"%(os.getpid())) if __name__ =="__main__": #父进程建立队列,并传递给子进程 #建立消息队列的对象实现进程间的通信 q = Queue() #进程的创建 pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) #进程的启动 pw.start() pr.start() #等待子进程的执行 pw.join() #pr进程是个死循环,无法等待其结束,只能强行结束 pr.terminate() print("父进程结束") """ 运行结果: 启动写子进程7668 启动读子进程15268 value = A value = B value = C value = D 结束子进程7668 父进程结束 """
四、Python实现多线程 - - threading
通过threading模块创建线程
示例一:启动一个线程
threading.Thread()会创建一个线程对象
current_thread() 方法返回当前对应调用者的控制线程的 Thread 对象。如果调用者的控制线程不是利用 threading 创建,会返回一个功能受限的虚拟线程对象。
mport threading,time def run(): print("子线程(%s)开始"%(threading.current_thread().name)) #实现线程的功能 time.sleep(2) print("打印") time.sleep(2) print("子线程(%s)结束"%(threading.current_thread().name)) if __name__ == "__main__": #任何进程默认就会启动一个线程,称为主线程,主线程可以启动新的线程 #current_threed():返回当前线程实例 print("主线程(%s)启动"%(threading.current_thread().name)) #创建子线程 t = threading.Thread(target=run,name = "rooThread") #子线程启动 t.start() #等待线程结束 t.join() print("主线程(%s)结束"%(threading.current_thread().name)) """ 运行结果: 主线程(MainThread)启动 子线程(rooThread)开始 打印 子线程(rooThread)结束 主线程(MainThread)结束 """
示例二:定时线程 - - threading.Time()
threading.Time() 会创建一个定时器对象,会等待到一定的时间再执行线程
import threading
def run():
print("i love you")
#延时执行线程 5s
t = threading.Timer(5,run)
t.start()
t.join()
print("父进程结束")
示例三:信号量控制线程数量 - - threading.Semaphore( )
一个信号量管理一个内部计数器,该计数器因 acquire() 方法(获取一个信号量)的调用而递减,因 release() (释放一个信号量)方法的调用而递增。 计数器的值永远不会小于零;当 acquire() 方法发现计数器为零时,将会阻塞,直到其它线程调用 release() 方法。
import threading,time
sem = threading.Semaphore(2)
def run():
#通过with自动的获取和释放信号量
with sem:
for i in range(10):
print("%s-%d"%(threading.current_thread().name,i))
time.sleep(1)
if __name__ =="__main__":
for i in range(5):
threading.Thread(target=run).start()
示例四:线程间的通信 - - threading.Event
threading.Event 对象是线程之间通信的最简单机制之一:一个线程发出事件信号,而其他线程等待该信号
wait():阻塞线程直到内部变量为true。如果调用时内部标志为true,将立即返回。否则将阻塞线程,直到调用 set() 方法将标志设置为true或者发生可选的超时
set() :将内部标志设置为true。所有正在等待这个事件的线程将被唤醒。当标志为true时,调用 wait() 方法的线程不会被被阻塞。
clear()将内部标志设置为false。之后调用 wait() 方法的线程将会被阻塞,直到调用 set() 方法将内部标志再次设置为true。
import threading,time def func(): event = threading.Event() def run(): for i in range(5): #阻塞,等待事件触发,即等待内部变量变为True event.wait() #重置,将内部变量设置为False event.clear() print("执行 %d 次"%i) t = threading.Thread(target=run).start() return event e = func() #触发事件 for i in range(5): time.sleep(2) #将内部变量设置为True e.set()
示例五:线程间的数据共享 - - threading.Lock
threading.Lock 对象会阻塞随后尝试获得锁的线程,直到它被释放;任何线程都可以释放它。
acquire()可以阻塞或非阻塞地获得锁。
release()释放一个锁
import threading #锁对象 lock = threading.Lock() num = 0 def run(n): global num for i in range(100000): #确保这段代码只能由一个线程从头到尾的执行 #阻止了多线程的并发执行,包含锁的某段代码只能以单线程执行,所以效率大大降低 #由于可以存在多个锁,不同线程持有不同的锁,并试图获取其他的锁,可能造成死锁,导致多个线程挂起,只能操作系统强制结束 try: #上锁,其他线程无法修改num的值,每次只允许一个线程对num的值进行修改 lock.acquire() num -= n num += n #修改完一定要释放锁,其他线程才能修改num的值 finally: lock.release() if __name__ =="__main__": #创建另个子线程 t1 = threading.Thread(target=run,args=(6,)) t2 = threading.Thread(target=run,args=(9,)) #子线程启动 t1.start() t2.start() #等待子线程完成 t1.join() t2.join() print("num:",num)
示例六:凑够一定数量再执行 (栅栏对象) - - threading.Barrier
threading.Barrie用于应对固定数量的线程需要彼此相互等待的情况;线程调用 wait() 方法后将阻塞,直到所有线程都调用了 wait() 方法。此时所有线程将被同时释放。
import threading,time
#阻塞的线程数量达到三个就执行
bar = threading.Barrier(3)
def run():
print("%s-start"%(threading.current_thread().name))
time.sleep(2)
bar.wait()
print("%s-end"%(threading.current_thread().name))
if __name__ =="__main__":
for i in range(6):
threading.Thread(target=run).start()
time.sleep(2)
示例七:生产者与消费者模型
import threading,time,queue,random #生产者 def product(id,q): while True: #随机产生数据 num = random.randint(0,10000) #将数据放入到消息队列中 q.put(num) #显示数据已经被生成 print("生产者%d产生了%d数据放入队列"%(id,num)) time.sleep(3) #任务完成 q.task_done() #消费者 def customer(id,q): while True: #获得消息队列的数据 item = q.get() if item is None: break #显示数据已经被消费 print("消费者%d消费%d数据"%(id,item)) time.sleep(2) #re任务完成 q.task_done() if __name__ =="__main__": #消息队列 q = queue.Queue() #创建两个线程 #生产者线程 for i in range(4): threading.Thread(target=product,args=(i,q)).start() #消费者线程 for i in range(3): threading.Thread(target=customer, args=(i, q)).start()
五、Python 协程
协程:可以用单线程实现多任务,协程其实是更小切分;进程是系统调度,协程是用户态调度。因为是单线程执行多任务所以协程不需要不需要加锁
在Python中通过yield生成器来实现协程
协程优点:
1、无需上下文切换的开销,避免无意义调度,提升性能
2、无需加锁及同步开销
3、方便切换控制流,简化编程模型
4、高并发、高扩展、低成本
协程缺点:
1、无法利用多核资源
2、进行阻塞操作会阻塞掉整个程序
示例一:协程实现生产者与消费者
#消费者 def consumer(): a = "" print('c1') # 只有第一次会执行(启动生成器), 之后再调用生成器就会从yield处执行 while True: #首先返回一个空字符串,然后被阻塞,等待生产者发出信号 # 再次执行时从这里的yield继续执行, 将把produce传入的参数 n 赋给局部变量 n . 下轮循环再次遇到yield就会就将 a 返回给produce函数 n = yield a # 由于生成器在启动的时候遇到上面的yield就返回了, 所以第一次不会执行这条语句. 之后每次都会被执行 print('c %d'%n) if not n: return print('[CONSUMER] Consuming %s' % n) #在下一次执行到yield a 时,将值返回给生产者,即是200 ok a = '200 OK' #b = 'fake 200 OK' # 返回的值与a无关 #生产者 def produce(c): #第一次发送None 与消费者建立关系 c.send(None) #执行标记 print('p1') n = 0 while n < 5: n = n + 1 #生产的数据 print('[PRODUCER] Producing %s' % n) #生产的数据发送给消费者 r = c.send(n) # 获取生成器consumer中由yield语句返回的下一个值,即200 ok print('[PRODUCER] Consumer return: %s' % r) c.close() c = consumer() # 并不会启动生成器, 只是将c变为一个生成器 produce(c)
图解:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。