当前位置:   article > 正文

Python并发编程:多线程

Python并发编程:多线程

前序博客中已经介绍了基于多进程的并发编程,本篇主要介绍基于多线程的并发编程。

1 全局解释锁

1.1 定义

  全局解释锁(Global Interpreter Lock,简称GIL)是Python(特别是CPython)解释器中的一个机制,这个机制会限制同一时间只有一个线程执行Python字节码。GIL的好处主要有以下两点:

  • 保护解释器的全局状态:在多线程环境中,Python解释器内部的许多数据结构是全局共享的,GIL通过确保同一时刻只有一个线程执行字节码,防止了多个线程同时修改这些数据结构而导致的数据不一致或崩溃。
  • 简化内存管理:Python的内存管理系统需要在对象分配和释放时更新引用计数。GIL使得这一操作变得线程安全,而无需使用复杂的锁机制。

  但GIL也会带来一些负面影响。比如使Python多线程无法充分利用多核CPU的优势。所以本篇要介绍的Python多线程并没有实现真正的并行(Parallel)执行,而是并发(Concurrent)执行。
Tips: 并发是指在同一时间段内多个任务交替执行,可能是单核处理器通过时间片轮转实现,也可能是在多核处理器上通过操作系统的调度实现。虽然多个任务看起来是同时进行的,但实际上每个任务在某个时间点上并没有真正同时执行。并行是指在同一时刻多个任务真正同时执行。并行通常需要多核处理器或多个处理器核心,每个任务在不同的处理器核心上运行,从而实现真正的同时执行。

2 多线程

  Python中的threading模块可以实现多线程。本篇主要基于该模块介绍python中多线程的一般用法。

2.1 创建和启动线程

  Python中可以通过两种方法创建线程。一种是通过重写threading.Thread中的run方法;而是直接把目标函数传递给threading.Thread类。具体代码举例:

import threading
import time
import datetime

# 方式1: 继承Thread类
class MyThread(threading.Thread):
    def run(self):
        print(f"Thread {self.name} is running:")
        time.sleep(2)
        print(f"Thread {self.name} is completed.")

# 方式2: 直接传递目标函数
def thread_function(name):
    print(f"Thread {name} is running:")
    time.sleep(2)
    print(f"Thread {name} is completed.")

# 创建和启动线程
threads = []
for i in range(3):
    t1 = MyThread(name=f"Thread-{i+1}")
    t2 = threading.Thread(target=thread_function, args=(f"Thread-{i+4}",))
    threads.append(t1)
    threads.append(t2)
print("Start all threads:",datetime.datetime.now())
# 启动所有线程
for t in threads:
    t.start()
# 等待所有线程完成
for t in threads:
    t.join()

print("All threads completed:",datetime.datetime.now())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
'
运行

其代码执行结果如下:

Start all threads: 2024-05-24 17:18:28.677731
Thread Thread-1 is running:
Thread Thread-4 is running:
Thread Thread-2 is running:
Thread Thread-5 is running:
Thread Thread-3 is running:
Thread Thread-6 is running:
All threads completed: 2024-05-24 17:18:30.682527
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

从代码运行结果中可以看到:6个线程总共运行了2s,这似乎与前文说的“Python多线程只能并发无法真正并行执行”的结论违背。出现这种情况是因为Python线程在执行time.sleep时释放了GIL,使得其他线程可以继续执行。在这种情况下,所有6个线程几乎同时启动并进入休眠状态,而不是一个接一个地进行,所以总的执行时间将是最长的单个线程的执行时间,即2秒,而不是所有线程执行时间的总和。
将上述代码中的每个线程的任务改成CPU密集型任务再来看代码运行结果:

import threading
import datetime

def target_function():
    sum=0
    for i in range(10000):
        for j in range(10000):
            sum+=i*j

# 创建和启动线程
threads = []
for i in range(3):
    t1 = threading.Thread(target=target_function)
    threads.append(t1)
    
print("验证执行一次target_function的时间")
print("验证开始:",datetime.datetime.now())
target_function()
print("验证结束:",datetime.datetime.now())
print("3个线程执行开始:",datetime.datetime.now())
# 启动所有线程
for t in threads:
    t.start()
