当前位置:   article > 正文

Python b站视频爬取_b站vip内容爬取

b站vip内容爬取

Python b站视频爬取

    直接上代码:

  1. import requests
  2. import json
  3. from requests.exceptions import RequestException
  4. import concurrent.futures
  5. class Spider():
  6. def __init__(self,bvid):
  7. self.headers = {
  8. 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', # noqa
  9. 'Accept-Charset': 'UTF-8,*;q=0.5',
  10. 'Accept-Encoding': 'gzip,deflate,sdch',
  11. 'Accept-Language': 'en-US,en;q=0.8',
  12. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.74 Safari/537.36 Edg/79.0.309.43',
  13. }
  14. self.bvid=bvid
  15. def get_page(self):
  16. try:
  17. url = f'https://api.bilibili.com/x/player/pagelist?bvid={self.bvid}&jsonp=jsonp'
  18. r = requests.get(url,self.headers)
  19. if r.status_code ==200:
  20. return r.text
  21. else:
  22. print(r.status_code)
  23. except RequestException:
  24. print('请求失败')
  25. return None
  26. def parse_page(self,html):
  27. data = json.loads(html)
  28. results = data.get('data')
  29. url_list = []
  30. name_list = []
  31. for result in results:
  32. cid = result['cid'] #获取视频地址
  33. video_name = result['part'] #视频名称
  34. url = f'https://api.bilibili.com/x/player/playurl?cid={cid}&otype=json&bvid={self.bvid}'
  35. res = requests.get(url,self.headers )
  36. r =res.text
  37. re = json.loads(r).get('data')['durl'][0]
  38. url_list.append(re['url'])
  39. name_list.append(video_name)
  40. return url_list,name_list
  41. def download_video(self,url_list,name_list,i):
  42. data ={
  43. 'referer': f'https://www.bilibili.com/video/{self.bvid}?p={i}',
  44. 'Connection': 'keep-alive',
  45. 'Origin': 'https://www.bilibili.com',
  46. 'Accept': '*/*',
  47. 'Accept-Encoding': 'gzip, deflate, sdch, br',
  48. 'Accept-Language': 'zh-CN,zh;q=0.8'
  49. }
  50. self.headers.update(data)
  51. url = url_list[i-1]
  52. r = requests.get(url, headers=self.headers, stream=True)
  53. print(f'正在下载第{i}个视频,name:{name_list[i-1]}')
  54. with open(f'{name_list[i-1]}.mp4', "wb") as mp4:
  55. for chunk in r.iter_content(chunk_size=1024 * 1024):
  56. if chunk:
  57. mp4.write(chunk)
  58. print(f'第{i}个视频下载完成,name:{name_list[i-1]}')
  59. def run(self):
  60. html = self.get_page()
  61. url_list = self.parse_page(html)
  62. # self.download_video(url_list)
  63. bvid = 'BV1og4y1q7M4'
  64. a = Spider(bvid)
  65. html = a.get_page()
  66. url_list,name_list = a.parse_page(html)
  67. # We can use a with statement to ensure threads are cleaned up promptly,0 max_workers is ThreadNum
  68. with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
  69. # Start the load operations and mark each future with its URL
  70. future_to_url = {executor.submit(a.download_video, url_list, name_list,i): i for i in range(1,len(url_list)+1)}
  71. for future in concurrent.futures.as_completed(future_to_url):
  72. url = future_to_url[future]
  73. try:
  74. data = future.result()
  75. except Exception as exc:
  76. print('%r generated an exception: %s' % (url, exc))
  77. else:
  78. print('success')

 

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号