当前位置:   article > 正文

python 操作MYSQL数据库 ORM框架:SQLAchemy_python mysql orm框架

python mysql orm框架

2.1下载安装模块

pip3 install SQLAlchemy
IDE下pycharm python环境路径下添加模块

2.2原理

SQLAlchemy是python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果
在这里插入图片描述
  利用模块,按照对应规则,自动生成sql语句!
    作用:提供简单的规则,自动转换成sql语句,最终还是执行sql语句,获取结果!

ORM操作流程:
  创建一个类,类对应数据库的表,类能实例一个对象,这个对象对应表里的数据行
关系对象映射关系:

代码 数据库

类 —> 表
对象 —> 行

DB first :手动创建数据库和表,通过ORM框架 根据数据库,通过类生成一个一个表

code first :手动创建类和数据库,通过ORM框架 利用类创建表

SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作。

远程连接数据库引擎类型:
    MySQL-Python
        mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
        
    pymysql
        mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
        
    MySQL-Connector
        mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
        
    cx_Oracle
        oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
        
    更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

SQLAchemy 只负责把类转换成sql语句,连接数据库还是需要插件配合着数据库模块连接的。
    连接的数据库不同,转换成的sql语句也不同。
    提前必须有连接不同数据库的模块或是软件,SQLAchemy再去配置

规则:导入模块,生成一个基类,然后再用创建类的方法去创建表,sql语句中的语法,全部转换成了方法

虽然没有使用__init__方法,但是在执行定义的时候,会copy到__init__中
    找到当前所有继承base的类,然后创建对应的表

注意:利用SQLAchemy 创建表之前,需要先手动创建一个数据库!

2.3操作

导入模块:
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHAR
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
 
创建基类:
    Base = declarative_base()
 
通过pymysql与mysql数据库建立远程连接 和设置最大连接数:
    engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/day63?charset=utf8", max_overflow=5)
     
创建单表:
    class 类名(Base):
        __tablename__="表名" #创建表名
        列名=Column(数据类型,是否为空,主键,自增,索引,唯一索引)
        __table_args__(
            UniqueConstraint("列名1","列名2","联合唯一索引名"),
            index("索引名","列名1","列名2"),
        ) #创建联合唯一索引
    数据类型:Integer 整型;String 字符串类型(CHAR,VARCHAR也可以);
    是否为空:nullable=True,
     
    是否为主键:primary_key=True,
    是否自增:autoincrement=True
      
    索引:index=True
    唯一索引:unique=True
    例:
    class Users(Base):
        __tablename__ = 'users'
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(VARCHAR(32), nullable=True, index=True)
        email = Column(VARCHAR(16), unique=True)
        __table_args__ = (
            UniqueConstraint('id', 'name', name='uix_id_name'),
            Index('ix_n_ex','name', 'email',),
         )
 
创建有外键关系的多表:
    1、先创建一个继承Base基类 存放数据的普通表
    2、创建一个继承Base基类 与其有外键关系的表
    3、语法:外键名("表名.列名")ForeignKey("usertype.id")
     
    例:
        class UserType(Base):
    __tablename__ = "usertype"
    id = Column(Integer,primary_key=True,autoincrement=True)
    title = Column(String(32),nullable=True,index=True)
 
    class Users(Base):
        __tablename__ = "users"
        id = Column(Integer,primary_key=True,autoincrement=True)
        name = Column(String(32),nullable=True,index=True)
        email = Column(String(16),unique=True)
        u_type_id = Column(Integer,ForeignKey("usertype.id"))
        #外键名 = Column(数据类型,ForeignKey("表名.列名"))
          
生成表或是删除表(可以把操作写成一个函数!)#找到当前所有继承base的类,然后创建所有的表
    def create_table():
        Base.metadata.create_all(engine)
    #找到当前所有继承base的类,然后删除所有的表
    def del_table():
        Base.metadata.drop_all(engine)
 
