当前位置:   article > 正文

Day:005 | Python爬虫:高效数据抓取的编程技术(爬虫效率)

Day:005 | Python爬虫:高效数据抓取的编程技术(爬虫效率)

爬虫之多线程-了解

单线程爬虫的问题

  • 因为爬虫多为IO密集型的程序,而IO处理速度并不是很快,因此速度不会太快
  • 如果IO卡顿,直接影响速度

解决方案
考虑使用多线程、多进程

原理:

爬虫使用多线程来处理网络请求,使用线程来处理URL队列中的url,然后将url返回的结果保存在另一个队列中,其它线程在读取这个队列中的数据,然后写到文件中 。

主要组成部分

URL队列和结果队列

将将要爬去的url放在一个队列中,这里使用标准库Queue。访问url后的结果保存在结果队列中

初始化一个URL队列 

  1. from queue import Queue
  2. urls_queue = Queue()
  3. out_queue = Queue()

 类包装

使用多个线程,不停的取URL队列中的url,并进行处理:

  1. import threading
  2. class ThreadCrawl(threading.Thread):
  3.    def __init__(self, queue, out_queue):
  4.        threading.Thread.__init__(self)
  5.        self.queue = queue
  6.        self.out_queue = out_queue
  7.    def run(self):
  8.        while True:
  9.            item = self.queue.get()

        如果队列为空,线程就会被阻塞,直到队列不为空。处理队列中的一条数据后,就需要通知队列已经处理完该条数据

函数包装

  1. from threading import Thread
  2. def func(args)
  3.    pass
  4. if __name__ == '__main__':
  5.    info_html = Queue()
  6.    t1 = Thread(target=func,args=
  7. (info_html,))

线程池 

  1. # 简单往队列中传输线程数
  2. import threading
  3. import time
  4. import queue
  5. class Threadingpool():
  6.    def __init__(self,max_num = 10):
  7.        self.queue = queue.Queue(max_num)
  8.        for i in range(max_num):
  9.            self.queue.put(threading.Thread)
  10.    def getthreading(self):
  11.        return self.queue.get()
  12.    def addthreading(self):
  13.        self.queue.put(threading.Thread)
  14. def func(p,i):
  15.    time.sleep(1)
  16.    print(i)
  17.    p.addthreading()
  18. if __name__ == "__main__":
  19.    p = Threadingpool()
  20.    for i in range(20):
  21.        thread = p.getthreading()
  22.        t = thread(target = func, args =
  23. (p,i))
  24.        t.start()
Queue模块中的常用方法 

Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步

  • Queue.qsize() 返回队列的大小
  • Queue.empty() 如果队列为空,返回True,反之False
  • Queue.full() 如果队列满了,返回True,反之False
  • Queue.full 与 maxsize 大小对应
  • Queue.get([block[, timeout]])获取队列,timeout等待时间
  • Queue.get_nowait() 相当Queue.get(False)
  • Queue.put(item) 写入队列,timeout等待时间
  • Queue.put_nowait(item) 相当Queue.put(item, False)
  • Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一
  • 个信号
  • Queue.join() 实际上意味着等到队列为空,再执行别的操作

爬虫之多进程-了解 

multiprocessing是python的多进程管理包,和threading.Thread类似

multiprocessing模块

multiprocessing模块可以让程序员在给定的机器上充分的利用CPU

在multiprocessing中,通过创建Process对象生成进程,然后调用它的start()方法

  1. from multiprocessing import Process
  2. def func(name):
  3.    print('hello', name)
  4. if __name__ == "__main__":
  5.    p = Process(target=func,args=('sxt',))
  6.    p.start()
  7.    p.join()  # 等待进程执行完毕
Manager类,实现数据共享

