当前位置:   article > 正文

使用 Python + xxl-job 构建爬虫系统_xxl-job python

xxl-job python

1 系统功能概述

爬虫系统包括爬虫任务管理系统数据爬取系统

爬虫任务管理系统包括爬虫任务的 crud、爬虫任务执行的启动和停止功能。

数据爬取系统用于数据的爬取和入库。

2 技术实现概述

使用 xxl-job 框架构建爬虫任务管理系统;

使用 Pyhon 的 django 框架构建数据爬取系统;

将数据爬取系统注册到 xxl-job 系统中,通过管理定时任务的方式来管理爬虫任务。

3 将数据爬取系统注册到 xxl-job 系统中

3.1 配置

配置xxl-job 服务端信息和 xxl-job 执行器信息。举例如下。

  1. # xxl-job 服务端
  2. # xxl-admin服务端暴露的restful接口url(如http://localhost:8080/xxl-job-admin/api/)
  3. XXL_ADMIN_BASEURL: str = 'http://xxl-job-test.com/api/'
  4. # 请求令牌
  5. XXL_JOB_ACCESS_TOKEN: str = 'test_token'
  6. # xxl-job 执行器信息
  7. EXECUTOR_APP_NAME: str = 'test-spider-web'
  8. EXECUTOR_PORT: int = 9999

3.2 代码

3.2.1 注册执行器到 xxl-job 服务端

数据爬取系统通过调用 xxl-job 的 registry 接口来注册执行器到 xxl-job 服务端。举例如下所示。

  1. # 项目启动时,异步执行注册执行器到xxl-job服务端
  2. register_async()
  3. # 异步执行注册执行器到xxl-job服务端
  4. def register_async():
  5.     logger.info("register_async 注册执行器到xxl-job服务端")
  6.     p = Pool(1)
  7.     p.apply_async(register_node, ())
  8.     logger.info("register_async 注册执行器到xxl-job服务端成功")
  9. # 注册执行器到xxl-job服务端
  10. def register_node():
  11.     # 必须循环去注册,不然会显示为离线
  12.     try:
  13.         while True:
  14.             registry()
  15.             time.sleep(10)
  16.     finally:
  17.         logger.error("Register node is exit.")
  18. # 注册
  19. def registry():
  20.     payload = {
  21.         "registryGroup": "EXECUTOR",
  22.         "registryKey": settings.EXECUTOR_APP_NAME,
  23.         "registryValue": executor_baseurl()
  24.     }
  25.     try:
  26.         headers = {"XXL-JOB-ACCESS-TOKEN": settings.XXL_JOB_ACCESS_TOKEN}
  27.         response = post_simple(settings.XXL_ADMIN_BASEURL + "registry", payload, headers)
  28.         if response.get("code") != 200:
  29.             logger.error("registry error. {}".format(str(response)))
  30.             return False
  31.         return True
  32.     except BaseException as e:
  33.         logger.error("registry error. {}".format(str(e)))
  34. def executor_baseurl() -> str:
  35.     return "http://{host}:{port}".format(host=get_network_ip(), port=settings.EXECUTOR_PORT)


3.2.2 接口调用工具类

  1. def post_simple(url, data, headers):
  2.     times = 0
  3.     while times < settings.REQUEST_RETRY_TIMES:
  4.         try:
  5.             response = requests.post(url, json=data, headers=headers)
  6.             if response.status_code != 200:
  7.                 logger.error("post 请求失败. url:{}, data:{}. response:{}".format(url, data, response.text))
  8.                 raise BusinessError("请求失败")
  9.             else:
  10.                 return response.json()
  11.         except BaseException as e:
  12.             times += 1
  13.             logger.warning(
  14.                 "post 请求连接失败. times:{}, retry after:{}, url:{}. data:{}.error:{}"
  15.                 .format(times, settings.REQUEST_RETRY_INTERVAL, url, str(data), str(e))
  16.             )
  17.             time.sleep(settings.REQUEST_RETRY_INTERVAL)
  18.     logger.error("post 请求连接失败. url:{}, data:{}".format(url, data))
  19.     raise BusinessError("请求连接失败")
  20. def get_network_ip() -> str:
  21.     """获取本机地址,会获取首个网络地址"""
  22.     _, _, ipaddrlist = socket.gethostbyname_ex(socket.gethostname())
  23.     return ipaddrlist[0]

