当前位置:   article > 正文

数据之路 - Python爬虫 - PySpider框架

pyspider集群多pysppider

1.PySpider基本功能

  • 提供方便易用的WebUI系统,可视化地编写和调试爬虫。

  • 提供爬取进度监控、 爬取结果查看、爬虫项目管理等功能。

  • 支持多种后端数据库,如MySQL、 MongoDB、 Redis、 SQLite、 Elasticsearch、 PostgreSQL。

  • 支持多种消息队列,如RabbitMQ、 Beanstalk、 Redis、 Kombu。

  • 提供优先级控制、失败重试、定时抓取等功能。

  • 对接了PhantomJS,可以抓取JavaScript渲染的页面。

  • 支持单机和分布式部署,支持Docker部署。

2.PySpider架构

pyspider的架构主要分为Scheduler(调度器)、Fetcher( 抓取器)、Processer(处理器)三个部分,整个爬取过程受到Monitor (监控器)的监控,抓取的结果被Result Worker (结果处理器)处理。

Scheduler发起任务调度,Fetcher负责抓取网页内容,Processer负责解析网页内容,然后将新生成的Request发给Scheduler进行调度,将生成的提取结果输出保存。

  • 每个pyspider 的项目对应一个Pythonj阅本,该脚本中定义了一个Handler 类,它有一个on_start()方法。 爬取首先调用on_start()方法生成最初的抓取任务,然后发送给Scheduler进行调度。

  • Scheduler将抓取任务分发给Fetcher进行抓取,Fetcher执行并得到响应,随后将响应发送给Processer。

  • Processer 处理响应并提取出新的 URL生成新的抓取任务,然后通过消息队列的方式通知Schduler 当前抓取任务执行情况,并将新生成的抓取任务发送给Scheduler。 如果生成了新的提取结果,则将其发送到结果队列等待ResultWorker处理。

  • Scheduler接收到新的抓取任务,然后查询数据库,判断其如果是新的抓取任务或者是需要重试的任务就继续进行调度,然后将其发送回Fetcher进行抓取。

  • 不断重复以上工作,直到所有的任务都执行完毕,抓取结束。

  • 抓取结束后,程序会回调on_finished()方法,这里可以定义后处理过程。

3.PySpider命令行

 

====================================================================================
pyspider [OPTIONS) COMMAND [ARGS]

-c, --config FILENAME   # 指定配置文件名称
--logging-config TEXT   # 日志配置文件名称,默认 pyspider/pyspider/logging.conf
--debug                 # 开启调试模式
--queue-maxsize INTEGER # 队列的最大长度
--taskdb TEXT           # taskdb的数据库连接字符串,默认:sqlite
--projectdb TEXT        # projectdb的数据库连接字符串,默认:sqlite
--resultdb TEXT         # resultdb的数据库连接字符串,默认:sqlite
--message-queue TEXT    # 消息队列连接字符串,默认:multiprocessing.Queue 
--phantomjs-proxy TEXT  # PhantomJS使用的代理,ip:port的形式
--data-path TEXT        # 数据库存放的路径
--version               # pyspider的版本
--help                  # 显示帮助信息
====================================================================================
pyspider scheduler [OPTIONS]

--xmlrpc / --no-xmlrpc
--xmlrpc -host TEXT 
--xmlrpc-port INTEGER 
--inqueue-limit INTEGER     # 任务队列的最大长度,如果满了则新的任务会被忽略
--delete-time INTEGER       # 设置为delete标记之前的删除时间
--active-tasks INTEGER      # 当前活跃任务数量配置
… loop-limit INTEGER        # 单轮最多调度的任务数量
-scheduler els TEXT         # Scheduler使用的类
--help                      # 显示帮助信息
====================================================================================
pysp1der fetcher [OPTIONS]

--xmlrpc / --no-xmlrpc 
--xmlrpc-host TEXT 
--xmlrpc-port INTEGER 
--poolsize INTEGER          # 同时请求的个数
--proxy TEXT                # 使用的代理
--user-agent TEXT           # 使用的User-Agent
--timeout TEXT              # 超时时间
--fetcher-els TEXT          # Fetcher使用的类
--help                      # 显示帮助信息
====================================================================================
pyspider processor [OPTIONS]

--processor-cls TEXT Processor  # 使用的类
--help                          # 显示帮助信息
====================================================================================
pyspider webui [OPTIONS]

