赞
踩
- import requests
- import json
- from requests.exceptions import RequestException
- import concurrent.futures
-
-
- class Spider():
- def __init__(self,bvid):
- self.headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', # noqa
- 'Accept-Charset': 'UTF-8,*;q=0.5',
- 'Accept-Encoding': 'gzip,deflate,sdch',
- 'Accept-Language': 'en-US,en;q=0.8',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.74 Safari/537.36 Edg/79.0.309.43',
- }
-
- self.bvid=bvid
-
-
- def get_page(self):
- try:
-
- url = f'https://api.bilibili.com/x/player/pagelist?bvid={self.bvid}&jsonp=jsonp'
-
- r = requests.get(url,self.headers)
-
- if r.status_code ==200:
- return r.text
- else:
- print(r.status_code)
-
- except RequestException:
- print('请求失败')
- return None
-
-
- def parse_page(self,html):
- data = json.loads(html)
- results = data.get('data')
- url_list = []
- name_list = []
- for result in results:
- cid = result['cid'] #获取视频地址
- video_name = result['part'] #视频名称
- url = f'https://api.bilibili.com/x/player/playurl?cid={cid}&otype=json&bvid={self.bvid}'
- res = requests.get(url,self.headers )
- r =res.text
- re = json.loads(r).get('data')['durl'][0]
-
- url_list.append(re['url'])
- name_list.append(video_name)
-
-
- return url_list,name_list
-
-
- def download_video(self,url_list,name_list,i):
-
-
- data ={
- 'referer': f'https://www.bilibili.com/video/{self.bvid}?p={i}',
- 'Connection': 'keep-alive',
- 'Origin': 'https://www.bilibili.com',
- 'Accept': '*/*',
- 'Accept-Encoding': 'gzip, deflate, sdch, br',
- 'Accept-Language': 'zh-CN,zh;q=0.8'
- }
-
- self.headers.update(data)
- url = url_list[i-1]
-
- r = requests.get(url, headers=self.headers, stream=True)
- print(f'正在下载第{i}个视频,name:{name_list[i-1]}')
- with open(f'{name_list[i-1]}.mp4', "wb") as mp4:
- for chunk in r.iter_content(chunk_size=1024 * 1024):
- if chunk:
- mp4.write(chunk)
- print(f'第{i}个视频下载完成,name:{name_list[i-1]}')
-
-
- def run(self):
- html = self.get_page()
- url_list = self.parse_page(html)
- # self.download_video(url_list)
-
-
-
-
-
- bvid = 'BV1og4y1q7M4'
-
- a = Spider(bvid)
- html = a.get_page()
- url_list,name_list = a.parse_page(html)
-
- # We can use a with statement to ensure threads are cleaned up promptly,0 max_workers is ThreadNum
- with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
- # Start the load operations and mark each future with its URL
- future_to_url = {executor.submit(a.download_video, url_list, name_list,i): i for i in range(1,len(url_list)+1)}
- for future in concurrent.futures.as_completed(future_to_url):
- url = future_to_url[future]
- try:
- data = future.result()
- except Exception as exc:
- print('%r generated an exception: %s' % (url, exc))
- else:
- print('success')
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。