赞
踩
模板基于 schedule 库实现定时任务,使用前需先安装这个第三方库 pip install schedule
。 schedule的配置参考文档:https://schedule.readthedocs.io/en/stable/
转自 chat什么T:
优雅退出是指在程序运行过程中,以一种平稳、有序的方式终止程序的执行。这种退出方式可以确保程序在退出前完成必要的清理操作,释放资源,并在关闭过程中避免可能引发错误或数据损失的情况发生。优雅退出对于长时间运行的程序、服务器应用和后台任务尤其重要。通过使用适当的退出策略,可以保证程序在终止时不会留下脏数据、未完成的操作或资源泄漏等问题。
本脚本实现优雅退出均是指把已开始但未完成的任务执行完、 接收到SIGINT(Ctrl+C) 或 SIGTERM(发送的终止信号: kill -15
) 信号的退出。其中不包括 kill -9
的这种退出方式,因此如果不需要优雅退出时候可以使用kill -9 PID
的方式进行强制退出。
线程池/进程池基于python内置库 concurrent.futures 的 ThreadPoolExecutor, ProcessPoolExecutor 实现。通过--pool
选择使用线程还是进程,--max-workers
设置并发数。
#!/usr/bin/python3 # -*- coding: utf-8 -*- import os, sys # """ # 切换跟根目录 如果需的话 # """ # # 获取当前脚本的目录路径 # current_dir = os.path.dirname(os.path.abspath(__file__)) # # 切换到上一级目录 # base_dir = os.path.dirname(current_dir) # # 添加上一级目录为项目的根目录 # sys.path.append(base_dir) # print(f"> base dir path is {base_dir}") import signal import time import schedule # pip install schedule import argparse from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed, wait def getConfigs(): """ get_configs 获取命令行参数 """ parser = argparse.ArgumentParser() # 检查时间间隔 parser.add_argument("--interval", "-i", type=int, default=int(timedelta(seconds=1).total_seconds()), # 默认间隔时间 help="Run interval. Unit: second. Default: %(default)s") # 异步运行方式 (线程池/进程池) parser.add_argument("--pool", '-p', type=str, choices=['thread', 'process'], default="thread", # 默认 线程/进程 运行 help="Run script by thread or Process. Default: %(default)s") # 最大异步worker parser.add_argument("--max-workers", type=int, default=1, # 默认线程池容量 help="Max number of concurrent executions. =0 no limit, Default: %(default)s") # TODO: other arguments arguments = parser.parse_args() print(f"DEBUG: Args{ str(arguments)[len('Namespace'):] }") return arguments def run(arguments): pool = (ThreadPoolExecutor if arguments.pool.lower().startswith("thread") else ProcessPoolExecutor)(max_workers=arguments.max_workers) def exitHandler(sig, frame): """ exitHandler 优雅退出处理函数 """ print(f"WARN: Catch single:{sig}, shutdown pool and waited running workers now ...") pool.shutdown(wait=True, cancel_futures=True) # 停止线程池,wait=True:等待正在运行的线程/进程结束,cancel_futures=True:取消正在等待的线程/进程 print(f"WARN: All running threads was done, exit now") exit(0) def start(*args, **kwargs): # TODO: 要执行的异步代码 print(f"start: args:{args} kwargs:{kwargs}") time.sleep(10) # 模拟耗时操作 print(f"end: args:{args} kwargs:{kwargs}") def do(): params_list = range(1, 10) # TODO 模拟要传递的参数列表 workers = [pool.submit(start, int(_id)) for _id in params_list] wait(workers) # 处理常用的退出信号 signal.signal(signal.SIGTERM, exitHandler) # SIGTERM 信号值:15 行为: supervisorctl stop / kill -15 signal.signal(signal.SIGINT, exitHandler) # SIGINT 信号值:2 行为: CTRL+C return do if __name__ == '__main__': arguments = getConfigs() runner = run(arguments) schedule.every(arguments.interval).seconds.do(runner) # 定时任务设置参考https://schedule.readthedocs.io/en/stable/ while True: schedule.run_pending() time.sleep(1)
[program: threads_process_script]
command=python threads_process_script_temp.py -i 120 --pool thread --max-workers 5 # 运行脚本
directory=/home/ubuntu/scripts # 根目录
autostart=true # 跟随程序自动启动
autorestart=true # 退出自动重启
startretries=5 # 重启重试次数
user=ubuntu # 用户运行
redirect_stderr=true # 重定向错误输出
stdout_logfile=/home/ubuntu/logs/supervisor/info.log # 输出日志文件
stdout_logfile_maxbytes=20MB # 日志文件最大容量,超出备份
stdout_logfile_backups=10 # 备份个数
在python 更推荐使用携程代替多线程, 尽管没有提升速度但是减少线程创建烧毁的开销。此处的池并非真正意义上的池,只是实现了类似池的效果。优雅退出的方式也改变成为标识符的现实,标记为退出后 直接略过剩余的任务。
github: https://github.com/JohnDoe1996/script-templates/blob/main/schedule_scripts/async_script_temp.py
#!/usr/bin/python3 # -*- coding: utf-8 -*- import os, sys # """ # 切换跟根目录 如果需的话 # """ # # 获取当前脚本的目录路径 # current_dir = os.path.dirname(os.path.abspath(__file__)) # # 切换到上一级目录 # base_dir = os.path.dirname(current_dir) # # 添加上一级目录为项目的根目录 # sys.path.append(base_dir) # print(f"> base dir path is {base_dir}") import signal import time import asyncio import argparse import schedule # pip install schedule from datetime import datetime, timedelta def getConfigs(): """ get_configs 获取命令行参数 """ parser = argparse.ArgumentParser() # 检查时间间隔 parser.add_argument("--interval", "-i", type=int, default=int(timedelta(seconds=1).total_seconds()), # 默认间隔时间 help="Run interval. Unit: second. Default: %(default)s") # 最大异步worker parser.add_argument("--max-workers", type=int, default=1, # 默认线程池容量 help="Max number of concurrent executions. =0 no limit, Default: %(default)s") # TODO: other arguments arguments = parser.parse_args() print(f"DEBUG: Args{ str(arguments)[len('Namespace'):] }") return arguments def run(arguments): is_shutdown = False def exitHandler(sig, frame): """ exitHandler 优雅退出处理函数 """ nonlocal is_shutdown is_shutdown = True print(f"WARN: Catch single:{sig}, shutdown pool and waited running workers now ...") async def start(*args, **kwargs): # TODO: 要执行的异步代码 print(f"start: args:{args} kwargs:{kwargs}") await asyncio.sleep(10) # 模拟耗时操作 print(f"end: args:{args} kwargs:{kwargs}") async def mainCoroutine(): """ mainCoroutine 主协程函数,用于多个运行子协程 """ pool = asyncio.Semaphore(arguments.max_workers) # 创建asyncio.Semaphore模拟pool, 必须写在async函数内 async def runFunc(fn, args, kwargs): """ runFunc 使用Semaphore运行异步函数 :param function fn: 要执行的async函数 :param tuple args: 函数参数 :param dict kwargs: 函数参数 """ async with pool: # 当asyncio.Semaphore.acquire() <= 0 也就是正在运行的协程数大于等于max_workers时会发生阻塞 if not is_shutdown: # 没有受到停止信号时执行函数。反之收到停止信号之后没有执行的函数不在执行直到队列结束,正在执行的函数继续执行,实现优雅退出 await fn(*args, **kwargs) params_list = range(1, 10) # TODO 模拟获取到的参数 tasks = [asyncio.ensure_future( runFunc(start ,args=(_id,), kwargs={}) # TODO 传入要执行的函数和参数 ) for _id in params_list] await asyncio.gather(*tasks) def do(): asyncio.run(mainCoroutine()) # 运行一轮主协程直到完成 if is_shutdown: # 接收到退出信号之后执行完这一轮后直接退出程序不执行下一轮。 print(f"WARN: All running threads was done, exit now") exit(0) # 处理常用的退出信号 signal.signal(signal.SIGTERM, exitHandler) # SIGTERM 信号值:15 行为: supervisorctl stop / kill -15 signal.signal(signal.SIGINT, exitHandler) # SIGINT 信号值:2 行为: CTRL+C return do if __name__ == '__main__': arguments = getConfigs() runner = run(arguments) schedule.every(arguments.interval).seconds.do(runner) # 定时任务设置参考https://schedule.readthedocs.io/en/stable/ while True: schedule.run_pending() time.sleep(1)
[program: async_script]
command=python async_script_temp.py -i 120 --max-workers 5 # 运行脚本
directory=/home/ubuntu/scripts # 根目录
autostart=true # 跟随程序自动启动
autorestart=true # 退出自动重启
startretries=5 # 重启重试次数
user=ubuntu # 用户运行
redirect_stderr=true # 重定向错误输出
stdout_logfile=/home/ubuntu/logs/supervisor/info.log # 输出日志文件
stdout_logfile_maxbytes=20MB # 日志文件最大容量,超出备份
stdout_logfile_backups=10 # 备份个数
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。