--host TEXT                 # 运行地址
--port INTEGER              # 运行揣口
--cdn TEXT                  # JS和css的CDN服务器
--scheduler-rpc TEXT        # Scheduler的xmlrpc路径
--fetcher-rpc TEXT          # Fetcher的xmlrpc路径
--max-rate FLOAT            # 每个项目最大的rate
--max-burst FLOAT           # 每个项目最大的burst
--username TEXT             # Auth验证的用户名
--password TEXT             # Auth验证的密码
--need-auth                 # 是否需要验证
-- webui-instance TEXT      # 运行时使用的Flask应用
--help                      # 显示帮助信息
====================================================================================
View Code

 

4.PySpider基本使用

pyspider all,启动pyspider的所有组件,包括PhantomJS、ResultWorker、Processer、Fetcher、 Scheduler、WebUI,这些都是pyspider运行必备的组件。

 

from libs.base_handler import *

class Handler(BaseHandler):

    @every(minutes=24*60, seconds=0)
    def on_start(self):
        self.crawl('http://scrapy.org/', callback=self.index_page)
    
    # @every:告诉调度器 on_start方法每天执行一次。
    # on_start:作为爬虫入口代码,调用此函数,启动抓取。
    
    @config(age=10*24*60*60)
    def index_page(self, response):
        for each in response.doc('a[href^="http://"]').items():
            self.crawl(each.attr.href, callback=self.detail_page)
    
    # @config:告诉调度器 request请求的过期时间是10天,10天内再遇到这个请求直接忽略。此参数亦可在self.crawl(url, age=10*24*60*60)中设置。
    # index_page:获取一个Response对象,response.doc是pyquery对象的一个扩展方法。
    
    def detail_page(self, response):
        return {
                "url": response.url,
                "title": response.doc('title').text(),
                }
    
    # detail_page:返回一个结果集对象。这个结果默认会被添加到resultdb数据库(如果启动时没有指定数据库默认调用sqlite数据库)。

 

5.crawl方法

- url:爬取的url,可以是单个URL字符串,也可以是URL列表。
=================================================================================================
- callback:callback是回调函数,指定了该URL对应的响应内容用哪个方法来解析。

    def on_start(self):
        self.crawl('http://scrapy.org/', callback=self.index_page)
=================================================================================================    
- age:任务的有效时间。 如果某个任务在有效时间内且已经被执行,则它不会重复执行。

    # 设置方法1
    def on_start(self): 
        self.crawl('http://www.example.org/', callback=self.callback, age=10*24*60*6o) 
    
    # 设置方法2
    @config(age=10 * 24 * 60 * 60) 
    def callback(self): 
        pass 