4 定时任务执行

4.1 定时任务的执行流程

XXL-JOB调度中心通过HTTP的方式,调用执行器的任务。具体流程如下:

  • 调度中心将任务调度信息推送给执行器。这些任务调度信息主要包括:任务ID、本次调度日志id、本次调度日志时间、任务参数等。
  • 执行器在收到调度信息后,启动任务的执行。这一过程在执行器的机器上进行。
  • 任务执行完毕后,执行器将执行结果返回给调度中心。这些执行结果主要包括:执行成功或失败、执行日志等。

4.2 触发任务执行

4.2.1 数据爬取系统暴露名称为“run” 的 http 接口

如 http://localhost:9999/run 。 爬取逻辑全部放在 run 接口中。举例如下。

  1. # (1)接口暴露 urls.py 文件
  2. urlpatterns = [
  3.     path('run', spider.run, name='run'),
  4. ]
  5. # (2)run 接口 spider.py 文件
  6. @api_view(['POST'])
  7. def run(request):
  8.     """
  9.     爬虫执行入口
  10.     """
  11.     logger.info("============ run 执行定时任务 start")
  12.     # 入参
  13.     request_data = JSONParser().parse(request)
  14.     executor_handler = request_data['executorHandler']
  15.     executor_params = request_data['executorParams']
  16.     executor_params_dict = json.loads(executor_params)
  17.     # 任务异步执行
  18.     if executor_handler == 'task01':
  19.         submit_task(TaskService01.doXxlJobTask, (executor_params_dict, request_data))
  20.     if executor_handler == 'task02':
  21.         submit_task(TaskService02.doXxlJobTask, (executor_params_dict, request_data))
  22.     logger.info("============ run 执行定时任务 finish")
  23.     return HttpResponse(json.dumps(dict(code=200, msg='成功')))
  24.     
  25. # (3)线程池工具类 thread_pool_util.py 文件
  26. import logging
  27. from multiprocessing.dummy import Pool
  28. # 通用线程池
  29. pool = Pool(10000)
  30. logger = logging.getLogger(__name__)
  31. # 异步执行任务
  32. def submit_task(func, args) -> None:
  33.     try:
  34.         pool.apply_async(func, args)
  35.     except BaseException as e:
  36.         logger.error("submit_task:执行失败. error:{}".format(str(e)))

4.2.2 xxl-job 定时调用上述 “run” 接口

xxl-job 调度中心通过HTTP的方式,调用执行器的任务。

具体来说是指 xxl-job 通过调用执行器的 “run” 接口来执行定时任务。

4.3 任务状态上报

任务执行完成后,数据爬取系统通过调用 xxl-job 的 callback 接口来进行定时任务执行状态上报。

  1. # 任务状态上报. code:200-表示任务执行正常,500-表示失败,100-执行中
  2. def callback(log_id, timestamp, code):
  3.     if log_id is None or timestamp is None:
  4.         return False
  5.     payload = [
  6.         {
  7.             "logId": log_id, # 本次调度日志id
  8.             "logDateTim": timestamp, # 本次调度时间
  9.             "handleCode": code # 任务状态
  10.         }
  11.     ]
  12.     try:
  13.         headers = {"XXL-JOB-ACCESS-TOKEN": settings.XXL_JOB_ACCESS_TOKEN}
  14.         response = post_simple(settings.XXL_ADMIN_BASEURL + "callback", payload, headers)
  15.         if response.get("code") != 200:
  16.             logger.error("callback error.log_id:{}.response{}".format(log_id, str(response)))
  17.             return False
  18.         return True
  19.     except BaseException as e:
  20.         logger.error("callback error. {}".format(str(e)))

5 参考文献

(1)分布式任务调度平台XXL-JOB 官方文档
 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/890133
推荐阅读
相关标签
  

闽ICP备14008679号