当前位置:   article > 正文

Python四 —— 多进程和多线程实际使用和对比_python多线程和多进程哪个快

python多线程和多进程哪个快

Python四 —— 多进程和多线程

进程和线程

进程和线程的区别是一个老生常见的问题了,一般情况下,进程和线程的区别有如下:

  • 进程是CPU资源分配的基本单位,线程是独立运行和独立调度的基本单位(CPU上真正运行的是线程)。
  • 进程拥有自己的资源空间,一个进程包含若干个线程,线程与CPU资源分配无关,多个线程共享同一进程内的资源。
  • 在大多数编程语言中因为切换消耗的资源更少,多线程比多进程效率更高

这里面有几个重要的点,需要单独提一下:

  • 单核CPU在任意时刻,只能执行单个线程,只有多核CPU才能真正做到多个线程同时运行。
  • 一个进程包含多个线程,这些线程可以分布在多个CPU上。
  • 多核CPU同时运行的线程,可以属于单个进程,也可以属于不同进程
  • 在大多数编程语言中因为切换消耗的资源更少,多线程比多进程效率更高

然而,Python是一个特例

GIL锁

Python开始于1991年,创立之初对运算的要求并不高,为了解决多线程共享内存的数据安全问题,引入了GIL锁,全称为:Global Interepter Lock,也就是全局解释器锁

GIL锁规定: 在一个进程中,每次只能有一个线程在运行。 这就使得GIL锁相当于是线程运行的资格证,某个线程想要运行,首先要获得GIL锁,然后遇到IO或者超时时,再释放GIL锁,给其余的线程去竞争,竞争成功的线程获得GIL锁获得下一次运行机会。

正因为有GIL锁的存在,Python中的多线程其实是假的,也因此有人说 Python的多线程十分鸡肋

多线程 & 多进程 速度对比

  • CPU密集型代码(比如循环计算,海量运算,机器学习等) : 多进程效率更高
  • IO密集型代码(比如文件操作,网络爬虫): 多线程效率更高。

其实也不难理解。对于IO密集型操作,大部分消耗时间其实是等待时间,在等待时间中CPU是不需要工作的,那你在此期间提供双CPU资源也是利用不上的。相反对于CPU密集型代码,2个CPU干活肯定比一个CPU快很多。

那么为什么多线程会对IO密集型代码有用呢?这时因为python碰到等待会释放GIL供新的线程使用,实现了线程间的切换,而线程切换比进程切换快很多。

串行执行 代码实现

在进行多进程和多线程操作之前,我们首先看看一下串行执行,也就是顺序执行情况下的效率。

首先创建一个函数,来表示执行,为了表示一个耗时任务,在函数中加入休眠,休眠 2 s 2s 2s,并在开始和结尾打印进程ID。

def func():
    print('Process {} starts'.format(os.getpid()))
    time.sleep(2)
    print('Process {} ends'.format(os.getpid()))
  • 1
  • 2
  • 3
  • 4

为了对比多线程和多进程结果,首先串行执行,看一下效果,代码如下

if __name__ == "__main__":
    print('Main process is {}'.format(os.getpid()))
    start_time = time.time()
    # ========= single process ========
    func()
    func()
    end_time = time.time()
    print('Total time is {}'.format(str(end_time - start_time)))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

执行结果如下:

Main process is 7440
Process 7440 starts
Process 7440 ends
Process 7440 starts
Process 7440 ends
Total time is 4.014200687408447
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

可以看到,这里是单个进程(进程号就是主进程进程号,只有一个) 先后顺序执行了两遍函数,共耗时约4秒。

多进程 代码实现

多进程

从主进程创建新的进程,用的是Process类,该类在实例化时,通常接两个参数:

  • target—— 新地进程执行函数的函数名

  • args—— 函数的参数

执行代码如下

if __name__ == "__main__":
    print('Main process is {}'.format(os.getpid()))
    start_time = time.time()
    # ========= multiprocessing ========
    from multiprocessing import Process
    p1 = Process(target=func)   # 因为func没有参数,所以args默认
    p2 = Process(target=func)
    print('Process will start')
    p1.start()
    p2.start()
 
    print('Process will join')
    p1.join()
    p2.join()
 
    end_time = time.time()
    print('Total time is {}'.format(str(end_time - start_time)))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

