当前位置:   article > 正文

python爬虫实战——抖音_抖音爬虫

抖音爬虫

目录

1、分析主页作品列表标签结构

2、进入作品页前 判断作品是视频作品还是图文作品

3、进入视频作品页面,获取视频

4、进入图文作品页面,获取图片

5、完整参考代码

6、获取全部作品的一种方法


        本文主要使用 selenium.webdriver(Firefox)、BeautifulSoup等相关库,在 centos 系统中,以无登录状态 进行网页爬取练习。仅做学习和交流使用。

安装和配置 driver 参考:

[1]: Linux无图形界面环境使用Python+Selenium最佳实践 - 知乎

[2]: 错误'chromedriver' executable needs to be in PATH如何解 - 知乎

1、分析主页作品列表标签结构

  1. # webdriver 初始化
  2. driver = webdriver.Firefox(options=firefox_options)
  3. # 设置页面加载的超时时间为6秒
  4. driver.set_page_load_timeout(6)
  5. # 访问目标博主页面
  6. # 如 https://www.douyin.com/user/MS4wLjABAAAAnq8nmb35fUqerHx54jlTx76AEkfq-sMD3cj7QdgsOiM
  7. driver.get(target)
  8. # 分别等待 class='e6wsjNLL' 和 class='niBfRBgX' 的元素加载完毕再继续执行
  9. #(等ul.e6wsjNLL加载完就行了)
  10. # WebDriverWait(driver, 6) 设置最长的等待事间为 6 秒
  11. WebDriverWait(driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'e6wsjNLL')))
  12. WebDriverWait(driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'niBfRBgX')))
  13. # 在浏览器中执行脚本,滚动页面到最底部,有可能会显示更多的作品
  14. driver.execute_script('document.querySelector(".wcHSRAj6").scrollIntoView()')
  15. sleep(1)
  16. # 使用 beautifulsoup 解析页面源代码
  17. html = BeautifulSoup(driver.page_source, 'lxml')
  18. # 关闭driver
  19. driver.quit()
  20. # 获取作品列表
  21. ul = html.find(class_='e6wsjNLL')
  22. # 获取每一个作品
  23. lis = ul.findAll(class_='niBfRBgX')

2、进入作品页前 判断作品是视频作品还是图文作品

  1. element_a = li.find('a')
  2. # a 标签下如果能找到 class = 'TQTCdYql' 的元素,
  3. # 则表示该作品是图文,如果没有(则为None),则表示该作品是视频
  4. is_pictures = element_a.find(class_='TQTCdYql')
  5. if (not is_pictures) or (not is_pictures.svg):
  6. # 视频作品
  7. pass
  8. else:
  9. # 图文作品
  10. pass

3、进入视频作品页面,获取视频

  1. # 拼接作品地址
  2. href = f'https://www.douyin.com{element_a["href"]}'
  3. # 使用 webdriver 访问作品页面
  4. temp_driver = webdriver.Firefox(options=firefox_options)
  5. temp_driver.set_page_load_timeout(6)
  6. temp_driver.get(href)
  7. # 等待 class='D8UdT9V8' 的元素显示后再执行(该元素的内容是作品的发布日期)
  8. WebDriverWait(temp_driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'D8UdT9V8')))
  9. html_v = BeautifulSoup(temp_driver.page_source, 'lxml')
  10. temp_driver.quit()
  11. # 获取该作品的发布时间
  12. publish_time = html_v.find(class_='D8UdT9V8').string[5:]
  13. video = html_v.find(class_='xg-video-container').video
  14. source = video.find('source')
  15. # 为该作品创建文件夹(一个作品一个文件夹)
  16. # 以该作品的 发布时间 加 作品类型来命名文件夹
  17. path = create_dir(f'{publish_time}_video')
  18. # 下载作品
  19. download_works(path, f'{get_current_time()}.mp4', f'https:{source["src"]}')

4、进入图文作品页面,获取图片

  1. # 拼接作品页地址
  2. href = f'https:{element_a["href"]}'
  3. # 使用webdriver访问作品页面
  4. temp_driver = webdriver.Firefox(options=firefox_options)
  5. temp_driver.set_page_load_timeout(6)
  6. temp_driver.get(href)
  7. # 等待包含作品发表时间的标签加载完成
  8. WebDriverWait(temp_driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'YWeXsAGK')))
  9. # 获取当前页面的源代码,关闭webdriver,交给beautifulsoup来处理
  10. # (剩下的任务 继续使用webdriver也能完成)
  11. html_p = BeautifulSoup(temp_driver.page_source, 'lxml')
  12. temp_driver.quit()
  13. # 获取该作品的发布时间
  14. publish_time = f'{html_p.find(class_="YWeXsAGK")}'[-23:-7]
  15. # 图片列表
  16. img_ul = html_p.find(class_='KiGtXxLr')
  17. imgs = img_ul.findAll('img')
  18. # 为该作品创建文件夹,以作品的发布时间+作品类型+图片数量(如果是图片类型作品)
  19. path = create_dir(f'{publish_time}_pictures_{len(imgs)}')
  20. # 遍历图片,获取url 然后下载
  21. for img in imgs:
  22. download_works(path, f'{get_current_time()}.webp', f'{img["src"]}')

