赞
踩
问题:web项目运行多个实例时,定时任务会被执行多次的问题
举例来说
我使用库APScheduler排定了一个定时任务taskA
在每天的晚上9点需要执行一次,我的web服务使用分布式运行了8个实例,于是在每天晚上的9点,我的8个服务都会接收到APScheduler发布的这个定时任务,然后每个服务都会执行一次这个定时任务,这与我预期的只执行1次是不符的,我想要解决这个问题,让定时任务只被执行一次
设想的解决方式:
使用redis分布式锁的方式(redis.setnx(key, value))让实例只能运行一次定时任务,从而解决定时任务会被执行多次的问题
setnx(key, value)方法很简单,就是当key不存在时,setnx会成功,否则会失败
def setnx(self, name: KeyT, value: EncodableT) -> Awaitable:
"""Set the value of key ``name`` to ``value`` if key doesn't exist"""
return self.execute_command("SETNX", name, value)
setnx(key, value)方法会在key不存在时设置value,当多个线程同时接到排期准备运行同一个任务时,只有第一个线程setnx会成功(返回True),于是第一个setnx成功的线程运行了定时任务,其他线程在setnx时由于key已经存在会失败(返回False),从而让它们跳过定时任务的执行
仍然存在的问题:定时任务一般会执行多次,在其下一次执行时,setnx相同key的这条记录应该被删除掉,因为这是一次新的任务,否则之后的任务执行都会因setnx时key已存在而失败导致任务无法执行
i. 第一种方案:在setnx成功的线程1任务执行完成后删除这个key在redis中存储的记录,从而让下一次任务第一次运行时又可以成功setnx(key, value)而执行
但这种方案存在一定的风险:如果存在线程2因为一些原因阻塞了,在线程1执行完任务才开始接收到运行定时任务的指令,那么线程2会在key被删除后开始尝试setnx,那必然会成功,然后重复了运行任务
ii. 基于第一种方案的考虑,确定了第二种方案,只需要给每次的定时任务添加唯一标识即可避免第一种方案的问题:设置此次任务运行的唯一key_x,在setnx成功的线程1任务执行完成之后不对这次定时任务的key_x执行删除
此次定时任务唯一key_x的设置很容易想到的方案是在这次定时任务id上添加运行的排期时间,这样就可以让这一次的定时任务是唯一且可识别了,只要运行了一次其值就永久设置为True,不会在执行第二次(考虑到资源占用,实际应该设置一个较长的过期时间也完全可以避免方案1的风险)
设置有过期时间的方法应该使用redis.set(key, value, nx=True, ex=10)
方法,这里nx=True表名使用命令SETNX
,而ex=10则是过期的时间,单位为秒
第一种方案的可重复运行的小案例:
# -*- coding: utf-8 -*- import asyncio import time import aioredis from aioredis import Redis loop = asyncio.get_event_loop() redis_coro = aioredis.Redis() def redis_distributed_lock(cache_key, cache_value="locked"): def decorator(func): async def wrapper(*args, **kwargs): redis_instance = await redis_coro # 这里设置了10小时的过期时间,完全可以避免重复运行的风险了 locked = await redis_instance.set(cache_key, cache_value, nx=True, ex=60 * 60 * 10) if locked: # 第一个线程设置成功值会运行任务,否则不会运行任务 print("success") return await func(*args, **kwargs) print(f"failed") return wrapper return decorator async def ntasks(): t_time = ["9点", "10点", "11点"] # 模拟任务在三个时间点被执行 redis: Redis = await redis_coro t_id = 1 async def task_func(tid): print(f"{tid=}, executing...") return tid # 为了可重复运行这个示例,先执行删除之前设置的key ret = await redis.delete(str(t_id)) for t_t in t_time: redis_key = f"{t_id}" # 这里本来预期是直接放到函数头上装饰,但是不方便控制redis_key参数,所以使用了原始的方式装饰task_func task_f = redis_distributed_lock(redis_key)(task_func) # 假设启动分布式服务8个,会执行8次定时任务,这里创建了8个任务,按照先执行完先返回的顺序处理 for f in asyncio.as_completed([task_f(redis_key) for _ in range(8)], loop=loop): res = await f # print(f"{res=}") print(f"=" * 80) time.sleep(5) # 模拟一个定时任务在多个时间点执行,下一次执行时,时间参数(t_t)会发生变化 # break if __name__ == '__main__': try: loop.run_until_complete(ntasks()) finally: loop.stop() loop.close()
运行结果
success tid='1', executing... failed failed failed failed failed failed failed ================================================================================ failed failed failed failed failed failed failed failed ================================================================================ failed failed failed failed failed failed
只有第一个时间点是按照预期执行,之后的时间点执行都总是失败,因为每个时间点的该任务设置的key都是一样的
第二种方案的可重复运行的小案例:
# -*- coding: utf-8 -*- import asyncio import time import aioredis from aioredis import Redis loop = asyncio.get_event_loop() redis_coro = aioredis.Redis() def redis_distributed_lock(cache_key, cache_value="locked"): def decorator(func): async def wrapper(*args, **kwargs): redis_instance = await redis_coro # 这里设置了10小时的过期时间,完全可以避免重复运行的风险了 locked = await redis_instance.set(cache_key, cache_value, nx=True, ex=60 * 60 * 10) if locked: # 第一个线程设置成功值会运行任务,否则不会运行任务 print("success") return await func(*args, **kwargs) print(f"failed") return wrapper return decorator async def ntasks(): t_time = ["9点", "10点", "11点"] # 模拟任务在三个时间点被执行 redis: Redis = await redis_coro t_id = 1 async def task_func(tid): print(f"{tid=}, executing...") return tid for t_t in t_time: redis_key = f"{t_id}_{t_t}" # set的key用定时任务的id+时间点来作为此次定时任务的唯一标识 # 为了可重复运行这个示例,先执行删除之前设置的key ret = await redis.delete(redis_key) print(f"key deleted? {ret}") # 这里本来预期是直接放到函数头上装饰,但是不方便控制redis_key参数,所以使用了原始的方式装饰task_func task_f = redis_distributed_lock(redis_key)(task_func) # 假设启动分布式服务8个,会执行8次定时任务,这里创建了8个任务,按照先执行完先返回的顺序处理 for f in asyncio.as_completed([task_f(redis_key) for _ in range(8)], loop=loop): res = await f # print(f"{res=}") print(f"=" * 80) time.sleep(5) # 模拟一个定时任务在多个时间点执行,下一次执行时,时间参数(t_t)会发生变化 # break if __name__ == '__main__': try: loop.run_until_complete(ntasks()) finally: loop.stop() loop.close()
输出
key deleted? 1 success tid='1_9点', executing... failed failed failed failed failed failed failed ================================================================================ key deleted? 1 success tid='1_10点', executing... failed failed failed failed failed failed failed ================================================================================ key deleted? 1 success tid='1_11点', executing... failed failed failed failed failed failed failed ================================================================================
可以看到task_func任务的每个时间点的执行都只有一次成功,而且不会出现只有第一个时间点执行成功而之后的时间点执行都全是失败的情况
有用的参考:
起初总是尝试在协程方法中使用多线程threading.Thread老是碰壁,看了这个回答后读了这篇文章,感觉豁然开朗
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。