当前位置:   article > 正文

django celery 异步任务 异步存储

django celery 异步任务 异步存储

环境:win11、python 3.9.2、django 4.2.11、celery 4.4.7、MySQL 8.1、redis 3.0

背景:基于django框架的大量任务实现,并且需要保存数据库

时间:20240409

说明:异步爬取小说,并将其保存到数据库

1、创建django项目,并创建app,测试调通

  1. # 创建目录GetFiction
  2. pip install django==4.2.11 pymysql-1.1.0
  3. django-admin startproject getfiction .
  4. django-admin startapp getsection

配置MySQL连接、应用注册、日志、其他配置

  1. # getfiction/__init__.py  filepath
  2. import pymysql
  3. pymysql.install_as_MySQLdb()
  4. # getfiction/settings.py  filepath
  5. # 数据库连接
  6. DATABASES = {
  7. 'default': {
  8. 'ENGINE': 'django.db.backends.mysql',
  9. 'NAME': 'fictions',
  10. 'USER': '****',
  11. 'PASSWORD': '****',
  12. 'HOST': '127.0.0.1',
  13. 'PORT': '3306',
  14. }
  15. }
  16. # 应用注册
  17. INSTALLED_APPS = [
  18. 'django.contrib.admin',
  19. 'django.contrib.auth',
  20. 'django.contrib.contenttypes',
  21. 'django.contrib.sessions',
  22. 'django.contrib.messages',
  23. 'django.contrib.staticfiles',
  24. 'getsection',
  25. ]
  26. # 其他配置
  27. LANGUAGE_CODE = 'zh-Hans'
  28. TIME_ZONE = 'Asia/Shanghai'
  29. # 日志配置
  30. import logging
  31. # 创建日志器
  32. logger = logging.getLogger("test")
  33. # 为日志器设置日志等级,如果这里不设置,将会使用其父级日志器的等日志等级
  34. # 这里它的父日志器是root,root的默认日志级别是 logging.WARNING
  35. logger.setLevel(logging.INFO)
  36. # 创建文件处理程序
  37. fh = logging.FileHandler(filename="./test.log",encoding="utf8")
  38. # 创建流处理程序
  39. sh = logging.StreamHandler()
  40. # 为文件处理程序设置日志等级
  41. fh.setLevel(logging.ERROR)
  42. # 为流处理程序设置日志等级
  43. sh.setLevel(logging.DEBUG)
  44. # 创建格式化程序
  45. ffmt = logging.Formatter(
  46. fmt = "%(asctime)s - %(levelname)s - %(name)s - %(filename)s:%(lineno)d - %(message)s",
  47. datefmt = "%Y/%m/%d %H:%M:%S"
  48. )
  49. # 创建格式化程序
  50. sfmt = logging.Formatter(
  51. fmt = "%(asctime)s - %(levelname)s - %(name)s - %(filename)s:%(lineno)d - %(message)s",
  52. )
  53. # 将 ffmt 格式化程序应用到 fh 文件处理程序
  54. fh.setFormatter(ffmt)
  55. # 将 sfmt 格式化程序应用到 sh 流处理程序
  56. sh.setFormatter(sfmt)
  57. # 将文件处理程序应用到logger日志器
  58. logger.addHandler(fh)
  59. # 将流处理程序应用到logger日志器
  60. logger.addHandler(sh)

 配置首页的路由以及视图函数

  1. # getfiction/urls.py
  2. from getsection.views import index
  3. urlpatterns = [
  4. path('admin/', admin.site.urls),
  5. path('', index),
  6. ]
  7. # getsection/views.py
  8. from django.shortcuts import HttpResponse
  9. def index(request):
  10. # 测试首页
  11. return HttpResponse(str("hello"))

调试(postman或是浏览器都行)

python manage.py runserver 0.0.0.0:8000

postman测试结果,如下:

2、django中集成celery,异步处理任务,并将任务存储到MySQL

celery配置:broker使用redis,backend使用django自带的ORM,并注册celery相关应用

  1. # getfiction/celery.py
  2. from __future__ import absolute_import, unicode_literals
  3. from celery import Celery
  4. import os
  5. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'getfiction.settings') # 设置django环境
  6. app = Celery('djcelery', broker='redis://127.0.0.1:6379/0', backend='django-db')
  7. app.autodiscover_tasks() # 发现任务文件每个app下的task.py
  8. # getfiction/__init__.py
  9. from __future__ import absolute_import, unicode_literals
  10. from .celery import app as celery_app
  11. import pymysql
  12. __all__ = ['celery_app']
  13. pymysql.install_as_MySQLdb()
  14. # getfiction/settings.py
  15. INSTALLED_APPS = [
  16. 'django.contrib.admin',
  17. 'django.contrib.auth',
  18. 'django.contrib.contenttypes',
  19. 'django.contrib.sessions',
  20. 'django.contrib.messages',
  21. 'django.contrib.staticfiles',
  22. 'django_celery_results',
  23. 'getsection',
  24. 'djcelery',
  25. ]