# 等待所有线程完成
for t in threads:
    t.join()

print("3个线程执行结束:",datetime.datetime.now())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

其执行结果如下:

验证执行一次target_function的时间
验证开始: 2024-05-24 18:14:40.580747
验证结束: 2024-05-24 18:14:46.259962
3个线程执行开始: 2024-05-24 18:14:46.260006
3个线程执行结束: 2024-05-24 18:15:02.237260
  • 1
  • 2
  • 3
  • 4
  • 5

从运行结果中可以看到,单次target_function的执行时间不到6s,而3个线程总的执行时间大致16s,和3个线程顺序执行的时间差不多。

2.2 线程同步

  线程同步可以防止多个线程在同一时间访问或修改共享数据,从而导致数据错误或程序行为异常。Python提供了多种线程同步机制,常见的包括锁、条件变量、信号量和事件等。

2.2.1 锁

  在Python多线程编程中,锁可以用于同步对多种共享资源的访问,确保线程安全和数据一致性。这些资源包括全局变量和实例变量、数据结构(如列表、字典等)、文件和I/O资源、数据库连接、线程之间的通信资源以及网络资源。目前threading模块中有两种锁类:

  • Lock:Lock对象是最基本的同步原语,有两种状态:锁定和非锁定。线程可以通过acquire()方法获得锁,如果锁已经被其他线程获得,则调用线程会被阻塞,直到锁被释放。线程通过release()方法释放锁,允许其他线程获得锁。
  • RLock(可重入锁):RLock对象允许同一个线程多次获得同一个锁而不会引起死锁。每次调用acquire()方法都会增加锁的计数,直到调用相应次数的release()方法,锁才会被释放。

具体代码举例如下:

import threading
import queue

counter = 0
def increment_counter():
    #累加器
    global counter
    for _ in range(1000):
        with counter_lock: # 使用with语句简化锁的获取和释放
            counter += 1

def producer():
    #生产者线程
    for i in range(10):
        #手动获取或释放锁
        queue_lock.acquire()
        shared_queue.put(i)
        queue_lock.release()

def consumer():
    #消费者线程
    while True:
        with queue_lock:
            if not shared_queue.empty():
                item = shared_queue.get()
            else:
                break

class SharedResource:
    def __init__(self):
        self.value = 0
    def increment(self):
        with rlock:
            self.value += 1
            self.double()
    def double(self):
        with rlock:
            self.value *= 2
def task():
    for _ in range(10):
        resource.increment()
