赞
踩
Python的threading模块是Python标准库中用于多线程编程的模块。它提供了一个Thread类,可以用于创建和管理线程。
import threading
import time
class MyThread(threading.Thread):
def run(self):
print('start working')
time.sleep(1)
print('end work')
my_thread = MyThread()
my_thread.start()
my_thread.join()
import threading
import time
def run():
print('start working')
time.sleep(1)
print('end work')
my_thread = threading.Thread(target=run)
my_thread.start()
my_thread.join()
threading.Lock 是 Python 的 threading 模块中的一个线程同步原语。。
Lock 有两种状态:locked 和 unlocked,acquire() 方法用于获取锁,如果锁是空闲的则立即返回,如果锁已经被其它线程占用了则阻塞等待,release() 方法用于释放锁,唤醒等待该锁的线程。
import threading lock = threading.Lock() def my_function(): lock.acquire() try: # 需要保护的代码 print("Hello from thread") finally: lock.release() thread = threading.Thread(target=my_function) thread.start()
此处RLock
并非读写锁,表示重入锁,同一个线程可以多次获取锁。
对比Lock
的示例代码,RLock
经常使用在一个API既需要内部使用,又需要开放外部访问保证线程安全时特别有用。
import threading class Fruits: list: [str] = [] lock: threading.RLock = threading.RLock() def add(self, fruit: str): with self.lock: index = self.get_index(fruit) if index == -1: # 存在更新 self.list.append(fruit) def update(self, old: str, new: str): with self.lock: index = self.get_index(old) if index != -1: # 存在更新 self.list[index] = new def get_index(self, fruit: str) -> int: with self.lock: for index in range(len(self.list)): if self.list[index] == fruit: return index return -1 fruits = Fruits() fruits.add('Banana') fruits.update('Banana', 'Apple') print(fruits.get_index('Banana')) print(fruits.get_index('Apple'))
threading.Timer是threading.Thread的一个派生类,用于在指定的时间后执行一个函数。
import threading
import datetime
def work():
print(f'hello {datetime.datetime.now()}')
timer = threading.Timer(2, work)
# 2秒过后调用work
timer.start()
# timer.cancel()
threading.Semaphore是线程同步原语的一种,用于限制对共享资源的并发访问。
常用方法
semaphore = threading.Semaphore(value=2) # 总资源数量2
semaphore.acquire() # 请求资源
semaphore.release(n=1) # 释放资源
使用示例:中间件
import threading # 令牌桶,网站同时只允许1000人访问 tokenBucket = threading.Semaphore(value=1000) class HTTPRequest: pass def abort(req: HTTPRequest): pass def next_step(req: HTTPRequest): pass # 模拟HTTP中间件 def middleware(req: HTTPRequest): b: bool = False try: # 获取令牌 b = tokenBucket.acquire(blocking=False) if not b: # 失败,中止访问 abort(req) # 成功,继续下一步 next_step(req) finally: if b: # 归还令牌 tokenBucket.release()
与threading.Semaphore
不同的是,资源数量被限制不能超过初始资源数量。
import threading
semaphore = threading.BoundedSemaphore(value=10)
semaphore.acquire()
semaphore.release()
# Exception: Semaphore released too many times
semaphore.release()
threading.Barrier
是 Python 中 threading
模块中的同步原语,它用于在多个线程中进行同步,确保这些线程在达到指定的屏障点之前都会被阻塞,然后在所有线程都到达屏障点后同时继续执行。
threading.Barrier
适用于需要所有线程到达某个点之后再继续执行的场景,比如等待所有线程完成一定阶段的工作后再进行下一阶段的操作。
threading.Barrier
的常用方法是:
__init__(parties, action=None)
: 创建一个 Barrier 对象。parties
参数指定需要等待的线程数量,当有 parties
个线程都调用 wait()
方法后,所有线程将同时释放并继续执行。可选的 action
参数可以指定一个回调函数,当所有线程释放后,此回调函数将在释放线程中的一个线程中执行。wait(timeout=None)
: 阻塞线程,直到所有参与线程都调用了 wait()
方法并达到屏障点。可选的 timeout
参数用于设置等待的超时时间,如果超过此时间,线程将被解除阻塞。下面是 threading.Barrier
的一个简单示例:
import threading def worker(barrier, name): print(f"{name}: 执行任务前") barrier.wait() print(f"{name}: 执行任务后") # 创建 Barrier 对象,需要等待3个线程 barrier = threading.Barrier(3) # 创建3个工作线程 thread1 = threading.Thread(target=worker, args=(barrier, "线程1")) thread2 = threading.Thread(target=worker, args=(barrier, "线程2")) thread3 = threading.Thread(target=worker, args=(barrier, "线程3")) # 启动工作线程 thread1.start() thread2.start() thread3.start() # 等待所有线程完成 thread1.join() thread2.join() thread3.join() print("所有线程已完成。")
threading.Event
是 Python 中 threading
模块中的同步原语,它允许线程等待直到被其他线程设置为真的事件。它通常用于协调多个线程的活动,并促进它们之间的通信。
threading.Event
的主要作用是为线程提供一个简单的机制,使它们能够在特定条件满足之前暂停执行。与 threading.Event
相关的两个主要方法是:
set()
: 设置事件为真。正在使用 wait()
方法等待事件的线程将被释放,可以继续执行。
clear()
: 重置事件为假。随后调用 wait()
的线程将被阻塞,直到再次使用 set()
方法设置事件。
除了这两个方法之外,还有 wait(timeout=None)
方法。当线程调用 wait()
时,它将被阻塞,直到事件被设置或达到可选的 timeout
参数为止。如果提供了超时,并且事件在指定的时间内没有被设置,线程将继续执行,而不考虑事件状态。
下面是 threading.Event
的一个基本示例:
import random import threading import time event = threading.Event() runners = ['runner1', 'runner2', 'runner3'] threads = [] def start(name: str): print(f'{name} 准备就绪') event.wait() print(f'{name} 起跑') time.sleep(random.randint(1, 3)) print(f'{name} 到达终点') # 准备 for runner in runners: t = threading.Thread(target=start, args=(runner,)) t.start() threads.append(t) # 让所有跑步者都进入等待,此处处理并不优雅,但这里主要目的是为了演示event。 time.sleep(1) # 发令 event.set() for t in threads: t.join()
threading.Condition
是 Python 中 threading
模块中的同步原语,它用于在多个线程之间进行复杂的协调和通信。它提供了一个通用的条件变量,允许线程等待某个条件为真,或者在满足条件时通知其他等待的线程。
threading.Condition
主要用途是在多线程环境下实现线程间的协作,特别是用于生产者-消费者模式和线程间的消息传递。通过使用 threading.Condition
,我们可以让一个线程等待直到满足特定条件,然后另一个线程通知条件已经满足,从而实现线程间的同步。
threading.Condition
的常用方法有:
__init__(lock=None)
: 创建一个 Condition
对象。可选的 lock
参数指定一个锁对象,用于在内部同步条件的访问。如果不提供锁对象,Condition
会自动创建一个默认的锁对象。acquire()
: 获取底层关联的锁,用于保护共享资源或条件。release()
: 释放底层关联的锁。wait(timeout=None)
: 等待条件为真。调用此方法将释放关联的锁并阻塞线程,直到另一个线程调用 notify()
或 notify_all()
方法通知条件为真或超时。notify(n=1)
: 唤醒等待此条件的一个线程。默认情况下,唤醒一个等待的线程,如果指定 n
参数,将唤醒 n 个等待的线程。notify_all()
: 唤醒所有等待此条件的线程。下面是一个使用 threading.Condition
的简单示例:
import threading car_condition = threading.Condition() toll_collector_semaphore = threading.Semaphore(value=0) def waiting_for_release(name: str): print(f'{name} 到达,等待放行') # 记录当前有等待放行 toll_collector_semaphore.release() with car_condition: car_condition.wait() print(f'{name} 放行') def toll_collector(): while True: toll_collector_semaphore.acquire() with car_condition: # 放行一辆 car_condition.notify() cars = ['A', 'B', 'C', 'D'] # 启动收费员 threading.Thread(target=toll_collector).start() for car in cars: threading.Thread(target=waiting_for_release, args=(car,)).start()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。