当前位置:   article > 正文

python的flask框架里怎么起定时任务&执行异步任务_flask 定时任务

flask 定时任务

python的flask框架里怎么起定时任务&执行异步任务

在工作场景里,我一开始知道定时任务和异步,就是celery.后来起一个小项目,懒得装celery,才去找到了flask-apscheduler这个扩展.

比较下来flask-apscheduler肯定更小巧方便.

celery

介绍

中文文档:https://www.celerycn.io/

另一个:Celery中文文档

它是一个分布系统,用来处理批量的消息.

任务队列

任务队列一般用于线程或计算机之间分配工作的一种机制。

名词解释:

Broker:中间人

Worker:职程,其实就是做任务的东西

beat:如果你用celery做定时任务,需要用到beat

客户端向队列发送消息,Broker把消息传递给Worker,最后由Worker执行

我这边用的Broker时RabbitMQ

也就是说客户端有很多任务要执行,不能直接交给Worker来做,需要一个Broker在中间调节.

优点

主要还是灵活,它的自定义连接池序列化方式压缩方式日志记录方式任务调度生产者消费者中间人(Broker)等都可以自定义定义.

Broker

有RabbitMQ,Redis,Amazon SQS

我这边用的是RabbitMQ

安装RabbitMQ要注意,安装路径不能有中文,你的电脑,用户,起名千万不要起中文名.不然就会和我一样,重装.或者永远不装RabbitMQ.

还有就是安装RabbitMQ要装erlang,要注意好erlang和RabbitMQ的版本对应.

启动就是打开文件夹->sbin->server.bat就启动了

使用方法

当你需要异步执行一个任务时:

可以初始化一个tasks

  1. app = Celery('tasks', broker='amqp://guest@localhost//')  # 指定broker为rabbitmq
  2. @app.task()
  3. def print_task(x, y):  # 一个任务
  4.    print(x + y)

启动命令:(打在terminal)

  1. celery -A tasks worker -l INFO
  2. celery -A tasks worker -l INFO -P solo -c 2
  3. # 这个的意思是启动tasks示例 打印info日志 -P solo是pool solo执行池指定为solo -c 2是2个进程
  4. # -P / --pool prefork(默认使用)、solo、eventlet、gevent
  1. # 参数
  2. # -A / --app 要使用的应用程序实例
  3. # -n / --hostname 设置自定义主机名
  4. # -Q / --queues 指定一个消息队列,该进程只接受此队列的任务
  5. # –max-tasks-per-child 配置工作单元子进程在被一个新进程取代之前可以执行的最大任务数量
  6. # –max-memory-per-child 设置工作单元子进程被替换之前可以使用的最大内存
  7. # -l / --loglevel 定义打印log的等级 DEBUG, INFO, WARNING, ERROR, CRITICAL, FATAL
  8. # –autoscale 池的进程的最大数量和最小数量
  9. # -c / --concurrency 同时处理任务的工作进程数量,默认值是系统上可用的cpu数量
  10. # -B / --beat 定义运行celery打周期任务调度程序
  11. # -h / --help help!help!help!
  12. # 也就是说这一行的意思是 启动tasks这个实例,打印info日志
  13. # control + c可以退出

此时就可以异步执行print_task这个方法了,只需要

  1. print_task.delay(x, y)
  2. # 或者
  3. print_task.apply_async([x, y])
  4. delay和apply_async的应用差不多,是apply_async的简便版.
  5. # .delay(_args, *_kwargs)等价于调用 .apply_async(args, kwargs)
  6. task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
  7. # 等同于
  8. task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
  9. # 但是delay不支持其他参数,apply_async支持很多
  10. task.apply_async(countdown=10)  # 现在开始10s后执行
  11. task.apply_async(eta=now + timedelta(seconds=10))  # 10s后 指明eta
  12. task.apply_async(countdown=60, expires=120)  # 60s后, 120s后过期
  13. task.apply_async(expires=now + timedelta(days=2))  # 指定2天后过期

定时任务

要启用beat

celery -A tasks beat 

代码:

  1. @app.on_after_configure.connect
  2. def setup_periodic_tasks(sender, **kwargs):
  3.    # 每天早上7点00 执行print_task 参数为1, 2
  4.    sender.add_periodic_task(crontab(hour="7", minute="0"), print_task.s(1, 2),
  5.                             name='print_task')
  6. # minute="*/5" 每五分钟

感受

用下来感觉功能非常全面,但是启动真的很麻烦.

我要先打开rabbitmq

再打开监听

再打开beat

再写好任务

如果定时任务要等待时间

如果是异步我要delay

而且要装一堆东西

所以我查到了下面这个

flask-apscheduler

安装

pip install flask-apscheduler

介绍