if __name__=="__main__":
    #1.全局变量锁
    counter_lock = threading.Lock()
    threads = [threading.Thread(target=increment_counter) for _ in range(10)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print("全局变量的最终值:",counter)

    #2.共享队列锁
    shared_queue = queue.Queue()
    queue_lock = threading.Lock()
    producer_thread = threading.Thread(target=producer)
    consumer_thread = threading.Thread(target=consumer)
    producer_thread.start()
    producer_thread.join()
    consumer_thread.start()
    consumer_thread.join()

    #3.可重入锁
    rlock = threading.RLock()
    resource = SharedResource()
    threads = [threading.Thread(target=task, name=f'Thread-{i+1}') for i in range(3)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print(f"Final value: {resource.value}")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
'
运行

其代码运行结果如下:

全局变量的最终值: 10000
Final value: 2147483646
  • 1
  • 2
2.2.2 信号量

  信号量(Semaphore)是用于控制资源访问数量的同步原语。信号量内部有一个计数器,每次调用acquire()方法时计数器减1,每次调用release()方法时计数器加1。如果计数器为0,调用acquire()的线程将被阻塞,直到其他线程调用release()。其用法举例如下

import threading
import time
import datetime
semaphore = threading.Semaphore(2)  # 最多允许两个线程同时执行
def task(name):
    semaphore.acquire()
    print(f"Thread {name} starting:",datetime.datetime.now())
    time.sleep(5)
    print(f"Thread {name} finished:",datetime.datetime.now())
    semaphore.release()

threads = [threading.Thread(target=task, args=(f'Thread-{i+1}',)) for i in range(5)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
print("All tasks completed")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

其执行结果如下:

Thread Thread-1 starting: 2024-05-25 16:23:12.605692
Thread Thread-2 starting: 2024-05-25 16:23:12.605832
Thread Thread-1 finished: 2024-05-25 16:23:17.608979
Thread Thread-2 finished: 2024-05-25 16:23:17.608901
Thread Thread-3 starting: 2024-05-25 16:23:17.609524
Thread Thread-4 starting: 2024-05-25 16:23:17.609970
Thread Thread-3 finished: 2024-05-25 16:23:22.613643
Thread Thread-4 finished: 2024-05-25 16:23:22.613916
Thread Thread-5 starting: 2024-05-25 16:23:22.615044
Thread Thread-5 finished: 2024-05-25 16:23:27.618141
All tasks completed
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

从代码运行结果可以看到,限定资源数量为2之后,线程3、4是等到线程1、2结束才开始执行的。

2.3 线程通信

  线程通信指的是线程之间传递信息和协调行为的机制。线程通信可以通过多种方式实现,包括队列、事件、条件变量、信号量等。这里仅介绍事件和的条件变量。

2.3.1 事件

事件(Event)是一种用于线程间通信和同步的机制。事件对象允许一个或多个线程等待某个事件的发生,并在事件发生时被唤醒。threading类中的Event类主要包括以下几个方法:

  • set():将内部标志设置为True,并唤醒所有等待的线程。
  • clear():将内部标志设置为False。
  • wait(timeout=None):如果内部标志为True,则立即返回;否则阻塞线程,直到内部标志被设置为True或超时。
  • is_set():返回内部标志的当前状态。

其用法举例如下:

import threading
import time

def waiter(event):
    print("Waiter: Waiting for the event to be set...")
    event.wait()  # 等待事件被设置为True
    print("Waiter: Event has been set, proceeding...")

def setter(event):
    time.sleep(3)  # 模拟一些工作
    print("Setter: Setting the event")
    event.set()  # 将事件设置为True

# 创建事件对象
event = threading.Event()

# 创建并启动线程
waiter_thread = threading.Thread(target=waiter, args=(event,))
setter_thread = threading.Thread(target=setter, args=(event,))

waiter_thread.start()
setter_thread.start()

waiter_thread.join()
setter_thread.join()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
'
运行

其执行结果如下:

Waiter: Waiting for the event to be set...
Setter: Setting the event
Waiter: Event has been set, proceeding...
  • 1
  • 2
  • 3
2.3.2 条件变量

  条件变量(Condition)允许一个线程等待特定条件的发生,并在条件满足时通知其他线程。它通常与锁一起使用。threading类中的Condition类一般包括以下方法:

  • wait():当前线程等待,直到其他线程调用notify()或notify_all()方法唤醒它。
  • notify():唤醒一个等待中的线程。
  • notify_all():唤醒所有等待中的线程。

其具体用法举例如下:

import threading
import time
import random

condition = threading.Condition()
queue = []

class Producer(threading.Thread):
    def run(self):
        global queue
        while True:
            item = random.randint(1, 10)
            with condition:
                queue.append(item)
                print(f"Produced {item}")
                condition.notify()  # 唤醒消费者线程
            time.sleep(random.random())

class Consumer(threading.Thread):
    def run(self):
        global queue
        while True:
            with condition:
                while not queue:
                    condition.wait()  # 等待生产者线程的通知
                item = queue.pop(0)
                print(f"Consumed {item}")
            time.sleep(random.random())

# 创建并启动线程
producer = Producer()
consumer = Consumer()

producer.start()
consumer.start()

producer.join()
consumer.join()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

补充

补充1: 释放GIL的操作

目前,Python中会导致线程释放GIL的操作主要包括以下几种:

  • I/O操作,例如文件读写、网络请求等。
  • 某些内置函数和标准库函数,例如 time.sleep()、select.select()、socket 模块中的操作。
  • 使用C语言编写的扩展模块可以显式地释放和重新获取GIL。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/871028
推荐阅读
相关标签
  

闽ICP备14008679号