赞
踩
单线程爬虫的问题
- 因为爬虫多为IO密集型的程序,而IO处理速度并不是很快,因此速度不会太快
- 如果IO卡顿,直接影响速度
解决方案
考虑使用多线程、多进程
原理
爬虫使用多线程来处理网络请求,使用线程来处理URL队列中的 url,然后将url返回的结果保存在另一个队列中,其它线程在读取这 个队列中的数据,然后写到文件中
主要组成部分
URL队列和结果队列
将将要爬去的url放在一个队列中,这里使用标准库Queue。访问 url后的结果保存在结果队列中
初始化一个URL队列
- from queue import Queue
- urls_queue = Queue()
- out_queue = Queue()
类包装
使用多个线程,不停的取URL队列中的url,并进行处理:
- import threading
- class ThreadCrawl(threading.Thread):
- def __init__(self, queue, out_queue):
- threading.Thread.__init__(self)
- self.queue = queue
- self.out_queue = out_queue
- def run(self):
- while True:
- item = self.queue.get()
如果队列为空,线程就会被阻塞,直到队列不为空。处理队列中的 一条数据后,就需要通知队列已经处理完该条数据。
函数包装
- from threading import Thread
- def func(args)
- pass
- if __name__ == '__main__':
- info_html = Queue()
- t1 = Thread(target=func,args=
- (info_html,))
线程池
- # 简单往队列中传输线程数
- import threading
- import time
- import queue
- class Threadingpool():
- def __init__(self,max_num = 10):
- self.queue = queue.Queue(max_num)
- for i in range(max_num):
- self.queue.put(threading.Thread)
- def getthreading(self):
- return self.queue.get()
- def addthreading(self):
- self.queue.put(threading.Thread)
- def func(p,i):
- time.sleep(1)
- print(i)
- p.addthreading()
- if __name__ == "__main__":
- p = Threadingpool()
- for i in range(20):
- thread = p.getthreading()
- t = thread(target = func, args = (p,i))
- t.start()
Queue模块中的常用方法
Python的Queue模块中提供了同步的、线程安全的队列类,包括 FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue, 和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多 线程中直接使用。可以使用队列来实现线程间的同步。
multiprocessing是python的多进程管理包,和threading.Thread 类似
multiprocessing模块
multiprocessing模块可以让程序员在给定的机器上充分的利用CPU 在multiprocessing中,通过创建Process对象生成进程,然后调用 它的start()方法。
- from multiprocessing import Process
- def func(name):
- print('hello', name)
- if __name__ == "__main__":
- p = Process(target=func,args=('xxx',))
- p.start()
- p.join() # 等待进程执行完毕
Manager类,实现数据共享
在使用并发设计的时候最好尽可能的避免共享数据,尤其是在使用 多进程的时候。 如果你真有需要 要共享数据,可以使用由 Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持
- from multiprocessing import
- Process,Manager,Lock
- def print_num(info_queue,l,lo):
- with lo:
- for n in l:
- info_queue.put(n)
- def updata_num(info_queue,lo):
- with lo:
- while not info_queue.empty():
- print(info_queue.get())
- if __name__ == '__main__':
- manager = Manager()
- into_html = manager.Queue()
- lock = Lock()
- a = [1, 2, 3, 4, 5]
- b = [11, 12, 13, 14, 15]
- p1 = Process(target=print_num,args= (into_html,a,lock))
- p1.start()
- p2 = Process(target=print_num,args= (into_html,b,lock))
- p2.start()
- p3 = Process(target=updata_num,args= (into_html,lock))
- p3.start()
- p1.join()
- p2.join()
- p3.join()
- from multiprocessing import Process
- from multiprocessing import Manager
- import time
- from fake_useragent import UserAgent
- import requests
- from time import sleep
- def spider(url_queue):
- while not url_queue.empty():
- try:
- url = url_queue.get(timeout = 1)
- # headers = {'UserAgent':UserAgent().chrome}
- print(url)
- # resp = requests.get(url,headers = headers)
- # 处理响应结果
- # for d in resp.json().get('data'):
- # print(f'tid:{d.get("tid")} topic:{d.get("topicName")} content:{d.get("content")}')
- sleep(1)
- # if resp.status_code == 200:
- # print(f'成功获取第{i}页数据')
- except Exception as e:
- print(e)
- if __name__ == '__main__':
- url_queue = Manager().Queue()
- for i in range(1,11):
- url =
- f'https://www.hupu.com/home/v1/news?pageNo={i}&pageSize=50'
- url_queue.put(url)
- all_process = []
- for i in range(3):
- p1 = Process(target=spider,args= (url_queue,))
- p1.start()
- all_process.append(p1)
- [p.join() for p in all_process]
进程池的使用
- from multiprocessing import Pool,Manager
- def print_num(info_queue,l):
- for n in l:
- info_queue.put(n)
- def updata_num(info_queue):
- while not info_queue.empty():
- print(info_queue.get())
- if __name__ == '__main__':
- html_queue =Manager().Queue()
- a=[11,12,13,14,15]
- b=[1,2,3,4,5]
- pool = Pool(3)
- pool.apply_async(func=print_num,args=(html_queue,a))
- pool.apply_async(func=print_num,args=(html_queue,b))
- pool.apply_async(func=updata_num,args=(html_queue,))
- pool.close() #这里join一定是在close之后,且必须要加join,否则主进程不等待创建的子进程执行完毕
- pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭
- from multiprocessing import Pool,Manager
- from time import sleep
- def spider(url_queue):
- while not url_queue.empty():
- try:
- url = url_queue.get(timeout = 1)
- print(url)
- sleep(1)
- except Exception as e:
- print(e)
- if __name__ == '__main__':
- url_queue = Manager().Queue()
- for i in range(1,11):
- url =f'https://www.hupu.com/home/v1/news?pageNo={i}&pageSize=50'
- url_queue.put(url)
- pool = Pool(3)
- pool.apply_async(func=spider,args=(url_queue,))
- pool.apply_async(func=spider,args=(url_queue,))
- pool.apply_async(func=spider,args=(url_queue,))
- pool.close()
- pool.join()
网络爬虫速度效率慢,多部分在于阻塞IO这块(网络/磁盘)。在阻塞 时,CPU的中内核是可以处理别的非IO操作。因此可以考虑使用协 程来提升爬虫效率,这种操作的技术就是协程。
协程一种轻量级线程,拥有自己的寄存器上下文和栈,本质是 一个进程 相对于多进程,无需线程上下文切换的开销,无需原子操作锁 定及同步的开销
简单的说就是让阻塞的子程序让出CPU给可以执行的子程序 一个进程包含多个线程
一个线程可以包含多个协程 多个线程相对独立,线程的切换受系统控制。 多个协程也相对 独立,但是其切换由程序自己控制
安装
pip install aiohttp
常用方法
代码
- import aiohttp
- import asyncio
- async def first():
- async with aiohttp.ClientSession() assession:
- async withsession.get('http://httpbin.org/get') as resp:
- rs = await resp.text()
- print(rs)
- headers = {'User-Agent':'aaaaaa123'}
- async def test_header():
- async withaiohttp.ClientSession(headers= headers) assession:
- async withsession.get('http://httpbin.org/get') as resp:
- rs = await resp.text()
- print(rs)
- async def test_params():
- async with aiohttp.ClientSession(headers= headers) as session:
- async with session.get('http://httpbin.org/get',params={'name':'xxx'}) as resp:
- rs = await resp.text()
- print(rs)
- async def test_cookie():
- async with aiohttp.ClientSession(headers= headers,cookies={'token':'xxx'}) as session:
- async with session.get('http://httpbin.org/get',params={'name':'xxx'}) as resp:
- rs = await resp.text()
- print(rs)
- async def test_proxy():
- async with aiohttp.ClientSession(headers=headers,cookies={'token':'xxx'}) as session:
- async with session.get('http://httpbin.org/get',params={'name':'xxx'},proxy = 'http://name:pwd@ip:port' ) as resp:
- rs = await resp.text()
- print(rs)
- if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- loop.run_until_complete(test_cookie())
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。