当前位置:   article > 正文

Python并发与并行_python并发和并行

python并发和并行

python的多线程因为GIL锁的原因是一个伪多线程

  • python2:100字节码或I/O阻塞进行切换
  • python3:I/O阻塞进行切换,移除了100字节码切换

1、并发与并行

并行:多个程序同时运行

并发:伪并行,看起来是同时并行,其实质是利用了多道技术

无论是并行还是并发,在用户眼里看起来都是同时运行的,不管是线程还是进程,都是只是一个任务,真正干活的是CPU,而同一个CPU在同一时刻只能执行一个任务。

2、多进程

  • 进程是操作系统分配资源的最小单元,线程是操作系统调度的最小单元

  • 一个应用程序最少包括一个进程,而一个进程包括1个或多个线程,线程的尺度更小

  • 每个进程在执行过程中都有独立的内存单元,而每一个进程中的多个线程在执行过程中共享内存

2.1 进程之 Process

  1. # -*- coding: utf-8 -*-
  2. # Created by Xue Jian on 3/10/23
  3. import time
  4. import os
  5. def long_time_task():
  6. print('current_process: {}'.format(os.getpid()))
  7. time.sleep(2)
  8. print('result: {}'.format(8 ** 20))
  9. if __name__ == '__main__':
  10. print('current parent process: {}'.format(os.getpid()))
  11. start = time.time()
  12. for i in range(2):
  13. long_time_task()
  14. end = time.time()
  15. print('sec: {}'.format(end - start))

输出结果如下:

总耗时4秒,自始至终只有一个进程在执行。看来电脑计算8的20次方基本不费时。

开启多进程

  1. # -*- coding: utf-8 -*-
  2. # Created by Xue Jian on 3/10/23
  3. import time
  4. import os
  5. from multiprocessing import Process
  6. def long_time_task(i):
  7. print('子进程: {} - 任务{}'.format(os.getpid(), i))
  8. time.sleep(2)
  9. print("结果: {}".format(8 ** 20))
  10. if __name__ == '__main__':
  11. print('当前母进程: {}'.format(os.getpid()))
  12. start = time.time()
  13. p1 = Process(target=long_time_task, args=(1,))
  14. p2 = Process(target=long_time_task, args=(2,))
  15. print('等待所有子进程完成。')
  16. p1.start()
  17. p2.start()
  18. p1.join()
  19. p2.join()
  20. end = time.time()
  21. print("总共用时{}秒".format((end - start)))

输出结果如下所示,耗时变为2秒,时间减了一半,可见并发执行的时间明显比顺序执行要快很多。你还可以看到尽管我们只创建了两个进程,可实际运行中却包含里1个母进程和2个子进程。之所以我们使用join()方法就是为了让母进程阻塞,等待子进程都完成后才打印出总共耗时,否则输出时间只是母进程执行的时间。

知识点:

  • 新创建的进程与进程的切换都是需要消耗资源的,所以平时工作中进程数不能开太大

  • 同时可以运行的进程数,一般受制于CPU的核数

  • 除了使用Process方法,我们还可以使用Pool方法创建多进程

2.2 进程之Pool类

很多时候系统都需要创建多个进程以提高CPU的利用率,当数量较少时,可以手动生成一个个Process实例。

当进程数量很多时,或许可以利用循环,但是这需要程序员手动管理系统中并发进程的数量,有时会很麻烦。这时进程池Pool就可以发挥其功效了。可以通过传递参数限制并发进程的数量,默认值为CPU的核数。

Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果进程池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。

下面介绍一下multiprocessing 模块下的Pool类的几个方法:

1.apply_async

函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

其作用是向进程池提交需要执行的函数及参数, 各个进程采用非阻塞(异步)的调用方式,即每个子进程只管运行自己的,不管其它进程是否已经完成。这是默认方式。

2.map()

函数原型:map(func, iterable[, chunksize=None])

Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回。 注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。

3.map_async()

函数原型:map_async(func, iterable[, chunksize[, callback]])与map用法一致,但是它是非阻塞的。其有关事项见apply_async。

4.close()

关闭进程池(pool),使其不在接受新的任务。

5.terminate()

结束工作进程,不在处理未处理的任务。

6.join()

主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用。

下例是一个简单的multiprocessing.Pool类的实例。

