赞
踩
在Flask中,如果想要操作数据库,我们可以使用ORM来操作数据库,使用ORM操作数据库将变得非常简单。 以 mysql + SQLAlchemy 组合进行学习 SQLAlchemy:SQLAlchemy是一个数据库的ORM框架 安装命令为:pip3 install SQLAlchemy。 通过SQLAlchemy连接数据库 首先来看一段代码: from sqlalchemy import create_engine # 数据库的配置变量 HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'xt_flask' USERNAME = 'root' PASSWORD = 'root' DB_URI = 'mysql+mysqldb://{}:{}@{}:{}/{}'.format(USERNAME,PASSWORD,HOSTNAME,PORT,DATABASE) # 创建数据库引擎 engine = create_engine(DB_URI) #创建连接 with engine.connect() as con: rs = con.execute('SELECT 1') print rs.fetchone() 首先从sqlalchemy中导入create_engine,用这个函数来创建引擎,然后用engine.connect()来连接数据库。 其中一个比较重要的一点是,通过create_engine函数的时候,需要传递一个满足某种格式的字符串, 对这个字符串的格式来进行解释: dialect+driver://username:password@host:port/database?charset=utf8 dialect是数据库的实现,比如MySQL、PostgreSQL、SQLite,并且转换成小写。 driver是Python对应的驱动,如果不指定,会选择默认的驱动,比如MySQL的默认驱动是MySQLdb。 username是连接数据库的用户名,password是连接数据库的密码,host是连接数据库的域名, port是数据库监听的端口号,database是连接哪个数据库的名字。如果以上输出了1, 说明SQLAlchemy能成功连接到数据库。 用SQLAlchemy执行原生SQL(扩展): 我们将上一个例子中的数据库配置选项单独放在一个constants.py的文件中,看以下例子: from sqlalchemy import create_engine from constants import DB_URI #连接数据库 engine = create_engine(DB_URI,echo=True) # 使用with语句连接数据库,如果发生异常会被捕获 with engine.connect() as con: # 先删除users表 con.execute('drop table if exists authors') # 创建一个users表,有自增长的id和name con.execute('create table authors(id int primary key auto_increment,'name varchar(25))') # 插入两条数据到表中 con.execute('insert into persons(name) values("abc")') con.execute('insert into persons(name) values("xiaotuo")') # 执行查询操作 results = con.execute('select * from persons') # 从查找的结果中遍历 for result in results: print(result) 必须掌握: from sqlalchemy import create_engine #准备连接数据库基本信息 HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' #dialect+driver://username:password@host:port/database?charset=utf8 #按照上述的格式来 组织数据库信息 DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) #创建数据库引擎 engine = create_engine(DB_URI) #创建连接 conn = engine.connect() # 判断是否连接成功 result =conn.execute('select 1') print(result) print(result.fetchone())
1. ORM:Object Relationship Mapping 2. 大白话:对象模型与数据库表的映射 python代码 和SQL代码角度理解: class Person(object): name = 'xx' age = 18 country ='xx' # Person类 -> 数据库中的一张表 # Person类中的属性 -> 数据库中一张表字段 # Person类的一个对象 -> 数据库中表的一条数据 # p = Person('xx',xx) # p.save() #create table person(name varchar(200),age int,country varchar(100)) ORM,全称Object Relational Mapping,中文名叫做对象关系映射,通过ORM我们可以通过类的方式去操作数据库而不用再写原生的SQL语句,通过把表映射成类,把行作为实例,把字段作为属性,ORM在执行对象操作的时候最终还是会把对象的操作转换为数据库的原生语句,但使用ORM有许多优点: 1.易用性:使用ORM做数据库开发可以有效减少重复SQL语句的概率,写出来的模型也更加直观、清晰 2.性能损耗小:ORM转换成底层数据库操作指令确实会有一些开销。但是从实际情况来看,这种性能损耗很少(不足5%),只要不是针对性能有严苛的要求,综合考虑开发效率、代码阅读性,带来的好处远大于性能损耗,而且项目越大作用越明显。 3.设计灵活:可以轻松的写出复杂的查询。 4.可移植性:SQLAlchemy封装了底层的数据库实现,支持多个关系数据库引擎,包括流行的Mysql、PostgreSQL和SQLite,可以非常轻松的切换数据库。
1. 用`declarative_base`根据`engine`创建一个ORM基类。 from sqlalchemy.ext.declarative import declarative_base engine = create_engine(DB_URI) Base = declarative_base(engine) 2. 用这个`Base`类作为基类来写自己的ORM类。要定义`__tablename__`类属性,来指定这个模型映射到数据库中的表名。 class Person(Base): __tablename__ ='person' 3. 创建属性来映射到表中的字段,所有需要映射到表中的属性都应该为Column类型: class Person(Base): __tablename__ ='person' #2.在这个ORM模型中创建一些属性,来跟表中的字段进行 一一 映射。这些属性必须是sqlalchemy给我们提供好的数据类型 id = Column(Integer,primary_key=True,autoincrement=True) name = Column(String(50)) age = Column(Integer) country = Column(String(50)) 4. 使用`Base.metadata.create_all()`来将模型映射到数据库中。 Base.metadata.create_all() 5. 一旦使用`Base.metadata.create_all()`将模型映射到数据库中后,即使改变了模型的字段,也不会重新映射了。
用session做数据的增删改查操作: 1. 构建session对象:所有和数据库的ORM操作都必须通过一个叫做`session`的会话对象来实现,通过以下代码来获取会话对象: from sqlalchemy.orm import sessionmaker engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() 2. 添加对象: * 创建对象,也即创建一条数据: p1 = Person(name='momo1',age=19,country='china') * 将这个对象添加到`session`会话对象中: session.add(p1) * 将session中的对象做commit操作(提交): session.commit() * 一次性添加多条数据: p1 = Person(name='momo1',age=19,country='china') p2 = Person(name='momo2',age=20,country='china') session.add_all([p1,p2]) session.commit() 3. 查找对象: # 查找某个模型对应的那个表中所有的数据: all_person = session.query(Person).all() # 使用filter_by来做条件查询 all_person = session.query(Person).filter_by(name='momo1').all() # 使用filter来做条件查询 all_person = session.query(Person).filter(Person.name=='momo1').all() # 使用get方法查找数据,get方法是根据id来查找的,只会返回一条数据或者None person = session.query(Person).get(primary_key) person = session.query(Person).get(3) # 注意括号里面直接写数字 直接写数字 # 使用first方法获取结果集中的第一条数据 person = session.query(Person).first() 4. 修改对象:首先从数据库中查找对象,然后将这条数据修改为你想要的数据,最后做commit操作就可以修改数据了。 person = session.query(Person).first() person.name = 'lulu' session.commit() 5. 删除对象:将需要删除的数据从数据库中查找出来,然后使用`session.delete`方法将这条数据从session中删除,最后做commit操作就可以了。 person = session.query(Person).first() session.delete(person) session.commit()
1. Integer:整形,映射到数据库中是int类型。 2. Float:浮点类型,映射到数据库中是float类型。他占据的32位。 3. Double:双精度浮点类型,映射到数据库中是double类型,占据64位 (SQLALCHEMY中没有)。 4. String:可变字符类型,映射到数据库中是varchar类型. 5. Boolean:布尔类型,映射到数据库中的是tinyint类型。 6. DECIMAL:定点类型。是专门为了解决浮点类型精度丢失的问题的。在存储钱相关的字段的时候建议大家都使用这个数据类型。并且这个类型使用的时候需要传递两个参数,第一个参数是用来标记这个字段总能能存储多少个数字,第二个参数表示小数点后有多少位。 7. Enum:枚举类型。指定某个字段只能是枚举中指定的几个值,不能为其他值。在ORM模型中,使用Enum来作为枚举,示例代码如下: class News(Base): __tablename__ = 'news' tag = Column(Enum("python",'flask','django')) 在Python3中,已经内置了enum这个枚举的模块,我们也可以使用这个模块去定义相关的字段。示例代码如下: class TagEnum(enum.Enum): python = "python" flask = "flask" django = "django" class News(Base): __tablename__ = 'news' id = Column(Integer,primary_key=True,autoincrement=True) tag = Column(Enum(TagEnum)) news = News(tag=TagEnum.flask) 8. Date:存储时间,只能存储年月日。映射到数据库中是date类型。在Python代码中,可以使用`datetime.date`来指定。 9. DateTime:存储时间,可以存储年月日时分秒毫秒等。映射到数据库中也是datetime类型。在Python代码中,可以使用`datetime.datetime`来指定。 10. Time:存储时间,可以存储时分秒。映射到数据库中也是time类型。在Python代码中,可以使用`datetime.time`来至此那个。示例代码如下: class News(Base): __tablename__ = 'news' create_time = Column(Time) news = News(create_time=time(hour=11,minute=11,second=11)) 11. Text:存储长字符串。一般可以存储6W多个字符。如果超出了这个范围,可以使用LONGTEXT类型。映射到数据库中就是text类型。 12. LONGTEXT:长文本类型,映射到数据库中是longtext类型。 演示整体代码如下: from sqlalchemy import create_engine,Column,Integer,String,Float,Enum,Boolean,DECIMAL,Text,Date,DateTime,Time from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.orm import sessionmaker import enum from datetime import date from datetime import datetime from datetime import time #准备数据库的一堆信息 ip port user pwd 数据库的名称 按要求组织格式 HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' #dialect+driver://username:password@host:port/database?charset=utf8 #按照上述的格式来 组织数据库信息 DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".\ format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) #创建数据库引擎 engine = create_engine(DB_URI) #创建会话对象 session = sessionmaker(engine)() #定义一个枚举类 class TagEnum(enum.Enum): python="PYHTON2" flask="FLASK2" django ="DJANGO" #创建一个ORM模型 说明基于sqlalchemy 映射到mysql数据库的常用字段类型有哪些? Base = declarative_base(engine) class News(Base): __tablename__='news' id = Column(Integer,primary_key=True,autoincrement=True) price1 = Column(Float) #存储数据时存在精度丢失问题 price2 = Column(DECIMAL(10,4)) title = Column(String(50)) is_delete =Column(Boolean) tag1 =Column(Enum('PYTHON','FLASK','DJANGO')) #枚举常规写法 tag2 =Column(Enum(TagEnum)) #枚举另一种写法 create_time1=Column(Date) create_time2=Column(DateTime) create_time3=Column(Time) content1 =Column(Text) content2 =Column(LONGTEXT) # Base.metadata.drop_all() # Base.metadata.create_all() #新增数据到表news中 # a1 = News(price1=1000.0078,price2=1000.0078,title='测试数据',is_delete=True,tag1="PYTHON",tag2=TagEnum.flask, # create_time1=date(2018,12,12),create_time2=datetime(2019,2,20,12,12,30),create_time3=time(hour=11,minute=12,second=13), # content1="hello",content2 ="hello hi nihao") a1 = News(price1=1000.0078,price2=1000.0078,title='测试数据',is_delete=False,tag1="PYTHON",tag2=TagEnum.python, create_time1=date(2018,12,12),create_time2=datetime(2019,2,20,12,12,30),create_time3=time(hour=11,minute=12,second=13), content1="hello",content2 ="hello hi nihao") session.add(a1) session.commit()
1. primary_key:True设置某个字段为主键。 2. autoincrement:True设置这个字段为自动增长的。 3. default:设置某个字段的默认值。在发表时间这些字段上面经常用。 4. nullable:指定某个字段是否为空。默认值是True,就是可以为空。 5. unique:指定某个字段的值是否唯一。默认是False。 6. onupdate:在数据更新的时候会调用这个参数指定的值或者函数。在第一次插入这条数据的时候,不会用onupdate的值,只会使用default的值。常用于是`update_time`字段(每次更新数据的时候都要更新该字段值)。 7. name:指定ORM模型中某个属性映射到表中的字段名。如果不指定,那么会使用这个属性的名字来作为字段名。如果指定了,就会使用指定的这个值作为表字段名。这个参数也可以当作位置参数,在第1个参数来指定。 title = Column(String(50),name='title',nullable=False) title = Column('my_title',String(50),nullable=False) 代码演示如下: from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,Date,DateTime,Time,String,Text from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from datetime import date from datetime import datetime from datetime import time #在Python3中才有enum这个模块,在python2中没有 import enum HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() class News(Base): __tablename__ = 'news' id = Column(Integer,primary_key=True,autoincrement=True) create_time = Column(DateTime,default=datetime.now) read_count = Column(Integer,default=11) title = Column(String(50),name='my_title',nullable=False) telephone = Column(String(11),unique=True) update_time = Column(DateTime,onupdate=datetime.now,default=datetime.now) Base.metadata.drop_all() Base.metadata.create_all() news= News() session.add(news) session.commit() #下面是测试onupdate参数的 # news= session.query(News).first() # news.title = '123' # session.commit()
1. 模型名。指定查找这个模型中所有的属性(对应查询表为全表查询)。 2. 模型中的属性。可以指定只查找某个模型的其中几个属性。 3. 聚合函数。 * func.count:统计行的数量。 * func.avg:求平均值。 * func.max:求最大值。 * func.min:求最小值。 * func.sum:求和。 `func`上,其实没有任何聚合函数。但是因为他底层做了一些魔术,只要mysql中有的聚合函数,都可以通过func调用。 代码演示如下: from sqlalchemy import create_engine,Column,Integer,String,Float,Enum,Boolean,DECIMAL,Text,\ Date,DateTime,Time,func from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.orm import sessionmaker import random import enum from datetime import date from datetime import datetime from datetime import time #准备数据库的一堆信息 ip port user pwd 数据库的名称 按要求组织格式 HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' #dialect+driver://username:password@host:port/database?charset=utf8 #按照上述的格式来 组织数据库信息 DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".\ format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) #创建数据库引擎 engine = create_engine(DB_URI) #创建会话对象 session = sessionmaker(engine)() #创建一个ORM模型 Base = declarative_base(engine) class News(Base): __tablename__='news' id = Column(Integer,primary_key=True,autoincrement=True) title =Column(String(50),nullable=False) price = Column(Integer) def __repr__(self): return "<title:%s,price:%s>"%(self.title,self.price) # Base.metadata.drop_all() # Base.metadata.create_all() #新增测试数据 # for x in range(1,6): # a = News(title="标题%s"%x,price =random.randint(1,100)) # session.add(a) # session.commit() #query()函数中能写什么参数 3种 #1.模型名 # results = session.query(News).all() # print(results) #2.模型名中的属性。 返回的列表中的元素是 元组类型数据 # results = session.query(News.title,News.price).all() # print(results) #3.mysql聚合函数 # r = session.query(func.count(News.id)).first() # print(r) # r = session.query(func.max(News.price)).first() # print(r) # r = session.query(func.min(News.price)).first() # print(r) # r = session.query(func.avg(News.price)).first() # print(r) r = session.query(func.sum(News.price)).first() print(r)
过滤是数据提取的一个很重要的功能,以下对一些常用的过滤条件进行解释,并且这些过滤条件都是只能通过filter方法实现的: 1. equals : == news= session.query(News).filter(News.title == "title1").first() 2. not equals : != query(User).filter(User.name != 'ed') 3. like & ilike [不区分大小写]: query(User).filter(User.name.like('%ed%')) 4. in: query(User).filter(User.name.in_(['ed','wendy','jack'])) 5. not in: query(User).filter(~User.name.in_(['ed','wendy','jack'])) 6. is null: query(User).filter(User.name==None) # 或者是 query(User).filter(User.name.is_(None)) 7. is not null: query(User).filter(User.name != None) # 或者是 query(User).filter(User.name.isnot(None)) 8. and: query(User).filter(and_(User.name=='ed',User.fullname=='Ed Jones')) # 或者是传递多个参数 query(User).filter(User.name=='ed',User.fullname=='Ed Jones') # 或者是通过多次filter操作 query(User).filter(User.name=='ed').filter(User.fullname=='Ed Jones') 9. or: query(User).filter(or_(User.name=='ed',User.name=='wendy')) 如果想要查看orm底层转换的sql语句,可以在filter方法后面不要再执行任何方法直接打印就可以看到了。比如: news = session.query(News).filter(or_(News.title=='abc',News.content=='abc')) print(news) 代码演示: from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,\ Date,DateTime,Time,String,Text,func,or_,and_ from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker import random HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() class Article(Base): __tablename__ = 'article' id = Column(Integer,primary_key=True,autoincrement=True) title = Column(String(50),nullable=False) price = Column(Float,nullable=False) content = Column(Text) def __repr__(self): return "<Article(title:%s)>" % self.title #共两种查询 过滤的方法 # r1 = session.query(News).filter(News.id == 1).first() # print(r1) # r2 =session.query(News).filter_by(id = 2).first() # print(r2) # 1. equal # news = session.query(News).filter(News.title == "title0").first() # print(news) # 2. not equal # news= session.query(News).filter(News.title != 'title0').all() # print(news) # 3. like & ilike(不区分大小写) # news= session.query(News).filter(News.title.ilike('title%')).all() # print(news) # 4. in: # for xxx in xxx # def _in() # news= session.query(News).filter(News.title.in_(['title1','title2'])).all() # print(news) #5. not in # news= session.query(News).filter(~News.title.in_(['title1','title2'])).all() # print(news) # news= session.query(News).filter(News.title.notin_(['title1','title2'])).all() # print(news) #6. is null #修改表,添加一个列content,操作数据 # news= session.query(News).filter(News.content==None).all() # print(news) #7. is not null # news= session.query(News).filter(News.content!=None).all() # print(news) #8. and # news= session.query(News).filter(News.title=='title5',News.content=='abc').all() # print(news) # 或者 # news= session.query(News).filter(and_(News.title=='title5',News.content=='abc')).all() # print(news) #9.or # news= session.query(News).filter(or_(News.title=='title3',News.content=='abc')).all() # print(news)
表关系: 表之间的关系存在三种:一对一、一对多、多对多。 而SQLAlchemy中的ORM也可以模拟这三种关系。 因为一对一其实在SQLAlchemy中底层是通过一对多的方式模拟的,所以先来看下一对多的关系: 外键: 使用SQLAlchemy创建外键非常简单。在从表中增加一个字段,指定这个字段外键的是哪个表的哪个字段就可以了。从表中外键的字段,必须和主表的主键字段类型保持一致。 示例代码如下: # 主表 / 从表 # user/news class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) uname = Column(String(50),nullable=False) def __repr__(self): return "<User(uname:%s)>" % self.uname class News(Base): __tablename__ = 'news' id = Column(Integer,primary_key=True,autoincrement=True) title = Column(String(50),nullable=False) content = Column(Text,nullable=False) uid = Column(Integer,ForeignKey("user.id")) def __repr__(self): return "<News(title:%s,content=%s)>" % (self.title,self.content) 外键约束有以下几项: 1. RESTRICT:若子表中有父表对应的关联数据,删除父表对应数据,会阻止删除。默认项 2. NO ACTION:在MySQL中,作用同RESTRICT。 3. CASCADE:级联删除 删除父表对应数据,子表中与父表存在关联关系的数据,会被一同删除。 此操作敏感,要特别注意 4. SET NULL:父表对应数据被删除,子表对应数据项会设置为NULL。自空删除 此种外键约束,是在开发中用的最多的。 注意点是:ondelete要写在ForeignKey括号里面 此知识点要必须掌握 演示代码如下: from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,\ Date,DateTime,Time,String,Text,func,or_,and_,ForeignKey from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker import random HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() # 父表/从表 # user/news class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) uname = Column(String(50),nullable=False) def __repr__(self): return "<User(uname:%s)>" % self.uname class News(Base): __tablename__ = 'news' id = Column(Integer,primary_key=True,autoincrement=True) title = Column(String(50),nullable=False) content = Column(Text,nullable=False) # uid = Column(Integer,ForeignKey("user.id",ondelete='RESTRICT')) # uid = Column(Integer,ForeignKey("user.id",ondelete='NO ACTION')) # uid = Column(Integer,ForeignKey("user.id",ondelete='CASCADE')) uid = Column(Integer,ForeignKey("user.id",ondelete='SET NULL')) def __repr__(self): return "<News(title:%s,content=%s)>" % (self.title,self.content) Base.metadata.drop_all() Base.metadata.create_all() user = User(uname='momo') session.add(user) session.commit() news1= News(title='AAA',content='123',uid=1) news2= News(title='BBB',content='456',uid=1) session.add_all([news1,news2]) session.commit()
SQLAlchemy提供了一个`relationship`,这个类可以定义属性,以后在访问相关联的表的时候就直接 可以通过属性访问的方式就可以访问得到了。另外,可以通过`backref`来指定反向访问的属性名称。 newss是指有多篇新闻。他们之间的关系是一个“一对多”的关系。 from sqlalchemy import create_engine,Column,Integer,String,Float,Enum,Boolean,DECIMAL,Text,\ Date,DateTime,Time,func,and_,or_,ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.orm import sessionmaker,relationship import random import enum from datetime import date from datetime import datetime from datetime import time #准备数据库的一堆信息 ip port user pwd 数据库的名称 按要求组织格式 HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' #dialect+driver://username:password@host:port/database?charset=utf8 #按照上述的格式来 组织数据库信息 DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".\ format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) #创建数据库引擎 engine = create_engine(DB_URI) #创建会话对象 session = sessionmaker(engine)() #创建ORM模型 Base = declarative_base(engine) # 主表 / 从表 # user/news class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) uname = Column(String(50),nullable=False) # newss=relationship("News") #这种写法不是最优的,通常会把它通过反向声明的方式写在“多”的那一方 def __repr__(self): return "<User(uname:%s)>" % self.uname class News(Base): __tablename__ = 'news' id = Column(Integer,primary_key=True,autoincrement=True) title = Column(String(50),nullable=False) content = Column(Text,nullable=False) #外键 uid = Column(Integer,ForeignKey("user.id")) #正向author = relationship("User") #正向 和 反向在一起 表明两个模型之间的关系 # 下面这句话跟数据库没关系,不会在数据库生成这个字段,唯一作用是:帮助做快速连表操作 author = relationship("User",backref="newss") def __repr__(self): return "<News(title:%s,content=%s)>" % (self.title,self.content) # Base.metadata.drop_all() # Base.metadata.create_all() #需求1:查询 第一篇新闻的 作者是谁 # news= session.query(News).first() # print(news) # print(news.uid) #1 # user = session.query(User).get(news.uid) # print(user.uname) #上述的需求 能够被实现 但是太麻烦,引入relationship进行查询优化 # news = session.query(News).first() # print(news.author) # print(news.author.uname) #需求2:查询xx作者的所有文章 user = session.query(User).first() print(user.newss)
在sqlalchemy中,如果想要将两个模型映射成一对一的关系,那么应该在父模型中,指定引用的时候, 要传递一个`uselist=False`这个参数进去。就是告诉父模型,以后引用这个从模型的时候, 不再是一个列表了,而是一个对象 class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) uname = Column(String(50),nullable=False) extend = relationship("UserExtend",uselist=False) class UserExtend(Base): __tablename__ = 'user_extend' id = Column(Integer, primary_key=True, autoincrement=True) school = Column(String(50)) uid = Column(Integer,ForeignKey("user.id")) user = relationship("User") 方式2:(用得较多) 当然,也可以借助`sqlalchemy.orm.backref`来简化代码: class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) uname = Column(String(50),nullable=False) class UserExtend(Base): __tablename__ = 'user_extend' id = Column(Integer, primary_key=True, autoincrement=True) school = Column(String(50)) uid = Column(Integer,ForeignKey("user.id")) # 引入relationship进行查询优化 user = relationship("User",backref=backref("extend",uselist=False)) 演示代码如下: from sqlalchemy import create_engine,Column,Integer,String,Float,DECIMAL,Boolean,Enum,Date,DateTime,Time,Text from sqlalchemy import func,and_,or_,ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.orm import sessionmaker,relationship,backref from datetime import date,datetime,time #在python 3.x中 有enum模块 import enum import random #准备连接数据库基本信息 HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' #dialect+driver://username:password@host:port/database?charset=utf8 #按照上述的格式来 组织数据库信息 DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) #创建数据库引擎 engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() # 主表 / 从表 # user/news 1:n # user/user_extend 1:1 #表1 class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) uname = Column(String(50),nullable=False) #添加属性 优化2表查询操作 # newss =relationship("News") #这种写法不是最优的,通常会把它通过反向声明的方式写在“多”的那一方 #1:1关系的表示方式1 # extend =relationship("UserExtend",uselist=False) def __repr__(self): return "<User(uname:%s)>" % self.uname #表3 class UserExtend(Base): __tablename__ = 'user_extend' id = Column(Integer, primary_key=True, autoincrement=True) school = Column(String(50)) #外键 uid = Column(Integer,ForeignKey("user.id")) #1:1关系的表示方式1 # user = relationship("User") # 1:1关系的表示方式2 user = relationship("User",backref = backref("extend",uselist=False)) #表2 class News(Base): __tablename__ = 'news' id = Column(Integer,primary_key=True,autoincrement=True) title = Column(String(50),nullable=False) content = Column(Text,nullable=False) #SQLALchemy实现外键的方法 uid = Column(Integer,ForeignKey("user.id")) #默认删除策略为 :RESTRICT #添加属性 优化2表查询操作 #正向 # author = relationship("User") #最终:会把正向 和反向 关系 写在一起 author = relationship("User",backref="newss") def __repr__(self): return "<News(title:%s,content=%s)>" % (self.title,self.content) #创建表 # Base.metadata.drop_all() # Base.metadata.create_all() #需求:ORM层面外键 和一对一关系实现 #好处1:添加数据 User 添加 UserExtend # user = User(uname="wangwu") # ux = UserExtend(school="京南大学") # user.extend = ux # # print(type(user.extend)) # session.add(user) # session.commit() #好处1:添加数据 UserExtend 添加 User # ux = UserExtend(school="武汉大学") # user2 = User(uname="李四") # ux.user = user2 # print(type(ux.user)) # session.add(ux) # session.commit() #好处2:查询数据 user3 = session.query(User).first() print(user3.uname) print(user3.extend.school)
1. 多对多的关系需要通过一张中间表来绑定他们之间的关系。 2. 先把两个需要做多对多的模型定义出来 3. 使用Table定义一个中间表,中间表一般就是包含两个模型的外键字段就可以了,并且让他们两个来作为一个“复合主键”。 4. 在两个需要做多对多的模型中随便选择一个模型,定义一个relationship属性,来绑定三者之间的关系,在使用relationship的时候,需要传入一个secondary=中间表对象名 from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,\ Date,DateTime,Time,String,Text,func,or_,and_,ForeignKey,Table from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship,backref import random HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() #表3 中间表 news_tag = Table( "news_tag", Base.metadata, Column("news_id",Integer,ForeignKey("news.id"),primary_key=True), Column("tag_id",Integer,ForeignKey("tag.id"),primary_key=True) ) #表1 class News(Base): __tablename__ = 'news' id = Column(Integer,primary_key=True,autoincrement=True) title = Column(String(50),nullable=False) #产生关系 写法1 # tags = relationship("Tag",backref="newss",secondary=news_tag) def __repr__(self): return "<News(title:%s)>" % self.title #表2 class Tag(Base): __tablename__ = 'tag' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(50), nullable=False) # 产生关系 写法2 newss = relationship("News",backref="tags",secondary=news_tag) def __repr__(self): return "<Tag(name:%s)>" % self.name # 1. 先把两个需要做多对多的模型定义出来 # 2. 使用Table定义一个中间表,中间表一般就是包含两个模型的外键字段就可以了,并且让他们两个来作为一个“复合主键”。 # 3. 在两个需要做多对多的模型中随便选择一个模型,定义一个relationship属性,来绑定三者之间的关系, # 4. 在使用relationship的时候,需要传入一个secondary=中间表对象名。 Base.metadata.drop_all() Base.metadata.create_all() #添加数据的好处 news1 = News(title="世界第一") news2 = News(title="世界第二") tag1 = Tag(name='要闻') tag2 = Tag(name='娱乐') news1.tags.append(tag1) news1.tags.append(tag2) news2.tags.append(tag1) news2.tags.append(tag2) session.add(news1) session.add(news2) session.commit() #查询数据的好处 news3 = session.query(News).first() print(news3.tags) tag = session.query(Tag).first() print(tag.newss)
只要有relationship,就必然涉及到外键,外键里面的约束是数据层面的
relationship里面的东西是ORM层面的。
ORM层面删除数据,会无视mysql级别的外键约束。 直接会将对应的数据删除,然后将从表中的那个外键设置为NULL。 如果想要避免这种行为,应该将从表中的外键的`nullable=False`。 from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,\ Date,DateTime,Time,String,Text,func,or_,and_,ForeignKey,Table from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship,backref import random HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) uname = Column(String(50),nullable=False) class News(Base): __tablename__ = 'news' id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(50),nullable=False) uid = Column(Integer,ForeignKey("user.id"),nullable=False) author = relationship("User",backref='newss') Base.metadata.drop_all() Base.metadata.create_all() user = User(uname='momo') news= News(title='hello world') news.author = user session.add(news) session.commit() #若 uid = Column(Integer,ForeignKey("user.id")) 没有声明非空,删除父表能成功,会将子表数据 对应项置为空 #解决 若uid = Column(Integer,ForeignKey("user.id"),nullable=False) 会被阻止删除父表 user = session.query(User).first() session.delete(user) session.commit()
在SQLAlchemy,只要将一个数据添加到session中,和他相关联的数据都可以一起存入到数据库中了。 这些是怎么设置的呢?其实是通过relationship的时候,有一个关键字参数cascade可以设置这些属性。 cascade属性值为: save-update:默认选项。在添加一条数据的时候,会把其他和他相关联的数据都添加到数据库中。这种行为就是save-update属性影响的。 delete:表示当删除某一个模型中的数据的时候,是否也删掉使用relationship和他关联的数据。默认不删除关联数据 delete-orphan:表示当对一个ORM对象解除了父表中的关联对象的时候,自己便会被删除掉。当然如果父表中的数据被删除,自己也会被删除。这个选项只能用在一对多上,并且还需要在子模型中的relationship中,增加一个single_parent=True的参数。 merge:默认选项。当在使用session.merge,合并一个对象的时候,会将使用了relationship相关联的对象也进行merge操作。 expunge:移除操作的时候,会将相关联的对象也进行移除。这个操作只是从session中移除,并不会真正的从数据库中删除。 all:是对save-update, merge, refresh-expire, expunge, delete几种的缩写。 from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,\ Date,DateTime,Time,String,Text,func,or_,and_,ForeignKey,Table from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship,backref import random HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) uname = Column(String(50),nullable=False) # articles =relationship("Article",cascade="save-update,delete") #放入Article中去优化 # comments = relationship("Comment") #放入Comment中去优化 class Article(Base): __tablename__ = 'article' id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(50), nullable=False) uid = Column(Integer,ForeignKey("user.id"),nullable=False) # author = relationship("User",backref="articles") #cascade默认为save-update # author = relationship("User",backref="articles",cascade="save-update") #明文指定为save-update #delete:表示当删除某一个模型中的数据的时候,是否也删掉使用relationship和他关联的数据。默认不删除关联数据 # author = relationship("User",cascade="save-update,delete") #明文指定为save-update 和delete #优化写法 author = relationship("User",backref=backref("articles",cascade="save-update,delete"),cascade="save-update,delete") class Comment(Base): __tablename__ = 'comment' id = Column(Integer, primary_key=True, autoincrement=True) content = Column(Text,nullable=False) uid = Column(Integer,ForeignKey("user.id")) #author = relationship("User") # 优化写法 author = relationship("User",backref=backref("comments")) def add_data(): Base.metadata.drop_all() Base.metadata.create_all() user = User(uname="momo") article = Article(title="华为5G") article.author = user session.add(article) #引入comment表 comment = Comment(content='你少说风凉话') comment.author = user session.add(comment) session.commit() def oper_data(): #relationship里边的cascade 可通过Article影响User # article = session.query(Article).first() # session.delete(article) # session.commit() #反过来通过User也能影响Article user = session.query(User).first() session.delete(user) session.commit() #总结1:relationship里边的cascade 可通过Article影响User,反过来通过User也能影响Article #总结2:relationship里边的cascade 只会影响当前类User中relationship的模型Article,不会影响模型Comment if __name__ == '__main__': # add_data() oper_data() from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,\ Date,DateTime,Time,String,Text,func,or_,and_,ForeignKey,Table from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship,backref import random HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) uname = Column(String(50),nullable=False) class Article(Base): __tablename__ = 'article' id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(50), nullable=False) uid = Column(Integer,ForeignKey("user.id"),nullable=False) #优化写法 # author = relationship("User",backref=backref("articles",cascade="save-update,delete"),cascade="save-update,delete") #delete-orphan的使用 ,注意:delete-orphan依赖于delete # author = relationship("User",backref=backref("articles",cascade="save-update,delete,delete-orphan"), # cascade="save-update",single_parent=True) #merge的使用 # author = relationship("User",backref=backref("articles",cascade="save-update,delete,delete-orphan,merge"), # cascade="save-update",single_parent=True) #all的使用 :包含了save-update, merge, expunge, delete几种功能 author = relationship("User",backref=backref("articles",cascade="all"), cascade="save-update",single_parent=True) class Comment(Base): __tablename__ = 'comment' id = Column(Integer, primary_key=True, autoincrement=True) content = Column(Text,nullable=False) uid = Column(Integer,ForeignKey("user.id")) author = relationship("User") # 优化写法 author = relationship("User",backref=backref("comments")) def add_data(): Base.metadata.drop_all() Base.metadata.create_all() user = User(uname="momo") article = Article(title="华为5G") article.author = user session.add(article) #引入comment表 comment = Comment(content='你少说风凉话') comment.author = user session.add(comment) session.commit() def oper_data(): #relationship里边的cascade 可通过Article影响User # article = session.query(Article).first() # session.delete(article) # session.commit() #反过来通过User也能影响Article # user = session.query(User).first() # session.delete(user) # session.commit() #测试delete-orphan的使用 user = session.query(User).first() user.articles = [] session.commit() #测试 merge 先查看user中的id数据 执行完该功能函数后 再查看user中的id数据 def oper_data_merge(): user = User(id=1,uname = "莫莫") #merge:默认选项,合并成功 # session.merge(user) #merge:当在使用session.merge,合并一个对象的时候,会将使用了relationship相关联的对象也进行merge操作。 article = Article(id=1,title="5G还没普及起来,4G怎么变慢了呢?") user.articles.append(article) session.merge(user) session.commit() #测试expunge 移除操作的时候,会将相关联的对象也进行移除。这个操作只是从session中移除,并不会真正的从数据库中删除。 def oper_data_expunge(): user = User(uname='花花') article = Article(title='哈哈哈') article.author = user session.add(user) session.add(article) session.expunge(user) #总结:add方法 与expunge的作用相反 session.commit() if __name__ == '__main__': # add_data() # oper_data() # oper_data_merge() oper_data_expunge() #总结:ondelete的删除策略必须掌握,同时了解orm层面的cascade的操作即可
1. order_by方法排序:可以指定根据模型中某个属性进行排序,"模型名.属性名.desc()"代表的是降序排序。 2. 在定义模型的时候指定排序:有些时候,不想每次在查询的时候都用order_by方法,可以在定义模型的时候就指定排序的方式。 有以下两种方式: 在模型定义中,添加以下代码: __mapper_args__ = { # "order_by": create_time #正序 "order_by": create_time.desc() #倒序 } 3.relationship的方法中order_by属性:在指定relationship方法的时候,添加order_by属性来指定排序的字段。 # author = relationship("User", backref=backref("articles",order_by=create_time)) #正序 author = relationship("User", backref=backref("articles",order_by=create_time.desc())) #倒序 from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,\ Date,DateTime,Time,String,Text,func,or_,and_,ForeignKey,Table from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship,backref import random,time from datetime import datetime HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() #排序方式1:order_by方法指定 class Article(Base): __tablename__ = 'article' id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(50), nullable=False) create_time = Column(DateTime, nullable=False, default=datetime.now) def __repr__(self): return "<Article(title:%s,create_time:%s)>" % (self.title,self.create_time) def add_data(): Base.metadata.drop_all() Base.metadata.create_all() article1 = Article(title='title1') session.add(article1) session.commit() time.sleep(3) article2 = Article(title='title2') session.add(article2) session.commit() def oper(): # 正序排序 articles1 = session.query(Article).order_by(Article.create_time).all() print(articles1) # 倒序排序 articles2 = session.query(Article).order_by(Article.create_time.desc()).all() print(articles2) if __name__ == '__main__': # add_data() oper() 方式二代码演示:定义模型时,指定排序方式 from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,\ Date,DateTime,Time,String,Text,func,or_,and_,ForeignKey,Table from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship,backref import random,time from datetime import datetime HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() #排序方式2:定义模型时,指定排序方式 class Article(Base): __tablename__ = 'article' id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(50), nullable=False) create_time = Column(DateTime, nullable=False, default=datetime.now) __mapper_args__ = { # "order_by": create_time #正序 "order_by": create_time.desc() #倒序 } def __repr__(self): return "<Article(title:%s,create_time:%s)>" % (self.title,self.create_time) def add_data(): Base.metadata.drop_all() Base.metadata.create_all() article1 = Article(title='title1') session.add(article1) session.commit() time.sleep(3) article2 = Article(title='title2') session.add(article2) session.commit() def oper(): # 不用再指定排序方式 因为在定义模型的时候 就已指定好排序方式 articles2 = session.query(Article).all() print(articles2) if __name__ == '__main__': # add_data() oper() 方式三代码演示:涉及两表时,定义模型时,用relationship方法中的order_by属性指定排序方式 from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,\ Date,DateTime,Time,String,Text,func,or_,and_,ForeignKey,Table from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship,backref import random,time from datetime import datetime HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() #排序方式3:涉及两表时,定义模型时,用relationship方法中的order_by属性指定排序方式 class User(Base): __tablename__ = 'user' id = Column(Integer, primary_key=True, autoincrement=True) uname = Column(String(50),nullable=False) class Article(Base): __tablename__ = 'article' id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(50), nullable=False) create_time = Column(DateTime, nullable=False, default=datetime.now) uid = Column(Integer,ForeignKey("user.id")) # author = relationship("User", backref=backref("articles",order_by=create_time)) #正序 author = relationship("User", backref=backref("articles",order_by=create_time.desc())) #倒序 def __repr__(self): return "<Article(title:%s,create_time:%s)>" % (self.title,self.create_time) def add_data(): Base.metadata.drop_all() Base.metadata.create_all() article1 = Article(title='title1') user = User(uname='默默') user.articles = [article1] session.add(user) session.commit() time.sleep(3) article2 = Article(title='title2') user.articles.append(article2) session.commit() def oper(): # 不用再指定排序方式 因为在定义模型的时候 就已指定好排序方式 user = session.query(User).first() print(user.articles) if __name__ == '__main__': # add_data() oper()
1. limit:可以限制查询的时候只查询前几条数据。 属top-N查询 2. offset:可以限制查找数据的时候过滤掉前面多少条。可指定开始查询时的偏移量。 3. 切片:可以对Query对象使用切片操作,来获取想要的数据。 可以使用`slice(start,stop)`方法来做切片操作。 也可以使用`[start:stop]`的方式来进行切片操作。 一般在实际开发中,中括号的形式是用得比较多的。 from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,\ Date,DateTime,Time,String,Text,func,or_,and_,ForeignKey,Table from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship,backref import random,time from datetime import datetime HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() class Article(Base): __tablename__ = 'article' id = Column(Integer,primary_key=True,autoincrement=True) title = Column(String(50),nullable=False) create_time = Column(DateTime,default=datetime.now) def __repr__(self): return "<Article(title: %s)>" % self.title def add_data(): Base.metadata.drop_all() Base.metadata.create_all() for x in range(100): title = "title %s" % x article = Article(title=title) session.add(article) session.commit() def oper1(): # articles = session.query(Article).all() #limit:可以限制每次查询的时候只查询前几条数据。 属top-N查询 articles = session.query(Article).limit(10).all() print(articles) def oper2(): #offset:可以限制查找数据的时候过滤掉前面多少条。可指定开始查询时的偏移量。 # articles = session.query(Article).offset(10).limit(10).all() #场景选型:查看最新的10条新闻 articles = session.query(Article).order_by(Article.id.desc()).limit(10).all() print(articles) #实现分页 from sqlalchemy.orm.query import Query def oper3(): articles = session.query(Article).order_by(Article.id.desc()).slice(0,10).all() print(articles) def oper4(): articles = session.query(Article).order_by(Article.id.desc())[0:10] print(articles) if __name__ == '__main__': # add_data() # oper1() # oper2() # oper3() oper4()
在一对多,或者多对多关系的时候,如果想要获取多的一方这一部分的数据的时候,往往能通过一个属性就可以全部获取了。 如有一个作者,想要这个作者的所有文章,通过user.articles就可以获取所有的。 但有时候我们不想获取所有的数据,如只想获取这个作者今天发表的文章, 那么这时候我们可以给relationship方法添加属性lazy='dynamic', 以后通过user.articles获取到的就不是一个列表,而是一个AppenderQuery对象了。 这样就可以对这个对象再进行一层过滤和排序等操作。 通过`lazy='dynamic'`,获取出来的多的那一部分的数据,就是一个`AppenderQuery`对象了。 这种对象既可以添加新数据,也可以跟`Query`一样,可以再进行一层过滤。 lazy可用的选项: 1. `select`:这个是默认选项。还是拿`user.articles`的例子来讲。 如果你没有访问`user.articles`这个属性,那么sqlalchemy就不会从数据库中查找文章。 一旦你访问了这个属性,那么sqlalchemy就会立马从数据库中查找所有的文章, 并把查找出来的数据组装成一个列表返回。这也是懒加载。 2.`dynamic`:这个也是懒加载。就是在访问`user.articles`的时候返回来的不是一个列表, 而是`AppenderQuery`对象。 总结:如果你在获取数据的时候,想要对多的那一边的数据再进行一层过滤, 那么这时候就可以使用`lazy='dynamic'`。 from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,\ Date,DateTime,Time,String,Text,func,or_,and_,ForeignKey,Table from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship,backref import random,time from datetime import datetime HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() class User(Base): __tablename__ = 'user' id = Column(Integer, primary_key=True, autoincrement=True) uname = Column(String(50),nullable=False) class Article(Base): __tablename__ = 'article' id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(50), nullable=False) create_time = Column(DateTime, nullable=False, default=datetime.now) uid = Column(Integer,ForeignKey("user.id")) # author = relationship("User", backref=backref("articles")) #懒加载 author = relationship("User", backref=backref("articles",lazy="dynamic")) def __repr__(self): return "<Article(title:%s,create_time:%s)>" % (self.title,self.create_time) def add_data(): Base.metadata.drop_all() Base.metadata.create_all() user = User(uname='莫莫') for x in range(100): article = Article(title="title %s" % x) article.author = user session.add(article) session.commit() from sqlalchemy.orm.collections import InstrumentedList def oper1(): user = session.query(User).first() print(type(user.articles)) #<class 'sqlalchemy.orm.collections.InstrumentedList'> print(user.articles) #懒加载 from sqlalchemy.orm.dynamic import AppenderQuery def oper2(): user = session.query(User).first() print(type(user.articles)) #<class 'sqlalchemy.orm.dynamic.AppenderQuery'> print(user.articles) #辨析 AppenderQuery 和 Query from sqlalchemy.orm.query import Query def oper3(): user = session.query(User) print(type(user)) #<class 'sqlalchemy.orm.query.Query'> print(user) #sql语句 #有2层意思 #1.是一个Query对象。可以调用Query对象的方法 #2.是一个AppenderQuery对象。可以继续追加数据进去 def oper4(): user = session.query(User).first()#可以调用Query对象的方法 print(type(user)) print(user.articles.filter(Article.id>=50).all()) # article = Article(title='title 100') # user.articles.append(article)#2.是一个AppenderQuery对象。可以继续追加数据进去 # session.commit() if __name__ == '__main__': # add_data() # oper1() # oper2() # oper3() oper4()
group_by: 根据某个字段进行分组。如想要根据年龄进行分组,来统计每个分组分别有多少人 r = session.query(User.age,func.count(User.id)).group_by(User.age).all() having: having是对分组查找结果作进一步过滤。如只想要看未成年人的人数, 那么可以首先对年龄进行分组统计人数,然后再对分组进行having过滤。 r = session.query(User.age,func.count(User.id)).group_by(User.age).having(User.age < 18).all() from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,\ Date,DateTime,Time,String,Text,func,or_,and_,ForeignKey,Table from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship,backref import random,time from datetime import datetime HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) uname = Column(String(50),nullable=False) age = Column(Integer,default=0) gender = Column(Enum("男","女","秘密"),default="男") def add_data(): Base.metadata.drop_all() Base.metadata.create_all() user1 = User(uname='李一',age=17,gender='男') user2 = User(uname='李二',age=17,gender='男') user3 = User(uname="张三",age=18,gender='女') user4 = User(uname="张四",age=19,gender='女') user5 = User(uname="莫莫",age=20,gender='男') session.add_all([user1,user2,user3,user4,user5]) session.commit() #查询 每个年龄的人数 def oper1(): # user = session.query(User.age,func.count(User.id)).group_by(User.age) # print(user) #sql语句 user = session.query(User.age, func.count(User.id)).group_by(User.age).all() print(user) # 列表 #查询 每个年龄的人数,要求排除未成年人 def oper2(): user = session.query(User.age, func.count(User.id)).group_by(User.age).having(User.age>=18).all() print(user) # 列表 #查询 每个年龄的人数,要求未成年人 def oper3(): user = session.query(User.age, func.count(User.id)).group_by(User.age).having(User.age<18).all() print(user) # 列表 if __name__ == '__main__': # add_data() # oper1() # oper2() oper3()
1.mysql中的外连接、内连接 表A数据: aID aNum 1 a20050111 2 a20050112 3 a20050113 4 a20050114 5 a20050115 表B数据: bID bName 1 2006032401 2 2006032402 3 2006032403 4 2006032404 8 2006032408 1.1 left join(左连接) SELECT * FROM a LEFT JOIN b ON a.aID =b.bID 结果如下: aID aNum bID bName 1 a20050111 1 2006032401 2 a20050112 2 2006032402 3 a20050113 3 2006032403 4 a20050114 4 2006032404 5 a20050115 NULL NULL 结果说明: left join是以A表的记录为基础的,A可以看成左表,B可以看成右表,leftjoin是以左表为准的. 换句话说,左表(A)的记录将会全部表示出来,而右表(B)只会显示符合搜索条件的记录(例子中为: A.aID = B.bID). B表记录不足的地方均为NULL. 1.2 right join(右连接) SELECT * FROM a RIGHT JOING b ON a.aID = b.bID 结果如下: aID aNum bID bName 1 a20050111 1 2006032401 2 a20050112 2 2006032402 3 a20050113 3 2006032403 4 a20050114 4 2006032404 NULL NULL 8 2006032408 结果说明: 仔细观察一下,就会发现,和left join的结果刚好相反,这次是以右表(B)为基础的,A表不足的地方用NULL填充. 1.3 inner join(相等连接或内连接) SELECT * FROM a INNER JOIN b ON a.aID =b.bID 等同于以下SQL句: SELECT * FROM a,b WHERE a.aID = b.bID 结果如下: aID aNum bID bName 1 a20050111 1 2006032401 2 a20050112 2 2006032402 3 a20050113 3 2006032403 4 a20050114 4 2006032404 结果说明: 很明显,这里只显示出了 A.aID = B.bID的记录.这说明inner join并不以谁为基础,它只显示符合条件的记录. 2.join的使用_高级查询: 1. join分为left join(左外连接)和right join(右外连接)以及内连接(等值连接)。 2. 在sqlalchemy中,使用join来完成内连接。在写join的时候,如果不写join的条件,那么默认将使用外键来作为条件连接。 3. 查询出来的字段,跟join后面的东西无关,而是取决于query方法中传了什么参数。(模型名=全表;模型名.属性=表名.字段)。 4. 在sqlalchemy中,使用outerjoin来完成外连接(默认是左外连接)。 from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,\ Date,DateTime,Time,String,Text,func,or_,and_,ForeignKey,Table from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship,backref import random,time from datetime import datetime HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) uname = Column(String(50),nullable=False) def __repr__(self): return "<User(uname: %s)>" % self.uname class Article(Base): __tablename__ = 'article' id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(50), nullable=False) create_time = Column(DateTime, nullable=False, default=datetime.now) uid = Column(Integer,ForeignKey("user.id")) author = relationship("User",backref="articles") def __repr__(self): return "<Article(title: %s)>" % self.title def add_data(): Base.metadata.drop_all() Base.metadata.create_all() user1 = User(uname='莫莫') user2 = User(uname='露露') for x in range(1): article = Article(title='title %s' % x) article.author = user1 session.add(article) session.commit() for x in range(1,3): article = Article(title='title %s' % x) article.author = user2 session.add(article) session.commit() #找到所有的用户,按照发表的文章数量进行排序 #注意1:在sqlalchemy中,使用join来完成内连接 def oper1(): #select user.uname,count(article.id) from user join article on user.id=article.uid group by user.id order by count(article.id) desc; # result = session.query(User.uname, func.count(Article.id)).join(Article,User.id==Article.uid).group_by(User.id).order_by( # func.count(Article.id).desc()) # print(result) #sql语句 result = session.query(User.uname, func.count(Article.id)).join(Article, User.id == Article.uid).group_by( User.id).order_by( func.count(Article.id).desc()).all() print(result) # 结果:列表 #找到所有的用户,按照发表的文章数量进行排序 #注意2:在写join的时候,如果不写join的条件,那么默认将使用外键来作为条件连接。 def oper2(): result = session.query(User.uname, func.count(Article.id)).join(Article).group_by( User.id).order_by( func.count(Article.id).desc()).all() print(result) # 结果:列表 #找到所有的用户,按照发表的文章数量进行排序 #注意3: 查询出来的字段,跟join后面的东西无关,而是取决于query方法中传了什么参数。(模型名=全表;模型名.属性=表名.字段) def oper3(): result = session.query(User).join(Article).group_by( User.id).order_by( func.count(Article.id).desc()).all() print(result) # 结果:列表 if __name__ == '__main__': # add_data() # oper1() # oper2() oper3()
subquery方法: 子查询即select语句中还有select。 那么在sqlalchemy中,要实现一个子查询,需以下几个步骤: 1. 将子查询按照传统的方式写好查询代码,然后在`query`对象后面执行`subquery`方法,将这个查询变成一个子查询。 2. 在子查询中,将以后需要用到的字段通过`label`方法,取个别名。 3. 在父查询中,如果想要使用子查询的字段,那么可以通过子查询的返回值上的`c`属性拿到(c=Column)。 from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,\ Date,DateTime,Time,String,Text,func,or_,and_,ForeignKey,Table from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship,backref import random,time from datetime import datetime HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) uname = Column(String(50),nullable=False) city = Column(String(50),nullable=False) age = Column(Integer,default=0) def __repr__(self): return "<User(username: %s)>" % self.uname def add_data(): Base.metadata.drop_all() Base.metadata.create_all() user1 = User(uname='一哥',city="贵阳",age=18) user2 = User(uname='王二',city="贵阳",age=18) user3 = User(uname='张三',city="北京",age=18) user4 = User(uname='赵四',city="贵阳",age=20) session.add_all([user1,user2,user3,user4]) session.commit() # 相亲类网站:同城交友 之珍爱网 #需求: 寻找和 “一哥” 这个人在同一个城市,并且是同年龄的人 #实现思路1:传统方式 def oper1(): u = session.query(User).filter(User.uname == '一哥').first() users = session.query(User).filter(User.city==u.city,User.age==u.age).all() print(users) #实现思路2:子查询方式 #原生sql:select `user`.id,`user`.uname,`user`.city,`user`.age from user, # (select `user`.city,`user`.age from user where uname='一哥') as yige # where `user`.city=yige.city AND `user`.age=yige.age def oper2(): # stmt = session.query(User.city.label('city'), User.age.label('age')).filter(User.uname == '一哥').subquery() # result = session.query(User).filter(User.city == stmt.c.city, User.age == stmt.c.age) # print(result) #查看sql语句 stmt = session.query(User.city.label('city'), User.age.label('age')).filter(User.uname == '一哥').subquery() result = session.query(User).filter(User.city == stmt.c.city, User.age == stmt.c.age).all() print(result) # 查看结果 if __name__ == '__main__': # add_data() # oper1() oper2()
当多表关联查询的时候, 有时候同一个表要用到多次, 这时候用别名就可以方便的解决命名冲突的问题了 from sqlalchemy import create_engine,Column,Integer,Float,Boolean,DECIMAL,Enum,\ Date,DateTime,Time,String,Text,func,or_,and_,ForeignKey,Table from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker,relationship,backref import random,time from datetime import datetime HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) session = sessionmaker(engine)() class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) uname = Column(String(50),nullable=False) city = Column(String(50),nullable=False) age = Column(Integer,default=0) def __repr__(self): return "<User(username: %s)>" % self.uname def add_data(): Base.metadata.drop_all() Base.metadata.create_all() user1 = User(uname='一哥',city="贵阳",age=18) user2 = User(uname='王二',city="贵阳",age=18) user3 = User(uname='张三',city="北京",age=18) user4 = User(uname='赵四',city="贵阳",age=20) session.add_all([user1,user2,user3,user4]) session.commit() from sqlalchemy.orm import aliased a1 = aliased(User) a2 = aliased(User) #别名 def oper1(): for uname, age1, age2 in session.query(User.uname,a1.age, a2.age).join( a1,User.id==a1.id).join(a2,a1.id==a2.id).all(): print(uname, age1, age2) if __name__ == '__main__': # add_data() oper1()
1.安装: pip install flask-sqlalchemy 2.Flask-SQLAlchemy的使用要点: 2.1 数据库连接 数据库初始化不再是通过create_engine。 1. 跟sqlalchemy一样,定义好数据库连接字符串DB_URI。 2. 将这个定义好的数据库连接字符串DB_URI,通过`SQLALCHEMY_DATABASE_URI`这个key名配置到`app.config`中。 代码:app.config["SQLALCHEMY_DATABASE_URI"] = DB_URI 3. 使用`flask_sqlalchemy.SQLAlchemy`这个类定义一个对象,并将`app`传入进去。 代码:db = SQLAlchemy(app) 2.2 创建ORM模型类 之前都是通过Base = declarative_base()来初始化一个基类,然后再继承,在Flask-SQLAlchemy中更加简单了。 1. 还是跟使用sqlalchemy一样,定义模型。现在不再是需要使用`delarative_base`来创建一个基类。而是使用`db.Model`来作为基类。 2. 在模型类中,`Column`、`String`、`Integer`以及`relationship`等,都不需要导入了,直接使用`db`下面相应的属性名就可以了。 3. 在定义模型的时候,可以不写`__tablename__`,那么`flask_sqlalchemy`会默认使用当前的模型的名字转换成小写来作为表的名字, 并且如果这个模型的名字用到了多个单词并且使用了驼峰命名法,那么会在多个单词之间使用下划线来进行连接, 虽然flask_sqlalchemy给我们提供了这个特性,但是不推荐使用。(增强代码可读性,提高团队合作效率) 2.3 将ORM模型映射到数据库表 写完模型类后,要将模型映射到数据库的表中,使用以下代码即可 1. 删除数据库表:db.drop_all() 2. 创建数据库表:db.create_all() 2.4 session的使用 以后session也不需要使用`sessionmaker`来创建了, 直接使用`db.session`就可以了, 操作这个session的时候就跟之前的`sqlalchemy`的`session`是一样一样的。 2.5 添加数据 这时候就可以在数据库中看到已经生成了对应表了 添加数据和之前的没有区别,只是session成为了一个db的属性 2.6 查询数据: 1.单表查询 查询数据不再是之前的session.query方法了,而是将query属性放在了db.Model上, 所以查询就是通过“模型名.query”的方式进行查询了,`query`就跟之前的sqlalchemy中的query方法是一样用的。 2.多表查询 如果查找数据涉及多个模型,只能使用db.session.query(模型名).all() 这种方式 2.7 修改数据: 修改数据和之前的没有区别,只是session成为了一个db的属性 2.8 删除数据: 删除数据跟添加数据和修改数据类似,只不过session是db的一个属性而已 3.代码演示: from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'first_sqlalchemy' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) app.config['SQLALCHEMY_DATABASE_URI'] = DB_URI app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False #1.连接数据库 db = SQLAlchemy(app) #2.创建ORM模型 class User(db.Model): __tablename__ = 'user' id = db.Column(db.Integer,primary_key=True,autoincrement=True) uname = db.Column(db.String(50),nullable=False) def __repr__(self): return "<User(uname: %s)>" % self.uname class Article(db.Model): __tablename__ = 'article' id = db.Column(db.Integer,primary_key=True,autoincrement=True) title = db.Column(db.String(50),nullable=False) uid = db.Column(db.Integer,db.ForeignKey("user.id")) author = db.relationship("User",backref="articles") #3.删除表 db.drop_all() #4.创建表 db.create_all() #5.添加数据 user = User(uname='莫莫') article = Article(title='华为5G 算法突破了,俄罗斯小伙突破的') article.author = user db.session.add(article) db.session.commit() #6.查询数据 # users = User.query.all() #等价于 db.session.query(User).all() # print(users) #在query属性之后 可以用 order_by 、 filter、filter_by、group_by、having等方法进行更复杂的单表查询 #若要进行更复杂的多表查询,只能使用db.session.query(User).all() 这种方式 #如 order_by users = User.query.order_by(User.id.desc()).all() print(users) #7.修改数据 user = User.query.filter(User.uname=='露露').first() user.uname = '探探' db.session.commit() #8.删除数据 user = User.query.filter(User.uname=='探探').first() db.session.delete(user) db.session.commit() @app.route('/') def hello_world(): return 'Hello World!' if __name__ == '__main__': app.run()
数据库迁移工具alembic介绍_alembic使用: 1.alembic介绍: alembic是sqlalchemy的作者开发的,用来做OMR模型与数据库的迁移与映射, alembic使用方式跟git有点了类似, alembic的所有命令都是以alembic开头, alembic的迁移文件也是通过版本进行控制的, 2.alembic安装: pip install alembic 3.alembic使用: 1.使用sqlalchemy创建好模型类: 如创建一个models.py模块,然后在里面定义需要的模型类,代码如下: from sqlalchemy import Column,String,Integer,create_engine from sqlalchemy.ext.declarative import declarative_base HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'alembic_demo' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) engine = create_engine(DB_URI) Base = declarative_base(engine) class User(Base): __tablename__ = 'user' id = Column(Integer,primary_key=True,autoincrement=True) uname = Column(String(50),nullable=False) country = Column(String(50)) # ORM -> 迁移文件 -> 映射到数据库中 在开发时用的是这种方式 # import os # print(os.path.dirname(__file__)) 2. 使用alembic创建一个仓库(初始化仓库): 1.打开dos系统界面 2.cd到当前项目目录中,注意:如果想要使用alembic,则需要先进入到安装了alembic的虚拟环境中,不然就找不到这个命令。 3.然后执行命令 “alembic init [仓库的名字,推荐使用alembic]” 3. 修改配置文件: 在`alembic.ini`中,给`sqlalchemy.url`项设置数据库的连接方式。方式跟sqlalchemy的方式是一样的。 sqlalchemy.url = driver://user:pass@localhost/dbname, 给`sqlalchemy.url`项设置数据库的连接操作为(没要求写端口号): sqlalchemy.url = mysql+pymysql://root:root@localhost/alembic_demo?charset=utf8 为了使用模型类更新数据库,需要在`alembic/env.py`文件中设置target_metadata项,默认为target_metadata=None。 需要将`target_metadata`的值设置为模型`Base.metadata`,但是要导入`models` 使用sys模块和os模块把当前项目的路径导入到path中: 导入`models`的操作为: import sys,os sys.path.append(os.path.dirname(os.path.dirname(__file__))) import models 设置target_metadata项操作为: target_metadata = models.Base.metadata 4.自动生成迁移文件: 使用alembic revision --autogenerate -m "提示信息"将当前模型中的状态生成迁移文件。 5.将生成的迁移文件映射到数据库中: 使用alembic upgrade head将刚刚生成的迁移文件,真正映射到数据库中。 同理,如果要降级,那么使用alembic downgrade head。 6. 以后如果修改了模型,重复4、5步骤。 4.常用alembic命令和参数解释: 1. init:创建一个alembic仓库。 2. revision:创建一个新的版本文件。 3. --autogenerate:自动将当前模型的修改,生成迁移脚本。 4. -m:本次迁移做了哪些修改,用户可以指定这个参数,方便回顾。 5. upgrade:将指定版本的迁移文件映射到数据库中,会执行版本文件中的upgrade函数。 如果有多个迁移脚本没有被映射到数据库中,那么会执行多个迁移脚本。 6. [head]:代表最新的迁移脚本的版本号。 7. downgrade:会执行指定版本的迁移文件中的downgrade函数。 8. heads:展示head指向的脚本文件版本号。 9. history:列出所有的迁移版本及其信息。 10. current:展示当前数据库中的版本号。 另外,在你第一次执行upgrade的时候,就会在数据库中创建一个名叫alembic_version表,这个表只会有一条数据,记录当前数据库映射的是哪个版本的迁移文件。 5.常见错误及解决办法: 1. 创建新版本时报错 FAILED: Target database is not up to date. 原因:主要是heads和current不相同。current落后于heads的版本。 解决办法:将current移动到head上。alembic upgrade head 2. 创建新版本时报错 KeyError: 'bb747b02cda0' 或者 FAILED: Can't locate revision identified by 'a65ff5195bc0' 原因:数据库中存的版本号不在迁移脚本文件中 解决办法:删除versions中所有的迁移文件,删除数据库所有表。
Flask-SQLAlchemy和alembic结合使用: 操作步骤如下: 1.配置好数据库连接文件 如config.py HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'fs_alembic_demo' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) SQLALCHEMY_DATABASE_URI = DB_URI 2.将config.py文件 结合flask项目主要运行文件 如momo.py from flask import Flask from flask_sqlalchemy import SQLAlchemy import config app = Flask(__name__) app.config.from_object(config) db = SQLAlchemy(app) class User(db.Model): __tablename__ = 'user' id = db.Column(db.Integer,primary_key=True,autoincrement=True) uname = db.Column(db.String(50),nullable=False) age = db.Column(db.Integer) gender=db.Column(db.String(2)) @app.route('/') def hello_world(): return 'Hello World!' if __name__ == '__main__': app.run() 3.使用alembic创建一个仓库(初始化仓库): 1.打开dos系统界面 2.cd到当前项目目录中,注意:如果想要使用alembic,则需要先进入到安装了alembic的虚拟环境中,不然就找不到这个命令。 3.然后执行命令 “alembic init [仓库的名字,推荐使用alembic]” 4.修改配置文件: 在`alembic.ini`中,给`sqlalchemy.url`项设置数据库的连接方式。方式跟sqlalchemy的方式是一样的。 sqlalchemy.url = driver://user:pass@localhost/dbname, 给`sqlalchemy.url`项设置数据库的连接操作为: sqlalchemy.url = mysql+pymysql://root:root@localhost/fs_alembic_demo?charset=utf8 为了使用模型类更新数据库,需要在`alembic/env.py`文件中设置target_metadata项,默认为target_metadata=None。 需要将`target_metadata`的值设置为模型`Base.metadata`,但是要导入`momo` 使用sys模块和os模块把当前项目的路径导入到path中: 导入`momo`的操作为: import sys,os sys.path.append(os.path.dirname(os.path.dirname(__file__))) import momo 设置target_metadata项操作为: target_metadata = momo.db.Model.metadata 5.自动生成迁移文件: 使用alembic revision --autogenerate -m "提示信息"将当前模型中的状态生成迁移文件。 6.将生成的迁移文件映射到数据库中: 使用alembic upgrade head将刚刚生成的迁移文件,真正映射到数据库中。 同理,如果要降级,那么使用alembic downgrade head。 7. 以后如果修改了模型,重复5、6步骤。
1.Flask-Script介绍: Flask-Script的作用是可以通过命令行的形式来操作Flask。 例如通过命令跑一个开发版本的服务器、设置数据库,定时任务等。 2.Flask-Script安装: pip install flask-script 3.Flask-Script基本使用: 1. 看一个小例子,在flask项目中,建一个manage.py文件,代码如下: from flask_script import Manager from momo import app manager = Manager(app) #1.通过命令执行 @manager.command def hello(): print('你好,hello') if __name__ == '__main__': manager.run() 其中的hello功能函数我们希望通过命令来运行, 然后在dos系统先进入装好Flask-Script的虚拟环境 运行python manage.py hello命令,就可以看hello函数已经执行了。 2. 再看一个例子,若执行命令脚本时,想传递某些参数过去,如下: from flask_script import Manager from momo import app manager = Manager(app) #2.通过命令执行 并要求传递参数 @manager.option("-p","--province",dest="province") @manager.option("-m","--month",dest="month") def get_pm_data(province,month): print("您请求的省份是:%s,月份是:%s" % (province,month)) if __name__ == '__main__': manager.run() 4.Flask-Script实战场景 老板要求给某员工快速建立一个后台账号。 分析:可通过命令执行 并传参的方式 将参数添加到数据库中 数据库连接config.py文件代码为: HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'flask_script_demo' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) SQLALCHEMY_DATABASE_URI = DB_URI SQLALCHEMY_TRACK_MODIFICATIONS = False 主运行文件momo.py代码为: from flask import Flask #实战 from flask_sqlalchemy import SQLAlchemy import config app = Flask(__name__) app.config.from_object(config) db = SQLAlchemy(app) class BackendUser(db.Model): __tablename__ = 'backend_user' id = db.Column(db.Integer,primary_key=True,autoincrement=True) uname = db.Column(db.String(50),nullable=False) email = db.Column(db.String(50),nullable=False) db.create_all() @app.route('/') def hello_world(): return 'Hello World!' if __name__ == '__main__': app.run() 存放命令的脚本文件manage.py代码为: from flask_script import Manager from momo import app,BackendUser,db manager = Manager(app) #3.Flask-Script实战场景 :老板要求给某员工快速建立一个后台账号 @manager.option("-u","--uname",dest="uname") @manager.option("-e","--email",dest="email") def add_user(uname,email): user = BackendUser(uname=uname,email=email) db.session.add(user) db.session.commit() print("添加OK") if __name__ == '__main__': manager.run() 5.写命令脚本技巧 如果有一些命令是针对某个功能的。 如有一堆命令是针对ORM与表映射的,那么可以将这些命令单独放在一个文件中方便管理。 然后到主脚本文件manage.py中,通过`Manager`的对象名.add_command`方法来添加。 如创建一个db_script.py文件 管理跟操作数据库表相关的东西,代码如: from flask_script import Manager db_manager = Manager() @db_manager.command def init(): print('迁移仓库创建OK!') @db_manager.command def revision(): print('迁移脚本文件生成OK!') @db_manager.command def upgrade(): print('迁移脚本文件映射到数据库OK!') 存放命令的脚本文件manage.py代码为: from flask_script import Manager from momo import app,BackendUser,db manager = Manager(app) #4.如果有一些命令是针对某个功能的。 # 比如有一堆命令是针对ORM与表映射的,那么可以将这些命令单独放在一个文件中方便管理。 # 也是使用`Manager`的对象来添加。然后到主manage文件中,通过`manager.add_command`来添加。 from db_script import db_manager manager.add_command("db",db_manager) if __name__ == '__main__': manager.run()
app.py文件代码如下: from flask import Flask import config from exts import db app = Flask(__name__) app.config.from_object(config) db.init_app(app) @app.route('/') def hello_world(): return 'Hello World!' if __name__ == '__main__': app.run() exts.py文件代码如下: from flask_sqlalchemy import SQLAlchemy # 注意没有传app对象过来,此时的db无法获取到数据库配置信息 db = SQLAlchemy() config.py文件代码如下: HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'binbin' USERNAME = 'root' PASSWORD = 'root' DB_URI ="mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) SQLALCHEMY_DATABASE_URI = DB_URI SQLALCHEMY_TRACK_MODIFICATIONS = False models.py文件代码如下: from exts import db class User(db.Model): __tablename__ = "user" id = db.Column(db.Integer, primary_key=True, autoincrement=True) uname = db.Column(db.String(50), nullable=False) age = db.Column(db.Integer, nullable=False) addr = db.Column(db.String(50), nullable=False)
manage.py文件 代码如下: from flask_script import Manager from app import app from exts import db # 把跟数据库操作相关的东西也导入进来 from flask_migrate import MigrateCommand, Migrate # 真正做数据库映射的 放到manage.py文件中做 # 需要把映射到数据库中的模型导入到manage.py文件中,否则就映射不成功 from models import User manager = Manager(app) Migrate(app, db) # Migrate是用来绑定app和数据库的连接使用的 套路 # 添加Migrate所有的子命令到db下 manager.add_command("db", MigrateCommand) if __name__ == "__main__": manager.run() 1.Flask-Migrate介绍: 在实际的开发环境中,经常会发生数据库修改的行为。一般我们修改数据库不会直接手动的去修改,而是去修改ORM对应的模型,然后再把模型映射到数据库中。这时候如果有一个工具能专门做这种事情,就显得非常有用了。 flask-migrate插件就是做这个事情的。flask-migrate是基于Alembic进行的一个封装,并集成到Flask中,所有的迁移操作其实都是Alembic做的,他能跟踪模型的变化,并将变化映射到数据库中。 2.Flask-Migrate安装: pip install flask-migrate 4.Flask-Migrate使用之manage.py文件中的代码: from flask_script import Manager from app import app from exts import db from flask_migrate import Migrate,MigrateCommand #需要把映射到数据库中的模型导入到manage.py文件中 from models import User manager = Manager(app) #用来绑定app和db到flask-migrate的 Migrate(app,db) #添加Migrate的所有子命令到db下 manager.add_command("db",MigrateCommand) if __name__ == '__main__': manager.run() 5.flask_migrate使用之常用命令: 1. 初始化一个环境:python manage.py db init 2. 自动检测模型,生成迁移脚本:python manage.py db migrate 3. 将迁移脚本映射到数据库中:python manage.py db upgrade 4. 更多命令:python manage.py db --help 6.flask_migrate使用之注意事项 #需要把映射到数据库中的模型导入到manage.py文件中,否则映射就不成功 from models import User
联系1如下:
app.py文件下的代码如下: from flask import Flask from flask_sqlalchemy import SQLAlchemy import config app = Flask(__name__) app.config.from_object(config) db = SQLAlchemy(app) class User(db.Model): __tablename__ = "user" id = db.Column(db.Integer, primary_key=True, autoincrement=True) uname = db.Column(db.String(50), nullable=False) age = db.Column(db.Integer, nullable=False) class ExtendUser(db.Model): __tablename__ = "extenduser" id = db.Column(db.Integer, primary_key=True, autoincrement=True) addr = db.Column(db.String(50), nullable=False) telephone = db.Column(db.String(20), nullable=False) uid = db.Column(db.Integer, db.ForeignKey("user.id")) user = db.relationship("User", backref=db.backref("exuser", uselist=False)) def create_all(): db.drop_all() db.create_all() user = User(uname="小二", age=18) db.session.add(user) extenduser = ExtendUser(addr="贵州省", telephone="119", uid=1) db.session.add(extenduser) db.session.commit() def query1(): q1 = db.session.query(ExtendUser).first() q2 = db.session.query(ExtendUser).get(1) print(q1.user.uname) print(q2.user.uname) def query2(): p1 = db.session.query(User).get(1) print(p1.exuser.telephone) if __name__ == '__main__': # query1() query2() config.py文件下的代码如下: HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'flask_web' USERNAME = 'root' PASSWORD = 'root' DB_URI = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) SQLALCHEMY_DATABASE_URI = DB_URI SQLALCHEMY_TRACK_MODIFICATIONS = False
练习2如下:
app.py文件下的代码: from flask import Flask from exts import db import config app = Flask(__name__) app.config.from_object(config) db.init_app(app) # 这个步骤保证db能够获取到数据库的配置信息 @app.route("/") def hello(): return "hello world" if __name__ == '__main__': app.run() config.py文件的代码如下: HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'binbin' USERNAME = 'root' PASSWORD = 'root' DB_URI = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}?charset=utf8".format(username=USERNAME,password=PASSWORD,host=HOSTNAME,port=PORT,db=DATABASE) SQLALCHEMY_DATABASE_URI = DB_URI SQLALCHEMY_TRACK_MODIFICATIONS = False models.py 文件中的代码如下: from exts import db from app import app class User(db.Model): __tablename__ = "user" id = db.Column(db.Integer, primary_key=True, autoincrement=True) uname = db.Column(db.String(50), nullable=False) age = db.Column(db.Integer, nullable=False) country = db.Column(db.String(50), nullable=False) addr = db.Column(db.String(50), nullable=False) if __name__ == "__main__": db.create_all(app=app) exts.py文件中的代码如下: from flask_sqlalchemy import SQLAlchemy # 没有传app对象过来,此时的db无法获取 数据库配置config信息 db = SQLAlchemy()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。