适配于Flask框架 是APScheduler扩展 支持flask配置类加载定时任务调度器配置和任务配置明细 提供内部restful风格API监控管理定时任务、认证机制、host白名单访问机制 高度与flask蓝图集成

  1. """add_job(func, trigger=None, args=None, kwargs=None, id=None, \
  2.           name=None, misfire_grace_time=undefined, coalesce=undefined, \
  3.           max_instances=undefined, next_run_time=undefined, \
  4.           jobstore='default', executor='default', \
  5.           replace_existing=False, **trigger_args)
  6.       Adds the given job to the job list and wakes up the scheduler if it's already running.
  7.       Any option that defaults to ``undefined`` will be replaced with the corresponding default
  8.       value when the job is scheduled (which happens when the scheduler is started, or
  9.       immediately if the scheduler is already running).
  10.       The ``func`` argument can be given either as a callable object or a textual reference in
  11.       the ``package.module:some.object`` format, where the first half (separated by ``:``) is an
  12.       importable module and the second half is a reference to the callable object, relative to
  13.       the module.
  14.       The ``trigger`` argument can either be:
  15.         #. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case
  16.           any extra keyword arguments to this method are passed on to the trigger's constructor
  17.         #. an instance of a trigger class
  18.       :param func: callable (or a textual reference to one) to run at the given time
  19.       :param str|apscheduler.triggers.base.BaseTrigger trigger: trigger that determines when
  20.           ``func`` is called
  21.       :param list|tuple args: list of positional arguments to call func with
  22.       :param dict kwargs: dict of keyword arguments to call func with
  23.       :param str|unicode id: explicit identifier for the job (for modifying it later)
  24.       :param str|unicode name: textual description of the job
  25.       :param int misfire_grace_time: seconds after the designated runtime that the job is still
  26.           allowed to be run (or ``None`` to allow the job to run no matter how late it is)
  27.       :param bool coalesce: run once instead of many times if the scheduler determines that the
  28.           job should be run more than once in succession
  29.       :param int max_instances: maximum number of concurrently running instances allowed for this
  30.           job
  31.       :param datetime next_run_time: when to first run the job, regardless of the trigger (pass
  32.           ``None`` to add the job as paused)
  33.       :param str|unicode jobstore: alias of the job store to store the job in
  34.       :param str|unicode executor: alias of the executor to run the job with
  35.       :param bool replace_existing: ``True`` to replace an existing job with the same ``id``
  36.           (but retain the number of runs from the existing one)
  37.       :rtype: Job
  38.       """
  39.    """
  40. 将给定的任务添加到任务列表中,并在调度程序已在运行时唤醒它。
  41. 当调度作业时,任何默认为“未定义”的选项都将替换为相应的默认值(当调度程序启动时,或当调度程序已在运行时立即发生)。
  42. “func”参数可以作为中的可调用对象或文本引用给出
  43. `package.module:some.object`格式,其中前半部分(用`:``分隔)是可导入模块,后半部分是对可调用对象的引用,相对于模块。
  44. “触发器”参数可以是:
  45. #.触发器的别名(例如“date”、“interval”或“cron”),在这种情况下
  46. 该方法的任何额外关键字参数都会传递给触发器的构造函数
  47. #.触发器类的实例
  48. :param func:可调用(或对函数的文本引用)以在给定时间运行
  49. :param str | apscheduler.triggers.base.BaseTrigger trigger:确定何时的触发器
  50. ``调用了func ``
  51. :param list | tuple args:用于调用func的位置参数列表
  52. :param dict kwargs:用于调用func的关键字实参的dict
  53. :param str | unicode id:作业的显式标识符(用于以后修改)
  54. :param str | unicode name:作业的文本描述
  55. :param int miscelle_grace_time:指定运行时后作业仍然存在的秒数 允许运行(或“无”以允许作业无论多晚都能运行)
  56. :param bool coalize:如果调度程序确定作业应连续运行多次
  57. :param int max_instances:允许同时运行的最大实例数
  58. 工作
  59. :param datetime next_run_time:第一次运行作业的时间,与触发器无关(通过
  60. ``无``以将作业添加为暂停)
  61. :param str | unicode jobstore:用于存储作业的作业存储的别名
  62. :param str | unicode executor:用于运行作业的执行器的别名
  63. :param bool replace_existing:`True``用相同的``id替换现有作业``
  64. (但保留现有执行次数)
  65. :r类型:作业"""
  1. # func 要执行的函数
  2. # trigger 触发方式 (1)date: 特定的时间点触发(2)interval: 固定时间间隔触发(3)cron: 在特定时间周期性地触发
  3. # args 参数 tuple
  4. # kwargs 参数 dict
  5. # id 唯一值 unicode
  6. # name 名字
  7. # misfire_grace_time 延迟执行时间 int 空的话代表永久 例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致21:00:31才执行,如果设置40,则该job会继续执行
  8. # coalesce Job是否合并执行 例如scheduler停止20s后重新启动,而job的触发器设置为5s执行一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行 bool
  9. # max_instances 此job允许同时运行的最大实例数 int
  10. # next_run_time 什么时候第一次运行 不考虑trigger datetime
  11. # jobstore 任务存储 str|unicode 存储被调度的任务,默认的任务存储是保存在内存中。同时支持任务存储在数据库中,一个任务的数据将在保存到持久化作业存储时被序列化,并在加载时被反序列化。
  12. # executor 执行器 str|unicode job创建时设置执行器的名字,根据执行器的名字发送给scheduler获取到执行此job的执行器,执行job指定的函数
  13. # replace_existing true:替换已经有的同样id的作业 bool

使用

  1. from apscheduler.schedulers.background import BackgroundScheduler
  2. def my_task():
  3.    pass
  4. scheduler = BackgroundScheduler()  # 初始化
  5. scheduler.add_job(my_task, trigger='interval', id='my_task', minutes=1)  # 意思就是一分钟执行一次my_task
  6. scheduler.add_job(my_task, trigger='interval', id='my_task', minutes=1, start_date='2024-01-01 00:00:00' , end_date='2025-01-01 00:00:00')  # 意思是在2024-01-01 00:00:00 ~ 2025-01-01 00:00:00时间内 每一分钟执行一次my_task
  7. scheduler.start()

感受

方便 装个包就行 但是功能不齐全 没有测过大的并发量下的执行情况

一开始只是为了一个小项目

还有一个注意事项,如果debug=True的话,任务会执行两次的

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/670960
推荐阅读
相关标签
  

闽ICP备14008679号