当前位置:   article > 正文

Python多进程、多线程编程_python 进程 编程

python 进程 编程

1. 进程、线程、协程

从教科书上我们知道,进程process是资源分配的基本单位,线程thread是资源调度的最小单位

线程与资源分配无关,它属于某一个进程,并与进程内的其他线程一起共享进程的资源,线程是比进程更细粒度的单位。

对单核CPU,同一时刻(而不是时间段)只有一个进程会被CPU执行,尽管能看见后台有多个进程在运行,这只是(例如时间片轮转引起的)“并发”而不是真正意义上的“并行”。
只有硬件才能决定能否并行,单CPU单物理核心,只能并发,没法并行。多核CPU(或多CPU)才能达到并行,每个核心执行一个进程。即只有多核CPU才能实现多进程

多进程的使命是提高多核利用率。
(多)线程的使命是提高CPU的利用率,IO占用时不让CPU闲置;
线程是资源调度(使用CPU)的基本单位,当执行中的线程遇到IO等操作时,可以让位给其他线程来占用CPU(例如只有一个进程时,不至于使该进程的所有线程都不再使用CPU),最大化CPU利用率。
展开:例如单核CPU、一个进程。如果进程是资源调度的基本单位,即整个进程要么在CPU上执行,要么闲置,这样,如果遇到IO,整个进程就得等待IO,使CPU空闲。如果资源调度的基本单位是线程,即在CPU上执行的不再是进程而是比进程更细粒度的线程,当遇到IO,可以让位给其他不依赖IO的线程占用CPU,不至于使CPU空闲。

线程切换开销大,用协程。

2. Python多线程

GIL全局解释器锁

GIL全局解释器锁(英语:Global Interpreter Lock,缩写GIL),并不是Python的特性,它是在实现CPython(Python最常见的解释器)时所引入的一个概念。
GIL的作用是保证同一时刻CPU上只有一个线程被执行,无论CPU有多少核心。这么设计的目的????
注意,只有Python或更具体CPython解释器才有这个GIL,其他的例如C、Java的多线程是真正的多线程,可以有多个线程在CPU上执行(多个核心)。

在python2.x中,ticks技术会很快达到阈值,触发GIL的释放与再竞争,线程切换需要消耗资源。GIL的释放逻辑是当前线程遇到IO操作或者ticks(python自身的计数器)计数达到100后就进行释放。在python3.x中,替换ticks的100计数改为时间阈值,即当前线程的执行时间达到阈值后进行释放。

“多核多线程比单核多线程更差,原因是单核下多线程,每次释放GIL,唤醒的那个线程都能获取到GIL锁,所以能够无缝执行,但多核下,CPU0释放GIL后,其他CPU上的线程都会进行竞争,但GIL可能会马上又被CPU0拿到,导致其他几个CPU上被唤醒后的线程会醒着等待到切换时间后又进入待调度状态,这样会造成线程颠簸(thrashing),导致效率更低。”

由于GIL的机制,Python的多线程相当于“伪多线程”,多核心多线程等价于多核心单线程,还会增加线程调度的颠簸开销。
既然如此,Python的伪多线程有什么用?
多线程首先是线程,有着线程的通用作用或功能,即可以解决CPU利用率问题,

补充GIL的意义
Python 是一门解释型的语言,这就意味着代码是解释一行,运行一行,它并不清楚代码全局;
GIL的意义,CPython官方说CPython的内存管理不是安全的,GIL是必要的,防止对多线程同时对共享数据修改,产生数据不一致性。如果继续问,关于为什么不设计成内存安全,进而去除GIL,我理解使用GIL设计简单吧,就工作量更小,例如多线程也存在线程竞争、调度

CPython科普

CPython指C语言实现的Python解释器,是官方的且使用最广的Python解释器。同样,还存在各种各样Python的实现版本,IronPython、Jython和PyPy。除了 CPython 以外,还有用 JAVA 实现的 Jython 、用.NET 实现的 IronPython、实验性的 Python 解释器比如 PyPy。具体参见
https://wiki.python.org/moin/PythonImplementations?action=show&redirect=implementation

