赞
踩
APScheduler集成Flask项目是一个系列文章
,它不是一篇介绍工具的文章,他是在介绍开发API接口项目工程中使用APScheduler解决实际定时执行任务需求。
为什么用一个系列博客介绍APScheduler模块?
Flask_APScheduler与APScheduler区别
Flask将APScheduler模块作为一个扩展库添加到了他的框架中,本身还是使用APScheduler完成任务调度。Flask对APScheduler的使用提供了更加完善的接口,符合Flask使用习惯。
APScheduler
模块由四个部分组成,他们各尽其职,通过调度器协调工作。
任务是被执行对象,通过调用器提供的接口可以添加、修改、删除任务。
创建任务信息如下
id
:指定任务的唯一IDname
:指定任务的名字trigger
:apscheduler定义的触发器,用于确定Job的执行时间,根据设置的trigger规则,计算得到下次执行此job的executor
:apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此max_instances
:执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数next_run_time
:Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触misfire_grace_time
:Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致coalesce
:Job是否合并执行,是一个bool值。例如scheduler停止20s后重新启动,而job的触发器设置为5s执行func
:Job执行的函数args
:Job执行函数需要的位置参数kwargs
:Job执行函数需要的关键字参数决定任务运行条件,每个任务都可以配置自己的触发器,用于决定接下来哪一个作业会运行。除了它们自己初始配置以外,触发器完全是无状态的。
APScheduler 有三种内建的 Trigger
:
APScheduler任务默认存储在内存中,如果应用重启已创建的任务就会消失。如果你的应用在每次启动的时候都会重新创建任务,那么使用默认的任务存储器(MemoryJobStore)即可,但是如果你需要应用重启后任务存储器中任然保留任务,你应该根据你的应用环境来选择具体的任务存储器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多数RDBMS)。
Executor执行器在scheduler中初始化,另外也可通过scheduler的add_executor动态添加Executor执行器。
每个executor都会绑定一个alias,这个作为唯一标识绑定到Job,在实际执行时会根据Job绑定的executor。找到实际的执行器对象,然后根据执行器对象执行Job。
Executor执行器有七种可根据业务需要选择
Scheduler是APScheduler的核心,任务调度器是属于整个调度的总指挥官。它会合理安排任务存储器、执行器、触发器进行工作,并进行添加、删除、暂停任务等。
下面通过一些示例介绍下不同类型的执行器和触发器使用,在以后开发项目环境中根据需求选择适合的执行器和触发器完成定时任务调度。
pip install flask_apscheduler
# 如果使用pipenv管理环境,使用pipenv安装
pipenv install flask_apscheduler
BlockingScheduler : Blocking解释为阻塞,该调度器在当前进程的主线程中运行,调用start函数后会阻塞当前线程。当调度器是你应用中唯一要运行的东西时使用。
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def main_thread():
# 主线程
while (True):
print('主线程每隔1s运行一次')
time.sleep(1)
def job():
print('定时任务每隔3s运行一次')
if __name__ == '__main__':
# 创建后台执行器类型调度器
sched = BlockingScheduler()
# 添加任务
sched.add_job(job, 'interval', id='3_second_job', seconds=3)
# 启动调度器
sched.start()
# 启动主线程
main_thread()
BlockingScheduler调用start函数后会阻塞应用的当前线程,导致main_thread()
函数不会运行。这里用main_thread()
函数表示我们应用主线程。
定时任务每隔3s运行一次
定时任务每隔3s运行一次
定时任务每隔3s运行一次
BackgroundScheduler : Background解释为后台,该调度器在后台线程中运行,不会阻塞当前线程。
from apscheduler.schedulers.background import BackgroundScheduler
import time
def main_thread():
# 主线程
while (True):
print('主线程每隔1s运行一次')
time.sleep(1)
def job():
print('定时任务每隔3s运行一次')
if __name__ == '__main__':
# 创建后台执行器类型调度器
sched = BackgroundScheduler()
# 添加任务
sched.add_job(job, 'interval', id='3_second_job', seconds=3)
# 启动调度器
sched.start()
# 启动主线程
main_thread()
BackgroundScheduler调用start函数后并不会阻塞当前线程,所以可以继续执行主程序中main_thread()
函数
主线程每隔1s运行一次
主线程每隔1s运行一次
主线程每隔1s运行一次
定时任务每隔3s运行一次
主线程每隔1s运行一次
主线程每隔1s运行一次
主线程每隔1s运行一次
定时任务每隔3s运行一次
date: 特定的时间点只触发一次,例如为发送右键设置一个定时发送任务
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def job():
print('date触发器运行任务')
if __name__ == '__main__':
# 创建后台执行器类型调度器
sched = BlockingScheduler()
# 添加任务,到达指定时间才会运行且只运行1次
sched.add_job(func=job, trigger='date', run_date=datetime.datetime(2023 ,2 , 21, 10, 43, 30), id='3_second_job')
# 或者用next_run_time指定运行时间
# sched.add_job(func=job, trigger='date', next_run_time=datetime.datetime(2023, 2, 21, 10, 43, 30), id='3_second_job')
# 启动调度器
sched.start()
运行代码后,没有立即执行等到达设置时间后才会运行,且运行了一次。
date触发器运行任务
interval: 每次间隔一段实际周期性的触发,例如每隔1小时提醒喝水。
interval可以设置的参数:
from apscheduler.schedulers.blocking import BlockingScheduler
def job():
print('interval间隔性循环触发器运行任务')
if __name__ == '__main__':
# 创建后台执行器类型调度器
sched = BlockingScheduler()
# 添加任务,循环运行
sched.add_job(func=job, trigger='interval', seconds=3 , id='3_second_job')
# 启动调度器
sched.start()
设置触发器每隔3秒运行一次任务
interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务
crontab触发器:在特定时间周期性地触发,如每天,周循环等场景,它是功能最强大的触发器。
参数范围只适用于int类型,str类型有无限可能
from apscheduler.schedulers.blocking import BlockingScheduler
def job():
print('cron特定时间周期性运行任务')
# 将cron表达式转换为 秒、分、周、月、年格式,传给cron触发器
def change_cron(expression):
args = {}
# 以空格为分隔符拆分字符串输出列表,拆分结果 ['0/2', '*', '*', '*', '*', '?']
expression = expression.split(' ')
if expression[0] != '?':
args['second'] = expression[0]
if expression[1] != '?':
args['minute'] = expression[1]
if expression[2] != '?':
args['hour'] = expression[2]
if expression[3] != '?':
args['day'] = expression[3]
if expression[4] != '?':
args['month'] = expression[4]
if expression[5] != '?':
args['day_of_week'] = expression[5]
return args
if __name__ == '__main__':
# 创建后台执行器类型调度器
sched = BlockingScheduler()
# 添加任务,循环运行
# 每周一早晨9点30分执行func任务
sched.add_job(func=job, trigger="cron", day_of_week=0, hour=9, minute=30)
# 间隔1分钟执行一次,与interval触发器使用功能相同
sched.add_job(func=job, trigger="cron", minute="*/1")
# 如果要接收cron表达式,需要对格式做个转换。通过change_cron函数转换格式。
sched.add_job(func=job, trigger="cron", **change_cron('0/2 * * * * ?'))
# 启动调度器
sched.start()
使用cron触发器,设置运行时间默认接收关键字类型参数,例如day_of_week=0, hour=9, minute=30。如果要接收一个cron表达式,需要通过一个函数将它转为关键字参数类型。上面的示例中change_cron()
函数将corn表达式0/2 * * * * ?
转为关键字参数。
cron特定时间周期性运行任务
cron特定时间周期性运行任务
cron特定时间周期性运行任务
存储被调度的任务,默认的任务存储是保存在内存中。同时支持任务存储在数据库中,一个任务的数据将在保存到持久化作业存储时被序列化,并在加载时被反序列化。
使用调度器调用add_job()
方法添加任务
from apscheduler.schedulers.blocking import BlockingScheduler
def job():
print('interval间隔性循环触发器运行任务')
if __name__ == '__main__':
# 创建后台执行器类型调度器
sched = BlockingScheduler()
# 添加任务,循环运行
sched.add_job(func=job, trigger='interval', seconds=3 , id='3_second_job')
# 启动调度器
sched.start()
通过主线程运行edit_job()
函数删除job任务,是任务停止运行。
import time
from apscheduler.schedulers.background import BackgroundScheduler
def job():
print('interval间隔性循环触发器运行任务')
def edit_job():
time.sleep(7)
sched.remove_job(job_id='3_second_job')
print('删除job任务')
if __name__ == '__main__':
# 创建后台执行器类型调度器
sched = BackgroundScheduler()
# 添加任务,循环运行
sched.add_job(func=job, trigger='interval', seconds=3 , id='3_second_job')
# 启动调度器
sched.start()
# 启动主线程
edit_job()
job循环运行了两次,运行删除任务后,任务不再运行。
interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务
删除job任务
import time
from apscheduler.schedulers.background import BackgroundScheduler
def job():
print('interval间隔性循环触发器运行任务')
def my_pause_job():
time.sleep(7)
sched.pause_job(job_id='3_second_job')
print('暂停job任务')
def my_resume_job():
time.sleep(1)
sched.resume_job(job_id='3_second_job')
print('恢复job任务')
time.sleep(4)
if __name__ == '__main__':
# 创建后台执行器类型调度器
sched = BackgroundScheduler()
# 添加任务,循环运行
sched.add_job(func=job, trigger='interval', seconds=3 , id='3_second_job')
# 启动调度器
sched.start()
# 启动主线程
my_pause_job()
my_resume_job()
恢复任务后又重新开始运行
interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务
暂停job任务
恢复job任务
interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务
修改任务可以有两个函数,modify_job()
和 reschedule_job()
,他们区别如下
modify_job()
函数修改任务时需要我们自己通过调度器创建一个触发器sched._create_trigger()
然后用这个触发器替换任务中原有触发器信息。reschedule_job()
函数帮我们做了创建触发器的工作,然后调用的modify_job()
函数修改任务,因此使用这个函数修改任务操作更加方便。modify_job()函数修改任务示例
import time
from apscheduler.schedulers.background import BackgroundScheduler
# 定时器运行的任务
def job():
print('interval间隔性循环触发器运行任务')
# 修改任务信息
def edit_job():
temp_dict = {"seconds": 1}
# 创建一个触发器
temp_trigger = sched._create_trigger(trigger='interval', trigger_args=temp_dict)
# 修改job信息
sched.modify_job(job_id='3_second_job', trigger=temp_trigger)
print('修改任务每隔1s运行一次')
time.sleep(9)
if __name__ == '__main__':
# 创建后台执行器类型调度器
sched = BackgroundScheduler()
# 添加任务,循环运行
sched.add_job(func=job, trigger='interval', seconds=3, id='3_second_job')
# 启动调度器
sched.start()
# 启动主线程
edit_job()
通过创建的触发器修改任务后由3秒运行一次变为1秒运行一次
修改任务每隔1s运行一次
interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务
reschedule_job函数修改任务示例
import time
from apscheduler.schedulers.background import BackgroundScheduler
def job():
print('interval间隔性循环触发器运行任务')
def edit_reschedule_job():
param = {"trigger": "interval", "seconds": 1}
sched.reschedule_job(job_id='3_second_job', **param)
print('修改任务每隔1s运行一次')
time.sleep(9)
if __name__ == '__main__':
# 创建后台执行器类型调度器
sched = BackgroundScheduler()
# 添加任务,循环运行
sched.add_job(func=job, trigger='interval', seconds=3, id='3_second_job')
# 启动调度器
sched.start()
# 启动主线程
edit_reschedule_job()
reschedule_job函数修改任务更加简单,不需要单独创建触发器,直接传入修改的参数即可。
修改任务每隔1s运行一次
interval间隔性循环触发器运行任务
interval间隔性循环触发器运行任务
reschedule_job函数源码
def reschedule_job(self, job_id, jobstore=None, trigger=None, **trigger_args):
"""
Constructs a new trigger for a job and updates its next run time.
Extra keyword arguments are passed directly to the trigger's constructor.
:param str|unicode job_id: the identifier of the job
:param str|unicode jobstore: alias of the job store that contains the job
:param trigger: alias of the trigger type or a trigger instance
:return Job: the relevant job instance
"""
# 创建一个触发器
trigger = self._create_trigger(trigger, trigger_args)
# 获取当前时间
now = datetime.now(self.timezone)
# 获取下次运行时间
next_run_time = trigger.get_next_fire_time(None, now)
# 调用modify_job函数修改任务
return self.modify_job(job_id, jobstore, trigger=trigger, next_run_time=next_run_time)
默认情况下,调度器会先把正在执行的任务处理完,再关闭任务储存器和执行器。但是,如果你就直接关闭,你可以添加参数:
scheduler.shutdown(wait=False)不管有没有任务在执行,会强制关闭调度器。
import time
from apscheduler.schedulers.background import BackgroundScheduler
def job():
print('interval间隔性循环触发器运行任务')
def my_get_job():
jobinfo = sched.get_job(job_id='3_second_job')
jobs = sched.get_jobs()
print(f'获取指定任务信息: {jobinfo}')
print(f'获取所有任务信息: {jobs}')
time.sleep(4)
if __name__ == '__main__':
# 创建后台执行器类型调度器
sched = BackgroundScheduler()
# 添加任务,循环运行
sched.add_job(func=job, trigger='interval', seconds=3, id='3_second_job')
# 启动调度器
sched.start()
# 启动主线程
my_get_job()
APScheduler默认将任务存储在了内存中,我们应用重启后任务就会失效。将任务存入数据库可以实现任务持久化
配置Mongodb数据库为存储任务源
from datetime import datetime
from pymongo import MongoClient
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# MongoDB 参数
host = '127.0.0.1'
port = 27017
client = MongoClient(host, port)
# 输出时间
def job():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 存储方式
jobstores = {
'mongo': MongoDBJobStore(collection='job', database='test', client=client),
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(10),
'processpool': ProcessPoolExecutor(3)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
# 初始化调度器
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(job, 'interval', seconds=5, jobstore='mongo')
scheduler.start()
可以为 scheduler 绑定事件监听器(event listen)。Scheduler 事件在某些情况下会被触发,而且它可能携带有关特定事件的细节信息。为add_listener()函数提供适当的掩码参数(mask argument)或者是将不同的常数组合到一起,可以监听特定类型的事件。可调用的listener可以通过event object作为参数而被调用。
事件 | 对应枚举值 | 描述 | 归属类 |
---|---|---|---|
EVENT_SCHEDULER_STARTED | 1 | 调度程序启动 | SchedulerEvent |
EVENT_SCHEDULER_SHUTDOWN | 2 | 调度程序关闭 | SchedulerEvent |
EVENT_SCHEDULER_PAUSED | 4 | 调度程序中任务处理暂停 | SchedulerEvent |
EVENT_SCHEDULER_RESUMED | 8 | 调度程序中任务处理恢复 | SchedulerEvent |
EVENT_EXECUTOR_ADDED | 16 | 将执行器添加到调度程序中 | SchedulerEvent |
EVENT_EXECUTOR_REMOVED | 32 | 执行器从调度程序中删除 | SchedulerEvent |
EVENT_JOBSTORE_ADDED | 64 | 将任务存储添加到调度程序中 | SchedulerEvent |
EVENT_JOBSTORE_REMOVED | 128 | 任务存储从调度程序中删除 | SchedulerEvent |
EVENT_ALL_JOBS_REMOVED | 256 | 所有任务从所有任务存储中删除或从一个特定的任务存储中删除 | SchedulerEvent |
EVENT_JOB_ADDED | 512 | 任务添加到任务存储中 | JobEvent |
EVENT_JOB_REMOVED | 1024 | 从任务存储中删除了任务 | JobEvent |
EVENT_JOB_MODIFIED | 2048 | 从调度程序外部修改了任务 | JobEvent |
EVENT_JOB_EXECUTED | 4096 | 任务被成功执行 | JobExecutionEvent |
EVENT_JOB_ERROR | 8192 | 任务在执行期间引发异常 | JobExecutionEvent |
EVENT_JOB_MISSED | 16384 | 错过了任务执行 | JobExecutionEvent |
EVENT_JOB_SUBMITTED | 32768 | 任务已经提交到执行器中执行 | JobSubmissionEvent |
EVENT_JOB_MAX_INSTANCES | 65536 | 任务因为达到最大并发执行时,触发的事件 | JobSubmissionEvent |
EVENT_ALL | 包含以上的所有事件 |
# 导入调度器,此处使用BackgroundScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入事件类
from apscheduler.events import EVENT_ALL
# 导入日志记录器
from loguru import logger
import time
# 定时任务执行函数
def my_task():
logger.info("执行task任务")
# 事件监听函数
def my_listener(event):
match event.code:
case 4096:
logger.info("任务被成功执行")
case 32768:
logger.info("任务已经提交到执行器中执行")
case _:
logger.info(event.code)
if __name__ == '__main__':
try:
# 实例化调度器对象
scheduler = BackgroundScheduler()
# 添加定时任务,指定任务函数和触发器
my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=2))
logger.error("开始定时任务")
# 开始执行定时任务调度器
scheduler.start()
# 添加监听器
scheduler.add_listener(my_listener, mask=EVENT_ALL)
time.sleep(4)
except (KeyboardInterrupt, SystemExit):
logger.error("进程已结束运行")
该示例通过监听事件ID号,做出对应的操作。
2023-02-21 17:25:48.561 | ERROR | __main__:<module>:41 - 开始定时任务
2023-02-21 17:25:50.565 | INFO | __main__:my_task:21 - 执行task任务
2023-02-21 17:25:50.566 | INFO | __main__:my_listener:28 - 任务被成功执行
2023-02-21 17:25:50.566 | INFO | __main__:my_listener:30 - 任务已经提交到执行器中执行
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。