进程池会根据我的CPU的核数进行,本次使用的虚拟机是4核的,这里会创建一个容量为4的进程池,4个进程需要执行5个任务,会有一个在排队等待

  1. from multiprocessing import Pool, cpu_count, Process
  2. import os
  3. import time
  4. def long_time_task(i):
  5. print('子进程: {} - 任务{}'.format(os.getpid(), i))
  6. time.sleep(2)
  7. print("结果: {}".format(8 ** 20))
  8. if __name__ == '__main__':
  9. print("CPU内核数:{}".format(cpu_count()))
  10. print('当前母进程: {}'.format(os.getpid()))
  11. start = time.time()
  12. p = Pool(4)
  13. for i in range(5):
  14. p.apply_async(long_time_task, args=(i,))
  15. print('等待所有子进程完成。')
  16. p.close()
  17. p.join()
  18. end = time.time()
  19. print("总共用时{}秒".format((end - start)))

2.3 多进程之间的数据共享和通信

通常,进程之间是相互独立的,每个进程都有独立的内存。通过共享内存(nmap模块),进程之间可以共享对象,使多个进程可以访问同一个变量(地址相同,变量名可能不同)。

多进程共享资源必然会导致进程间相互竞争,所以应该尽最大可能防止使用共享状态。还有一种方式就是使用队列queue来实现不同进程间的通信或数据共享,这一点和多线程编程类似。

下例这段代码中中创建了2个独立进程,一个负责写(pw), 一个负责读(pr), 实现了共享一个队列queue

  1. from multiprocessing import Process, Queue
  2. import os, time, random
  3. # 写数据进程执行的代码:
  4. def write(q):
  5. print('Process to write: {}'.format(os.getpid()))
  6. for value in ['A', 'B', 'C']:
  7. print('Put %s to queue...' % value)
  8. q.put(value)
  9. time.sleep(random.random())
  10. # 读数据进程执行的代码:
  11. def read(q):
  12. print('Process to read:{}'.format(os.getpid()))
  13. while True:
  14. value = q.get(True)
  15. print('Get %s from queue.' % value)
  16. if __name__ == '__main__':
  17. # 父进程创建Queue,并传给各个子进程:
  18. q = Queue()
  19. pw = Process(target=write, args=(q,))
  20. pr = Process(target=read, args=(q,))
  21. # 启动子进程pw,写入:
  22. pw.start()
  23. # 启动子进程pr,读取:
  24. pr.start()
  25. # 等待pw结束:
  26. pw.join()
  27. # pr进程里是死循环,无法等待其结束,只能强行终止:
  28. pr.terminate()

3. 线程

threading.Thread方法可以接收两个参数, 第一个是target,一般指向函数名,第二个是args,需要向函数传递的参数。

对于创建的新线程,调用start()方法即可让其开始。我们还可以使用current_thread().name打印出当前线程的名字。 下例中我们使用多线程技术重构之前的计算代码

  1. import threading
  2. import time
  3. def long_time_task(i):
  4. print('当前子线程: {} - 任务{}'.format(threading.current_thread().name, i))
  5. time.sleep(2)
  6. print("结果: {}".format(8 ** 20))
  7. if __name__=='__main__':
  8. start = time.time()
  9. print('这是主线程:{}'.format(threading.current_thread().name))
  10. t1 = threading.Thread(target=long_time_task, args=(1,))
  11. t2 = threading.Thread(target=long_time_task, args=(2,))
  12. t1.start()
  13. t2.start()
  14. end = time.time()
  15. print("总共用时{}秒".format((end - start)))

当我们设置多线程时,主线程会创建多个子线程,在python中,默认情况下主线程和子线程独立运行互不干涉。

如果希望让主线程等待子线程实现线程的同步,我们需要使用join()方法。

如果我们希望一个主线程结束时不再执行子线程,我们应该怎么办呢? 我们可以使用t.setDaemon(True),代码如下所示。

  1. import threading
  2. import time
  3. def long_time_task():
  4. print('当子线程: {}'.format(threading.current_thread().name))
  5. time.sleep(2)
  6. print("结果: {}".format(8 ** 20))
  7. if __name__ == '__main__':
  8. start = time.time()
  9. print('这是主线程:{}'.format(threading.current_thread().name))
  10. for i in range(5):
  11. t = threading.Thread(target=long_time_task, args=())
  12. t.setDaemon(True)
  13. t.start()
  14. end = time.time()
  15. print("总共用时{}秒".format((end - start)))

