赞
踩
无意中发现 Liuli 这个项目,项目 Github:https://github.com/liuli-io/liuli
看了其文章,发现 Liuli 是 Python 实现的,便打算简单看看其实现细节,老规矩,看项目,先将好奇点写下来:
1.Python 怎么实现宣传文章中那么漂亮的 PC 软件界面的?
2. 如何采集公众号文章?
对,我就对这两点感兴趣,经过一番阅读后,关于好奇 1,其实人家没有实现漂亮的 PC 软件界面,Liuli 只是采集,然后将内容推送过去,所以本文的重点,就是看一下它是怎么采集公众号文章的,此外在阅读过程中,发现 LiuLi 还使用了简单的方法来识别文章是否为广告文章,这点也挺有意思的,也记录一下。
Liuli 基于搜狗微信(https://weixin.sogou.com/)对公众号文章进行采集,而且实现了 2 种方式:
使用 playwright 自动化浏览器的方式实现公众号文章采集
使用 ruia 爬虫框架实现公众号文章的采集
我们可以通过相应的配置文件控制 Liuli 使用其中哪种方式来进行文章采集,其默认使用 ruia 的方式进行采集。
Liuli 将功能分为多个模块,然后通过调度器去调度不同的模块,调度器启动方法代码如下:
- # src/liuli_schedule.py
-
- def start(ll_config_name: str = ""):
- """调度启动函数
- Args:
- task_config (dict): 调度任务配置
- """
- if not ll_config_name:
- freeze_support()
-
- # 默认启动 liuli_config 目录下所有配置
- ll_config_name_list = []
- for each_file in os.listdir(Config.LL_CONFIG_DIR):
- if each_file.endswith("json"):
- # 加入启动列表
- ll_config_name_list.append(each_file.replace(".json", ""))
- # 进程池
- p = Pool(len(ll_config_name_list))
- for each_ll_config_name in ll_config_name_list:
- LOGGER.info(f"Task {each_ll_config_name} register successfully!")
- p.apply_async(run_liuli_schedule, args=(each_ll_config_name,))
- p.close()
- p.join()
-
- else:
- run_liuli_schedule(ll_config_name)
从代码可知,调度器会启动 Python 的进程池,然后向其中添加 run_liuli_schedule 异步任务,该异步任务中,会执行 run_liuli_task 方法,该方法才是一次完整的任务流程,代码如下:
- def run_liuli_task(ll_config: dict):
- """执行调度任务
- Args:
- ll_config (dict): Liuli 任务配置
- """
- # 文章源, 用于基础查询条件
- doc_source: str = ll_config["doc_source"]
- basic_filter = {"basic_filter": {"doc_source": doc_source}}
- # 采集器配置
- collector_conf: dict = ll_config["collector"]
- # 处理器配置
- processor_conf: dict = ll_config["processor"]
- # 分发器配置
- sender_conf: dict = ll_config["sender"]
- sender_conf.update(basic_filter)
- # 备份器配置
- backup_conf: dict = ll_config["backup"]
- backup_conf.update(basic_filter)
-
- # 采集器执行
- LOGGER.info("采集器开始执行!")
- for collect_type, collect_config in collector_conf.items():
- collect_factory(collect_type, collect_config)
- LOGGER.info("采集器执行完毕!")
- # 采集器执行
- LOGGER.info("处理器(after_collect): 开始执行!")
- for each in processor_conf["after_collect"]:
- func_name = each.pop("func")
- # 注入查询条件
- each.update(basic_filter)
- LOGGER.info(f"处理器(after_collect): {func_name} 正在执行...")
- processor_dict[func_name](**each)
- LOGGER.info("处理器(after_collect): 执行完毕!")
- # 分发器执行
- LOGGER.info("分发器开始执行!")
- send_doc(sender_conf)
- LOGGER.info("分发器执行完毕!")
- # 备份器执行
- LOGGER.info("备份器开始执行!")
- backup_doc(backup_conf)
- LOGGER.info("备份器执行完毕!")
从 run_liuli_task 方法可知,Liuli 一次任务需要执行:
1. 采集公众号文章
2. 处理公众号文章(广告文章标注、文本清理、rss 生成、html 渲染等)
3. 分发到企业微信(wecom)等不同终端上(在企业微信中可以通过微信插件将信息推送到微信)
4. 备份文章到 mongodb 和 github
关于 Liuli 的功能,可以阅读作者本人的文章: 基于 Liuli 构建纯净的 RSS 公众号信息流,这里先只关注公众号采集的逻辑。
因为有 ruia 与 playwright 两种不同方式实现的采集器,具体使用哪种,通过配置文件决定,然后通过 import_module 方法动态导入相应的模块,然后运行模块的 run 方法,从而实现公众号文章的采集,相关代码如下:
- def collect_factory(collect_type: str, collect_config: dict) -> bool:
- """
- 采集器工厂函数
- :param collect_type: 采集器类型
- :param collect_config: 采集器配置
- :return:
- """
- collect_status = False
- try:
- # import_module方法动态载入具体的采集模块
- collect_module = import_module(f"src.collector.{collect_type}")
- collect_status = collect_module.run(collect_config)
- except ModuleNotFoundError:
- LOGGER.error(f"采集器类型不存在 {collect_type} - {collect_config}")
- except Exception as e:
- LOGGER.error(f"采集器执行出错 {collect_type} - {collect_config} - {e}")
- return collect_status
playwright 是微软出品的自动化库,与 selenium 的作用类似,定位于网页测试,但也被人用于网页信息的获取,可见即可得,使用门槛低,因为要加载网页信息,所以性能比较差,当然一些前端反爬的措施,playwright 也无法突破。
playwright 相比于 selenium,支持 python 的 async,性能有所提升(但还是比不了直接请求),这里贴一下获取某公众号下最新文章的部分逻辑(完整代码太长):
- async def playwright_main(wechat_name: str):
- """利用 playwright 获取公众号元信息,输出数据格式见上方
- Args:
- wechat_name ([str]): 公众号名称
- """
- wechat_data = {}
- try:
- async with async_playwright() as p:
- # browser = await p.chromium.launch(headless=False)
- browser = await p.chromium.launch()
- context = await browser.new_context(user_agent=Config.SPIDER_UA)
- page = await context.new_page()
- # 进行公众号检索
- await page.goto("https://weixin.sogou.com/")
- await page.wait_for_load_state()
- await page.click('input[name="query"]')
- await page.fill('input[name="query"]', wechat_name)
- await asyncio.sleep(1)
- await page.click("text=搜公众号")
- await page.wait_for_load_state()
从上述代码可知,playwright 用法与 selenium 很像,将用户操作网站的流程自动化便可以获取相应的数据了。
ruia 是轻量级的 Python 异步爬虫框架,因为比较轻量,我也将其代码读了一遍,作为下篇文章的内容。
它的用法与 scrapy 有点像,需要定义继承于 ruia.Spider 的子类,然后调用 start 方法实现对目标网站的请求,然后 ruia 会自动调用 parse 方法实现对网页内容的解析,来看一下具体的代码,首先是入口逻辑:
- def run(collect_config: dict):
- """微信公众号文章抓取爬虫
- Args:
- collect_config (dict, optional): 采集器配置
- """
- s_nums = 0
- wechat_list = collect_config["wechat_list"]
- delta_time = collect_config.get("delta_time", 5)
- for wechat_name in wechat_list:
- SGWechatSpider.wechat_name = wechat_name
- SGWechatSpider.request_config = {
- "RETRIES": 3,
- "DELAY": delta_time,
- "TIMEOUT": 20,
- }
- sg_url = f"https://weixin.sogou.com/weixin?type=1&query={wechat_name}&ie=utf8&s_from=input&_sug_=n&_sug_type_="
- SGWechatSpider.start_urls = [sg_url]
- try:
- # 启动爬虫
- SGWechatSpider.start(middleware=ua_middleware)
- s_nums += 1
- except Exception as e:
- err_msg = f"声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/292683推荐阅读
相关标签
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。