赞
踩
import asyncio import random import socket import traceback from io import BytesIO import requests from PIL import Image from cv2 import cv2 from pyppeteer import launch from fake_useragent import UserAgent import tkinter class XhsCookie(object): def __init__(self, loop: asyncio.AbstractEventLoop) -> None: self._loop = loop self.zoom = 1 self.ip_address = self.get_host_ip() self.width, self.height = self.screen_size() async def create_browser(self): try: params = { 'headless': False, # 无界面模式 'dumpio': True, 'autoClose': False, # 'devtools': True, # 开发者模式 # 'executablePath': 'chromedriver.exe', #指定路径 'userDataDir': './userdata', # 设置用户目录,登录关闭浏览器无需重新登陆(过期除外) 'args': ['--no-sandbox', '--window-size={},{}'.format(self.width, self.height), '--disable-infobars'] } browser = await launch(params) return browser except RuntimeWarning: print('RuntimeWarning') return False def geturl(self): # noinspection PyBroadException try: # TODO 可从数据库获取 urls = [(292805, 'https://www.xiaohongshu.com/discovery/item/5e75bc16000000000100538e'), (292507, 'https://www.xiaohongshu.com/discovery/item/5e746d4b0000000001002052'), (292468, 'https://www.xiaohongshu.com/discovery/item/5e7884d900000000010082f9'), (292186, 'https://www.xiaohongshu.com/discovery/item/5e76c7c70000000001001b48'), (292508, 'https://www.xiaohongshu.com/discovery/item/5e75a4840000000001007a7a'), (292785, 'https://www.xiaohongshu.com/discovery/item/5e7435940000000001000ec9'), (291986, 'https://www.xiaohongshu.com/discovery/item/5e7852b100000000010048fe'), (292526, 'https://www.xiaohongshu.com/discovery/item/5e78fa4d000000000100a087'), (291866, 'https://www.xiaohongshu.com/discovery/item/5e77605d00000000010039ef'), (292905, 'https://www.xiaohongshu.com/discovery/item/5e7823840000000001000d2f')] return urls except Exception: return {} def store_cookie(self, cookie_str, ip_addr, user_agent): """2, 存储Cookie""" # noinspection PyBroadException try: adict = {"cookie": cookie_str, 'ip_addr': ip_addr, 'user_agent': user_agent} # TODO 存储cookie 逻辑 except Exception: print('save cookie error') @staticmethod def get_host_ip(): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] finally: s.close() return ip @staticmethod def screen_size(): """使用tkinter获取屏幕大小""" # noinspection PyBroadException try: tk = tkinter.Tk() width = tk.winfo_screenwidth() height = tk.winfo_screenheight() tk.quit() return width, height except Exception: return 1366, 768 @staticmethod def get_user_agent(): # ua_list = [ # ua.ie, # # ua.chrome, # ua.firefox, # ua.safari, # ua.opera, # # ua.random # ] ua = UserAgent() return ua.chrome async def get_cookies(self, page, browser, urls, user_agent): # noinspection PyBroadException try: for url_data in urls: try: id, url = url_data # url = 'https://www.xiaohongshu.com/web-login/captcha?redirectPath=http%3A%2F%2Fwww.xiaohongshu.com%2Fdiscovery%2Fitem%2F5de9fa6b0000000001008dba' # 禁止加载JS # await page.setJavaScriptEnabled(enabled=False) await page.goto(url) # 等待 await page.waitFor(3000) # 执行JS await page.evaluate( '''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''') # 滚动到底部 await page.evaluate('window.scrollBy(0, window.innerHeight)') # 截屏 # await page.screenshot(path='example.png') title = await page.title() html_page = await page.content() if title == '小红书登录' or title == '滑块验证': res_crack = await self.crack(page, browser) if res_crack: await browser.close() break pass else: if html_page.find('该内容无法展示') == -1 or html_page.find('该笔记已被删除') == -1: print('[{}][{}]'.format(self.ip_address, url)) cookies = await page.cookies() cook_dict = [item["name"] + "=" + item["value"] for item in cookies] cook_str = ';'.join(item for item in cook_dict) if cook_str: # 3, 存储cookie self.store_cookie(cook_str, self.ip_address, user_agent) # 4, 删除所有cookie await page.deleteCookie() else: continue else: sql = 'update analysis_articles_app set status=-3 where id = {}'.format(id) self.analysis_xhshu.do(sql) await asyncio.sleep(30) except RuntimeError: continue else: await browser.close() except Exception: print(traceback.format_exc()) await browser.close() async def get_cookie_run(self): # noinspection PyBroadException while True: try: browser = await self.create_browser() if browser: user_agent = self.get_user_agent() page = await browser.newPage() await page.setViewport({'width': self.width, 'height': self.height}) await page.setUserAgent(user_agent) # print("默认UA", await browser.userAgent()) # 种子URL urls = self.geturl() # 获取cookie await self.get_cookies(page, browser, urls, user_agent) else: raise ValueError('浏览器启动失败') except Exception: print('运行失败') print(traceback.format_exc()) finally: print('运行完毕 10 秒关闭浏览器') await asyncio.sleep(10) await browser.close() async def get_pic(self, page): try: await asyncio.sleep(2) target_link = await page.evaluate( '''() => { var src =document.getElementsByClassName('shumei_captcha_loaded_img_bg')[0].getAttribute('src') return src }''') # await page.querySelector('.shumei_captcha_loaded_img_bg') template_link = await page.evaluate( '''() => { var src =document.getElementsByClassName('shumei_captcha_loaded_img_fg')[0].getAttribute('src') return src }''') await page.querySelector('.shumei_captcha_loaded_img_fg') target_img = Image.open(BytesIO(requests.get(target_link).content)) template_img = Image.open(BytesIO(requests.get(template_link).content)) target_img.save('target.jpg') template_img.save('template.png') local_img = Image.open('target.jpg') size_loc = local_img.size self.zoom = 400.0 / int(size_loc[0]) except ValueError: pass @staticmethod def match(target, template): img_rgb = cv2.imread(target) img_gray = cv2.cvtColor(img_rgb, cv2.COLOR_BGR2GRAY) template = cv2.imread(template, 0) run = 1 w, h = template.shape[::-1] res = cv2.matchTemplate(img_gray, template, cv2.TM_CCOEFF_NORMED) min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res) # 获取最佳匹配结果的坐标 print(u'目标区域起点x坐标为:{}'.format(max_loc)) return max_loc[0] run = 1 # 使用二分法查找阈值的精确值 L = 0 R = 1 while run < 20: run += 1 threshold = (R + L) / 2 print(threshold) if threshold < 0: return None loc = np.where(res >= threshold) if len(loc[1]) > 1: L += (R - L) / 2 elif len(loc[1]) == 1: print(u'目标区域起点x坐标为:%d' % loc[1][0]) break elif len(loc[1]) < 1: R -= (R - L) / 2 return loc[1][0] @staticmethod async def crack_slider(page, distance, zoom): el = await page.querySelector('div.shumei_captcha_slide_btn') box = await el.boundingBox() await page.hover('div.shumei_captcha_slide_btn') await page.waitFor(2 * 1000) await page.mouse.down() # 滑块图片一半大小20 -1 await page.mouse.move((box['x'] + distance * zoom + 20), box['y'], {'steps': 50}) await page.waitFor(500) await page.mouse.up() async def crack(self, page, browser, cnt=0): ''' 破解滑动验证 :param page: :param browser: :param cnt: :return: ''' try: while True: title = await page.title() if title == '小红书登录' or title == '滑块验证': target = 'target.jpg' template = 'template.png' await self.get_pic(page) distance = self.match(target, template) await self.crack_slider(page, distance, self.zoom) await asyncio.sleep(3) cnt += 1 if cnt >= 5: return True else: print('滑动验证码第 {} 次'.format(cnt)) await self.crack(page, browser, cnt) else: return True except ValueError: await self.crack(page, browser, cnt) async def main(): loop = asyncio.get_event_loop() xhs_cookie = XhsCookie(loop) await xhs_cookie.get_cookie_run() if __name__ == "__main__": asyncio.get_event_loop().run_until_complete(main())
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。