创建完Process对象以后通过start()方法来启动该进程,同时如果想让某个进程阻塞主进程,可以执行该进程的join()方法。

正常情况下创建完子进程以后主进程会继续向下执行直到结束,如果有子进程阻塞了主进程则主进程会等待该子进程执行完以后才向下执行。这里主进程会等待p1p2两个子进程都执行完毕才计算结束时间。

执行结果如下:

Main process is 5616
Process will start
Process will join
Process 22412 starts
Process 20256 starts
Process 22412 ends
Process 20256 ends
Total time is 2.2408976554870605
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

可以看到,创建的子进程和主进程的进程ID是不一样的,说明此时一共有 3 3 3个进程在同时跑。最后的用时为 2.24 s 2.24s 2.24s,几乎降到了顺序执行一半的程度,当然比单个函数执行的时间还是慢了点,说明进程的创建和停止还是需要耗时的

进程池

从上述例子中,可以看出,进程的创建和停止也是需要耗时的,因此进程绝不是越多越好。因为单个CPU核某时刻只能执行单个进程,所以最好的情况就是让最大进程数等于CPU核数,这样可以最大化利用CPU

这时就会有一个问题出现了,如果进程数少还比较容易,但是如果进程数特别多,又需要维持一个固定数量的进程,这时候就需要用到进程池了。

线程池创建使用方法如下:

if __name__ == "__main__":
    print('Main process is {}'.format(os.getpid()))
    from multiprocessing import Process, Pool, cpu_count
    print("core number is {}".format(cpu_count()))  # 16
    start_time = time.time()
    # ========= multiprocessing Pool ========
    p = Pool(16)
    for i in range(18):
        p.apply_async(func)
    p.close()
    p.join()
 
    end_time = time.time()
    print('Total time is {}'.format(str(end_time - start_time)))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

这里我首先利用cpu_count()方法计算了一下电脑的CPU核数,16核,所以进程池的最大进程数目设定为16。

PS:我电脑物理上是单CPU,8核。但是因为intel有超线程技术,一个核可以当作两个核来跑,所以逻辑上相当于16核
  • 1

这里利用Pool类来创建进程池,传递一个参数是最大进程数。利用Pool对象的apply_async()方法往进程池中添加待执行的任务(注意不是进程,只是任务),这里也可以利用map_async(func, iterable)来添加,用来类似于内建的map()方法,不过需要待执行的函数带参数,类似下面这样

if __name__ == "__main__":
    print('Main process is {}'.format(os.getpid()))
    from multiprocessing import Process, Pool, cpu_count
    print("Core number is {}".format(cpu_count()))  # 16
    start_time = time.time()
    # ========= multiprocessing Pool ========
    p = Pool(16)
    p.map_async(func, range(18))
    # for i in range(18):
    #     p.apply_async(func)
    p.close()
    p.join()
 
    end_time = time.time()
    print('Total time is {}'.format(str(end_time - start_time)))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

然后是close()方法,进程池不再接受新的任务(注意不是进程)以及terminate()方法,关闭主进程,此时未开始的子进程都不会执行了。同样的,想要让进程池去阻塞主进程可以用join()方法。注意join()一定要在close()或者terminate()之后

运行结果如下:

Main process is 26964
Core number is 16
Process 20264 starts
Process 26256 starts
Process 29268 starts
Process 6816 starts
Process 9276 starts
Process 2428 starts
Process 8216 starts
Process 30588 starts
Process 4516 starts
Process 14672 starts
Process 30404 starts
Process 24232 starts
Process 9376 starts
Process 30096 starts
Process 23436 starts
Process 28660 starts
Process 26256 ends
Process 20264 ends
Process 26256 starts
Process 20264 starts
Process 29268 ends
Process 6816 ends
Process 9276 ends
Process 8216 ends
Process 2428 ends
Process 14672 ends
Process 4516 ends
Process 30588 ends
Process 30404 ends
Process 30096 ends
Process 9376 ends
Process 24232 ends
Process 23436 ends
Process 28660 ends
Process 26256 ends
Process 20264 ends
Total time is 6.874840021133423
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

一共 18 18 18个任务,在最大数目为 16 16 16的进程池里面至少要执行两轮,同时加上进程启动和停止的消耗,最后用时 6.874 s 6.874s 6.874s

