当前位置:   article > 正文

源码剖析 - 公众号采集阅读器 Liuli

python run(collect_config: dict)

14e30ff13811bcc87a9ca0573cf01ef0.png

简介

无意中发现 Liuli 这个项目,项目 Github:https://github.com/liuli-io/liuli

看了其文章,发现 Liuli 是 Python 实现的,便打算简单看看其实现细节,老规矩,看项目,先将好奇点写下来:

  • 1.Python 怎么实现宣传文章中那么漂亮的 PC 软件界面的?

  • 2. 如何采集公众号文章?

对,我就对这两点感兴趣,经过一番阅读后,关于好奇 1,其实人家没有实现漂亮的 PC 软件界面,Liuli 只是采集,然后将内容推送过去,所以本文的重点,就是看一下它是怎么采集公众号文章的,此外在阅读过程中,发现 LiuLi 还使用了简单的方法来识别文章是否为广告文章,这点也挺有意思的,也记录一下。

公众号文章采集

Liuli 基于搜狗微信(https://weixin.sogou.com/)对公众号文章进行采集,而且实现了 2 种方式:

  • 使用 playwright 自动化浏览器的方式实现公众号文章采集

  • 使用 ruia 爬虫框架实现公众号文章的采集

我们可以通过相应的配置文件控制 Liuli 使用其中哪种方式来进行文章采集,其默认使用 ruia 的方式进行采集。

Liuli 将功能分为多个模块,然后通过调度器去调度不同的模块,调度器启动方法代码如下:

  1. # src/liuli_schedule.py
  2. def start(ll_config_name: str = ""):
  3.     """调度启动函数
  4.     Args:
  5.         task_config (dict): 调度任务配置
  6.     """
  7.     if not ll_config_name:
  8.         freeze_support()
  9.         # 默认启动 liuli_config 目录下所有配置
  10.         ll_config_name_list = []
  11.         for each_file in os.listdir(Config.LL_CONFIG_DIR):
  12.             if each_file.endswith("json"):
  13.                 # 加入启动列表
  14.                 ll_config_name_list.append(each_file.replace(".json"""))
  15.         # 进程池
  16.         p = Pool(len(ll_config_name_list))
  17.         for each_ll_config_name in ll_config_name_list:
  18.             LOGGER.info(f"Task {each_ll_config_name} register successfully!")
  19.             p.apply_async(run_liuli_schedule, args=(each_ll_config_name,))
  20.         p.close()
  21.         p.join()
  22.     else:
  23.         run_liuli_schedule(ll_config_name)

从代码可知,调度器会启动 Python 的进程池,然后向其中添加 run_liuli_schedule 异步任务,该异步任务中,会执行 run_liuli_task 方法,该方法才是一次完整的任务流程,代码如下:

  1. def run_liuli_task(ll_config: dict):
  2.     """执行调度任务
  3.     Args:
  4.         ll_config (dict): Liuli 任务配置
  5.     """
  6.     # 文章源, 用于基础查询条件
  7.     doc_source: str = ll_config["doc_source"]
  8.     basic_filter = {"basic_filter": {"doc_source": doc_source}}
  9.     # 采集器配置
  10.     collector_conf: dict = ll_config["collector"]
  11.     # 处理器配置
  12.     processor_conf: dict = ll_config["processor"]
  13.     # 分发器配置
  14.     sender_conf: dict = ll_config["sender"]
  15.     sender_conf.update(basic_filter)
  16.     # 备份器配置
  17.     backup_conf: dict = ll_config["backup"]
  18.     backup_conf.update(basic_filter)
  19.     # 采集器执行
  20.     LOGGER.info("采集器开始执行!")
  21.     for collect_type, collect_config in collector_conf.items():
  22.         collect_factory(collect_type, collect_config)
  23.     LOGGER.info("采集器执行完毕!")
  24.     # 采集器执行
  25.     LOGGER.info("处理器(after_collect): 开始执行!")
  26.     for each in processor_conf["after_collect"]:
  27.         func_name = each.pop("func")
  28.         # 注入查询条件
  29.         each.update(basic_filter)
  30.         LOGGER.info(f"处理器(after_collect): {func_name} 正在执行...")
  31.         processor_dict[func_name](**each)
  32.     LOGGER.info("处理器(after_collect): 执行完毕!")
  33.     # 分发器执行
  34.     LOGGER.info("分发器开始执行!")
  35.     send_doc(sender_conf)
  36.     LOGGER.info("分发器执行完毕!")
  37.     # 备份器执行
  38.     LOGGER.info("备份器开始执行!")
  39.     backup_doc(backup_conf)
  40.     LOGGER.info("备份器执行完毕!")

从 run_liuli_task 方法可知,Liuli 一次任务需要执行:

  • 1. 采集公众号文章

  • 2. 处理公众号文章(广告文章标注、文本清理、rss 生成、html 渲染等)

  • 3. 分发到企业微信(wecom)等不同终端上(在企业微信中可以通过微信插件将信息推送到微信)

  • 4. 备份文章到 mongodb 和 github

关于 Liuli 的功能,可以阅读作者本人的文章: 基于 Liuli 构建纯净的 RSS 公众号信息流,这里先只关注公众号采集的逻辑。

因为有 ruia 与 playwright 两种不同方式实现的采集器,具体使用哪种,通过配置文件决定,然后通过 import_module 方法动态导入相应的模块,然后运行模块的 run 方法,从而实现公众号文章的采集,相关代码如下:

  1. def collect_factory(collect_type: str, collect_config: dict) -> bool:
  2.     """
  3.     采集器工厂函数
  4.     :param collect_type: 采集器类型
  5.     :param collect_config: 采集器配置
  6.     :return:
  7.     """
  8.     collect_status = False
  9.     try:
  10.         # import_module方法动态载入具体的采集模块
  11.         collect_module = import_module(f"src.collector.{collect_type}")
  12.         collect_status = collect_module.run(collect_config)
  13.     except ModuleNotFoundError:
  14.         LOGGER.error(f"采集器类型不存在 {collect_type} - {collect_config}")
  15.     except Exception as e:
  16.         LOGGER.error(f"采集器执行出错 {collect_type} - {collect_config} - {e}")
  17.     return collect_status

playwright 采集模块实现

playwright 是微软出品的自动化库,与 selenium 的作用类似,定位于网页测试,但也被人用于网页信息的获取,可见即可得,使用门槛低,因为要加载网页信息,所以性能比较差,当然一些前端反爬的措施,playwright 也无法突破。

playwright 相比于 selenium,支持 python 的 async,性能有所提升(但还是比不了直接请求),这里贴一下获取某公众号下最新文章的部分逻辑(完整代码太长):

  1. async def playwright_main(wechat_name: str):
  2.     """利用 playwright 获取公众号元信息,输出数据格式见上方
  3.     Args:
  4.         wechat_name ([str]): 公众号名称
  5.     """
  6.     wechat_data = {}
  7.     try:
  8.         async with async_playwright() as p:
  9.             # browser = await p.chromium.launch(headless=False)
  10.             browser = await p.chromium.launch()
  11.             context = await browser.new_context(user_agent=Config.SPIDER_UA)
  12.             page = await context.new_page()
  13.             # 进行公众号检索
  14.             await page.goto("https://weixin.sogou.com/")
  15.             await page.wait_for_load_state()
  16.             await page.click('input[name="query"]')
  17.             await page.fill('input[name="query"]', wechat_name)
  18.             await asyncio.sleep(1)
  19.             await page.click("text=搜公众号")
  20.             await page.wait_for_load_state()

从上述代码可知,playwright 用法与 selenium 很像,将用户操作网站的流程自动化便可以获取相应的数据了。

ruia 采集模块实现

ruia 是轻量级的 Python 异步爬虫框架,因为比较轻量,我也将其代码读了一遍,作为下篇文章的内容。

它的用法与 scrapy 有点像,需要定义继承于 ruia.Spider 的子类,然后调用 start 方法实现对目标网站的请求,然后 ruia 会自动调用 parse 方法实现对网页内容的解析,来看一下具体的代码,首先是入口逻辑:

  1. def run(collect_config: dict):
  2.     """微信公众号文章抓取爬虫
  3.     Args:
  4.         collect_config (dict, optional): 采集器配置
  5.     """
  6.     s_nums = 0
  7.     wechat_list = collect_config["wechat_list"]
  8.     delta_time = collect_config.get("delta_time"5)
  9.     for wechat_name in wechat_list:
  10.         SGWechatSpider.wechat_name = wechat_name
  11.         SGWechatSpider.request_config = {
  12.             "RETRIES"3,
  13.             "DELAY": delta_time,
  14.             "TIMEOUT"20,
  15.         }
  16.         sg_url = f"https://weixin.sogou.com/weixin?type=1&query={wechat_name}&ie=utf8&s_from=input&_sug_=n&_sug_type_="
  17.         SGWechatSpider.start_urls = [sg_url]
  18.         try:
  19.             # 启动爬虫
  20.             SGWechatSpider.start(middleware=ua_middleware)
  21.             s_nums += 1
  22.         except Exception as e:
  23.             err_msg = f"
    声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/292683
    推荐阅读
    相关标签