=================================================================================================
- priority:爬取任务的优先级,其值默认是0, priority的数值越大,对应的请求会越优先被调度。

    def index_page(self): 
      self.crawl('http://www.example.org/page.html', callback=self.index_page) 
      self.crawl('http://www.example.org/233.html’, callback=self .detail_page, priority=l) 
=================================================================================================
- exetime:设置定时任务,其值是时间戳,默认是0,即代表立即执行。

    import  time 
    def on_start(self): 
      self.crawl(’http://www.example.org/', callback=self.callback, exetime=time.time()+30*60) 
=================================================================================================
- retries:可以定义重试次数,其值默认是3。
=================================================================================================
- itag:设置判定网页是存发生变化的节点值,在爬取时会判定次当前节点是否和上次爬取到的节点相同。 
        如果节点相同,则证明页面没有更新,就不会重复爬取。

    def index_page(self,response):
        for item in response.doc('.item’).items(): 
            self.crawl(item.find('a').attr.url, callback=self.detail_page, itag=item.find('.update-time’).text()) 
=================================================================================================
- auto_recrawl:当开启时,爬取任务在过期后会重新执行,循环时间即定义的age时间长度。

    def on_start(self): 
        self.craw1('http://www.example.org/', callback=self.callback, age=5*60*60, auto_recrawl=True) 
=================================================================================================  
- method:HTTP请求方式,它默认是GET。 如果想发起POST请求,可以将method设置为POST。
=================================================================================================
- params:用于定义GET请求参数。

    def on_start(self):
        self.crawl(’http://httpbin.org/get’, callback=self.callback, params={'a':123, 'b':'c'}) 
        self.crawl('http://httpbin.org/get?a=123&b=c’, callback=self.callback) 
=================================================================================================
- data:用于传递POST表单数据。

    def on_start(self): 
        self.crawl('http://httpbin.org/post', callback=self.callback, method='POST’, data={'a':123, 'b':'c'}) 
=================================================================================================        
- files:上传的文件,需要指定文件名。

    def on_start(self): 
        self.crawl(’http://httpbin.org/post', callback=self.callback, method=’POST’, files={field:{filename:'content'}})
=================================================================================================   
- user_agent:爬取使用的User-Agent。

- headers:爬取时使用的Headers,即Request Headers。

- cookies:爬取时使用的Cookies,为字典格式。

- connect_timeout:初始化连接时的最长等待时间,它默认是20秒。

- timeout:抓取网页时的最长等待时间,它默认是120秒。

- allow_redirects:确定是否自动处理重定向,它默认是True。

- validate_cert:确定是否验证证书,此选项对HTTPS请求有效,默认是True。
=================================================================================================
- proxy:爬取时使用的代理,它支持用户名密码的配置。
    # 设置方法1
    def on_start(self):
        self.crawl(’http://httpbin.org/get', callback=self.callback, proxy=’127.0.0.1:9743') 
        
    # 设置方法2
    class Handler(BaseHandler): 
        crawl_config = { 'proxy':  '121.0.0.1:9743' }
=================================================================================================        
- fetch_type:开启PhantomJS渲染。如果遇到JavaScript渲染的页面,指定此字段即可实现PhantomJS的对接,
              pyspider将会使用PhantomJS进行网页的抓取。

    def on_start(self):
        self.crawl('https://www.taobao.com', callback=self.index_page, fetch_type=’js')
=================================================================================================        
- js_script:页面加载完毕后执行的JavaScript脚本。
    def on_start(self): 
        self.crawl('http://www.example.org/,callback=self.callback, fetch_type=’js’, js_script='''
        function() { 
            window. scrollTo(0, document.body.scrollHeight); 
            return 123; 
                }''')
=================================================================================================                
- js_run_at:JavaScript脚本运行的位置,是在页面节点开头还是结尾,默认是结尾,即document-end。

- js_viewport_width/js_viewport_height:渲染页面时的窗口大小。

- load_ images:加载JavaScript页面时确定是否加载图片,它默认是否。
=================================================================================================
- save
    # 设置方法1
    def on_start(self): 
        self.crawl(’http://www.example.org/', callback=self.callback, save={'page': 1}) 
        
    # 设置方法2
    def callback(self,  response): 
        return response.save['page'] 
=================================================================================================
- cancel:是取消任务,如果一个任务是ACTIVE状态的,则需要将force_update设置为True。

- force_ update

6.任务区分

在pyspider判断两个任务是否是重复的是使用的是该任务对应的URL的MD5值作为任务的唯一ID,如果D相同,那么两个任务就会判定为相同,其中一个就不会爬取了。很多情况下请求的链接可能是同一个,但是POST的参数不同。

这时可以重写task_id()方法,改变这个ID的计算方式来实现不同任务的区分。

import json 
from pyspider.libs.utils import md5string 
def get_taskid(self, task): 
    return md5string(task['url']+json.dumps(task['fetch'].get('data',''))) 

7.全局配置

pyspider可以使用crawl_config来指定全局的配置,配置中的参数会和crawl()方法创建任务时的参数合井。

class Handler(BaseHandler):
    crawl_config = {'headers':{'User-Agent':'GoogleBot'}}

8.定时爬取

通过every属性来设置爬取的时间间隔

@every(minutes=24 * 60) 
def on_start(self):
    for url in urllist: 
        self.crawl(url, callback=self.index_page) 

9.项目状态

每个项目都有6个状态,分别是TODO、 STOP、 CHECKING、 DEBUG、 RUNNING、 PAUSE。

  • TODO:它是项目刚刚被创建还未实现时的状态。

  • STOP:如果想停止某项目的抓取,可以将项目的状态设置为STOP。

  • CHECKING:正在运行的项目被修改后就会变成CHEC阻NG状态,项目在中途出错需要调整的时候会遇到这种情况。

  • DEBUG/RUNNING:这两个状态对项目的运行没有影响,状态设置为任意一个,项目都可以运行,但是可以用二者来区分项目是否已经测试通过。

  • PAUSE:当爬取过程中出现连续多次错误时,项目会自动设置为PAUSE状态,并等待一定时间后继续爬取。

 

转载于:https://www.cnblogs.com/Iceredtea/p/11107890.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/954783
推荐阅读
相关标签
  

闽ICP备14008679号