5、完整参考代码

  1. # -*- coding: utf-8 -*-
  2. import threading,requests,os,zipfile
  3. from selenium.webdriver.common.by import By
  4. from selenium.common.exceptions import NoSuchElementException
  5. from selenium.webdriver.support import expected_conditions as EC
  6. from selenium.webdriver.support.wait import WebDriverWait
  7. from datetime import datetime
  8. from selenium import webdriver
  9. from selenium.webdriver.firefox.options import Options
  10. from pyvirtualdisplay import Display
  11. from time import sleep
  12. from bs4 import BeautifulSoup
  13. from selenium.common.exceptions import WebDriverException
  14. display = Display(visible=0, size=(1980, 1440))
  15. display.start()
  16. firefox_options = Options()
  17. firefox_options.headless = True
  18. firefox_options.binary_location = '/home/lighthouse/firefox/firefox'
  19. # 获取当前时间
  20. def get_current_time():
  21. now = datetime.now()
  22. format_time = now.strftime("_%Y-%m-%d__%H-%M-%S-%f__")
  23. return format_time
  24. # 设置一个根路径,作品文件以及日志文件都保留在此
  25. ABS_PATH = f'/home/resources/{get_current_time()}'
  26. # 创建目录,dir_name 是作品的发布时间,格式为:2024-02-26 16:59,需要进行处理
  27. def create_dir(dir_name):
  28. dir_name = dir_name.replace(' ', '-').replace(':', '-')
  29. path = f'{ABS_PATH}/{dir_name}'
  30. try:
  31. os.makedirs(path)
  32. except FileExistsError:
  33. print(f'试图创建已存在的文件, 失败({path})')
  34. else:
  35. print(f'创建目录成功 {path}')
  36. finally:
  37. return path
  38. # 下载 目录名称,当前文件的命名,下载的地址
  39. def download_works(dir_name, work_name, src):
  40. response = requests.get(src, stream=True)
  41. if response.status_code == 200:
  42. with open(f'{dir_name}/{work_name}', mode='wb') as f:
  43. for chunk in response.iter_content(1024):
  44. f.write(chunk)
  45. # 判断作品是否已经下载过
  46. def test_work_exist(dir_name):
  47. dir_name = dir_name.replace(' ', '-').replace(':', '-')
  48. path = f'{ABS_PATH}/{dir_name}'
  49. if os.path.exists(path) and os.path.isdir(path):
  50. if os.listdir(path):
  51. return True
  52. return False
  53. def get_all_works(target):
  54. try:
  55. driver = webdriver.Firefox(options=firefox_options)
  56. driver.set_page_load_timeout(6)
  57. # 目标博主页面
  58. driver.get(target)
  59. WebDriverWait(driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'e6wsjNLL')))
  60. WebDriverWait(driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'niBfRBgX')))
  61. driver.execute_script('document.querySelector(".wcHSRAj6").scrollIntoView()')
  62. sleep(1)
  63. html = BeautifulSoup(driver.page_source, 'lxml')
  64. driver.quit()
  65. # 作品列表
  66. ul = html.find(class_='e6wsjNLL')
  67. # 每一个作品
  68. lis = ul.findAll(class_='niBfRBgX')
  69. for li in lis:
  70. element_a = li.find('a')
  71. is_pictures = element_a.find(class_='TQTCdYql')
  72. if (not is_pictures) or (not is_pictures.svg):
  73. href = f'https://www.douyin.com{element_a["href"]}'
  74. temp_driver = webdriver.Firefox(options=firefox_options)
  75. temp_driver.set_page_load_timeout(6)
  76. temp_driver.get(href)
  77. WebDriverWait(temp_driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'D8UdT9V8')))
  78. # 不是必须,剩余内容webdriver也能胜任
  79. html_v = BeautifulSoup(temp_driver.page_source, 'lxml')
  80. temp_driver.quit()
  81. # 获取该作品的发布时间
  82. publish_time = html_v.find(class_='D8UdT9V8').string[5:]
  83. # if test_work_exist(f'{publish_time}_video'):
  84. # continue
  85. video = html_v.find(class_='xg-video-container').video
  86. source = video.find('source')
  87. # 为该作品创建文件夹
  88. path = create_dir(f'{publish_time}_video')
  89. # 下载作品
  90. download_works(path, f'{get_current_time()}.mp4', f'https:{source["src"]}')
  91. else:
  92. href = f'https:{element_a["href"]}'
  93. temp_driver = webdriver.Firefox(options=firefox_options)
  94. temp_driver.set_page_load_timeout(6)
  95. temp_driver.get(href)
  96. WebDriverWait(temp_driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'YWeXsAGK')))
  97. # 使用 beautifulsoup 不是必须
  98. html_p = BeautifulSoup(temp_driver.page_source, 'lxml')
  99. temp_driver.quit()
  100. publish_time = f'{html_p.find(class_="YWeXsAGK")}'[-23:-7]
  101. # 图片列表
  102. img_ul = html_p.find(class_='KiGtXxLr')
  103. imgs = img_ul.findAll('img')
  104. # if test_work_exist(f'{publish_time}_pictures_{len(imgs)}'):
  105. # continue
  106. path = create_dir(f'{publish_time}_pictures_{len(imgs)}')
  107. for img in imgs:
  108. download_works(path, f'{get_current_time()}.webp', f'{img["src"]}')
  109. display.stop()
  110. print('##### finish #####')
  111. except WebDriverException as e:
  112. print(f"捕获到 WebDriverException: {e}")
  113. except Exception as err:
  114. print("捕获到其他错误 get_all_works 末尾")
  115. print(err)
  116. finally:
  117. driver.quit()
  118. display.stop()
  119. # 将目录进行压缩
  120. def zipdir(path, ziph):
  121. # ziph 是 zipfile.ZipFile 对象
  122. for root, dirs, files in os.walk(path):
  123. for file in files:
  124. ziph.write(os.path.join(root, file),
  125. os.path.relpath(os.path.join(root, file), os.path.join(path, '..')))
  126. def dy_download_all(target_url):
  127. get_all_works(target_url)
  128. directory_to_zip = ABS_PATH # 目录路径
  129. output_filename = f'{ABS_PATH}.zip' # 输出ZIP文件的名称
  130. with zipfile.ZipFile(output_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
  131. zipdir(directory_to_zip, zipf)
  132. return f'{ABS_PATH}.zip' # 返回下载地址
  133. if __name__ == '__main__':
  134. # 简单测试
  135. url = input('请输入博主主页url:')
  136. path = dy_download_all(url)
  137. print('下载完成')
  138. print(f'地址:{path}')

测试结果:

6、获取全部作品的一种方法

        上述的操作是在无登录状态下进行的,即使在webdriver中操作让页面滚动,也只能获取到有限的作品,大约是 20 项左右。对此提出一个解决方案。

         以登录状态(或者有cookies本地存储等状态)访问目标博主页面,滚动到作品最底部,然后在控制台中执行JavaScript脚本,获取全部作品的信息(在这里是作品链接以及作品类型),然后写出到文本文件中。

JavaScript 代码: 

  1. let ul = document.querySelector('.e6wsjNLL');
  2. // 存放结果
  3. works_list = [];
  4. // 遍历,每一次都加入一个对象,包括作品页的地址、作品是否为图片作品
  5. ul.childNodes.forEach((e)=>{
  6. let href = e.querySelector('a').href;
  7. let is_pictures = e.querySelector('a').querySelector('.TQTCdYql') ? true : false;
  8. works_list.push({href, is_pictures})
  9. })
  10. // 创建一个Blob对象,包含要写入文件的内容
  11. var content = JSON.stringify(works_list);
  12. var blob = new Blob([content], {type: "text/plain;charset=utf-8"});
  13. // 创建一个链接元素
  14. var link = document.createElement("a");
  15. // 设置链接的href属性为Blob对象的URL
  16. link.href = URL.createObjectURL(blob);
  17. // 设置链接的下载属性,指定下载文件的名称
  18. link.download = "example.txt";
  19. // 触发链接的点击事件,开始下载文件
  20. link.click();

写出结果: 

        列表中的每一个元素都是一个对象,href 是作品的地址,is_pictures 以 boolean 值表示该作品是否为图片作品

        

然后在 python 中读入该文件,使用 json 解析,转成字典列表的形式,遍历列表,对每一个字典(就是每一个作品)进行处理即可。

示例代码:

         win 环境下

  1. import json
  2. import threading,requests,os
  3. from bs4 import BeautifulSoup
  4. from seleniumwire import webdriver
  5. from selenium.webdriver.common.by import By
  6. from selenium.common.exceptions import NoSuchElementException
  7. from datetime import datetime
  8. from selenium.webdriver.support import expected_conditions as EC
  9. from selenium.webdriver.support.wait import WebDriverWait
  10. # 获取当前时间
  11. def get_current_time():
  12. now = datetime.now()
  13. format_time = now.strftime("_%Y-%m-%d__%H-%M-%S-%f__")
  14. return format_time
  15. # 设置一个根路径,作品文件以及日志文件都保留在此
  16. ABS_PATH = f'F:\\{get_current_time()}'
  17. # 创建目录,dir_name 是作品的发布时间,格式为:2024-02-26 16:59,需要进行处理
  18. def create_dir(dir_name):
  19. dir_name = dir_name.replace(' ', '-').replace(':', '-')
  20. path = f'{ABS_PATH}/{dir_name}'
  21. try:
  22. os.makedirs(path)
  23. except FileExistsError:
  24. print(f'试图创建已存在的文件, 失败({path})')
  25. else:
  26. print(f'创建目录成功 {path}')
  27. finally:
  28. return path
  29. # 下载 目录名称,当前文件的命名,下载的地址
  30. def download_works(dir_name, work_name, src):
  31. response = requests.get(src, stream=True)
  32. if response.status_code == 200:
  33. with open(f'{dir_name}/{work_name}', mode='wb') as f:
  34. for chunk in response.iter_content(1024):
  35. f.write(chunk)
  36. # 判断作品是否已经下载过
  37. def test_work_exist(dir_name):
  38. dir_name = dir_name.replace(' ', '-').replace(':', '-')
  39. path = f'{ABS_PATH}/{dir_name}'
  40. if os.path.exists(path) and os.path.isdir(path):
  41. if os.listdir(path):
  42. return True
  43. return False
  44. # 下载一个作品
  45. def thread_task(ul):
  46. for item in ul:
  47. href = item['href']
  48. is_pictures = item['is_pictures']
  49. if is_pictures:
  50. temp_driver = webdriver.Chrome()
  51. temp_driver.set_page_load_timeout(6)
  52. temp_driver.get(href)
  53. WebDriverWait(temp_driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'YWeXsAGK')))
  54. # 使用 beautifulsoup 不是必须
  55. html_p = BeautifulSoup(temp_driver.page_source, 'lxml')
  56. temp_driver.quit()
  57. publish_time = f'{html_p.find(class_="YWeXsAGK")}'[-23:-7]
  58. # 图片列表
  59. img_ul = html_p.find(class_='KiGtXxLr')
  60. imgs = img_ul.findAll('img')
  61. # if test_work_exist(f'{publish_time}_pictures_{len(imgs)}'):
  62. # continue
  63. path = create_dir(f'{publish_time}_pictures_{len(imgs)}')
  64. for img in imgs:
  65. download_works(path, f'{get_current_time()}.webp', f'{img["src"]}')
  66. else:
  67. temp_driver = webdriver.Chrome()
  68. temp_driver.set_page_load_timeout(6)
  69. temp_driver.get(href)
  70. WebDriverWait(temp_driver, 6).until(EC.presence_of_element_located((By.CLASS_NAME, 'D8UdT9V8')))
  71. # 不是必须,剩余内容webdriver也能胜任
  72. html_v = BeautifulSoup(temp_driver.page_source, 'lxml')
  73. temp_driver.quit()
  74. # 获取该作品的发布时间
  75. publish_time = html_v.find(class_='D8UdT9V8').string[5:]
  76. # if test_work_exist(f'{publish_time}_video'):
  77. # continue
  78. video = html_v.find(class_='xg-video-container').video
  79. source = video.find('source')
  80. # 为该作品创建文件夹
  81. path = create_dir(f'{publish_time}_video')
  82. # 下载作品
  83. download_works(path, f'{get_current_time()}.mp4', f'https:{source["src"]}')
  84. if __name__ == '__main__':
  85. content = ''
  86. # 外部读入作品链接文件
  87. with open('../abc.txt', mode='r', encoding='utf-8') as f:
  88. content = json.load(f)
  89. length = len(content)
  90. if length <= 3 :
  91. thread_task(content)
  92. else:
  93. # 分三个线程
  94. ul = [content[0: int(length / 3) + 1], content[int(length / 3) + 1: int(length / 3) * 2 + 1],
  95. content[int(length / 3) * 2 + 1: length]]
  96. for child_ul in ul:
  97. thread = threading.Thread(target=thread_task, args=(child_ul,))
  98. thread.start()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/448167
推荐阅读
相关标签
  

闽ICP备14008679号