赞
踩
前序博客中已经介绍了基于多进程的并发编程,本篇主要介绍基于多线程的并发编程。
全局解释锁(Global Interpreter Lock,简称GIL)是Python(特别是CPython)解释器中的一个机制,这个机制会限制同一时间只有一个线程执行Python字节码。GIL的好处主要有以下两点:
但GIL也会带来一些负面影响。比如使Python多线程无法充分利用多核CPU的优势。所以本篇要介绍的Python多线程并没有实现真正的并行(Parallel)执行,而是并发(Concurrent)执行。
Tips: 并发是指在同一时间段内多个任务交替执行,可能是单核处理器通过时间片轮转实现,也可能是在多核处理器上通过操作系统的调度实现。虽然多个任务看起来是同时进行的,但实际上每个任务在某个时间点上并没有真正同时执行。并行是指在同一时刻多个任务真正同时执行。并行通常需要多核处理器或多个处理器核心,每个任务在不同的处理器核心上运行,从而实现真正的同时执行。
Python中的threading
模块可以实现多线程。本篇主要基于该模块介绍python中多线程的一般用法。
Python中可以通过两种方法创建线程。一种是通过重写threading.Thread
中的run
方法;而是直接把目标函数传递给threading.Thread
类。具体代码举例:
import threading import time import datetime # 方式1: 继承Thread类 class MyThread(threading.Thread): def run(self): print(f"Thread {self.name} is running:") time.sleep(2) print(f"Thread {self.name} is completed.") # 方式2: 直接传递目标函数 def thread_function(name): print(f"Thread {name} is running:") time.sleep(2) print(f"Thread {name} is completed.") # 创建和启动线程 threads = [] for i in range(3): t1 = MyThread(name=f"Thread-{i+1}") t2 = threading.Thread(target=thread_function, args=(f"Thread-{i+4}",)) threads.append(t1) threads.append(t2) print("Start all threads:",datetime.datetime.now()) # 启动所有线程 for t in threads: t.start() # 等待所有线程完成 for t in threads: t.join() print("All threads completed:",datetime.datetime.now())
其代码执行结果如下:
Start all threads: 2024-05-24 17:18:28.677731
Thread Thread-1 is running:
Thread Thread-4 is running:
Thread Thread-2 is running:
Thread Thread-5 is running:
Thread Thread-3 is running:
Thread Thread-6 is running:
All threads completed: 2024-05-24 17:18:30.682527
从代码运行结果中可以看到:6个线程总共运行了2s,这似乎与前文说的“Python多线程只能并发无法真正并行执行”的结论违背。出现这种情况是因为Python线程在执行time.sleep
时释放了GIL,使得其他线程可以继续执行。在这种情况下,所有6个线程几乎同时启动并进入休眠状态,而不是一个接一个地进行,所以总的执行时间将是最长的单个线程的执行时间,即2秒,而不是所有线程执行时间的总和。
将上述代码中的每个线程的任务改成CPU密集型任务再来看代码运行结果:
import threading import datetime def target_function(): sum=0 for i in range(10000): for j in range(10000): sum+=i*j # 创建和启动线程 threads = [] for i in range(3): t1 = threading.Thread(target=target_function) threads.append(t1) print("验证执行一次target_function的时间") print("验证开始:",datetime.datetime.now()) target_function() print("验证结束:",datetime.datetime.now()) print("3个线程执行开始:",datetime.datetime.now()) # 启动所有线程 for t in threads: t.start() # 等待所有线程完成 for t in threads: t.join() print("3个线程执行结束:",datetime.datetime.now())
其执行结果如下:
验证执行一次target_function的时间
验证开始: 2024-05-24 18:14:40.580747
验证结束: 2024-05-24 18:14:46.259962
3个线程执行开始: 2024-05-24 18:14:46.260006
3个线程执行结束: 2024-05-24 18:15:02.237260
从运行结果中可以看到,单次target_function的执行时间不到6s,而3个线程总的执行时间大致16s,和3个线程顺序执行的时间差不多。
线程同步可以防止多个线程在同一时间访问或修改共享数据,从而导致数据错误或程序行为异常。Python提供了多种线程同步机制,常见的包括锁、条件变量、信号量和事件等。
在Python多线程编程中,锁可以用于同步对多种共享资源的访问,确保线程安全和数据一致性。这些资源包括全局变量和实例变量、数据结构(如列表、字典等)、文件和I/O资源、数据库连接、线程之间的通信资源以及网络资源。目前threading
模块中有两种锁类:
具体代码举例如下:
import threading import queue counter = 0 def increment_counter(): #累加器 global counter for _ in range(1000): with counter_lock: # 使用with语句简化锁的获取和释放 counter += 1 def producer(): #生产者线程 for i in range(10): #手动获取或释放锁 queue_lock.acquire() shared_queue.put(i) queue_lock.release() def consumer(): #消费者线程 while True: with queue_lock: if not shared_queue.empty(): item = shared_queue.get() else: break class SharedResource: def __init__(self): self.value = 0 def increment(self): with rlock: self.value += 1 self.double() def double(self): with rlock: self.value *= 2 def task(): for _ in range(10): resource.increment() if __name__=="__main__": #1.全局变量锁 counter_lock = threading.Lock() threads = [threading.Thread(target=increment_counter) for _ in range(10)] for thread in threads: thread.start() for thread in threads: thread.join() print("全局变量的最终值:",counter) #2.共享队列锁 shared_queue = queue.Queue() queue_lock = threading.Lock() producer_thread = threading.Thread(target=producer) consumer_thread = threading.Thread(target=consumer) producer_thread.start() producer_thread.join() consumer_thread.start() consumer_thread.join() #3.可重入锁 rlock = threading.RLock() resource = SharedResource() threads = [threading.Thread(target=task, name=f'Thread-{i+1}') for i in range(3)] for thread in threads: thread.start() for thread in threads: thread.join() print(f"Final value: {resource.value}")
其代码运行结果如下:
全局变量的最终值: 10000
Final value: 2147483646
信号量(Semaphore)是用于控制资源访问数量的同步原语。信号量内部有一个计数器,每次调用acquire()方法时计数器减1,每次调用release()方法时计数器加1。如果计数器为0,调用acquire()的线程将被阻塞,直到其他线程调用release()。其用法举例如下
import threading import time import datetime semaphore = threading.Semaphore(2) # 最多允许两个线程同时执行 def task(name): semaphore.acquire() print(f"Thread {name} starting:",datetime.datetime.now()) time.sleep(5) print(f"Thread {name} finished:",datetime.datetime.now()) semaphore.release() threads = [threading.Thread(target=task, args=(f'Thread-{i+1}',)) for i in range(5)] for thread in threads: thread.start() for thread in threads: thread.join() print("All tasks completed")
其执行结果如下:
Thread Thread-1 starting: 2024-05-25 16:23:12.605692
Thread Thread-2 starting: 2024-05-25 16:23:12.605832
Thread Thread-1 finished: 2024-05-25 16:23:17.608979
Thread Thread-2 finished: 2024-05-25 16:23:17.608901
Thread Thread-3 starting: 2024-05-25 16:23:17.609524
Thread Thread-4 starting: 2024-05-25 16:23:17.609970
Thread Thread-3 finished: 2024-05-25 16:23:22.613643
Thread Thread-4 finished: 2024-05-25 16:23:22.613916
Thread Thread-5 starting: 2024-05-25 16:23:22.615044
Thread Thread-5 finished: 2024-05-25 16:23:27.618141
All tasks completed
从代码运行结果可以看到,限定资源数量为2之后,线程3、4是等到线程1、2结束才开始执行的。
线程通信指的是线程之间传递信息和协调行为的机制。线程通信可以通过多种方式实现,包括队列、事件、条件变量、信号量等。这里仅介绍事件和的条件变量。
事件(Event)是一种用于线程间通信和同步的机制。事件对象允许一个或多个线程等待某个事件的发生,并在事件发生时被唤醒。threading
类中的Event
类主要包括以下几个方法:
其用法举例如下:
import threading import time def waiter(event): print("Waiter: Waiting for the event to be set...") event.wait() # 等待事件被设置为True print("Waiter: Event has been set, proceeding...") def setter(event): time.sleep(3) # 模拟一些工作 print("Setter: Setting the event") event.set() # 将事件设置为True # 创建事件对象 event = threading.Event() # 创建并启动线程 waiter_thread = threading.Thread(target=waiter, args=(event,)) setter_thread = threading.Thread(target=setter, args=(event,)) waiter_thread.start() setter_thread.start() waiter_thread.join() setter_thread.join()
其执行结果如下:
Waiter: Waiting for the event to be set...
Setter: Setting the event
Waiter: Event has been set, proceeding...
条件变量(Condition)允许一个线程等待特定条件的发生,并在条件满足时通知其他线程。它通常与锁一起使用。threading
类中的Condition
类一般包括以下方法:
其具体用法举例如下:
import threading import time import random condition = threading.Condition() queue = [] class Producer(threading.Thread): def run(self): global queue while True: item = random.randint(1, 10) with condition: queue.append(item) print(f"Produced {item}") condition.notify() # 唤醒消费者线程 time.sleep(random.random()) class Consumer(threading.Thread): def run(self): global queue while True: with condition: while not queue: condition.wait() # 等待生产者线程的通知 item = queue.pop(0) print(f"Consumed {item}") time.sleep(random.random()) # 创建并启动线程 producer = Producer() consumer = Consumer() producer.start() consumer.start() producer.join() consumer.join()
目前,Python中会导致线程释放GIL的操作主要包括以下几种:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。