赞
踩
SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
PS:SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
如下:
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。
from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/数据库", max_overflow=5) # 执行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES ('1.1.1.22', 3)" # ) # 新插入行自增ID # cur.lastrowid # 执行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[('1.1.1.22', 3),('1.1.1.221', 3),] # ) # 执行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)", # host='IP', color_id=3 # ) # 执行SQL cur = engine.execute('select * from hosts') # 获取第一行数据 cur.fetchone() 获取第n行数据 cur.fetchmany(3) # 获取所有数据 cur.fetchall()
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/数据库", max_overflow=5) Base = declarative_base() #固定语句,创建一个父类 # 创建单表 #往下所有的类都需要继承Base这个父类 class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) extra = Column(String(16)) #Column(数据类型,属性)固定写法 __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), #创建联合唯一索引 Index('ix_id_name', 'name', 'extra'), #创建索引 ) # 外键索引 class Favor(Base): __tablename__ = 'favor' nid = Column(Integer, primary_key=True) #设置主键 caption = Column(String(50), default='red', unique=True) #设置唯一键 class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) #设置主键 name = Column(String(32), index=True, nullable=True) #设置索引,不为null favor_id = Column(Integer, ForeignKey("favor.nid")) #设置外键 # 多外键索引 class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) port = Column(Integer, default=22) class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) class ServerToGroup(Base): __tablename__ = 'servertogroup' nid = Column(Integer, primary_key=True, autoincrement=True) #设置主键,自动增长 server_id = Column(Integer, ForeignKey('server.id')) #设置外键 group_id = Column(Integer, ForeignKey('group.id')) def init_db(): Base.metadata.create_all(engine) #指明使用的连接器 #这个方法会自动寻找Base父类下的所有子类进行执行 def drop_db(): Base.metadata.drop_all(engine)
注:设置外检的另一种方式 ForeignKeyConstraint([‘other_id’], [‘othertable.other_id’])
engine = create_engine("mysql+pymysql://Atlan:密码@atlan.top:3306/ORM?charset=utf8", max_overflow=5) Session = sessionmaker(bind=engine) session = Session() obj1 = Users(name='Atlan',extra='黑金') #Users为上方创建的类,此处作用是表名 session.add(obj1) #单条数据插入 objs = [ Users(name='Altan',extra='bolu'), Users(name='wangwu',extra='lisi'), Users(name='wang', extra='lisi') ] session.add_all(objs) #add_all会让整个容器类型的数据进行写入 session.commit() session.close()
engine = create_engine("mysql+pymysql://Atlan:密码@atlan.top:3306/ORM?charset=utf8", max_overflow=5)
Session = sessionmaker(bind=engine)
session = Session()
session.query(Users.id,Users.name,Users.extra).filter(Users.id > 8).delete()
#删除行,需使用查询的语句进行删除
session.commit()
#提交修改
session.close()
#关闭会话连接
engine = create_engine("mysql+pymysql://Atlan:密码@atlan.top:3306/ORM?charset=utf8", max_overflow=5) Session = sessionmaker(bind=engine) session = Session() session.query(Users.id,Users.name,Users.extra).filter(Users.id > 7).update({'name':'baba'}) update(这里写要修改的列和值,以字典的方式传入) session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False) 此处update中的内容是让user.name(数据类型为字符串)这个字段加上’099‘(字符串相加操作) session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate") 此处update中的内容是让user.name(数据类型为int)这个字段加上1(int相加操作) session.commit() #提交修改 session.close() #关闭会话连接
engine = create_engine("mysql+pymysql://Atlan:密码@atlan.top:3306/ORM?charset=utf8", max_overflow=5) Session = sessionmaker(bind=engine) session = Session() user_type_list = session.query(Users).all() all()是将结果取回 print(type(user_type_list)) 返回列表,列表的每个元素都是对象 user_type_list = session.query(Users.name,Users.extra).filter(Users.id > 3) query(这是写要查询的列),filter(这里写过滤条件(是python的表达式)) for i in user_type_list: print(i.name,i.extra) #循环打印值 session.commit() #提交修改 session.close() #关闭会话连接
# 条件 engine = create_engine("mysql+pymysql://Atlan:密码@atlan.top:3306/ORM?charset=utf8", max_overflow=5) Session = sessionmaker(bind=engine) session = Session() ret = session.query(Users).filter_by(name='alex').all() #filter_by和filter一样,括号中是表达式非SQL条件 ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() #in需要_(下划线)固定写法 ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() #~(波浪号)表示not ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all() #子查询语句 from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() #and_()括中的内容是and关系 ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() #or_()括号中的内容是or关系 ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all() session.commit() #提交修改 session.close() #关闭会话连接 #组合关系 # 通配符 ret = session.query(Users).filter(Users.name.like('e%')).all() ret = session.query(Users).filter(~Users.name.like('e%')).all() #~(波浪号)表示not # 限制 ret = session.query(Users)[1:2] #limit操作 # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() #排序 # 分组 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 连表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() #完全连接 ret = session.query(Person).join(Favor).all() #完全连接(笛卡尔乘积) ret = session.query(Person).join(Favor, isouter=True).all() #左连接 ret = session.query(Person,isouter=True).join(Favor).all() #右连接 # 组合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() #上下连表 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all() session.commit() #提交修改 session.close() #关闭会话连接
class Favor(Base): #单独创建一个表 __tablename__ = 'favor' nid = Column(Integer, primary_key=True) caption = Column(String(50), default='red', unique=True) class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) favor_id = Column(Integer, ForeignKey("favor.nid")) #ForeignKey("favor.nid")指明外键为favor表的nid字段 user_type = relationship("Favor",backref='xxoo') # relationship 跟外键捆绑在一起使用 # relationship('这里写和外键同一个表'(这里写的是从Person获取Favor的数据),\ # backref='这里任意写'(这里写的是从Favor获取Person的数据行)) #这里相当创建一个快捷方式 #两张表根据外键连接的时候可以直接使用user_type来获取对应的行(以外键为条件对应) #例如where Person.facor_id = Favor.nid 这样使用user_type可以帮我们自动完成
例如
user_list = session.query(Person.name,Favor.caption).join(Favor,isouter=True).all()
for row in user_list:
print(row[0],row[1],row.name,row.caption)
#正常通过外联获取值
#使用了relationship的
user_list = session.query(Person)
for row in user_list:
print(row.name,row.id,row.user_type.caption)
""" 1 白金 2 黑金 obj.xx ==> [obj,obj...] """ class UserType(Base): #单独创建一个表 __tablename__ = 'favor' id = Column(Integer, primary_key=True) title = Column(String(50), default='red', unique=True) """ 1 Atlan 1 2 wangwu 1 3 lisi 2 # 正向 ut = relationship(backref='xx') obj.ut ==> 1 白金 """ class Users(Base): __tablename__ = 'person' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) user_type_id = Column(Integer, ForeignKey("UserType.nid")) #ForeignKey("favor.nid")指明外键为favor表的nid字段 user_type = relationship("UserType",backref='xxoo') # relationship 跟外键捆绑在一起使用 # relationship('这里写和外键同一个表'(这里写的是从Person获取Favor的数据),\ # backref='这里任意写'(这里写的是从Favor获取Person的数据行)) #这里相当创建一个快捷方式 #两张表根据外键连接的时候可以直接使用user_type来获取对应的行(以外键为条件对应) #例如where Person.facor_id = Favor.nid 这样使用user_type可以帮我们自动完成 #获取用户类型 (表如上) #使用正常方式 type_list = session.query(UserType) for row in type_list: print(row.id,row.title,session.query(Users).filter(Users.user_type_id == row.id).all()) #使用 relationship type_list = session.query(UserType) for row in type_list: print(row.id,row.title,row.xxoo)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。