进程间通信

队列通信

前面说到进程间是相互独立的,不共享内存空间,所以在一个进程中声明的变量在另一个进程中是看不到的。这时候就要借助一些工具来在两个进程间进行数据传输了,其中最常见的就是队列了。

队列(queue)生产消费者模型中很常见,生产者进程在队列一端写入,消费者进程在队列另一端读取。

首先创建两个函数,分别扮演生产者和消费者

def write_to_queue(queue):
    for index in range(5):
        print('Write {} to {}'.format(str(index), queue))
        queue.put(index)
        time.sleep(1)
 
 
def read_from_queue(queue):
    while True:
        result = queue.get(True)
        print('Get {} from {}'.format(str(result), queue))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

这两个函数都接受一个队列作为参数然后利用put()方法往其中写入或者get()方法来读取。生产者会连续写入 5 5 5个数字,每次间隔 1 1 1秒,消费者则会一直尝试读取。

主程序如下

if __name__ == "__main__":
    print('Main process is {}'.format(os.getpid()))
    from multiprocessing import Process, Pool, cpu_count, Queue
 
    print("Core number is {}".format(cpu_count()))  # 16
    start_time = time.time()
    # ========= multiprocessing queue ========
    queue = Queue()
    pw = Process(target=write_to_queue, args=(queue,))  # 注意输入args是元组,如果是单个元素需要加,
    pr = Process(target=read_from_queue, args=(queue,))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()  # 停止消费者进程
 
    end_time = time.time()
    print('Total time is {}'.format(str(end_time - start_time)))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

注意这里在创建子进程的时候就用元组的形式传递了参数,如果元组只有一个元素,记住添加逗号,否则会被认为是单个元素而不是元组。同时这里因为消费者是死循环,所以只是将生产者加入了阻塞,生产者进程执行完毕以后停止消费者进程。

最终输出结果如下:

Main process is 25200
Core number is 16
Write 0 to <multiprocessing.queues.Queue object at 0x0000029EA3722B80>
Get 0 from <multiprocessing.queues.Queue object at 0x0000025B1CF82B80>
Write 1 to <multiprocessing.queues.Queue object at 0x0000029EA3722B80>
Get 1 from <multiprocessing.queues.Queue object at 0x0000025B1CF82B80>
Write 2 to <multiprocessing.queues.Queue object at 0x0000029EA3722B80>
Get 2 from <multiprocessing.queues.Queue object at 0x0000025B1CF82B80>
Write 3 to <multiprocessing.queues.Queue object at 0x0000029EA3722B80>
Get 3 from <multiprocessing.queues.Queue object at 0x0000025B1CF82B80>
Write 4 to <multiprocessing.queues.Queue object at 0x0000029EA3722B80>
Get 4 from <multiprocessing.queues.Queue object at 0x0000025B1CF82B80>
Total time is 6.263579607009888
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
进程间共享变量

不同的进程默认有自己独立的内存空间,互相之间不能直接访问。所以,通常的全局变量或者传递参数,在创建新进程时,都是拷贝一份到新进程里使用。

如果希望不同进程读写同一个变量,需要做特殊的声明multiprocessing提供了两种实现方式,一种是共享内存,一种是使用服务进程共享内存只支持两种数据结构ValueArray

共享内存

class Test:
    count = multiprocessing.Value('i', 0)
 
    @classmethod
    def fun(cls):
        cls.count.value += 1
        print('cls id: {}, cls.count id: {}, count: {}'.format(id(cls), id(cls.count), cls.count.value))
 
    @classmethod
    def test(cls):
        p = multiprocessing.Process(target=cls.fun)
        p.start()
        p.join()
        print('cls id: {}, cls.count id: {}, count: {}'.format(id(cls), id(cls.count), cls.count.value))
 
if __name__ == '__main__':
    Test.test()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

执行结果如下:

cls id: 140731472827480, cls.count id: 4554919384, count: 1
cls id: 140731472827480, cls.count id: 4554919384, count: 1
  • 1
  • 2

