(6)在settings.py配置文件中开启管道(ITEM_PIPELINES ,同时处理ROBOTSTXT_OBEY协议、USER_AGENT 伪装和LOG_LEVEL='ERROR'日志输出等级)
(7)启动爬虫:scrapy crawl 爬虫文件
(2)可以定义多个管道类进行处理,但是在每个process_item函数中必须return item 传递给其它管道使用
- # (1)自定义的文件持久化管道
- class TofilePipeline(object):
- # 构造方法初始化一个属性文件句柄,也可以定义一个类变量:fp=None
- def __init__(self):
- self.fp = None
- # 打开文件句柄
- def open_spider(self, spider):
- print('开始写入数据...')
- self.fp = open('a.text', 'w', encoding='utf-8')
- # 关闭文件进行系统资源回收
- def close_spider(self, spider):
- self.fp.close()
- print('数据下载成功,爬虫结束!')
- def process_item(self, item, spider):
- self.fp.write(f'{item["title"]}:{item["url"]}\n')
- return item
- (1)自定义的文件持久化管道
- # -*- coding: utf-8 -*-
- # Define your item pipelines here
- #
- # Don't forget to add your pipeline to the ITEM_PIPELINES setting
- # See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html
- import codecs
- # 本模块定义标准Python 编码解码器(编码器和解码器)的基类
- import json
- from scrapy.pipelines.images import ImagesPipeline
- # scrapy 自带的图片处理的类
- from scrapy.exporters import JsonItemExporter
- # scrapy 导出json类 JsonItemExporter
- import pymysql
- import pymysql.cursors
- # 导入pymysql 数据包
- import pymongo
- # 导入 Pymongo
- from twisted.enterprise import adbapi
- """
- Twisted 是一个异步网络框架,不幸的是大部分数据库api实现只有阻塞式接口,
- twisted.enterprise.adbapi为此产生,它是DB-API 2.0 API的非阻塞接口,可以访问各种关系数据库
- """
- class MysqlPipeline(object):
- #采用同步的机制写入mysql
- def __init__(self):
- self.conn = pymysql.connect('', 'root', 'root', 'article', charset="utf8", use_unicode=True)
- # 链接MySQL 数据库
- self.cursor = self.conn.cursor()
- # 获取MySQL 数据库指针
- def process_item(self, item, spider):
- insert_sql = """
- insert into article(title, url, create_date, fav_nums)
- VALUES (%s, %s, %s, %s)
- """
- self.cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))
- # 执行 导入数据
- self.conn.commit()
- # 提交数据
MySQL非阻塞 异步存储
1. 导入adbapi
2. 生成数据库连接池
3. 执行数据数据库插入操作
4. 打印错误信息,并排错
- class MysqlTwistedPipline(object):
- """
- 异步存储数据
- 非阻塞型
- """
- def __init__(self, dbpool):
- self.dbpool = dbpool
- @classmethod
- def from_settings(cls, settings):
- dbparms = dict(
- host = settings["MYSQL_HOST"],
- db = settings["MYSQL_DBNAME"],
- user = settings["MYSQL_USER"],
- passwd = settings["MYSQL_PASSWORD"],
- charset='utf8',
- cursorclass=pymysql.cursors.DictCursor,
- use_unicode=True,
- )
- dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
- return cls(dbpool)
- def process_item(self, item, spider):
- #使用twisted将mysql插入变成异步执行
- query = self.dbpool.runInteraction(self.do_insert, item)
- query.addErrback(self.handle_error, item, spider) #处理异常
- def handle_error(self, failure, item, spider):
- # 处理异步插入的异常
- print (failure)
- def do_insert(self, cursor, item):
- #执行具体的插入
- #根据不同的item 构建不同的sql语句并插入到mysql中
- insert_sql, params = item.get_insert_sql()
- print (insert_sql, params)
- cursor.execute(insert_sql, params)
json 数据存储
- class JsonExporterPipleline(object):
- #调用scrapy提供的json export导出json文件
- def __init__(self):
- self.file = open('articleexport.json', 'wb')
- self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False)
- # 打开文件
- self.exporter.start_exporting()
- # 导出文件
- def close_spider(self, spider):
- self.exporter.finish_exporting()
- self.file.close()
- # 文件关闭
- def process_item(self, item, spider):
- self.exporter.export_item(item)
- return item
mongo 数据存储
- class MongoPipeline(object):
- # <span class='wp_keywordlink_affiliate'><a href="https://www.168seo.cn/tag/mongodb" title="View all posts in mongodb" target="_blank">mongodb</a></span> 数据库存储
- collection_name = 'scrapy_items'
- # 数据库名称
- def __init__(self, mongo_uri, mongo_db):
- self.mongo_uri = mongo_uri
- self.mongo_db = mongo_db
- @classmethod
- def from_crawler(cls, crawler):
- return cls(
- mongo_uri=crawler.settings.get('MONGO_URI'),
- mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
- )
- def open_spider(self, spider):
- # 数据库打开配置
- self.client = pymongo.MongoClient(self.mongo_uri)
- self.db = self.client[self.mongo_db]
- def close_spider(self, spider):
- # 数据库关闭
- self.client.close()
- def process_item(self, item, spider):
- # 数据库储存
- self.db[self.collection_name].insert_one(dict(item))
- return item
- # 切记 一定要返回item进行后续的pipelines 数据处理
- import redis
- class RedisPipeline(object):
- conn = None
- # 建立数据库连接
- def open_spider(self, spider):
- print('建立数据库连接...')
- self.conn = redis.Redis(host='', port=6379)
- def process_item(self, item, spider):
- self.conn.lpush('data', dict(item)) # 将item对象以字典形式存储在redis列表结构中
- return item
- #IMAGES_STORE='filepath'在settings设置文件中指定存储图片的下载路径
- import os
- import scrapy
- from scrapy.pipelines.images import ImagesPipeline
- class ImageDownloadPipeline(ImagesPipeline):
- #接收item且将item中存储的url进行请求发送
- def get_media_requests(self, item, info): # 下载图片
- url=item["url"]
- yield scrapy.Request(url, meta={'item': item})
- # 指定下载路径
- def file_path(self, request, response=None, info=None):
- item = request.meta['item']
- img_name = request.url.split('/')[-1]
- path='图片'#自动会创建文件夹,同时存放在settings.py中指定的IMAGES_STORE配置路径下
- filename = os.path.join(path, img_name)
- print(f'正在下载------{filename}...')
- return filename
- # 将item传递给下一个即将被执行的管道类
- def item_completed(self, result, item, info):
- return item
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。