赞
踩
环境:win11、python 3.9.2、django 4.2.11、celery 4.4.7、MySQL 8.1、redis 3.0
背景:基于django框架的大量任务实现,并且需要保存数据库
时间:20240409
说明:异步爬取小说,并将其保存到数据库
1、创建django项目,并创建app,测试调通
- # 创建目录GetFiction
- pip install django==4.2.11 pymysql-1.1.0
- django-admin startproject getfiction .
- django-admin startapp getsection
配置MySQL连接、应用注册、日志、其他配置
- # getfiction/__init__.py filepath
-
- import pymysql
-
- pymysql.install_as_MySQLdb()
-
- # getfiction/settings.py filepath
-
- # 数据库连接
-
- DATABASES = {
- 'default': {
- 'ENGINE': 'django.db.backends.mysql',
- 'NAME': 'fictions',
- 'USER': '****',
- 'PASSWORD': '****',
- 'HOST': '127.0.0.1',
- 'PORT': '3306',
- }
- }
-
- # 应用注册
-
- INSTALLED_APPS = [
- 'django.contrib.admin',
- 'django.contrib.auth',
- 'django.contrib.contenttypes',
- 'django.contrib.sessions',
- 'django.contrib.messages',
- 'django.contrib.staticfiles',
- 'getsection',
- ]
-
- # 其他配置
-
- LANGUAGE_CODE = 'zh-Hans'
-
- TIME_ZONE = 'Asia/Shanghai'
-
- # 日志配置
-
- import logging
-
- # 创建日志器
- logger = logging.getLogger("test")
- # 为日志器设置日志等级,如果这里不设置,将会使用其父级日志器的等日志等级
- # 这里它的父日志器是root,root的默认日志级别是 logging.WARNING
- logger.setLevel(logging.INFO)
-
- # 创建文件处理程序
- fh = logging.FileHandler(filename="./test.log",encoding="utf8")
- # 创建流处理程序
- sh = logging.StreamHandler()
-
- # 为文件处理程序设置日志等级
- fh.setLevel(logging.ERROR)
- # 为流处理程序设置日志等级
- sh.setLevel(logging.DEBUG)
-
- # 创建格式化程序
- ffmt = logging.Formatter(
- fmt = "%(asctime)s - %(levelname)s - %(name)s - %(filename)s:%(lineno)d - %(message)s",
- datefmt = "%Y/%m/%d %H:%M:%S"
- )
- # 创建格式化程序
- sfmt = logging.Formatter(
- fmt = "%(asctime)s - %(levelname)s - %(name)s - %(filename)s:%(lineno)d - %(message)s",
- )
-
- # 将 ffmt 格式化程序应用到 fh 文件处理程序
- fh.setFormatter(ffmt)
- # 将 sfmt 格式化程序应用到 sh 流处理程序
- sh.setFormatter(sfmt)
-
- # 将文件处理程序应用到logger日志器
- logger.addHandler(fh)
- # 将流处理程序应用到logger日志器
- logger.addHandler(sh)
配置首页的路由以及视图函数
- # getfiction/urls.py
-
- from getsection.views import index
-
- urlpatterns = [
- path('admin/', admin.site.urls),
- path('', index),
- ]
-
- # getsection/views.py
-
- from django.shortcuts import HttpResponse
-
- def index(request):
- # 测试首页
- return HttpResponse(str("hello"))
调试(postman或是浏览器都行)
python manage.py runserver 0.0.0.0:8000
postman测试结果,如下:
2、django中集成celery,异步处理任务,并将任务存储到MySQL
celery配置:broker使用redis,backend使用django自带的ORM,并注册celery相关应用
- # getfiction/celery.py
-
- from __future__ import absolute_import, unicode_literals
- from celery import Celery
- import os
-
- os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'getfiction.settings') # 设置django环境
- app = Celery('djcelery', broker='redis://127.0.0.1:6379/0', backend='django-db')
- app.autodiscover_tasks() # 发现任务文件每个app下的task.py
-
- # getfiction/__init__.py
-
- from __future__ import absolute_import, unicode_literals
- from .celery import app as celery_app
- import pymysql
-
- __all__ = ['celery_app']
- pymysql.install_as_MySQLdb()
-
- # getfiction/settings.py
-
- INSTALLED_APPS = [
- 'django.contrib.admin',
- 'django.contrib.auth',
- 'django.contrib.contenttypes',
- 'django.contrib.sessions',
- 'django.contrib.messages',
- 'django.contrib.staticfiles',
- 'django_celery_results',
- 'getsection',
- 'djcelery',
- ]
安装redis并启动:Release 3.0.504 · microsoftarchive/redis (github.com)
安装redis、eventlet模块:pip install redis eventlet
安装celery相关模块
pip install celery django-celery django-celery-results
启动测试报错:
1、cannot import name 'ugettext_lazy' from 'django.utils.translation'
“ugettext_lazy
”已被 Django 3+
弃用,所以需要修改django的源码,如下:
将ugettext_lazy使用gettext_lazy替代,修改如下:
- # from django.utils.translation import ugettext_lazy as _
- from django.utils.translation import gettext_lazy as _
重启项目,继续测试
2、ModuleNotFoundError: No module named 'celery.five'
版本不兼容,将celery的版本降到 4.4.7
pip install celery==4.4.7
重启项目,继续测试
3、cannot import name 'force_unicode' from 'django.utils.encoding'
处理方式同上述1, 将 force_unicode替换为 force_str
- # from django.utils.encoding import force_unicode as force_text # noqa
- from django.utils.encoding import force_str as force_text # noqa
重启继续
4、except self._encode_error, exc:
anyjson不兼容python3,所以产生报错,下面会连续修改该模块相关源码,如下:
- # GetFiction\venv\lib\site-packages\anyjson\__init__.py
-
- 88 except self._encode_error as exc:
- 89 raise (TypeError, TypeError(*exc.args), sys.exc_info()[2])
-
- 100 except self._decode_error as exc:
- 101 raise (ValueError, ValueError(*exc.args), sys.exc_info()[2])
-
- 120 print("Running anyjson as a stand alone script is not supported")
-
- 67 if isinstance(modinfo["encerror"], str):
- 69 if isinstance(modinfo["encerror"], str):
5、from django.utils.translation import ugettext_lazy as _ 错误同 1,
6、from django.utils.translation import ungettext, ugettext as _ 与1类似
- from django.utils.translation import gettext, gettext as _
- # from django.utils.translation import ungettext, ugettext as _
7、cannot import name 'force_unicode' from 'django.utils.encoding'
- # venv/Lib/site-packages/djcelery/admin.py
- from django.utils.encoding import force_str as force_text # noqa
至此,修改源码部分完成
3、添加任务,执行,写入到数据库
数据库迁移
- python manage.py makemigrations #生成迁移文件
- python manage.py migrate #执行迁移,生成数据表
创建发送任务视图函数:
- # getsection/views.py
-
- from django.shortcuts import HttpResponse
- from getfiction.settings import logger
- from getsection.tasks import getfictioninfo
- from playwright.sync_api import sync_playwright
-
-
- def index(request):
- with sync_playwright() as p:
- browser = p.chromium.launch()
- page = browser.new_page()
- base_url = "https://www.83ks.org"
- # https://www.83ks.org/read/196719/2535054.html 某一章的内容
- page.goto(f"{base_url}/book/196719/")
- element_href = page.query_selector_all("#list dl a")
- novel_href_dic = {}
- if element_href:
- for i in element_href[:5]:
- c = [i.get_attribute('href')] + i.get_attribute('title').split(" ")[:0:-1]
- if len(c) < 3:
- logger.error(str(c) + "该章节存在错误")
- elif len(c) == 4:
- c.remove("lwxs.com")
- novel_href_dic[c[-1]] = c[:2]
- else:
- novel_href_dic[c[-1]] = c[:2]
- browser.close()
- for secindex, url_and_secname in novel_href_dic.items():
- getfictioninfo.delay(secindex, url_and_secname)
- return HttpResponse("OK")
安装playwright模块:pip install playwright
- # getsection/tasks.py
-
- from __future__ import absolute_import
- from celery import shared_task
- from playwright.sync_api import sync_playwright
- from getfiction.settings import logger
-
-
-
- @shared_task
- def getfictioninfo(secindex, url_and_secname):
- # 第68章 ['/read/196719/1660838.html', '势不可挡']
- with sync_playwright() as p:
- browser = p.chromium.launch()
- section_page = browser.new_page()
- section_url = "https://www.83ks.org" + url_and_secname[0]
- try:
- section_page.goto(section_url)
- except Exception as e:
- logger.error(str(secindex) + str(e))
- section_page_element = section_page.query_selector_all("#content p")
- section_name = secindex + " " + url_and_secname[1] + "\n"
- logger.info(section_name)
- for i in section_page_element:
- section_name += i.inner_text() + " "
- return section_name
启动celery:celery -A getfiction worker -l info -P eventlet
启动django:python manage.py runserver 0.0.0.0:8000
访问首页,进行测试:
数据存储,如下:
django_celery_results_taskresult
存在的问题,版本存在不兼容的问题,尚需优化
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。