结果如下:

3.1 通过继承Thread类重写run方法创建新进程

除了使用Thread()方法创建新的线程外,我们还可以通过继承Thread类重写run方法创建新的线程,这种方法更灵活。下例中我们自定义的类为MyThread, 随后我们通过该类的实例化创建了2个子线程。

  1. import threading
  2. import time
  3. def long_time_task(i):
  4. time.sleep(2)
  5. return 8 ** 20
  6. class MyThread(threading.Thread):
  7. def __init__(self, func, args, name='', ):
  8. threading.Thread.__init__(self)
  9. self.func = func
  10. self.args = args
  11. self.name = name
  12. self.result = None
  13. def run(self):
  14. print('开始子进程{}'.format(self.name))
  15. self.result = self.func(self.args[0], )
  16. print("结果: {}".format(self.result))
  17. print('结束子进程{}'.format(self.name))
  18. if __name__ == '__main__':
  19. start = time.time()
  20. threads = []
  21. for i in range(1, 3):
  22. t = MyThread(long_time_task, (i,), str(i))
  23. threads.append(t)
  24. for t in threads:
  25. t.start()
  26. for t in threads:
  27. t.join()
  28. end = time.time()
  29. print("总共用时{}秒".format((end - start)))

结果如下:

3.2 不同线程间的数据共享

一个进程中的不同线程之间是共享内存的,这就意味着任何一个变量都可以被任何一个线程修改,因此线程之间共享数据最大的危险在于多个线程同时修改同一个变量,把内容给改乱了。

如果不同线程之间有共享的变量,其中一个方法就是在修改前给加上一把锁lock,确保一次只有一个线程能修改它。

threading.Lock()方法可以轻易实现对一个共享变量的锁定,修改后release,以供其他线程使用。

比如在下例中 余额 balance是一个共享变量,使用lock可以使其不被改变

  1. import threading
  2. class Account:
  3. def __init__(self):
  4. self.balance = 0
  5. def add(self, lock):
  6. # 获得锁
  7. # lock.acquire()
  8. for i in range(0, 100000):
  9. self.balance += 1
  10. # 释放锁
  11. # lock.release()
  12. def delete(self, lock):
  13. # 获得锁
  14. # lock.acquire()
  15. for i in range(0, 100000):
  16. self.balance -= 1
  17. # 释放锁
  18. # lock.release()
  19. if __name__ == "__main__":
  20. account = Account()
  21. lock = threading.Lock()
  22. # 创建线程
  23. thread_add = threading.Thread(target=account.add, args=(lock,), name='Add')
  24. thread_delete = threading.Thread(target=account.delete, args=(lock,), name='Delete')
  25. # 启动线程
  26. thread_add.start()
  27. thread_delete.start()
  28. # 等待线程结束
  29. thread_add.join()
  30. thread_delete.join()
  31. print('The final balance is: {}'.format(account.balance))

结果如下:

下图进行加锁,就可以避免这个情况

  1. import threading
  2. class Account:
  3. def __init__(self):
  4. self.balance = 0
  5. def add(self, lock):
  6. # 获得锁
  7. lock.acquire()
  8. for i in range(0, 100000):
  9. self.balance += 1
  10. # 释放锁
  11. lock.release()
  12. def delete(self, lock):
  13. # 获得锁
  14. lock.acquire()
  15. for i in range(0, 100000):
  16. self.balance -= 1
  17. # 释放锁
  18. lock.release()
  19. if __name__ == "__main__":
  20. account = Account()
  21. lock = threading.Lock()
  22. # 创建线程
  23. thread_add = threading.Thread(target=account.add, args=(lock,), name='Add')
  24. thread_delete = threading.Thread(target=account.delete, args=(lock,), name='Delete')
  25. # 启动线程
  26. thread_add.start()
  27. thread_delete.start()
  28. # 等待线程结束
  29. thread_add.join()
  30. thread_delete.join()
  31. print('The final balance is: {}'.format(account.balance))

加锁后输出结果如下:

另一种实现不同线程间数据共享的方法就是使用消息队列queue。不像列表,queue是线程安全的,可以放心使用

使用queue队列通信-经典的生产者和消费者模型