操作表:
    万年不变的 数据行 的增删改查
 
    #首先,先建立链接通道
        Session = sessionmaker(bind=engine)
        session = Session()
     
    #其次,操作表   注意:操作内填如的内容,一定并必须是表达式!
     
        #增  
            对哪张表更改,就用其对应的类进行实例化,生成的对象就代表着数据行
         
            #增加单个  session.add()
                obj = UserType(title = "黑金用户")
                session.add(obj)
             
            #增加多个  session.add_all()
                objs =[
                    UserType(title = "会员用户"),
                    UserType(title = "超级用户"),
                    UserType(title = "铂金用户"),
                    UserType(title = "黑金用户"),
                ]
                session.add_all(objs)
         
        #查  session.query(类名).all()  #直接获取整个类(表)下所有的对象(数据行)
             
            #直接操作,获取的是像数据库发送执行的sql语句
            res = session.query(UserType)  #SQL语句
            print(res)
             
            #获取所有对应类(表)的对象(数据行) 列表类型
            res_list = session.query(UserType).all() 
            print(res_list)
             
            #查询操作,获取表中某列的值!是对接收到的整个列表进行循环遍历查找
             
            #查询整个表内的信息 ------->等效于数据库中:  select xxx from usertype
            res_list = session.query(UserType).all()
            for sss in res_list:
                print(sss.id,sss.title)
             
        #注意点:.filter()方法是过滤的意思,相当于sql语句中的where
             
            #条件查找表内信息 -------->等效于数据库中: select xxx usertype where 条件
            res_list = session.query(UserType).filter(UserType.id >2)
            for sss in res_list:
                print(sss.id,sss.title)
 
    #注意点:执行删除和更改操作时,都是先把数据行找到(查操作),再进行删或改操作!
         
        #删  找到对应的数据行,删除即可  .delete()
         
            #先找后删,等效于------> delete from usertype where usertype.id > 4
            session.query(UserType).filter(UserType.id > 4).delete()
         
        #改 先查后改  注意传值的格式!
            #这里有个参数 synchronize_session 没别的招,看源码解释!!!
            #对表进行批量更改!
            session.query(UserType).filter(UserType.id>0).update({"title":"黑金"})
            #动态获取原表的数据(char类型),对表进行批量更改
            session.query(UserType).filter(UserType.id>0).update({UserType.title:UserType.title+"SX"},synchronize_session=False)
            #动态获取原表的数据(int类型),对表进行批量更改
            session.query(UserType).filter(UserType.id>0).update({"title":UserType.id+1},synchronize_session="evaluate")
 
        #查找其他操作:
            # 分组,排序,连表,通配符,子查询,limit,union,where,原生SQL、
             
            # 条件
 
                #过滤,又叫条件判断
                ret = session.query(Users).filter_by(name='alex').all()
                #两个表达式同时存在,逗号分开,不写关系默认是 and
                ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
                #between and
                ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
                #in判断 语法:in_
                ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
                # not in 判断 语法:表达式最前加 ~
                ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
                #子查询
                ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
                #逻辑判断:  and_ or_ 操作 
                from sqlalchemy import and_, or_
                ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
                ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
                ret = session.query(Users).filter(
                or_(
                    Users.id < 2,
                    and_(Users.name == 'eric', Users.id > 3),
                    Users.extra != ""
                )).all()
 
 
            # 通配符 .like()的方法调用
                ret = session.query(Users).filter(Users.name.like('e%')).all()
                ret = session.query(Users).filter(~Users.name.like('e%')).all()
 
            # 限制
                ret = session.query(Users)[1:2]
                 
            #分页 .limit(n) 取n个数据
                res_list = session.query(UserType).limit(2).all()
                for sss in res_list:
                    print(sss.id,sss.title)
 
            # 排序  查表.order_by(列名.desc()/列名.asc())  [.desc() 由大到小;.asc() 由小到大]
                ret = session.query(Users).order_by(Users.name.desc()).all()
                ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
 
            # 分组 聚合函数func.方法(列名)  和 .group_by(列名) 方法
                from sqlalchemy.sql import func
                ret = session.query(Users).group_by(Users.extra).all()
                ret = session.query(
                    func.max(Users.id),
                    func.sum(Users.id),
                    func.min(Users.id)).group_by(Users.name).all()
 
                ret = session.query(
                    func.max(Users.id),
                    func.sum(Users.id),
                    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
 
            # 连表
 
                ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() #直连
                ret = session.query(Person).join(Favor).all()
                ret = session.query(Person).join(Favor, isouter=True).all()
                 
            #子查询 三种样式
 
                #查询的信息作为另一张表的条件
                # 1.select * from b where id in (select id from tb2)
 
                #最为一张新表进行二次筛选  
                # 2. select * from (select * from tb) as B
                     
                    #查询语句.subquery()  查询结果作为一个子查询(新表)好在进行下一步的查询。不加.subquery()的话会报错,不再往下查询
                    # q1 = session.query(UserType).filter(UserType.id > 0).subquery()
                    # result = session.query(q1).all()
                    # print(result)
 
                #作为一个列内的数据,在另一张表中显示 ****** .as_scalar()方法
                # 3. select id ,(select * from users where users.user_type_id=usertype.id) from usertype;
 
                    # session.query(UserType,Users)
                     
                    # result = session.query(UserType.id,session.query(Users).as_scalar())
                    # print(result) #查看对应的sql语句
                     
                    # result = session.query(UserType.id,session.query(Users).filter(Users.user_type_id==UserType.id).as_scalar())
                    # print(result) #查看对应的sql语句
 
 
            # 组合(上下连表) .union()  和 .union_all()  注意是:先把信息找到再操作!
                q1 = session.query(Users.name).filter(Users.id > 2)
                q2 = session.query(Favor.caption).filter(Favor.nid < 2)
                ret = q1.union(q2).all()
 
                q1 = session.query(Users.name).filter(Users.id > 2)
                q2 = session.query(Favor.caption).filter(Favor.nid < 2)
                ret = q1.union_all(q2).all()
                 
        - 便利的功能 relationship() 与生成表结构无关,仅用于查询方便  释放了连表操作的繁琐查找,直接通过方法定位!
            使用规范:哪个类中有外键列,就在外键列下添加。
            语法:自定义名=relationship("外键有联系的类名",backref="任意命名")
             
             
            # 问题1. 获取用户信息以及与其关联的用户类型名称(FK,Relationship=>正向操作)
                 
                #初始方法:连表操作
                user_list = session.query(Users,UserType).join(UserType,isouter=True)
                print(user_list)
                for row in user_list:
                    print(row[0].id,row[0].name,row[0].email,row[0].user_type_id,row[1].title)
                 
                user_list = session.query(Users.name,UserType.title).join(UserType,isouter=True).all()
                for row in user_list:
                    print(row[0],row[1],row.name,row.title)
 
                #(FK,Relationship=>正向操作) 先查用户信息表,通过命名的 自定义名 正向获取用户类型
                user_list = session.query(Users)
                for row in user_list:
                    print(row.name,row.id,row.user_type.title)
 
            # 问题2. 获取用户类型  (FK,Relationship=>反向操作)
                 
                #连表操作:
                type_list = session.query(UserType)
                for row in type_list:
                    print(row.id,row.title,session.query(Users).filter(Users.user_type_id == row.id).all())
                 
                #反向操作:先查类型表,再通过backref 自定义的变量 反向查找用户信息
                type_list = session.query(UserType)
                for row in type_list:
                    print(row.id,row.title,row.xxoo)
                             
            PS:正向操作与反向操作,是相对于外键来相对判断的!
                例如:A表与B表,A表中建立了与B表联系的外键,A表通过外键获取B表中的信息,叫正向操作;反之,叫反向操作!
             
    最后,操作及语法写完后,都需要提交给数据库去执行,不再使用也需要断开连接!
        session.commit()  #提交
        session.close()   #关闭连接
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
#!/usr/bin/env python
# _*_ coding:utf-8 _*_

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

Base = declarative_base()
engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/day63?charset=utf8",max_overflow=5)

class UserType(Base):
    __tablename__ = "usertype"
    id = Column(Integer,primary_key=True,autoincrement=True)
    title = Column(String(32),nullable=True,index=True)

class Users(Base):
    __tablename__ = "users"
    id = Column(Integer,primary_key=True,autoincrement=True)
    name = Column(String(32),nullable=True,index=True)
    email = Column(String(16),unique=True)
    u_type_id = Column(Integer,ForeignKey("usertype.id"))

    u_type = relationship("UserType",backref="sss")

def create_table():
    Base.metadata.create_all(engine)

def del_table():
    Base.metadata.drop_all(engine)

#类 --> 表
#对象 --> 行

#建立链接通道
Session = sessionmaker(bind=engine)
session = Session()

#操作内填入的内容,必须是表达式

########## 增加 ###################

#增加单个
# obj = UserType(title = "黑金用户")
# session.add(obj)

#增加多个
# objs =[
#     UserType(title = "会员用户"),
#     UserType(title = "超级用户"),
#     UserType(title = "铂金用户"),
#     UserType(title = "黑金用户"),
# ]
# session.add_all(objs)

############ 查询 ################
# res = session.query(UserType)  #SQL语句
# print(res)

# res_list = session.query(UserType).all()  #获取所有对应类(表)的对象(数据行) 列表类型
# print(res_list)

#select xxx from usertype
# res_list = session.query(UserType).limit(2).all()
# for sss in res_list:
#     print(sss.id,sss.title)
#
# #select xxx usertype where ***
# res_list = session.query(UserType).filter(UserType.id >2)
# for sss in res_list:
#     print(sss.id,sss.title)

############### 删除 ###################

# delete from usertype where usertype.id > 4
# session.query(UserType).filter(UserType.id > 4).delete()

################ 更改 ########################

#这里有个参数 synchronize_session 没别的招,看源码解释!!!
# session.query(UserType).filter(UserType.id>0).update({"title":"黑金"}) #对表进行批量更改
# session.query(UserType).filter(UserType.id>0).update({UserType.title:UserType.title+"SX"},synchronize_session=False) #动态获取原先的数据,对表进行批量更改
# session.query(UserType).filter(UserType.id>0).update({"title":UserType.id+1},synchronize_session="evaluate") #对表进行批量更改

############# 查询其他操作 #################
# 分组,排序,连表,通配符,limit,union,where,原生SQL#

#条件 and or
# ret = session.query(Users).filter(Users.id > 1, Users.name == "sesc").all()
# for row in ret:
#     print(row.email)


# #正向操作
# res = session.query(Users)
# for row in res:
#     print(row.id,row.name,row.u_type.title)
#
# #反向操作
# res = session.query(UserType)
# for row in res:
#     for a in row.sss:
#         print(row.id,row.title,a.name)

session.commit()
session.close()

部分代码举例!从上边粘贴测试即可!
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/927257
推荐阅读
相关标签
  

闽ICP备14008679号