赞
踩
PyQt5应用程序可以利用APScheduler任务调度库处理定期或周期性的调度任务,在调度任务执行的过程中,往往希望调度任务能够与应用程序窗口进行交互,比如实现将调度任务的处理结果或者某些信息发送给主窗口或者子窗口进行界面显示或者弹窗提示,本文详细介绍实现该类功能需求的一种方法。
1、关于调度器的选择:
APScheduler中的QtScheduler和BackgroundScheduler都是后台线程调度器,两者的主要区别在于使用的底层库不同。
2、关于作业/任务(job)序列化的问题:
根据以下APScheduler官方文档的原文描述:
It is important to note that if you use an executor or job store that serializes the job, it will add a couple requirements on your job:
The target callable must be globally accessible
Any arguments to the callable must be serializable
Of the builtin job stores, only MemoryJobStore doesn’t serialize jobs. Of the builtin executors, only ProcessPoolExecutor will serialize jobs.
可知,当使用了序列化任务的执行器(executor)或者持久化存储(job store)时,必须满足两个条件:
而APScheduler库内置的任务存储器(job store)仅MemoryJobStore不对作业任务进行序列化,内置的执行器(executor)仅ProcessPoolExecutor会对作业任务进行序列化。
为此,当对作业/任务进行持久化存储时,由于窗口对象无法被序列化,无法将窗口对象通过可调用参数的方式传递给调度任务函数,否则会报类似TypeError: can't pickle xxx objects
的错误,而我们一般不应该在调度任务中去创建窗口对象,因为大多数情况下在调度器启动之前或任务执行之前应用程序主窗口对象就已经存在了。此外,在添加调度任务时,如果以普通对象方法的方式定义调度任务,由于对象无法被序列化,也无法通过对象去引用任务函数(obj.func
)方式添加调度任务,否则,同样会报TypeError错误。如果无需对作业任务进行持久化存储,即采用MemoryJobStore且使用ThreadPoolExecutor执行器,则无需对对象进行序列化操作,因为所有数据都保存在内存中,以上操作不会报错。
根据以上难点分析,如果确实需要序列化任务,如使用SQLAlchemyJobStore对任务进行存储,以便实现程序重启后未执行的调度任务可以接着执行,同时可通过信号与槽的方式在任务处理过程中给主窗口发送消息,解决的办法是确保信号实例所属对象的全局可访问,以下示例项目代码描述了具体的实现过程:
# -*- coding: utf-8 -*-
# import config
from config import Config
def job():
"""
调度任务
:return:
"""
Config.SCHEDULER_TASK_SIGNALS.task_signal.emit("任务执行中...")
# 执行任务逻辑
pass
# -*- coding: utf-8 -*- from PyQt5.QtCore import QObject, pyqtSignal class SchedulerTaskSignals(QObject): """ 自定义调度任务中可能使用的各种自定义信号集合类,将该类定义在配置模块中,防止出现循环引用错误 自定义的信号只能在QObject类型的对象上定义为类成员,在此通过继承QObject类间接实现各种信号的创建 """ # 根据实际需求自定义各种信号 finished = pyqtSignal(str) # 调度任务已完成 task_signal = pyqtSignal(str) # 调度任务消息 # 方式一、config模块文件中实例化一个SchedulerTaskSignals对象,其它地方调用该对象 # 要实现在其它模块文件中调用同一个该对象,则在模块文件中先:import config,随后采用config.SCHEDULER_TASK_SIGNALS方式调用 # SCHEDULER_TASK_SIGNALS = SchedulerTaskSignals() # 方式二、config模块文件中以配置类属性的方式实例化一个SchedulerTaskSignals对象,其它地方通过配置类来调用该对象 class Config: # 应用程序启动时实例化一个SchedulerTaskSignals对象,用于调度任务中调用,放到配置类中定义的目的是为了全局可调用同一个对象 # apscheduler调度任务如果采用持久化存储时无法序列化对象,无法将对象进行传参数,只能全局调用 SCHEDULER_TASK_SIGNALS = SchedulerTaskSignals()
# -*- coding: utf-8 -*- # import config import sys from PyQt5.QtWidgets import QApplication, QMainWindow, QMessageBox from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.schedulers.qt import QtScheduler from config import Config class MyWindow(QMainWindow): def __init__(self): super().__init__() # 任务信号事件注册 Config.SCHEDULER_TASK_SIGNALS.task_signal.connect(self.process_task_msg_slot) def process_task_msg_slot(self, msg): """ 在此处理接收到的调度任务信号 :return: """ print(f"主窗口接收到任务消息: {msg}") # 在消息框中显示消息 self.msg_box = QMessageBox() self.msg_box.setText(msg) self.msg_box.show() if __name__ == '__main__': app = QApplication(sys.argv) jobstores = { 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite'), 'memory': MemoryJobStore() } executors = { 'default': ThreadPoolExecutor(10), # 多线程,可指定线程数 'processpool': ProcessPoolExecutor(5) # 多进程,可指定进程数,负载为CPU密集型操作时可考虑使用它来利用多核CPU } job_defaults = { 'coalesce': True, # 是否合并任务,只运行一次 'max_instances': 3, # 并发执行数 'misfire_grace_time': 300, # 设置300秒的任务超时容错, } # 在此采用QT事件循环中运行的调度器 scheduler = QtScheduler( timezone='Asia/Shanghai', # 指定时区 jobstores=jobstores, executors=executors, job_defaults=job_defaults ) my_window = MyWindow() scheduler.add_job( func='utils.tasks:job', id='job-1', trigger='interval', seconds=5, replace_existing=True ) scheduler.start() my_window.show() sys.exit(app.exec_())
以上介绍了PyQt5与APScheduler集成进行任务调度过程中关于如何利用Qt信号与槽机制实现调度任务与窗口对象之间的消息传递,其关键在于绕过调度任务可调用参数进行对象传参的方式实现任务的定义和添加以及如何确保在任务中的消息对象全局可调用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。