赞
踩
爬虫系统包括爬虫任务管理系统和数据爬取系统。
爬虫任务管理系统包括爬虫任务的 crud、爬虫任务执行的启动和停止功能。
数据爬取系统用于数据的爬取和入库。
使用 xxl-job 框架构建爬虫任务管理系统;
使用 Pyhon 的 django 框架构建数据爬取系统;
将数据爬取系统注册到 xxl-job 系统中,通过管理定时任务的方式来管理爬虫任务。
配置xxl-job 服务端信息和 xxl-job 执行器信息。举例如下。
- # xxl-job 服务端
- # xxl-admin服务端暴露的restful接口url(如http://localhost:8080/xxl-job-admin/api/)
- XXL_ADMIN_BASEURL: str = 'http://xxl-job-test.com/api/'
- # 请求令牌
- XXL_JOB_ACCESS_TOKEN: str = 'test_token'
-
- # xxl-job 执行器信息
- EXECUTOR_APP_NAME: str = 'test-spider-web'
- EXECUTOR_PORT: int = 9999
数据爬取系统通过调用 xxl-job 的 registry 接口来注册执行器到 xxl-job 服务端。举例如下所示。
- # 项目启动时,异步执行注册执行器到xxl-job服务端
- register_async()
-
- # 异步执行注册执行器到xxl-job服务端
- def register_async():
- logger.info("register_async 注册执行器到xxl-job服务端")
-
- p = Pool(1)
- p.apply_async(register_node, ())
-
- logger.info("register_async 注册执行器到xxl-job服务端成功")
-
-
- # 注册执行器到xxl-job服务端
- def register_node():
- # 必须循环去注册,不然会显示为离线
- try:
- while True:
- registry()
- time.sleep(10)
- finally:
- logger.error("Register node is exit.")
-
-
- # 注册
- def registry():
- payload = {
- "registryGroup": "EXECUTOR",
- "registryKey": settings.EXECUTOR_APP_NAME,
- "registryValue": executor_baseurl()
- }
- try:
- headers = {"XXL-JOB-ACCESS-TOKEN": settings.XXL_JOB_ACCESS_TOKEN}
- response = post_simple(settings.XXL_ADMIN_BASEURL + "registry", payload, headers)
- if response.get("code") != 200:
- logger.error("registry error. {}".format(str(response)))
- return False
- return True
- except BaseException as e:
- logger.error("registry error. {}".format(str(e)))
-
-
- def executor_baseurl() -> str:
- return "http://{host}:{port}".format(host=get_network_ip(), port=settings.EXECUTOR_PORT)
- def post_simple(url, data, headers):
- times = 0
- while times < settings.REQUEST_RETRY_TIMES:
- try:
- response = requests.post(url, json=data, headers=headers)
- if response.status_code != 200:
- logger.error("post 请求失败. url:{}, data:{}. response:{}".format(url, data, response.text))
- raise BusinessError("请求失败")
- else:
- return response.json()
-
- except BaseException as e:
- times += 1
- logger.warning(
- "post 请求连接失败. times:{}, retry after:{}, url:{}. data:{}.error:{}"
- .format(times, settings.REQUEST_RETRY_INTERVAL, url, str(data), str(e))
- )
- time.sleep(settings.REQUEST_RETRY_INTERVAL)
-
- logger.error("post 请求连接失败. url:{}, data:{}".format(url, data))
- raise BusinessError("请求连接失败")
-
-
- def get_network_ip() -> str:
- """获取本机地址,会获取首个网络地址"""
- _, _, ipaddrlist = socket.gethostbyname_ex(socket.gethostname())
- return ipaddrlist[0]
XXL-JOB调度中心通过HTTP的方式,调用执行器的任务。具体流程如下:
如 http://localhost:9999/run 。 爬取逻辑全部放在 run 接口中。举例如下。
- # (1)接口暴露 urls.py 文件
- urlpatterns = [
- path('run', spider.run, name='run'),
- ]
-
-
- # (2)run 接口 spider.py 文件
- @api_view(['POST'])
- def run(request):
- """
- 爬虫执行入口
- """
- logger.info("============ run 执行定时任务 start")
-
- # 入参
- request_data = JSONParser().parse(request)
- executor_handler = request_data['executorHandler']
- executor_params = request_data['executorParams']
- executor_params_dict = json.loads(executor_params)
-
- # 任务异步执行
- if executor_handler == 'task01':
- submit_task(TaskService01.doXxlJobTask, (executor_params_dict, request_data))
- if executor_handler == 'task02':
- submit_task(TaskService02.doXxlJobTask, (executor_params_dict, request_data))
-
-
- logger.info("============ run 执行定时任务 finish")
- return HttpResponse(json.dumps(dict(code=200, msg='成功')))
-
-
- # (3)线程池工具类 thread_pool_util.py 文件
-
- import logging
- from multiprocessing.dummy import Pool
- # 通用线程池
- pool = Pool(10000)
- logger = logging.getLogger(__name__)
-
-
- # 异步执行任务
- def submit_task(func, args) -> None:
- try:
- pool.apply_async(func, args)
- except BaseException as e:
- logger.error("submit_task:执行失败. error:{}".format(str(e)))
xxl-job 调度中心通过HTTP的方式,调用执行器的任务。
具体来说是指 xxl-job 通过调用执行器的 “run” 接口来执行定时任务。
任务执行完成后,数据爬取系统通过调用 xxl-job 的 callback 接口来进行定时任务执行状态上报。
- # 任务状态上报. code:200-表示任务执行正常,500-表示失败,100-执行中
- def callback(log_id, timestamp, code):
- if log_id is None or timestamp is None:
- return False
-
- payload = [
- {
- "logId": log_id, # 本次调度日志id
- "logDateTim": timestamp, # 本次调度时间
- "handleCode": code # 任务状态
- }
- ]
-
- try:
- headers = {"XXL-JOB-ACCESS-TOKEN": settings.XXL_JOB_ACCESS_TOKEN}
- response = post_simple(settings.XXL_ADMIN_BASEURL + "callback", payload, headers)
- if response.get("code") != 200:
- logger.error("callback error.log_id:{}.response{}".format(log_id, str(response)))
- return False
- return True
- except BaseException as e:
- logger.error("callback error. {}".format(str(e)))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。