在使用并发设计的时候最好尽可能的避免共享数据,尤其是在使用多进程的时候。 如果你真有需要 要共享数据,可以使用由Manager()返回的manager提供list, dict, Namespace, Lock, RLock,
Semaphore, BoundedSemaphore, Condition, Event, Barrier,Queue, Value and Array类型的支持

  1. from multiprocessing import
  2. Process,Manager,Lock
  3. def print_num(info_queue,l,lo):
  4.    with lo:
  5.        for n in l:
  6.            info_queue.put(n)
  7. def updata_num(info_queue,lo):
  8.    with lo:
  9.        while not info_queue.empty():
  10.            print(info_queue.get())
  11. if __name__ == '__main__':
  12.        manager = Manager()
  13.        into_html = manager.Queue()
  14.        lock = Lock()
  15.        a = [1, 2, 3, 4, 5]
  16.        b = [11, 12, 13, 14, 15]
  17.        p1 = Process(target=print_num,args=
  18. (into_html,a,lock))
  19.        p1.start()
  20.        p2 = Process(target=print_num,args=
  21. (into_html,b,lock))
  22.        p2.start()
  23.        p3 = Process(target=updata_num,args=
  24. (into_html,lock))
  25.        p3.start()
  26.        p1.join()
  27.        p2.join()
  28.        p3.join()
  1. from multiprocessing import Process
  2. from multiprocessing import Manager
  3. import time
  4. from fake_useragent import UserAgent
  5. import requests
  6. from time import sleep
  7. def spider(url_queue):
  8.    while not url_queue.empty():
  9.        try:
  10.            url = url_queue.get(timeout = 1)
  11.            # headers = {'UserAgent':UserAgent().chrome}
  12.            print(url)
  13.            # resp =
  14. requests.get(url,headers = headers)
  15.            # 处理响应结果
  16.            # for d in
  17. resp.json().get('data'):
  18.            #     print(f'tid:{d.get("tid")}
  19. topic:{d.get("topicName")} content:
  20. {d.get("content")}')
  21.            sleep(1)
  22.            # if resp.status_code == 200:
  23.            #     print(f'成功获取第{i}页数据')
  24.        except Exception as e:
  25.            print(e)
  26. if __name__ == '__main__':
  27.    url_queue = Manager().Queue()
  28.    for i in range(1,11):
  29.        url =
  30. f'https://www.hupu.com/home/v1/news?pageNo=
  31. {i}&pageSize=50'
  32.        url_queue.put(url)
  33.    all_process = []
  34.    for i in range(3):
  35.        p1 = Process(target=spider,args=
  36. (url_queue,))
  37.        p1.start()
  38.        all_process.append(p1)
  39.   [p.join() for p in all_process]  
 进程池的使用
  • 进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
  • 进程池中有两个方法:
    • apply同步执行-串行
    • apply_async异步执行-并行
  1. from multiprocessing import Pool,Manager
  2. def print_num(info_queue,l):
  3.    for n in l:
  4.        info_queue.put(n)
  5. def updata_num(info_queue):
  6.    while not info_queue.empty():
  7.        print(info_queue.get())
  8. if __name__ == '__main__':
  9.    html_queue =Manager().Queue()
  10.    a=[11,12,13,14,15]
  11.    b=[1,2,3,4,5]
  12.    pool = Pool(3)
  13. pool.apply_async(func=print_num,args=
  14. (html_queue,a))
  15.    pool.apply_async(func=print_num,args=
  16. (html_queue,b))
  17.    pool.apply_async(func=updata_num,args=
  18. (html_queue,))
  19.    pool.close() #这里join一定是在close之后,且必须要加join,否则主进程不等待创建的子进程执行完毕
  20.    pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭

 

  1. from multiprocessing import Pool,Manager
  2. from time import sleep
  3. def spider(url_queue):
  4.    while not url_queue.empty():
  5.        try:
  6.            url = url_queue.get(timeout = 1)
  7.            print(url)
  8.            sleep(1)
  9.        except Exception as e:
  10.            print(e)
  11. if __name__ == '__main__':
  12.    url_queue = Manager().Queue()
  13.    for i in range(1,11):
  14.        url =
  15. f'https://www.hupu.com/home/v1/news?pageNo=
  16. {i}&pageSize=50'
  17.        url_queue.put(url)
  18.    pool = Pool(3)
  19. pool.apply_async(func=spider,args=
  20. (url_queue,))
  21.    pool.apply_async(func=spider,args=
  22. (url_queue,))
  23.    pool.apply_async(func=spider,args=
  24. (url_queue,))
  25.    pool.close()
  26.    pool.join()

 

