赞
踩
Python实战项目:实现一个基于多线程/多进程的任务调度系统
在本文中,我们将创建一个简单的任务调度系统,该系统能够使用多线程或多进程来执行一系列任务。我们将使用Python的标准库来实现这个系统,不依赖任何第三方库。
首先,让我们了解一下基础知识。
1.多线程与多进程
多线程和多进程是两种不同的并行处理方法。多线程是在单个进程内创建多个线程,共享进程的资源。而多进程则是创建多个独立的进程,每个进程有自己的内存空间和资源。
线程之间的通信和同步比较简单,因为它们共享进程的资源。相比之下,进程之间的通信和同步更加复杂,因为它们是独立的实体。
2.任务调度系统
任务调度系统是一种用于管理、调度和执行任务的软件。它可以自动或手动安排任务的执行顺序,并监控任务的执行状态。在本文中,我们将实现一个简单的任务调度系统,该系统可以接收任务列表,并使用多线程或多进程来执行这些任务。
项目实现
首先,我们需要一个任务类来描述每个任务。任务类应该包含任务的描述和其他相关信息。
class Task:
def __init__(self, description):
self.description = description
self.is_completed = False
def complete(self):
self.is_completed = True
接下来,我们定义任务调度器类。这个类将负责管理任务列表,并根据需要启动多线程或多进程来执行任务。
import threading class ThreadTaskScheduler: def __init__(self): self.tasks = [] self.threads = [] def add_task(self, task): self.tasks.append(task) def start(self): for task in self.tasks: thread = threading.Thread(target=task.complete) thread.start() self.threads.append(thread) for thread in self.threads: thread.join()
为了使用多进程,我们需要导入multiprocessing
模块。注意,由于GIL(全局解释器锁)的存在,多进程在执行CPU密集型任务时可能并不会提高效率。但对于IO密集型任务,多进程可以充分利用多核CPU的性能。
# 导入multiprocessing模块,用于创建和管理多进程 import multiprocessing # 导入queue模块,提供线程安全的队列实现,用于进程间通信 import queue # 定义一个名为ProcessTaskScheduler的类,用于管理多进程任务调度 class ProcessTaskScheduler: # 初始化方法,用于初始化类的实例 def __init__(self): # 存储添加的任务,初始为空列表 self.tasks = [] # 存储创建的进程,初始为空列表 self.processes = [] # 创建一个线程安全的队列对象,用于存储任务,可以在多个进程之间共享数据 self.task_queue = multiprocessing.Queue() # 添加任务方法,接受一个任务作为参数,将其添加到tasks列表中 def add_task(self, task): self.tasks.append(task) # 开始执行方法,启动所有添加的任务并执行它们 def start(self): # 定义一个内部函数worker,该函数将在每个进程中执行 def worker(): while True: # 从task_queue队列中获取任务 task = self.task_queue.get() # 如果任务是None,则跳出循环,结束工作进程 if task is None: break # 执行任务,假设每个任务都有一个complete()方法来执行任务 task.complete() # 根据CPU的核心数量创建相应数量的进程,每个进程执行worker函数 for _ in range(multiprocessing.cpu_count()): # 根据CPU核心数创建相应数量的进程 process = multiprocessing.Process(target=worker) # 创建一个新的进程,并将worker函数作为目标函数传递给该进程 process.start() # 启动进程 self.processes.append(process) # 将新进程添加到processes列表中 # 将所有任务添加到task_queue队列中,以便工作进程可以获取并执行它们 for task in self.tasks: self.task_queue.put(task) # 将None放入队列中,通知工作进程它们已经完成所有任务并应该退出循环 for _ in range(len(self.tasks)): # 将None放入队列,通知工作进程结束工作 self.task_queue.put(None) # 等待所有进程完成工作,join()方法将阻塞当前进程,直到该进程完成执行 for process in self.processes: # 等待所有进程结束工作 process.join() ``
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。