赞
踩
~网站优化第一定律:缓存 (空间换时间)
-热数据(热点数据):体量不大、访问频率极高、更新不频繁
~网站优化第二定律:削峰 (能不马上做的事情都不要马上做)、消息队列
-遇到耗时间的任务(用户请求),用户不需要马上得到结果、异步化、推迟执行
-多线程
-异步消息队列 -celery
#在django中使用celery模块提供消息队列服务 import os import celery from django.conf import settings os.environ.setdefault('DJANGO_SETTINGS_MODULE','izufang.settings') #找到django的配置文件 #创建Celery对象,指定模块名,消息队列和持久化方式 app = celery.Celery( main = 'izufang', broker='redis://用户名:密码@ip:端口/1', backend='redis://地址:端口/那个数据库' #数据持久化 ) #main参数指定存放位置,一般放在主项目底下__init__自动执行,如果单独放一个模块,比如hello.py 这里写hello #broker使用消息队列的服务,这里使用Redis来做消息队列服务 #从项目的配置文件读取Celery配置信息 app.config_from_object('django.conf:settings') #从指定的文件(例如celery_config.py)中读取Celery配置信息 #app.config_from_object('celery_config') #自动从注册的应用中找带有@app.task的函数 app.autodiscover_tasks(settings.INSTALLED_APPS) #自动从指定的应用中发现任务 #app.autodiscover_tasks((('common',)))
使用celery
#给短信验证码的接口加上消息队列,延迟执行 @app.task #加上这个装饰器 def send_sms_by_luosimao(tel, message): """发送短信(调用螺丝帽短信网关)""" resp = requests.post( url='http://sms-api.luosimao.com/v1/send.json', auth=('api', 'key-d752503b8db92317a2642771cec1d9b0'), data={ 'mobile': tel, 'message': message }, timeout=10, verify=False ) return resp.json() #将需要异步执行的函数加上@app.task装饰器 #待会执行函数的时候可以通过调用apply_asunc / delay方法来执行函数 #调用方式(例如短信验证码) code = gen_mobile_code() #获取验证码 message = f'您的短信验证码是{code},打死也不能告诉别人哟!【Python小课】' send_sms_by_luosimao.apply_async((tel, message), queue='queue1', #使用那么消息队列 countdown=random.random() * 5, #延迟执行时间 retry_policy={'max_reties':3}, #如果执行失败,最大执行次数 compression='zlib', #压缩方式 ) #send_sms_by_luosimao.delay(tel,message) #这个函数不会马上执行,而是放置消息队列,需要启消息消费者才行 或 task = send_sms_by_luosimao.s(countdown=10,expires=60) task.delay(tel,message) #10秒延迟执行,超时时间60秒,如果60秒没响应不执行
执行消息队列
#在当前虚拟环境中输入命令
celery -A izufang worker -c 1 -l debug #-Q queue1 # 消费那个消息队列(可以启多个)
celery -A izufang worker -l debug #-Q queue2
#-A 指定模块名
#worker 表示这是一个消费者
# -l 表示日志级别
#debug 表示最详细的日志
# -c 启动几个进程
celery -A phone_recycling worker -c 1 -l info 一个进程
实例
celery配置定时任务
#假设User用户表有一个字段为记录用户是否活跃,True为活跃 在项目中新建一个tasks.py文件 专门用来处理定时任务 from common.models import User @app.task def check_inacive_user(): last_30_days = timezone.now() - timedalta(days=30) User.objects.filter(lastvisit__lt=last_30_days).update(is_inacive=False) #函数已经写好 #配置celery定时任务 在配置celery的文件中这里在common/__init__.py app.conf.update( timezone=settings.TIME_ZONE, #校正时间 enable_utc=True, beat_schedule={ 'task1':{'common.tasks.check_inactive_user', #执行的函数路径 'schedule:crontab('*','*','*','*','*') #克隆表达式指定执行的时间,不指定每分钟执行 'args':(), #参数 }, }, ) app.autodiscover_tasks(('common',)) #自动从指定的应用中发现任务 #终端中输入命令 celery -A izufang beat -l debug #消息生产者,时间到了自动进入消息队列 #如果没有指定消息消费者,那么队列会积压 #终端中输入命令 celery -A izufang worker -l debug #消息消费者 处理进入消息队列的功能
celery 消息日志放入数据库
pip install django-celery-results #修改settings.py配置 INSTALLED_APPS = [ ..., 'django_celery_results', ] python manage.py migrate django_celery_results #数据迁移 (如果报safe update...错误) #这步如果报错,去数据库删除django_migrations和django_celery_results两张表 #show variables like '%safe%'; #查询这个变量 set session sql_safe_updates=off; #set sql_safe_updates=off; #这句话只能保证当前会话关闭 set global sql_safe_updates=off; #全局配置关闭 app = celery.Celery('izufang', broker=消息队列的数据库, backend='django-db' ) #配置完成后,执行消息队列就可以保存至数据库中
打包执行异步任务
from celery import group
task_group = group(任务1.s(),任务2.s(),...)
results = task_group() #得到一个列表,每一个元素就是每个任务的执行结果
链式执行异步任务(需要将上一个任务执行的结果传入下一个任务当做参数)
from celery import chain
task = chain(任务1.s() | 任务2.s() | 任务3.s())
result = task() #这里返回最后一个任务的执行结果
异步化时候序列化的问题(图片上传的问题)
方法1 需要在django中的配置文件添加配置(全局配置) CELERY_TASK_SERIALIZER = 'pickle' #上传时用到的序列化类型 CELERY_RESULT_SERIALIZER = 'pickle' #返回结果时用的序列化类型 CELERY_ACCEPT_CONTENT = ['pickle','json'] #celery接收的数据类型 调用的时候需要读取配置文件 app.config_from_object('django.conf:settings') 方法2 #或者在调用的时候写上 app.conf.update( accept_content = ['json','pickle','msgpack'], task_serializer = 'pickle', result_serializer = 'json' #'pickle' timezone=settings.TIME_ZONE, #校正时间 enable_utc=True, beat_schedule={ 'task1':{'common.tasks.check_inactive_user', #执行的函数路径 'schedule:crontab('*','*','*','*','*') #克隆表达式指定执行的时间,不指定每分钟执行 'args':(), #参数 }, }, ) app.autodiscover_tasks(('common',)) #自动从指定的应用中发现任务
消息队列的监控
pip install flower
celery flower --broker=消息队列的数据库 #这里是个命令
celery -A 项目名 flower --address=0.0.0.0 --port=8080
项目主文件夹下__init__.py文件
import os
import celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ddd.settings') # (..., 项目名.settings)
app = celery.Celery(
main='ddd', # 项目名
broker='redis://:密码@ip:6379/0', # redis
)
app.autodiscover_tasks((('backstage',))) # 自动从backstage文件发现任务
写视图
from rest_framework.response import Response from ddd import app from datetime import timedelta from celery import Task class CustomTask(Task): def on_success(self, retval, task_id, args, kwargs): '''异步任务执行成功时,会执行这个回调方法。''' print('异步任务成功' + '-->' + str(retval)) # 返回值 return super(CustomTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): '''异步任务执行失败时,会执行这个回调方法。如果你想实现一个 异步任务执行失败时,需要发出告警邮件通知的功能,重写这个函数就可实现''' print('异步任务失败', exc) return super(CustomTask, self).on_failure(exc, task_id, args, kwargs, einfo) def after_return(self, status, retval, task_id, args, kwargs, einfo): '''异步任务尝试重试时,会执行这个回调方法。''' print(retval) return super(CustomTask, self).after_return(status, retval, task_id, args, kwargs, einfo) @app.task(ignore_result=True, base=CustomTask) # 异步函数 def res(n): '''参数解析 name : 可以显式指定任务的名字;默认是模块的命名空间中本函数的名字。 serializer : 指定本任务的序列化的方法; bind : 一个bool值,设置是否绑定一个task的实例,如果绑定,task实例会作为参数传递到任务方法中,可以访问task实例的所有的属性,即前面反序列化中那些属性 base : 定义任务的基类,可以以此来定义回调函数,默认是Task类,我们也可以定义自己的Task类 default_retry_delay : 设置该任务重试的延迟时间,当任务执行失败后,会自动重试,单位是秒,默认3分钟; autoretry_for : 设置在特定异常时重试任务,默认False即不重试; retry_backoff : 默认False,设置重试时的延迟时间间隔策略; retry_backoff_max : 设置最大延迟重试时间,默认10分钟,如果失败则不再重试; retry_jitter : 默认True,即引入抖动,避免重试任务集中执行; ''' for i in range(int(n)): print(i) time.sleep(1) return '您输入的数字是 %s' % n @api_view(['GET', ]) def test(request): v1 = datetime.today() + timedelta(seconds=20) # 任务执行时间(北京时间)datetime类型 print('中国时间:{}'.format(v1)) v2 = datetime.utcfromtimestamp(v1.timestamp()) # 把时间转为UTC print('UTC:{}'.format(v2)) n = request.GET.get('n') #r = res.delay(n) r = res.apply_async(args=[n, ], eta=v2) # args为参数,eta为执行时间 print(r.id) return Response({'code': 0, 'msg': 'succ'})
项目主文件夹下__init.py文件
import os import celery from celery.schedules import crontab from datetime import timedelta os.environ.setdefault('DJANGO_SETTINGS_MODULE', '项目名.settings') app = celery.Celery( main='ddd', broker='redis://:密码@101.37.163.0:6379/0', ) app.conf.timezone = "Asia/Shanghai" app.conf.beat_schedule = { # 名字随意起 # 'add-every-10-seconds': { # 'task': 'backstage.views.res', # task: ’项目文件.文件.计划任务函数‘ # 'schedule': timedelta(seconds=2), # 计划时间,支持克隆表达式 # 'args': (888,) # 参数 # }, 'add-every-11-seconds': { 'task': 'backstage.views.res', # task: ’项目文件.文件.计划任务函数‘ # 'schedule': crontab('0', '23', '*', '*', '*'), # 每天23:00点执行 'schedule': crontab('17', '10', '*', '*', '*'), # 每天10:17点执行 'args': (595858585858585858585,) # 参数 } } 启动消费者: celery -A ddd worker -l info # ddd换成项目名 启动定时任务生产者: celery -A ddd beat -l info # ddd换成项目名
# 方法一:这是apply_async方法的别名,但接受的参数较为简单;
task.delay()
# 方法二:可以接受复杂的参数
task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})
# 方法三:可以发送未被注册的异步任务,即没有被celery.task装饰的任务;
send_task()
方法一:app.send_task
注意: send_task 在发送的时候是不会检查 tasks.add 函数是否存在的,即使为空也会发送成功,所以 celery 执行是可能找不到该函数报错;
from celery import Celery
app = Celery()
def add(x, y):
return x+y
app.send_task('tasks.add',args=[3,4]) # 参数基本和apply_async函数一样
方法二:Task.delay
delay 方法是 apply_async 方法的简化版,不支持执行选项,只能传递任务的参数。
from celery import Celery
app = Celery()
@app.task
def add(x, y, z=0):
return x + y
add.delay(30, 40, z=5) # 包括位置参数和关键字参数
方法三:Task.apply_async
apply_async 支持执行选项,它会覆盖全局的默认参数和定义该任务时指定的执行选项,本质上还是调用了 send_task 方法;
from celery import Celery app = Celery() @app.task def add(x, y, z=0): return x + y add.apply_async(args=[30,40], kwargs={'z':5}) # 其他参数 task_id : 为任务分配唯一id,默认是uuid; countdown : 设置该任务等待一段时间再执行,单位为s; eta : 定义任务的开始时间;eta=time.time()+10; expires : 设置任务时间,任务在过期时间后还没有执行则被丢弃; retry : 如果任务失败后, 是否重试;使用true或false,默认为true shadow : 重新指定任务的名字str,覆盖其在日志中使用的任务名称; retry_policy : {},重试策略.如下: ----max_retries : 最大重试次数, 默认为 3 次. ----interval_start : 重试等待的时间间隔秒数, 默认为 0 , 表示直接重试不等待. ----interval_step : 每次重试让重试间隔增加的秒数, 可以是数字或浮点数, 默认为 0.2 ----interval_max : 重试间隔最大的秒数, 即 通过 interval_step 增大到多少秒之后, 就不在增加了, 可以是数字或者浮点数, 默认为 0.2 . routing_key : 自定义路由键; queue : 指定发送到哪个队列; exchange : 指定发送到哪个交换机; priority : 任务队列的优先级,0到255之间,对于rabbitmq来说0是最高优先级; serializer :任务序列化方法;通常不设置; compression : 压缩方案,通常有zlib, bzip2 headers : 为任务添加额外的消息; link : 任务成功执行后的回调方法;是一个signature对象;可以用作关联任务; link_error : 任务失败后的回调方法,是一个signature对象; # 其他参数参考用法如下: add.apply_async((2, 2), retry=True, retry_policy={ 'max_retries': 3, 'interval_start': 0, 'interval_step': 0.2, 'interval_max': 0.2, })
自定义发布者、交换机、路由键、队列、优先级、序列方案和压缩方法:
task.apply_async((2,2),
compression='zlib',
serialize='json',
queue='priority.high',
routing_key='web.add',
priority=0,
exchange='web_exchange')
获取任务结果和状态
由于 celery 发送的都是去其他进程执行的任务,如果需要在客户端监控任务的状态,有如下方法:
r = task.apply_async()
r.ready() # 查看任务状态,返回布尔值, 任务执行完成, 返回 True, 否则返回 False.
r.wait() # 会阻塞等待任务完成, 返回任务执行结果,很少使用;
r.get(timeout=1) # 获取任务执行结果,可以设置等待时间,如果超时但任务未完成返回None;
r.result # 任务执行结果,未完成返回None;
r.state # PENDING, START, SUCCESS,任务当前的状态
r.status # PENDING, START, SUCCESS,任务当前的状态
r.successful # 任务成功返回true
r.traceback # 如果任务抛出了一个异常,可以获取原始的回溯信息
但是一般业务中很少用到,因为获取任务执行的结果需要阻塞,celery使用场景一般是不关心结果的。
执行池概念
当运行类似 如下命令启动一个celery进程时,其实启动的是一个管理进程,此进程不处理实际的任务,而是产生的子进程或线程 去处理具体任务;那么这些 子进程或线程 在一起就叫做 执行池;
celery worker --app=worker.app
执行池的大小(子进程或线程的个数) 决定了 celery可以并发执行任务的个数;如果想尽可能快和多的执行任务,那么 增加执行池的大小 是个可以考虑的解决方案;
执行池工作方式
Prefork
celery默认的执行池工作方式,是多进程的执行池,一般在 计算密集型任务中使用, 能充分利用cpu多核; 如果不指定–concurrency(并发进程个数)参数,则无论什么执行池都是尽可能多的使用cpu的核数;
Solo
这种执行池有点特殊,因为此模式在处理任务时 直接在 管理进程中进行; 不是基于进程或线程的工作模式; 也就是一直只有一个 消费者 在处理任务;上个任务不结束,则下个任务就会阻塞; 但是在微服务中 比如 在使用k8s 进行 docker部署时,可以使用此模式,这样 k8s直接通过观察启动多少个docker容器 就能知道启动了多少个celery消费者;
Eventlet/Gevent
2个都是基于协程 的执行池,一般在 IO密集型任务中使用,如频繁的网络请求,数据库操作等等;
gevent是对eventlet的高级封装,一般使用时 用 gevent,因为此包有monkey.patch_all()方法将 所有能转为协程的地方都转为协程,从而增加处理能力;
在使用celery==5.1.2版本时定时任务 works执行后总是报keyerror,目前不知道问题是什么;
如下处理后即恢复正常;
import os import celery from celery.schedules import crontab from game_resources import settings os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'game_resources.settings') app = celery.Celery( main='game_resources', broker='redis://***@***:54321/8', include=['backstage.task'], # 加了这句,指定task的文件; ) app.conf.timezone = "Asia/Shanghai" app.conf.beat_schedule = { 'add-every-11-seconds': { 'task': 'backstage.task.test_down_url', 'schedule': crontab(minute=59, hour=8, day_of_week=2), # 每周二 9:00点执行 'args': () # 参数 } } # app.autodiscover_tasks((('backstage',))) # 自动从backstage文件发现任务 app.autodiscover_tasks(settings.INSTALLED_APPS) backstage/task.py中 @app.task(base=CustomTask) # 异步函数 def test_down_url(): print('触发定时任务') url_error_path = [] url = Games.objects.filter(download_url__isnull=False).values('download_url') for i in url: try: logger.success(f'正在识别地址:{i}') result = ''.join(re.findall(r'<title>(.*?)</title>', requests.get(i.get('download_url')).text)) if '不存在' in result: url_error_path.append(i.get('download_url')) except Exception as ext: logger.error(f'请求失败:{str(ext)}') continue logger.error(f'失效url:{url_error_path}') send_mail('【Chi_Tao_Wan】:', f'检测提取地址是否有效触发;所有失效地址为:{",".join(url_error_path)}', EMAIL_ADDRESS, ['595118627@qq.com'], fail_silently=False)
pip install frozenlist==1.3.1 geopy==2.2.0 humanize==4.3.0 idna==3.3 importlib-metadata==4.12.0 jsonschema==4.9.0 korean_lunar_calendar==0.2.1 marshmallow==3.17.0 pyOpenSSL==22.0.0 pyrsistent==0.18.1 python-dotenv==0.20.0 pytz==2022.2.1 selenium==4.4.0 simplejson==3.17.6 sniffio==1.2.0 trio==0.21.0 urllib3==1.26.11 wsproto==1.1.0 zipp==3.8.1
# 如果上述安装仍未解决, 则执行以下安装(笔者执行完上述安装即能解决问题)
pip install backoff==2.1.2 colorama==0.4.5 croniter==1.3.5 cryptography==37.0.4 email-validator==1.2.1 flask-compress==1.12 flask-migrate==3.1.0 aiohttp==3.8.1 aiosignal==1.2.0 Mako==1.2.1 Babel==2.10.3
此时再次执行 celery ;运行成功
但在消费任务时又报错了
解决方案
pip install eventlet
并在启动 celery 消费者时携带参数 -P eventlet(大写 P)
celery -A celery_tasks.main worker -l INFO -P eventlet
解决办法:安装 importlib-metadata, 版本必须小于5
pip install importlib-metadata==4.8.3
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。