赞
踩
出于工作保密性质,代码中所涉及url皆以***代替
import logging import random import time import requests from lxml import etree from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk from datetime import datetime from queue import Queue import threading es = Elasticsearch(hosts='192.168.126.90', port=9200) # 连接Elasticsearch class Freedom(object): def __init__(self): self.log = self.get_log() self.headers, self.proxies_list, self.data = self.get_headers() self.urlQueue = Queue() self.resQueue = Queue() def get_log(self): logger = logging.getLogger(__name__) #日志 logger.setLevel(level=logging.INFO) #日志级别 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') #日志时间、执行程序路径、日志当前行号、日志级别、日志信息 sh = logging.StreamHandler() # 往屏幕上输出 sh.setFormatter(formatter) # 设置屏幕上显示的格式 today = datetime.now() log_file_path = "./log/form-{}-{}-{}.log".format(today.year, today.month, today.day) handler = logging.FileHandler(log_file_path,encoding='utf-8') #往文件输出 handler.setFormatter(formatter) #设置文件写入格式 logger.addHandler(handler) #把对象加入到logger里 logger.addHandler(sh) return logger def get_headers(self): proxies_list = [ {"http": "192.168.126.110:9008"}, {"http": "192.168.126.107:9398"}, {"http": "192.168.126.106:9398"}, {"http": "192.168.126.105:9398"}, {"http": "192.168.126.108:9398"}, ] data = { 'name': 'qwertyuiopl', 'passwd': 'Qwertyuiopl123' } headers = { 'Host' '**********************.onion' 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Connection': 'keep-alive', 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; rv:60.0) Gecko/20100101 Firefox/60.0', 'Upgrade-Insecure-Requests': '1', } return headers, proxies_list, data def main(self): self.get_url() # 得到队列中url thread_list = [] # 存放所有的线程 for i in range(5): Content = threading.Thread(target=self.getInfo) # 响应线程 thread_list.append(Content) for j in range(3): Parse = threading.Thread(target=self.getParse) # 解析线程 thread_list.append(Parse) for th in thread_list: th.setDaemon(True) # 所有线程开始干活 th.start() self.urlQueue.join() # 回收 self.resQueue.join() # 回收 # 获取url def get_url(self): url_login = 'http://*************************************************' proxies = random.choice(self.proxies_list) # 代理 global session session = requests.session() # session请求 r = session.post(url_login, headers=self.headers, proxies=proxies, data=self.data) # 登录 first_page = etree.HTML(r.text) url_good=first_page.xpath('//div[@class="col-md-2"]/a/@href')[0] #商品目录 res = session.get(url_good,headers=self.headers,proxies=proxies) second_page = etree.HTML(res.text) urls=second_page.xpath('//div[@class="post-item p-1"]/h4/a/@href') #详情链接 for url in urls: print(url) self.urlQueue.put(url) # 遍历所有url添加进队列 while True: time.sleep(0.01) try: next_page = second_page.xpath('//div[@class="d-flex mt-5 justify-content-center"]/ul/li[last()]/a/@href')[0] #翻页 response = session.get(next_page, headers=self.headers, proxies=proxies) # 翻页请求 third_page = etree.HTML(response.text) second_page = third_page urls = third_page.xpath('//div[@class="post-item p-1"]/h4/a/@href') # 详情链接 for url in urls: print('url:',url) self.urlQueue.put(url) # 遍历所有url添加进队列 except: break # 请求资源 def getInfo(self): while True: time.sleep(0.01) try: proxies = random.choice(self.proxies_list) # 代理 url = self.urlQueue.get() response = session.get(url, headers=self.headers, proxies=proxies) body = response.text item = { 'body':body, 'url':url } self.resQueue.put(item) # 将响应数据存到字典item中 self.urlQueue.task_done() # 剔除队列中数据 except: break # 解析对数据作处理 数据持久化到es def getParse(self): while True: try: item = self.resQueue.get() url = item['url'] # 读取字典url的value body = item['body'] # 读取字典body的value index_name = 'deeps' index_type = 'test' actions = [] action = { "_index": index_name, "_type": index_type, # "_id": i, #_id 也可以默认生成,不赋值 "_source": { "url": url, "html": body, "domain_name": '****************.onion/', "language": 'en', "crawl_time": datetime.utcnow(), } } actions.append(action) success, _ = bulk(es, actions, index=index_name, raise_on_error=True) self.resQueue.task_done() # 剔除队列中数据 except: break if __name__ == '__main__': creat = Freedom() creat.main()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。