3. Python:多进程 or 多线程

结论:
I/O密集型任务,使用多线程(并发):
计算密集型任务,使用多进程(并行):CPU不够用,多核/多进程起作用。此时,如果使用Python多线程,ticks技术会很快达到阈值,触发GIL的释放与再竞争,线程切换需要消耗资源。

计算密集型、I/O密集型科普

计算密集型任务,需要进行大量的计算,消耗大量CPU资源,例如计算圆周率、高维for循环的数值计算。

I/O密集型任务,例如磁盘、网络IO,大量时间在等待IO操作完成(IO速度远远低于CPU和内存的速度),CPU资源消耗的少。这种情况下整体的速度瓶颈在IO,CPU大部分时间空闲,多核/多进程也没用。

4. 编程实战

https://docs.python.org/3.10/library/multiprocessing.html?highlight=multiprocessing
守护进程(线程)
四个核心函数
apply_asyncmap_asyncapplymap
后缀async意味着异步即非阻塞其他进程,反之,没有后缀是同步阻塞方式。
同步方式类似串行,一次往线程池放一个进程,一个进程使用时会阻塞其他进程,无法并行加速。
一般就用apply_async,无论并行还是串行,都是处理数据,有返回值的,并行能保证保证返回的先后顺序,进而可以复现结果吗,这取决于代码设计方式。

实现一:使用callback,进程间的返回值始终随机

import multiprocessing as mp
import numpy as np
import time

def call_back(res):
    results.append(res)

def onecpu(iter):
    a = iter
    time.sleep(1)
    print(a)
    return a

if __name__ == '__main__':
    results = []
    # print(mp.cpu_count())  # 8
    pool = mp.Pool(mp.cpu_count() // 2)
    t0 = time.time()
    for iter in range(10):
        # 后缀async意味着异步即非阻塞其他进程,反之是同步阻塞方式,
        # pool.map_async(onecpu, (iter,), callback=call_back)

        pool.apply_async(onecpu, (iter,), callback=call_back)

        # a = pool.apply(onecpu, (iter,))  # 同步方式无法加速,使其他进程阻塞只至其结束
        # print(a)

        # pool.map(onecpu, (iter,))  # 同步方式无法加速
    pool.close()
    pool.join()  # 阻塞主进程,之前必须close,

    print('time:{:.2f}'.format(time.time()-t0))
    print(results)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

实现2:不用callback,能保证有序

import multiprocessing as mp
import numpy as np
import time

def onecpu(iter):
    a = iter
    time.sleep(1)
    print(a)
    return a

if __name__ == '__main__':
    results = []
    # print(mp.cpu_count())
    pool = mp.Pool(mp.cpu_count() // 2)
    t0 = time.time()
    for iter in range(10):
        # 后缀async意味着异步即非阻塞其他进程,反之是同步阻塞方式,
        # pool.map_async(onecpu, (iter,), callback=call_back)

        # a = pool.apply_async(onecpu, (iter,), callback=call_back)
        a = pool.apply_async(onecpu, (iter,))
        results.append(a)

        # a = pool.apply(onecpu, (iter,))  # 同步方式无法加速,使其他进程阻塞只至其结束
        # print(a)

        # pool.map(onecpu, (iter,))  # 同步方式无法加速
    pool.close()
    pool.join()  # 阻塞主进程,之前必须close,

    print('time:{:.2f}'.format(time.time()-t0))

    for res in results:
        print(res.get(), end=' ')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

注意,实现2,不能在for循环内改为results.append(a.get()),这样将变为同步(从运行时间上这样猜测的)。

实验2中print的顺序是随机的,但results中是顺序是定的,机理还不清楚;
实验1中print的顺序和results的顺序都是随机的,而且不相同。大量实验得到的结果。

mp.cpu_count()是cpu的逻辑核

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/143625
推荐阅读
相关标签
  

闽ICP备14008679号