安装redis并启动:Release 3.0.504 · microsoftarchive/redis (github.com)

安装redis、eventlet模块:pip install redis eventlet

 安装celery相关模块

pip install celery django-celery django-celery-results

启动测试报错:

        1、cannot import name 'ugettext_lazy' from 'django.utils.translation'

        “ugettext_lazy”已被 Django 3+ 弃用,所以需要修改django的源码,如下:

        

 将ugettext_lazy使用gettext_lazy替代,修改如下:

  1. # from django.utils.translation import ugettext_lazy as _
  2. from django.utils.translation import gettext_lazy as _

重启项目,继续测试

        2、ModuleNotFoundError: No module named 'celery.five'

        版本不兼容,将celery的版本降到 4.4.7

pip install celery==4.4.7

重启项目,继续测试

        3、cannot import name 'force_unicode' from 'django.utils.encoding'

处理方式同上述1, 将 force_unicode替换为 force_str

  1. # from django.utils.encoding import force_unicode as force_text # noqa
  2. from django.utils.encoding import force_str as force_text # noqa

重启继续

        4、except self._encode_error, exc:

        anyjson不兼容python3,所以产生报错,下面会连续修改该模块相关源码,如下:

        

  1. # GetFiction\venv\lib\site-packages\anyjson\__init__.py
  2. 88 except self._encode_error as exc:
  3. 89 raise (TypeError, TypeError(*exc.args), sys.exc_info()[2])
  4. 100 except self._decode_error as exc:
  5. 101 raise (ValueError, ValueError(*exc.args), sys.exc_info()[2])
  6. 120 print("Running anyjson as a stand alone script is not supported")
  7. 67 if isinstance(modinfo["encerror"], str):
  8. 69 if isinstance(modinfo["encerror"], str):

         5、from django.utils.translation import ugettext_lazy as _  错误同 1,

        6、from django.utils.translation import ungettext, ugettext as _ 与1类似

        

  1. from django.utils.translation import gettext, gettext as _
  2. # from django.utils.translation import ungettext, ugettext as _

        7、cannot import name 'force_unicode' from 'django.utils.encoding'

  1. # venv/Lib/site-packages/djcelery/admin.py
  2. from django.utils.encoding import force_str as force_text # noqa

至此,修改源码部分完成

3、添加任务,执行,写入到数据库

 数据库迁移

  1. python manage.py makemigrations #生成迁移文件
  2. python manage.py migrate #执行迁移,生成数据表

创建发送任务视图函数:

  1. # getsection/views.py
  2. from django.shortcuts import HttpResponse
  3. from getfiction.settings import logger
  4. from getsection.tasks import getfictioninfo
  5. from playwright.sync_api import sync_playwright
  6. def index(request):
  7. with sync_playwright() as p:
  8. browser = p.chromium.launch()
  9. page = browser.new_page()
  10. base_url = "https://www.83ks.org"
  11. # https://www.83ks.org/read/196719/2535054.html 某一章的内容
  12. page.goto(f"{base_url}/book/196719/")
  13. element_href = page.query_selector_all("#list dl a")
  14. novel_href_dic = {}
  15. if element_href:
  16. for i in element_href[:5]:
  17. c = [i.get_attribute('href')] + i.get_attribute('title').split(" ")[:0:-1]
  18. if len(c) < 3:
  19. logger.error(str(c) + "该章节存在错误")
  20. elif len(c) == 4:
  21. c.remove("lwxs.com")
  22. novel_href_dic[c[-1]] = c[:2]
  23. else:
  24. novel_href_dic[c[-1]] = c[:2]
  25. browser.close()
  26. for secindex, url_and_secname in novel_href_dic.items():
  27. getfictioninfo.delay(secindex, url_and_secname)
  28. return HttpResponse("OK")

安装playwright模块:pip install playwright

  1. # getsection/tasks.py
  2. from __future__ import absolute_import
  3. from celery import shared_task
  4. from playwright.sync_api import sync_playwright
  5. from getfiction.settings import logger
  6. @shared_task
  7. def getfictioninfo(secindex, url_and_secname):
  8. # 第68章 ['/read/196719/1660838.html', '势不可挡']
  9. with sync_playwright() as p:
  10. browser = p.chromium.launch()
  11. section_page = browser.new_page()
  12. section_url = "https://www.83ks.org" + url_and_secname[0]
  13. try:
  14. section_page.goto(section_url)
  15. except Exception as e:
  16. logger.error(str(secindex) + str(e))
  17. section_page_element = section_page.query_selector_all("#content p")
  18. section_name = secindex + " " + url_and_secname[1] + "\n"
  19. logger.info(section_name)
  20. for i in section_page_element:
  21. section_name += i.inner_text() + " "
  22. return section_name

启动celery:celery -A getfiction worker -l info -P eventlet  

启动django:python manage.py runserver 0.0.0.0:8000

访问首页,进行测试:

数据存储,如下:

 django_celery_results_taskresult

 存在的问题,版本存在不兼容的问题,尚需优化

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/442666
推荐阅读
相关标签
  

闽ICP备14008679号