赞
踩
安装好 Chrome 浏览器并正确配置了 ChromeDrive
安装好 python 至少为 3.6以上
安装好 Selenium 相关包并能成功用 Selenium 打开 Chrome 浏览器
https://spa2.scrape.center/
点进一部电影观察 URL
https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWIx
可以看到这里的 URL 的 detail 后面跟的是一个长字符串, 看着是由 Base64 编码而得,也就是说,详情页的 URL 中包含加密参数,所以我们无法根据规律构造详情页的 URL
然后依次点击列表页的1-10 页观察 Ajax请求
可以看到,这里接口的参数多了一个 token 字段, 而且每次请求的 token 都不用,这个字段看着同样是由 Base64 编码而得, 更棘手的一点是, API 具有时效性,意味着 把 Ajax 内的 URL 复制下来,短期内可以访问, 但过段时间就访问不了了, 会直接返回 401 状态。
之前我们可以直接用 requests 构造 Ajax 请求, 但现在 Ajax 请求接口带有 token , 而且还是可变的, 我们不知道 token 的生成逻辑, 就没法直接构造 Ajax 请求来爬取数据。 怎么办呢? 先分析出 token 的生成逻辑,再模拟 Ajax 请求,是一个办法, 可这个办法相对比较难。 此时我们可以用 Selenium 绕过这个阶段, 直接获取 JavaScript 最终渲染完成的页面源代码, 再从中提取数据即可
之后我们要完成如下工作
通过 Selenium 遍历列表页,获取每部电影的详情页 URL
通过 Selenium 根据上一步获取的详情页 URL 爬取每部电影的详情页
从详情页中提取每部电影的名称,类别, 分数,简介,封面等内容
先做一系列的初始化工作
from selenium import webdriver from selenium.common.exceptions import TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s- %(levelname)s: %(message)s') INDEX_URL = 'https://spa2.scrape.center/page/{page}' TIME_OUT = 10 TOTAL_PAGE = 10 browser = webdriver.Chrome() wait = WebDriverWait(browser, TIME_OUT)
这里首先导入了一些必要的包,后面我们会用这些包爬取页面和设置延迟等。然后定义了日志配置和几个变量,接着使用 Chrome 类生成了一个 webdriver 对象, 并赋值为 browser 变量,我们可以通过 borwser 调用 Selenium 的一些API 对浏览器进行一些列的操作,最后我们声明了一个 WebDriverWait 对象,利用它可以设置页面加载的最长等待时间
下面我们来观察列表页
可以看到列表页的 URL 还是有一些规律的, 比如第一页的URL
https://spa2.scrape.center/page/1
最后的数字就是页码,所以可以构造每一页的 URL
那么怎么判断一个列表页是否加载成功? 当页面出现了我们想要的内容就代表加载成功了。这里可以用 Selenium 的隐式判断条件, 例如 每部电影的信息区块的 CSS 选择器 #index .item 如图
直接使用 visibility_of_all_elements_located 判断条件加上 CSS 选择器的内容,即可判断页面有没有加载成功,配合 WebDriverWait 的超时配置, 就可以实现 10 秒的页面加载监听。 如果 10秒之内我们的配置条件得到满足,就代表页面加载成功,否则抛出 TimeoutException 异常。
def scrape_page(url, condition, locator): logging.info(('scraping %s', url)) try: browser.get(url) wait.until(condition(locator)) except TimeoutException: logging.error('error occurred while scraping %s', url, exc_info=True) def scrape_index(page): url = INDEX_URL.format(page=page) scrape_page(url, condition=EC.visibility_of_all_elements_located, locator=(By.CSS_SELECTOR, '#index .item'))
这里我们定义了两个方法
第一个是 scrape_page 方法,是一个通用的爬取方法,可以对任意 URL 进行爬取,状态监听以及异常处理,接收 url , condition , locator 三个参数, url 就是要爬取的页面的 URL ,condition 是页面的加载成功的判断条件, 可以是 expected_conditins 中的某一项 如 visibility_of_all_elements_located ,visibility_of_element_localed 等, locator 是定位器, 是一个元组, 通过配置查询条件和参数来获取列表页所有的电影信息节点。另外。我们在爬取过程中添加了超时检测,如果规定时间(这里为 10 秒)还没有加载出对应的节点,就抛出 TimeoutException 异常并输出错误日志
第二个方法 scrape_index 则是爬取列表页的方法, 接收一个参数 page , 通过调用 scrape_page 方法并传入 condition 参数和 locator 参数, 完成队列表页的爬取。这里的 condition 我们传入的是 visibility_of_all_elements_located , 嗲表所有节点加载出来才算成功
注意: 这里爬取页面时不需要返回任何结果,因为执行完 scrape_index 方法后,页面正好处于加载完成状态,利用 browser 对象即可进行进一步的信息提取
现在我们已经可以加载出列表页了, 下一步解析列表页,从中提取详情页的 URL
from urllib.parse import urljoin def parse_index(): elements = browser.find_elements(by=By.CSS_SELECTOR, value='#index .item .name') for element in elements: href = element.get_attribute('href') yield urljoin(INDEX_URL, href) def main(): try: for page in range(1, TOTAL_PAGE + 1): scrape_index(page) detail_urls = parse_index() logging.info('details urls %s', list(detail_urls)) finally: browser.close() main()
2024-07-28 19:30:29,064- INFO: ('scraping %s', 'https://spa2.scrape.center/page/9')
2024-07-28 19:30:29,897- INFO: details urls ['https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI4MQ==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI4Mg==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI4Mw==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI4NA==', ['https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI5MQ==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI5Mg==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI5Mw==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI5NA==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI5NQ==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI5Ng==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI5Nw==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI5OA==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWI5OQ==', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWIxMDA=']
省略部分输出
我们通过 find_elements 方法直接从列表页提取所有电影节点,接着遍历这些节点,通过 get_attribute 方法提取了详情页的 href 属性值, 再用 urljoin 方法合并成一个完整的 URL
最后我们用一个 main 方法 把上面所有的方法串联起来
基于同样的原理,这里可以加一个判断条件,如果电影名称加载出来,就代表详情页加载成功
def scrape_detail(url): scrape_page(url, condition=EC.visibility_of_all_elements_located, locator=(By.TAG_NAME, 'h2'))
这里的判定条件 condition 传入的是 visibility_of_element_located , 即单个元素出现即可, locator 传入的是 (By.TAG_NAME, ‘h2’),即 h2 这个节点,也就是电影名称对应的节点
如果执行了 scrape_detail 方法,没有抛出 TimeoutException 异常, 就表示页面加载成功了
下面定义一个解析详情页的方法来提取我们想要的信息
def parse_detail(): url = browser.current_url name = browser.find_element(by=By.TAG_NAME, value='h2').text categories = [element.text for element in browser.find_elements(by=By.CSS_SELECTOR, value='.categories button span')] cover = browser.find_element(by=By.CSS_SELECTOR, value='.cover').get_attribute('src') score = browser.find_element(by=By.CLASS_NAME, value='score').text drama = browser.find_element(by=By.CSS_SELECTOR, value='.drama p').text return { 'url': url, 'name': name, 'categories': categories, 'cover': cover, 'score': score, 'drama': drama }
这里定义了一个 parse_detail 方法, 提取了详情页的URL 和电影名称,类比,封面,分数和简介等内容,提取细节如下
URL : 直接调用 Browsr 对象的 current_url 属性即可获取当前页的 URL
名称:提取 h2 节点内部的文本即可获取电影名称,这里我们使用 find_element 方法并传入 h2 ,提取到了指定名称对应的节点, 然后调用 text 属性提取了节点内部的文本,即电影名称
类别: 为了方便,这里通过 CSS 选择器提取电影类别, 对应的 CSS 选择器为 .categories button span 。 可以使用 find_elements 方法提取 CSS 选择器对应的多个节点,然后遍历这些节点, 调用节点的 text 属性获取节点内部的文本
封面: 可以使用 CSS 选择器 .cover 直接获取封面对应的节点。 但是封面的 URL 对应的是 src 这个属性, 所以这里使用 get_attribute 方法传入 scr 来获取
分数: 对应的 CSS 选择器为 .score 依然可以用上面的方案来提取分数,但是这里使用了 class_name 这个属性来提取,这个属性可以使用 class 的名称提取节点,能达到同样的效果, 不过这里传入的是 class 名称 score 而不是 .score 了,提取节点后,再调用 text 属性提取节点文本即可
简介: 对应的 CSS 选择器为 .drama p , 直接获取间接对应的节点, 然后调用 text 属性提取文本即可
最后把所有结果构造成一个字典并返回
接下来在 main 方法中添加对这两个方法的调用
def main(): try: for page in range(1, TOTAL_PAGE + 1): scrape_index(page) detail_urls = parse_index() for detail_url in list(detail_urls): logging.info('get detail url %s', detail_url) scrape_detail(detail_url) detail_data = parse_detail() logging.info('detail data %s', detail_data) finally: browser.close() if __name__ == '__main__': main()
输出结果,省略部分
2024-07-29 07:28:11,823- WARNING: Exception managing chrome: error sending request for url (https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json)
2024-07-29 07:28:14,065- INFO: ('scraping %s', 'https://spa2.scrape.center/page/1')
2024-07-29 07:28:16,100- INFO: get detail url https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWIx
2024-07-29 07:28:16,100- INFO: ('scraping %s', 'https://spa2.scrape.center/detail/ZWYzNCN0ZXVxMGJ0dWEjKC01N3cxcTVvNS0takA5OHh5Z2ltbHlmeHMqLSFpLTAtbWIx')
为了方便,这里将数据保存为 JSON 文件
from os import makedirs from os.path import exists import json RESULTS_DIR = 'results' exists(RESULTS_DIR) or makedirs(RESULTS_DIR) def save_data(data): name = data.get('name') data_path = f'{RESULTS_DIR}/{name}.json' json.dump(data, open(data_path, 'w', encoding='utf-8'), ensure_ascii=False, index=1)
如果觉得爬取过程中,弹出的浏览器会造成干扰, 可以开启 Chrome 无头模式,这样不仅解决了干扰,爬取速度也会提升
options = webdriver.ChromeOptions() options.add_argument('--headless') browser = webdriver.Chrome(options=options)
这些要设置在最前面,最后一行替换之前 browser = webdriver.Chrome()
最后在 main 方法中调用 save_data
def main(): try: for page in range(1, TOTAL_PAGE + 1): scrape_index(page) detail_urls = parse_index() for detail_url in list(detail_urls): logging.info('get detail url %s', detail_url) scrape_detail(detail_url) detail_data = parse_detail() logging.info('detail data %s', detail_data) save_data(detail_data) logging.info('save_data %s', detail_data.get('name')) finally: browser.close() if __name__ == '__main__': main()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。