爬虫之协程

        网络爬虫速度效率慢,多部分在于阻塞IO这块(网络/磁盘)。在阻塞时,CPU的中内核是可以处理别的非IO操作。因此可以考虑使用协程来提升爬虫效率,这种操作的技术就是协程.

协程一种轻量级线程,拥有自己的寄存器上下文和栈,本质是一个进程
相对于多进程,无需线程上下文切换的开销,无需原子操作锁定及同步的开销


简单的说就是让阻塞的子程序让出CPU给可以执行的子程序


一个进程包含多个线程,一个线程可以包含多个协程

多个线程相对独立,线程的切换受系统控制。 多个协程也相对独立,但是其切换由程序自己控制

安装 

pip install aiohttp

官网:https://docs.aiohttp.org/en/stable/ 

常用方法

属性或方法功能
aiohttp.ClientSession()获取客户端函数
session.get(url)发送get请求
seesion.post(url)发送post请求
resp.status获取响应状态码
resp.url 获取响应url地址
resp.cookies获取响应cookie内容
resp.headers获取响应头信息
resp.read()获取响应bytes类型
resp.text()获取响应文本内容
  1. import aiohttp
  2. import asyncio
  3. async def first():
  4.    async with aiohttp.ClientSession() as
  5. session:  # aiohttp.ClientSession() ==
  6. import requests 模块
  7.        async with
  8. session.get('http://httpbin.org/get') as
  9. resp:
  10.            rs = await resp.text()
  11.            print(rs)
  12. headers = {'User-Agent':'aaaaaa123'}
  13. async def test_header():
  14.  async with
  15. aiohttp.ClientSession(headers= headers) as
  16. session:  # aiohttp.ClientSession() ==
  17. import requests 模块
  18.        async with
  19. session.get('http://httpbin.org/get') as
  20. resp:
  21.            rs = await resp.text()
  22.            print(rs)
  23. async def test_params():
  24.    async with
  25. aiohttp.ClientSession(headers= headers) as
  26. session:  # aiohttp.ClientSession() ==
  27. import requests 模块
  28.        async with
  29. session.get('http://httpbin.org/get',params=
  30. {'name':'bjsxt'}) as resp:
  31.            rs = await resp.text()
  32.            print(rs)
  33. async def test_cookie():
  34.    async with
  35. aiohttp.ClientSession(headers=
  36. headers,cookies={'token':'sxt123id'}) as
  37. session:  # aiohttp.ClientSession() ==
  38. import requests 模块
  39.        async with
  40. session.get('http://httpbin.org/get',params=
  41. {'name':'bjsxt'}) as resp:
  42.            rs = await resp.text()
  43.            print(rs)
  44. async def test_proxy():
  45.    async with
  46. aiohttp.ClientSession(headers=
  47. headers,cookies={'token':'sxt123id'}) as
  48. session:  # aiohttp.ClientSession() ==
  49. import requests 模块
  50.        async with
  51. session.get('http://httpbin.org/get',params=
  52. {'name':'bjsxt'},proxy =
  53. 'http://name:pwd@ip:port' ) as resp:
  54.            rs = await resp.text()
  55.            print(rs)
  56. if __name__ == '__main__':
  57.    loop = asyncio.get_event_loop()
  58.    loop.run_until_complete(test_cookie())

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/437310
推荐阅读
相关标签
  

闽ICP备14008679号