赞
踩
1、SQLAlchemy是一个基于python实现的orm框架,该框架建立在DB API之上,使用关系对象映射进行数据库操作,简而言之是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果 2、是第三方的orm框架,可以独立于web项目使用 3、pip3 install sqlalchemy 4、组成部分 Engine,框架的引擎 Connection Pooling,数据连接池 Dialect,选择连接数据库的DB API种类 Schema/Types,架构和类型 SQL Expression Language,SQL表达式 5、SQLAlchemy不能创建数据库,可以创建表,创建字段 SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如: MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
补充:Django中如何反向生成models
python manage.py inspectdb > app/models.py
能创建表、删除表,但是不能修改表
from sqlalchemy import create_engine import threading engine = create_engine( "mysql+pymysql://root:111@127.0.0.1:3306/bbs_db?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=2, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置),-1表示不重置 ) def task(arg): conn = engine.raw_connection() cursor = conn.cursor() cursor.execute("select * from blog_tag") result = cursor.fetchall() print(result) cursor.close() conn.close() for i in range(20): t = threading.Thread(target=task, args=(i,)) t.start()
models.py
# 通过类,创建表 import datetime from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Users(Base): __tablename__ = 'users' # 数据库表名称 id = Column(Integer, primary_key=True) # id 主键 name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空、 age = Column(Integer) # email = Column(String(32), unique=True) # #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间 # ctime = Column(DateTime, default=datetime.datetime.now) # extra = Column(Text, nullable=True) __table_args__ = ( # UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一 # Index('ix_id_name', 'name', 'email'), #索引 ) # 一对多,一个爱好,可以有多个人喜欢,关联字段写在多的一方,写在Person中 class Hobby(Base): __tablename__ = 'hobby' id = Column(Integer, primary_key=True) caption = Column(String(50), default='篮球') class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) # "hobby.id"中的hobby指的是tablename而不是类名,uselist=False # 外键关联 # hobby_id跟hobby表的id字段关联 hobby_id = Column(Integer, ForeignKey("hobby.id")) # 跟数据库无关,不会新增字段,只用于快速链表操作 # 类名,backref用于反向查询 hobby = relationship('Hobby', backref='pers') # 把表同步到数据库 def init_db(): """ 根据类创建数据库表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8", max_overflow=0, pool_size=5, pool_timeout=30, pool_recycle=-1 ) # 把所有被Base管理的表都创建出来 Base.metadata.create_all(engine) def drop_db(): """ 根据类创建数据库表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8", max_overflow=0, pool_size=5, pool_timeout=30, pool_recycle=-1 ) # 把所有被Base管理的表都删除 Base.metadata.drop_all(engine) if __name__ == '__main__': init_db() # drop_db() """ 直接右击执行文件即可创建/删除 表和字段 """
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from models import User engine = create_engine( "mysql+pymysql://root:111@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5 ) Connection = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个Connection,是从连接池中直接拿链接 conn = Connection() ######## 执行ORM操作######## # 创建出对象,通过conn.add(对象),把对象增加进去 obj1 = User(name='cc') conn.add(obj1) # 提交事务 conn.commit() # 关闭session,其实是将连接放回连接池 conn.close()
方式一:
conn = Connection()
# 先在hobby表中准备数据
hobby = Hobby(caption='橄榄球')
person = Person(name='cc', hobby_id=1) # hobby_id不支持直接传hobby对象
conn.add_all([hobby, person])
conn.commit()
conn.close()
方式二:
conn = Connection()
# 先在hobby中准备两条数据
hobby = Hobby(caption='橄榄球')
person = Person(name='cc', hobby=hobby) # 通过person表的hobby外键关系传hobby对象
conn.add_all([hobby, person])
conn.commit()
conn.close()
flask中已经集成了scoped_session,如果单独使用sqlalchemy,需要处理一下线程安全问题
示例:
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import User engine = create_engine("mysql+pymysql://root:111@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5) conn = sessionmaker(bind=engine) session = scoped_session(conn) """ # 线程安全,基于本地线程实现每个线程用同一个session # 特殊的:因为scoped_session类实例化的时候会执行顶级代码: for meth in Session.public_methods: setattr(scoped_session, meth, instrument(meth)) # 从而将原来Session类中的以下方法映射到scoped_session中: public_methods = ( '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested', 'close', 'commit', 'connection', 'delete', 'execute', 'expire', 'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind', 'is_modified', 'bulk_save_objects', 'bulk_insert_mappings', 'bulk_update_mappings', 'merge', 'query', 'refresh', 'rollback', 'scalar' ) """ obj1 = User(name='jason') session.add(obj1) session.commit() session.close()
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from models import User, Person, Hobby
engine = create_engine(
"mysql+pymysql://root:111@127.0.0.1:3306/aaa",
max_overflow=0,
pool_size=5
)
conn = sessionmaker(engine)
session = scoped_session(conn)
res=session.query(Users).all() # 查所有 res=session.query(Users).all()[0] # 取第一个 res=session.query(Users).first() # 取第一个 # 查询时,等式用filter_by, 不等式(比较)用filter # filter传的是表达式,filter_by传的是参数 r1 = session.query(User).filter(User.id < 3) # 没有加.all() 或者.first(),结果就是原生sql print(r1) # SELECT users.id AS users_id, users.name AS users_name, users.email AS users_email, users.ctime AS users_ctime, users.extra AS users_extra FROM users WHERE users.id < %(id_1)s # 查询Users表中id<3的记录 r2 = session.query(User).filter(User.id < 3).all() print(r2) # 结果是一个列表 for user in r2: print(user.name) # 查询Users表中id为3的第一条记录 r3 = session.query(User).filter_by(id=3).first() print(r3.name)
r4 = session.query(User).filter(User.id > 2).delete()
print(r4) # 1 (结果为影响的记录条数)
r5 = session.query(User).filter_by(id=1).delete()
session.commit()
session.close()
r6 = session.query(User).filter_by(id=1).update({"name": 'cc'})
print(res) # 结果为影响的记录条数
# 类似于django的F查询
# synchronize_session=False 表示加字符串
r7 = session.query(User).filter(User.id > 0).update({"name": User.name + '_xxx'}, synchronize_session=False)
# synchronize_session="evaluate" 表示数学运算
r8 = session.query(User).filter(User.id > 0).update({"age": User.age + 1}, synchronize_session='evaluate')
# 与上面的结果一样
r9 = session.query(User).filter(User.id > 0).update({User.age: User.age + 1}, synchronize_session='evaluate')
session.commit()
session.close()
r10 = ssion.query(User.name.label('xxx'), User.age).all()
for user in r10
print(user.xxx) # users表的 name字段 在代码中使用就必须使用 xxx,用name就会报错
# select * from user where id<6 and name=cc order by id;
r11 = session.query(User).filter(text("id<:value and name=:name")).params(value=6, name='cc').order_by(User.id).all()
print(r11)
#自定义查询sql
r12 = session.query(User).from_statement(text("SELECT * FROM users where name=:name")).params(name='cc').all()
for u in r12:
print(u.id)
# in_ (注意下划线) r13 = session.query(User).filter(User.id.in_([1,3,4])).all() # ~非 (取反) r14 = session.query(User).filter(~User.id.in_([1,3,4])).all() print(ret) # 二次筛选 r15 = session.query(User).filter(User.id.in_(session.query(User.id).filter_by(name='cc'))).all() from sqlalchemy import and_, or_ # or_包裹的都是or条件,and_包裹的都是and条件 r16 = session.query(User).filter(and_(User.id > 3, User.name == 'cc')).all() r17 = session.query(User).filter(User.id > 3, User.name == 'cc').all() # and条件的and_()可以省略 r18 = session.query(User).filter(or_(User.id < 5, User.name == 'cc')).all() r19 = session.query(User).filter( or_( User.id < 2, and_(User.name == 'eric', User.id > 3), User.age != 8 )) print(r19)
# 以c开头
r20 = session.query(User).filter(User.name.like('c%')).all()
# 不包含c
r21 = session.query(User).filter(~User.name.like('%c%')).all()
# select * from users limit 1,4; 从第一条数据往后取4条
r22 = session.query(User)[0:4]
for user in r22:
print(user.age)
# 按照name字段降序排
r23 = session.query(User).order_by(User.name.desc()).all()
for user in r23:
print(user.id)
#第一个条件重复后,再按第二个条件升序排
r24 = session.query(User).order_by(User.name.desc(), User.id.asc()).all()
from sqlalchemy.sql import func
# select max(id),sum(id),min(id) from users group by age;
r25 = session.query(
func.max(User.id),
func.sum(User.id),
func.min(User.id)).group_by(User.age).all()
print(r25)
# select max(id),sum(id),min(id) from user group by name having min(id)>2;
ret = session.query(
func.max(User.id),
func.sum(User.id),
func.min(User.id)).group_by(User.name).having(func.min(User.id) > 2).all()
# 相当于django中的orm:
# User.objects.value(User.name).filter().annotate(a=max(User.id),b=min(User.id)).filter(b__gt=2).all()
#join表,默认是inner join,没有指定on的字段,默认用外键关联 # select * from Person inner join Hobby on person.hobby_id =hobby.id; ret = session.query(Person).join(Hobby) print(ret) # isouter=True 外连,表示Person left join Hobby,没有右连接,反过来即可 ret = session.query(Person).join(Hobby, isouter=True) ret = session.query(Hobby).join(Person, isouter=True) print(ret) # 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上 # select * from Person left join Hobby on person.id=hobby.id ret = session.query(Person).join(Hobby,Person.hobby_id==Hobby.id, isouter=True).all() print(ret) # select * from Person inner join Hobby on Person.hobby_id=Hobby.id where person.id>1 and hobby.caption='篮球' ret = session.query(Person).join(Hobby).filter(Person.id>1,Hobby.caption=='篮球').all()
# UNION 操作符用于合并两个或多个 SELECT 语句的结果集
# union和union all的区别?————union去重,union all 不去重
q1 = session.query(Person.name).filter(Person.nid > 2)
q2 = session.query(Hobby.caption).filter(Hobby.id < 2)
ret = q1.union(q2).all()
print(ret)
q1 = session.query(Person.name).filter(Person.nid > 2)
q2 = session.query(Hobby.caption).filter(Hobby.id < 2)
ret = q1.union_all(q2).all()
print(ret)
import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Boy2Girl(Base): __tablename__ = 'boy2girl' id = Column(Integer, primary_key=True, autoincrement=True) girl_id = Column(Integer, ForeignKey('girl.id')) boy_id = Column(Integer, ForeignKey('boy.id')) class Girl(Base): __tablename__ = 'girl' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) class Boy(Base): __tablename__ = 'boy' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(64), unique=True, nullable=False) # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以 girls = relationship('Girl', secondary='boy2girl', backref='boys') engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.create_all(engine)
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Boy, Girl, Boy2Girl from sqlalchemy.sql import text engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5) # 从连接池中拿一个链接 conn = sessionmaker(bind=engine) session = scoped_session(conn) boy1 = Boy(name='铁蛋') boy2 = Boy(name='大壮') girl1 = Girl(name='刘亦菲') girl2 = Girl(name='迪丽热巴') session.add_all([boy1, boy2, girl1, girl2]) # 建立关系 res = Boy2Girl(girl_id=1, boy_id=1) # res = Boy2Girl(girl=Girl对象,boy=Boy对象) session.add(res) td = session.query(Boy).filter(Boy.id == 1).first() dlrb = session.query(Girl).filter(Girl.id == 2).first() lyf = session.query(Girl).filter(Girl.id == 1).first() dz.girls=[dlrb,] td.girls.append(lyf) dz = session.query(Boy).filter(Boy.name == "大壮").first() lyf = session.query(Girl).filter(Girl.name == "刘亦菲").first() lyf.boys.append(dz) session.add(lyf) # print(lyf.boys) session.commit() session.close()
pip3 install flask-sqlalchemy
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///example.sqlite" db = SQLAlchemy(app) # db对象中有Column、Integer、String、session等属性,不用专门导入 # db.Model就是Base基表 class User(db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String, unique=True, nullable=False) email = db.Column(db.String, unique=True, nullable=False) db.session.add(User(username="cc", email="123@qq.com")) db.session.commit() users = User.query.all()
# flask和SQLAlchemy的管理者,通过db把他们做连接
db = SQLAlchemy(app)
- 包含配置
- 包含ORM基类
- 包含create_all()
- engine
- 创建连接
# pip3 install flask-migrate
# 安装了flask-migrate模块以后,就可以在flask项目中执行命令来操作数据库的初始化和迁移记录:
python3 manage.py db init # 初始化:只执行一次
python3 manage.py db migrate # 等同于 django中的makemigartions
python3 manage.py db upgrade # 等同于 django中的migrate
补充:
django中的迁移记录可以指定app名,就会只迁移该app的记录
- python3 manage.py makemigrations app01
- python3 manage.py migrate app01
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。