子进程和主进程访问的count在内存中的地址是相同的。这里有两点需要注意:

  1. multiprocessing.Value对象和Process一起使用的时候,可以像上面那样作为全局变量使用,也可以作为传入参数使用。但是和Pool一起使用的时候,只能作为全局变量使用,作为传入参数使用会报错RuntimeError: Synchronized objects should only be shared between processes through inheritance
  2. 多个进程读写共享变量的时候,要注意操作是否是进程安全的。对于前面的累加计数器,虽然是一个语句,但是涉及到读和写,和进程的局域临时变量,这个操作不是进程安全的。多进程的累加的时候,会出现不正确的结果。需要给cls.count += 1加上锁。加锁的方式,可以使用外部的锁,也可以直接使用get_lock()方法。
# 使用外部的锁
class Test:
    lock = multiprocessing.Lock()
        ...
    def fun(cls):
        cls.lock.acquire()
        cls.count.value += 1
        cls.lock.release()
 
# 使用get_lock()方法
    def fun(cls):
        with cls.count.get_lock():
            cls.count += 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

共享内存支持的数据结构有限,另一种共享变量的方式是使用服务进程管理需要共享的变量,其他进程操作共享变量的时候通过和服务进程的交互实现。

这种方式支持列表、字典等类型,而且可以实现多台机器之间共享变量。但是速度要比共享内存的方式。另外,这种方式可以用作Pool的传入参数。同样的,对于非进程安全的操作,也需要加锁

import multiprocessing
import os
 
class Test:
    lock = multiprocessing.Lock()
 
    @classmethod
    def fun(cls, count):
        cls.lock.acquire()
        count.value += 1
        if count.value % 500 == 0:
            print('pid: {}, cls id: {}, cls.count id: {}, count: {}'.format(os.getpid(), id(cls), id(count), count.value))
        cls.lock.release()
 
    @classmethod
    def test(cls):
        manager = multiprocessing.Manager()
        count = manager.Value('i', 0)
        print('main-cls id: {}, count id: {}, count: {}'.format(id(cls), id(count), count.value))
 
        pool = multiprocessing.Pool(2)
        rs = []
        for i in range(1000):
            p = pool.apply_async(func=cls.fun, kwds={'count': count})
            rs.append(p)
        pool.close()
        pool.join()
        for i in rs:
            i.get()
        print('main-cls id: {}, count id: {}, count: {}'.format(id(cls), id(count), count.value))
 
if __name__ == '__main__':
    Test.test()
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

执行结果如下:

main-cls id: 140298063822200, count id: 4347985312, count: 0
pid: 47895, cls id: 140298063822200, count id: 4348083224, count: 500
pid: 47895, cls id: 140298063822200, count id: 4348082776, count: 1000
main-cls id: 140298063822200, count id: 4347985312, count: 1000
  • 1
  • 2
  • 3
  • 4

另外一种常用的共享对象就是队列multiprocessing.Queue,这个对象本身提供的方法都是进程安全的,所以使用时不必再加锁。

多线程 代码实现

多线程实现

首先创建一个函数,用于测试:

import threading
def func2(n):
    print('Thread {} starts'.format(threading.current_thread().name))
    time.sleep(2)
    print('Thread {} ends'.format(threading.current_thread().name))
    return n
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

多线程使用的是threading.Thread