下例创建两个线程,一个负责生产,一个负责消费,所生产的产品存放在queue里,实现了不同线程的沟通

  1. from queue import Queue
  2. import random, threading, time
  3. # 生产者类
  4. class Producer(threading.Thread):
  5. def __init__(self, name, queue):
  6. threading.Thread.__init__(self, name=name)
  7. self.queue = queue
  8. def run(self):
  9. for i in range(1, 5):
  10. print("{} is producing {} to the queue!".format(self.getName(), i))
  11. self.queue.put(i)
  12. time.sleep(random.randrange(10) / 5)
  13. print("%s finished!" % self.getName())
  14. # 消费者类
  15. class Consumer(threading.Thread):
  16. def __init__(self, name, queue):
  17. threading.Thread.__init__(self, name=name)
  18. self.queue = queue
  19. def run(self):
  20. for i in range(1, 5):
  21. val = self.queue.get()
  22. print("{} is consuming {} in the queue.".format(self.getName(), val))
  23. time.sleep(random.randrange(10))
  24. print("%s finished!" % self.getName())
  25. def main():
  26. queue = Queue()
  27. producer = Producer('Producer', queue)
  28. consumer = Consumer('Consumer', queue)
  29. producer.start()
  30. consumer.start()
  31. producer.join()
  32. consumer.join()
  33. print('All threads finished!')
  34. if __name__ == '__main__':
  35. main()

结果如下:

5. 线程池

但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对编写线程池/进程池提供了直接的支持。

5.1 参数详解

  1. ProcessPoolExecutor(n):n表示池里面存放多少个进程,之后的连接最大就是n的值
  2. submit(fn,*args,**kwargs) 异步提交任务
  3. map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作
  4. shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作
  5. wait=True,等待池内所有任务执行完毕回收完资源后才继续,--------》默认
  6. wait=False,立即返回,并不会等待池内的任务执行完毕
  7. 但不管wait参数为何值,整个程序都会等到所有任务执行完毕
  8. submit和map必须在shutdown之前
  9. result(timeout=None) #取得结果
  10. add_done_callback(fn) #回调函数

使用submit 来操作线程池/进程池

  1. from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
  2. import time
  3. # 模拟网络请求的网络延迟
  4. def get_html(times):
  5. time.sleep(times)
  6. print("get page {}s finished".format(times))
  7. return times
  8. # 创建一个大小为2的线程池
  9. pool = ThreadPoolExecutor(max_workers=2)
  10. # 将上个任务提交到线程池,因为线程池的大小是2,所以必须等task1和task2中有一个完成之后才会将第三个任务提交到线程池
  11. task1 = pool.submit(get_html, 3)
  12. task2 = pool.submit(get_html, 2)
  13. task3 = pool.submit(get_html, 4)
  14. # 打印该任务是否执行完毕
  15. print(task1.done())
  16. # 只有未被提交的到线程池(在等待提交的队列中)的任务才能够取消
  17. print(task3.cancel())
  18. time.sleep(4) # 休眠4秒钟之后,线程池中的任务全部执行完毕,可以打印状态
  19. print(task1.done())
  20. print(task1.result()) # 该任务的return 返回值 该方法是阻塞的。

 结果如下:

  1. ThreadPoolExecutor构造实例的时候,传入max_workers参数来设置线程池中最多能同时运行的线程数目。

  2. 使用submit函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意submit()不是阻塞的,而是立即返回。

  3. 通过submit函数返回的任务句柄,能够使用done()方法判断该任务是否结束。上面的例子可以看出,由于任务有2s的延时,在task1提交后立刻判断,task1还未完成,而在延时4s之后判断,task1就完成了。

  4. 使用cancel()方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。这个例子中,线程池的大小设置为2,任务已经在运行了,所以取消失败。如果改变线程池的大小为1,那么先提交的是task1,task2还在排队等候,这是时候就可以成功取消。

  5. 使用result()方法可以获取任务的返回值。查看内部代码,发现这个方法是阻塞的。

as_completed

上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断啊。有时候我们是得知某个任务结束了,就去获取结果,而不是一直判断每个任务有没有结束。这是就可以使用as_completed方法一次取出所有任务的结果。

  1. import time
  2. from concurrent.futures import ThreadPoolExecutor, as_completed
  3. # 模拟网络请求的网络延迟
  4. def get_html(times):
  5. time.sleep(times)
  6. print("get page {}s finished".format(times))
  7. return times
  8. pool = ThreadPoolExecutor(max_workers=2)
  9. urls = [2, 3, 4]
  10. all_task = [pool.submit(get_html, url) for url in urls]
  11. for future in as_completed(all_task):
  12. data = future.result()
  13. print("in main: get page {}s success".format(data))

