当前位置:   article > 正文

scrapyl使用pymysql操作数据库存储数据(存入多个表)_pymysql库scrapy

pymysql库scrapy

scrapy操作数据库

  • 需要改动的原scrapy框架代码

1、settings.py配置数据库连接信息

2、在项目内创建一个和爬虫代码同级的文件夹,例如:db

在文件内创建两个个py文件,例如:dbhelper.py(这个名字随意,是封装操作数据库代码的文件),____init____.py(这个不能改变名字,把项目下的这个文件复制过来即可)

3、pipelines.py内引用一下,封装好的dbhelper.py内的类

  • settings
#Mysql数据库的配置信息
MYSQL_HOST = '127.0.0.1'
MYSQL_DBNAME = 'test'         #数据库名字,请修改
MYSQL_USER = 'root'             #数据库账号,请修改
MYSQL_PASSWD = 'root'         #数据库密码,请修改

MYSQL_PORT = 3306               #数据库端口,在dbhelper中使用

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • dbhelper
# -*- coding: utf-8 -*-

import pymysql
from twisted.enterprise import adbapi
from scrapy.utils.project import get_project_settings  #导入seetings配置

class DBHelper():
    '''这个类也是读取settings中的配置,自行修改代码进行操作'''

    def __init__(self):
        settings = get_project_settings()  #获取settings配置,设置需要的信息

        dbparams = dict(
            host=settings['MYSQL_HOST'],  #读取settings中的配置
            db=settings['MYSQL_DBNAME'],
            user=settings['MYSQL_USER'],
            passwd=settings['MYSQL_PASSWD'],
            charset='utf8',  #编码要加上,否则可能出现中文乱码问题
            cursorclass=pymysql.cursors.DictCursor,
            use_unicode=False,
        )
        #**表示将字典扩展为关键字参数,相当于host=xxx,db=yyy....
        dbpool = adbapi.ConnectionPool('pymysql', **dbparams)
        self.dbpool = dbpool

    def connect(self):
        return self.dbpool

    #插入数据
    def insert(self, item):
        #这里定义要插入的字段
        sql = "insert into testtable(name,url) values(%s,%s)"
        #调用插入的方法
        query = self.dbpool.runInteraction(self._conditional_insert, sql, item)
        #调用异常处理方法
        query.addErrback(self._handle_error)
        return item

    #写入数据库中
    def _conditional_insert(self, canshu, sql, item):
        #取出要存入的数据,这里item就是爬虫代码爬下来存入items内的数据
        params = (item['name'], item['url'])
        canshu.execute(sql, params)

    #错误处理方法
    def _handle_error(self, failue):
        print('--------------database operation exception!!-----------------')
        print(failue)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • piplines
# -*- coding: utf-8 -*-

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
from twisted.enterprise import adbapi
import MySQLdb
import MySQLdb.cursors
from scrapy_mysql_demo.db.dbhelper import DBHelper
import codecs
import json
from logging import log
from scrapy.utils.project import get_project_settings
class WebcrawlerScrapyPipeline(object):
    # 连接数据库
    def __init__(self):
        self.db = DBHelper()

    def process_item(self, item, spider):
        # 插入数据库
        self.db.insert(item)
        return item

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

封装一个mysql数据库基本操作类

在项目内创建一个dbHelper.py文件封装增、删、改、更新操作

在这里可以方便的创建数据库,不需要打开数据库控制台,因为scrapy不像django那样有models模型可以写模型类直接通过migrate映射模型到mysql数据库创建数据库,这样封装后可以在piplines中引用调用

import MySQLdb
from scrapy.utils.project import get_project_settings#引入settings配置

class DBHelper():

    def __init__(self):
        self.settings=get_project_settings()#获取settings配置数据

        self.host=self.settings['MYSQL_HOST']
        self.port=self.settings['MYSQL_PORT']
        self.user=self.settings['MYSQL_USER']
        self.passwd=self.settings['MYSQL_PASSWD']
        self.db=self.settings['MYSQL_DBNAME']
    #连接mysql
    def connectMysql(self):
        conn=MySQLdb.connect(host=self.host,
                             port=self.port,
                             user=self.user,
                             passwd=self.passwd,
                             charset='utf8')
        return conn
    #连接数据库
    def connectDatabase(self):
        conn=MySQLdb.connect(host=self.host,
                             port=self.port,
                             user=self.user,
                             passwd=self.passwd,
                             db=self.db,
                             charset='utf8')
        return conn

    #创建数据库
    def createDatabase(self):
        conn=self.connectMysql()

        sql="create database if not exists "+self.db
        cur=conn.cursor()
        cur.execute(sql)
        cur.close()
        conn.close()

    #创建数据表
    def createTable(self,sql):
        conn=self.connectDatabase()

        cur=conn.cursor()
        cur.execute(sql)
        cur.close()
        conn.close()

    #插入数据
    def insert(self,sql,*params):
        conn=self.connectDatabase()

        cur=conn.cursor();
        cur.execute(sql,params)
        conn.commit()
        cur.close()
        conn.close()

    #更新数据
    def update(self,sql,*params):
        conn=self.connectDatabase()

        cur=conn.cursor()
        cur.execute(sql,params)
        conn.commit()
        cur.close()
        conn.close()

    #删除数据
    def delete(self,sql,*params):
        conn=self.connectDatabase()

        cur=conn.cursor()
        cur.execute(sql,params)
        conn.commit()
        cur.close()
        conn.close()


#测试数据库操作
class TestDBHelper():
    def __init__(self):
        self.dbHelper=DBHelper()

    def testCreateDatebase(self):
        self.dbHelper.createDatabase()

    def testCreateTable(self):
        sql="create table testtable(id int primary key auto_increment,name varchar(50),url varchar(200))"
        self.dbHelper.createTable(sql)

    def testInsert(self):
        sql="insert into testtable(name,url) values(%s,%s)"
        params=("test","test")
        self.dbHelper.insert(sql,*params)
    def testUpdate(self):
        sql="update testtable set name=%s,url=%s where id=%s"
        params=("update","update","1")
        self.dbHelper.update(sql,*params)

    def testDelete(self):
        sql="delete from testtable where id=%s"
        params=("1")
        self.dbHelper.delete(sql,*params)

if __name__=="__main__":
    testDBHelper=TestDBHelper()
    #testDBHelper.testCreateDatebase()  #
    #testDBHelper.testCreateTable()     #
    #testDBHelper.testInsert()          #
    #testDBHelper.testUpdate()          #
    #testDBHelper.testDelete()          #

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/515889
推荐阅读
相关标签
  

闽ICP备14008679号