if __name__ == '__main__':
    print('Main thread is {}'.format(threading.current_thread().name))
    start_time = time.time()
    # ======= multithread =========
    t1 = threading.Thread(target=func2, args=(1,))
    t2 = threading.Thread(target=func2, args=(2,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
 
    end_time = time.time()
    print('Total time is {}'.format(str(end_time - start_time)))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

基本用法和上面进程的Process差不多,执行结果如下:

Main thread is MainThread
Thread Thread-1 starts
Thread Thread-2 starts
Thread Thread-2 ends
Thread Thread-1 ends
Total time is 2.0166513919830322
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

对比前面多进程的 2.24 s 2.24s 2.24s,这里还是快了不少的。

线程池

和进程一样,通常是使用线程池来完成自动控制线程数量的目的。但是这里就没有一个推荐的上限数量了,毕竟因为GIL锁的存在不管怎么样每次都只有一个线程在跑。

同时threading模块是不支持线程池的,python3.4以后官方推出了concurrent.futures模块来统一进程池和线程池的接口,这里关注一下线程池。

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
 
if __name__ == '__main__':
    print('Main thread is {}'.format(threading.current_thread().name))
    start_time = time.time()
    # ========= thread pool ===========
    executor = ThreadPoolExecutor(16)  # 创建一个最大容量为16的线程池
    all_tasks = [executor.submit(func2, i) for i in range(18)]  # 将任务添加到线程池, 并返回一个future对象列表
    # wait函数阻塞主线程, 该函数接受3个参数
    # 第一个参数是future对象列表
    # 第二个参数是超时时间
    # 第三个参数是在什么时候结束阻塞, 默认是ALL_COMPLETED表示全部任务结束之后,也可以设定为FIRST_COMPLETED表示第一个任务结束以后
    wait(all_tasks, return_when=ALL_COMPLETED)
 
    end_time = time.time()
    print('Total time is {}'.format(str(end_time - start_time)))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

这里利用ThreadPoolExecutor()创建一个线程池,最大上限为 5 5 5,然后利用submit()方法往线程池中添加任务(注意是任务,不是线程),submit()方法会返回一个future对象,注意这里我将创建的任务放进了一个列表中。

如果要阻塞主线程,不能用join()方法了,需要用到wait()方法,该方法接受三个参数

  • 第一个参数是一个future对象的列表
  • 第二个参数是超时时间,这里放空
  • 第三个参数是在什么时候结束阻塞,默认是ALL_COMPLETED表示全部任务结束之后,也可以设定为FIRST_COMPLETED表示第一个任务结束以后。

执行结果如下:

Main thread is MainThread
Thread ThreadPoolExecutor-0_0 starts
Thread ThreadPoolExecutor-0_1 starts
Thread ThreadPoolExecutor-0_2 starts
Thread ThreadPoolExecutor-0_3 starts
Thread ThreadPoolExecutor-0_4 starts
Thread ThreadPoolExecutor-0_5 starts
Thread ThreadPoolExecutor-0_6 starts
Thread ThreadPoolExecutor-0_7 starts
Thread ThreadPoolExecutor-0_8 starts
Thread ThreadPoolExecutor-0_9 starts
Thread ThreadPoolExecutor-0_10 starts
Thread ThreadPoolExecutor-0_11 starts
Thread ThreadPoolExecutor-0_12 starts
Thread ThreadPoolExecutor-0_13 starts
Thread ThreadPoolExecutor-0_14 starts
Thread ThreadPoolExecutor-0_15 starts
Thread ThreadPoolExecutor-0_15 ends
Thread ThreadPoolExecutor-0_12 ends
Thread ThreadPoolExecutor-0_10 ends
Thread ThreadPoolExecutor-0_10 starts
Thread ThreadPoolExecutor-0_12 starts
Thread ThreadPoolExecutor-0_8 ends
Thread ThreadPoolExecutor-0_4 ends
Thread ThreadPoolExecutor-0_0 ends
Thread ThreadPoolExecutor-0_2 ends
Thread ThreadPoolExecutor-0_14 ends
Thread ThreadPoolExecutor-0_7 ends
Thread ThreadPoolExecutor-0_5 ends
Thread ThreadPoolExecutor-0_6 ends
Thread ThreadPoolExecutor-0_3 ends
Thread ThreadPoolExecutor-0_1 ends
Thread ThreadPoolExecutor-0_11 ends
Thread ThreadPoolExecutor-0_9 ends
Thread ThreadPoolExecutor-0_13 ends
Thread ThreadPoolExecutor-0_12 ends
Thread ThreadPoolExecutor-0_10 ends
Total time is 4.019853830337524
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

最后的结果也是接近两倍的函数耗时 4 4 4秒,比进程池快了不止一点点。

map

这里需要额外提一下多线程中的map()方法。

多进程中的map_async()方法和多线程中的map()方法除了将任务加入线程池,还会按添加的顺序返回每个线程的执行结果,这个执行结果也很特殊,是一个生成器

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
 
if __name__ == '__main__':
    print('Main thread is {}'.format(threading.current_thread().name))
    start_time = time.time()
    # ========= thread pool ===========
    executor = ThreadPoolExecutor(16)  # 创建一个最大容量为16的线程池
    all_result = executor.map(func2, range(18))  # 通过线程池的map方法,将任务提交给线程池, map返回的是线程执行的结果的生成器对象
    for res in all_result:
        print(res)
 
    end_time = time.time()
    print('Total time is {}'.format(str(end_time - start_time)))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

这里的all_results是一个生成器,可以通过for循环来按顺序获取每个线程的返回结果。同时值得注意的是map方法并不会阻塞主线程,也没法使用wait方法,只能通过获取生成器的结果来阻塞主线程了。

执行结果如下:

Main thread is MainThread
Thread ThreadPoolExecutor-0_0 starts
Thread ThreadPoolExecutor-0_1 starts
Thread ThreadPoolExecutor-0_2 starts
Thread ThreadPoolExecutor-0_3 starts
Thread ThreadPoolExecutor-0_4 starts
Thread ThreadPoolExecutor-0_5 starts
Thread ThreadPoolExecutor-0_6 starts
Thread ThreadPoolExecutor-0_7 starts
Thread ThreadPoolExecutor-0_8 starts
Thread ThreadPoolExecutor-0_9 starts
Thread ThreadPoolExecutor-0_10 starts
Thread ThreadPoolExecutor-0_11 starts
Thread ThreadPoolExecutor-0_12 starts
Thread ThreadPoolExecutor-0_13 starts
Thread ThreadPoolExecutor-0_14 starts
Thread ThreadPoolExecutor-0_15 starts
Thread ThreadPoolExecutor-0_3 ends
Thread ThreadPoolExecutor-0_3 starts
Thread ThreadPoolExecutor-0_9 ends
Thread ThreadPoolExecutor-0_9 starts
Thread ThreadPoolExecutor-0_12 ends
Thread ThreadPoolExecutor-0_8 ends
Thread ThreadPoolExecutor-0_10 ends
Thread ThreadPoolExecutor-0_0 ends
Thread ThreadPoolExecutor-0_11 ends
Thread ThreadPoolExecutor-0_15 ends
Thread ThreadPoolExecutor-0_2 ends
Thread ThreadPoolExecutor-0_4 ends
Thread ThreadPoolExecutor-0_14 ends
Thread ThreadPoolExecutor-0_1 ends
Thread ThreadPoolExecutor-0_13 ends
Thread ThreadPoolExecutor-0_5 ends
Thread ThreadPoolExecutor-0_6 ends
Thread ThreadPoolExecutor-0_7 ends
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Thread ThreadPoolExecutor-0_3 ends
Thread ThreadPoolExecutor-0_9 ends
16
17
Total time is 4.027376174926758
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

可以看出线程结果是按顺序返回的。

异步

想要不用map()方法又要异步获取线程的返回值,还可以用as_completed()方法

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, as_completed
 
if __name__ == '__main__':
    print('Main thread is {}'.format(threading.current_thread().name))
    start_time = time.time()
    # ========= thread pool ===========
    executor = ThreadPoolExecutor(16)  # 创建一个最大容量为16的线程池
    all_tasks = [executor.submit(func2, (i)) for i in range(18)]  # 将任务提交到线程池
    for future in as_completed(all_tasks):
        print(future.result())
 
    end_time = time.time()
    print('Total time is {}'.format(str(end_time - start_time)))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

as_completed()接受一个任务列表做为参数,返回一个生成器,所以主线程依然会被阻塞,等所有线程执行完毕打印出结果再继续执行主线程。

执行结果如下:

Main thread is MainThread
Thread ThreadPoolExecutor-0_0 starts
Thread ThreadPoolExecutor-0_1 starts
Thread ThreadPoolExecutor-0_2 starts
Thread ThreadPoolExecutor-0_3 starts
Thread ThreadPoolExecutor-0_4 starts
Thread ThreadPoolExecutor-0_5 starts
Thread ThreadPoolExecutor-0_6 starts
Thread ThreadPoolExecutor-0_7 starts
Thread ThreadPoolExecutor-0_8 starts
Thread ThreadPoolExecutor-0_9 starts
Thread ThreadPoolExecutor-0_10 starts
Thread ThreadPoolExecutor-0_11 starts
Thread ThreadPoolExecutor-0_12 starts
Thread ThreadPoolExecutor-0_13 starts
Thread ThreadPoolExecutor-0_14 starts
Thread ThreadPoolExecutor-0_15 starts
Thread ThreadPoolExecutor-0_12 endsThread ThreadPoolExecutor-0_15 ends
Thread ThreadPoolExecutor-0_12 starts
12Thread ThreadPoolExecutor-0_4 ends
Thread ThreadPoolExecutor-0_4 starts
Thread ThreadPoolExecutor-0_14 ends
Thread ThreadPoolExecutor-0_5 ends
Thread ThreadPoolExecutor-0_9 ends
Thread ThreadPoolExecutor-0_11 ends
Thread ThreadPoolExecutor-0_3 ends
Thread ThreadPoolExecutor-0_8 ends
Thread ThreadPoolExecutor-0_2 ends
Thread ThreadPoolExecutor-0_7 ends
Thread ThreadPoolExecutor-0_13 ends
 
Thread ThreadPoolExecutor-0_1 ends
Thread ThreadPoolExecutor-0_0 ends
Thread ThreadPoolExecutor-0_6 ends
Thread ThreadPoolExecutor-0_10 ends
4
14
5
9
11
3
 
8
2
7
13
15
1
0
6
10
Thread ThreadPoolExecutor-0_12 endsThread ThreadPoolExecutor-0_4 ends
 
17
16
Total time is 4.024051904678345
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

这里的线程结果就不是按照就不是按照添加任务的顺序,而是按照返回的先后顺序打印的

所以,想要获取多线程的返回结果,按照添加顺序就用map方法,按照返回的先后顺序就用as_completed方法

线程间通讯

互斥锁的必要性

与多进程的内存独立不同,多线程间可以共享内存,所以同一个变量是可以被多个线程共享的,不需要额外的插件。

想要让多个线程能同时操作某变量,要么将该变量作为参数传递到线程中(必须是可变变量,例如list和dict),要么作为全局变量在线程中用global关键字进行声明。

因为有GIL锁的存在,每次只能有一个线程在对变量进行操作,有人就认为python不需要互斥锁了。但是实际情况却和我们想的相差很远,先看下面这个例子

import time
import threading
 
 
def increase(var):
    global total_increase_times
    for i in range(1000000):
        var[0] += 1
        total_increase_times += 1
 
 
def decrease(var):
    global total_decrease_times
    for i in range(1000000):
        var[0] -= 1
        total_decrease_times += 1
 
 
if __name__ == '__main__':
    print('Main thread is {}'.format(threading.current_thread().name))
    start_time = time.time()
    var = [5]
    total_increase_times = 0
    total_decrease_times = 0
    t1 = threading.Thread(target=increase, args=(var,))
    t2 = threading.Thread(target=decrease, args=(var,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(var)
    print('Total increase times: {}'.format(str(total_increase_times)))
    print('Total decrease times: {}'.format(str(total_decrease_times)))
    end_time = time.time()
    print('Total time is {}'.format(str(end_time - start_time)))
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

这里首先定义了两个函数,分别对传进来的list的第一个元素进行加一和减一操作,重复多遍

然后在主线程中创建两个子线程分别运行,同时创建两个全局变量total_increase_timestotal_decrease_times分别来统计对变量进行加值和减值的次数,为了防止可能由于操作次数不一致导致的错误。

执行结果如下:

Main thread is MainThread
[-487906]
Total increase times: 1000000
Total decrease times: 1000000
Total time is 0.37512660026550293
  • 1
  • 2
  • 3
  • 4
  • 5

很奇怪,对变量值增加和减少同样的次数,最后的结果却和原先的值不一致。而且如果将该程序重复运行多次,每次得到的最终值都不同,有正有负。

这是为什么呢?

这是因为某些在我们看来是原子操作的,例如+或者-,在python看来不是的。例如执行a+=1操作,在python看来其实是三步:获取a的值,将值加1,将新的值赋给a。在这三步中的任意位置,该线程都有可能被暂停,然后让别的线程先运行。这时候就有可能出现如下的局面

线程1获取了a的值为10,被暂停
线程2获取了a的值为10
线程2将a的值赋值为9,被暂停
线程1将a的值赋值为11,被暂停
线程2获取了a的值为11
...
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

这样线程1就将线程2的操作全部覆盖了,这也就是为什么最后的结果有正有负。

那么如何处理这种情况呢?

需要用到互斥锁。

互斥锁

线程1在操作变量a的时候就给a上一把锁,别的线程看到变量有锁就不会去操作该变量,一直到线程1再次获得GIL之后继续操作将锁释放,别的线程才有机会对该变量进行操作。

修改下上面的代码

import time
import threading
 
 
def increase(var, lock):
    global total_increase_times
    for i in range(1000000):
        if lock.acquire():
            var[0] += 1
            lock.release()
            total_increase_times += 1
 
 
def decrease(var, lock):
    global total_decrease_times
    for i in range(1000000):
        if lock.acquire():
            var[0] -= 1
            lock.release()
            total_decrease_times += 1
 
 
if __name__ == '__main__':
    print('Main thread is {}'.format(threading.current_thread().name))
    start_time = time.time()
    lock = threading.Lock()  # 创建互斥锁
    var = [5]
    total_increase_times = 0
    total_decrease_times = 0
    t1 = threading.Thread(target=increase, args=(var, lock))  # 参数加入互斥锁
    t2 = threading.Thread(target=decrease, args=(var, lock))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(var)
    print('Total increase times: {}'.format(str(total_increase_times)))
    print('Total decrease times: {}'.format(str(total_decrease_times)))
    end_time = time.time()
    print('Total time is {}'.format(str(end_time - start_time)))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

这里创建了一个全局锁lock并传递给两个线程,利用acquire()方法获取锁,如果没有获取到锁该线程会一直卡在这,并不会继续循环,操作完毕用release()方法释放锁。

执行结果如下:

Main thread is MainThread
[5]
Total increase times: 1000000
Total decrease times: 1000000
Total time is 1.314643144607544
  • 1
  • 2
  • 3
  • 4
  • 5

最终的结果不管执行多少次都没有问题,但是因为前面说的等待锁的过程会造成大量时间的浪费,这里耗时 1.31 1.31 1.31秒比前面的 0.37 0.37 0.37秒要慢了接近 4 4 4倍。

队列

多线程间通讯也可以用queue,因为queue是对线程安全的,不需要额外加锁了

import time
import threading
 
 
def write_to_queue(queue):
    for index in range(5):
        print('Write {} to {}'.format(str(index), queue))
        queue.put(index)
        time.sleep(1)
 
 
def read_from_queue(queue):
    while True:
        result = queue.get(True)
        print('Get {} from {}'.format(str(result), queue))
 
 
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, as_completed
 
if __name__ == '__main__':
    print('Main thread is {}'.format(threading.current_thread().name))
    start_time = time.time()
    # ========== multithread queue ===============
    from queue import Queue
 
    queue = Queue()
    tw = threading.Thread(target=write_to_queue, args=(queue,))
    tr = threading.Thread(target=read_from_queue, args=(queue,))
    tr.setDaemon(True)  # 停止主线程,子线程也会停止
    tw.start()
    tr.start()
    tw.join()
    end_time = time.time()
    print('Total time is {}'.format(str(end_time - start_time)))
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

这里不能像进程中那样用terminate方法停止一个线程,需要用setDaemon方法。

Main thread is MainThread
Write 0 to <queue.Queue object at 0x000001ED97C986D0>
Get 0 from <queue.Queue object at 0x000001ED97C986D0>
Write 1 to <queue.Queue object at 0x000001ED97C986D0>
Get 1 from <queue.Queue object at 0x000001ED97C986D0>
Write 2 to <queue.Queue object at 0x000001ED97C986D0>
Get 2 from <queue.Queue object at 0x000001ED97C986D0>
Write 3 to <queue.Queue object at 0x000001ED97C986D0>
Get 3 from <queue.Queue object at 0x000001ED97C986D0>
Write 4 to <queue.Queue object at 0x000001ED97C986D0>
Get 4 from <queue.Queue object at 0x000001ED97C986D0>
Total time is 5.057354688644409
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

总结

  • CPU密集型使用多进程,IO密集型使用多线程

  • 查看进程ID和线程ID的命令分别是os.getpid()threading.current_thread()

  • 多进程使用multiprocessing就可以了,通常使用进程池来完成操作,阻塞主进程使用join()方法

  • 多线程使用threading模块,线程池使用concurrent.futures模块,同时主线程的阻塞方法有多种

  • 不管多进程还是多线程,生产消费模型都可以用队列来完成,如果要用多线程操作同一变量记得加互斥锁

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/151577
推荐阅读
相关标签
  

闽ICP备14008679号