结果如下:

as_completed()方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出,先完成的任务会先通知主线程。

map除了上面的as_completed方法,还可以使用executor.map方法,但是有一点不同。

  1. import time
  2. from concurrent.futures import ThreadPoolExecutor, as_completed
  3. # 模拟网络请求的网络延迟
  4. def get_html(times):
  5. time.sleep(times)
  6. print("get page {}s finished".format(times))
  7. return times
  8. pool = ThreadPoolExecutor(max_workers=2)
  9. urls = [2, 3, 4]
  10. for data in pool.map(get_html, urls):
  11. print("in main: get page {}s success".format(data))

结果如下:

shutdown

shutdown方法的功能类似于 join+close的集合

  1. import time
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. results = []
  4. def get_html(i):
  5. time.sleep(2)
  6. return 2 * i
  7. def handle(res):
  8. res = res.result()
  9. results.append(res)
  10. pool = ThreadPoolExecutor(max_workers=2)
  11. for i in range(4):
  12. pool.submit(get_html, i).add_done_callback(handle)
  13. pool.shutdown(wait=True) # 相当于 join + close
  14. print('main')
  15. print(results)

结果如下:

wait

wait方法可以让主线程阻塞,直到满足设定的要求。

当设置了wait后,主线程会一直等待子线程执行完毕才能执行

和 shutdown效果类似

  1. from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
  2. import time
  3. # 参数times用来模拟网络请求的时间
  4. def get_html(times):
  5. time.sleep(times)
  6. print("get page {}s finished".format(times))
  7. return times
  8. executor = ThreadPoolExecutor(max_workers=2)
  9. urls = [3, 2, 4] # 并不是真的url
  10. all_task = [executor.submit(get_html, (url)) for url in urls]
  11. wait(all_task, return_when=ALL_COMPLETED)
  12. print("main")

结果如下:

6. 进程池

  1. from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
  2. import time,random,os
  3. def task(n):
  4. print('%s is running'% os.getpid())
  5. time.sleep(random.randint(1,3))
  6. return n
  7. def handle(res):
  8. res=res.result()
  9. print("handle res %s"%res)
  10. if __name__ == '__main__':
  11. #同步调用
  12. # pool=ProcessPoolExecutor(8)
  13. #
  14. # for i in range(13):
  15. # pool.submit(task, i).result() #变成同步调用,串行了,等待结果
  16. # # pool.shutdown(wait=True) #关门等待所有进程完成
  17. # pool.shutdown(wait=False)#默认wait就等于True
  18. # # pool.submit(task,3333) #shutdown后不能使用submit命令
  19. #
  20. # print('主')
  21. #异步调用
  22. pool=ProcessPoolExecutor(8)
  23. for i in range(13):
  24. obj=pool.submit(task,i)
  25. obj.add_done_callback(handle) #这里用到了回调函数
  26. pool.shutdown(wait=True) #关门等待所有进程完成
  27. print('主')
  28. ##注意,创建进程池必须在if __name__ == '__main__':中,否则会报错
  29. ##其他的用法和创建线程池的一样

回调函数

  1. from concurrent.futures import ThreadPoolExecutor
  2. from urllib import request
  3. from threading import current_thread
  4. import time
  5. def get(url):
  6. print('%s get %s'%(current_thread().getName(),url))
  7. response=request.urlopen(url)
  8. time.sleep(2)
  9. # print(response.read().decode('utf-8'))
  10. return{'url':url,'content':response.read().decode('utf-8')}
  11. def parse(res):
  12. res=res.result()
  13. print('parse:[%s] res:[%s]'%(res['url'],len(res['content'])))
  14. # get('http://www.baidu.com')
  15. if __name__ == '__main__':
  16. pool=ThreadPoolExecutor(2)
  17. urls=[
  18. 'https://www.baidu.com',
  19. 'https://www.python.org',
  20. 'https://www.openstack.org',
  21. 'https://www.openstack.org',
  22. 'https://www.openstack.org',
  23. 'https://www.openstack.org',
  24. 'https://www.openstack.org',
  25. 'https://www.openstack.org',
  26. ]
  27. for url in urls:
  28. pool.submit(get,url).add_done_callback(parse)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/172555
推荐阅读
相关标签
  

闽ICP备14008679号