赞
踩
目录
定义:ORM(Object-Relational Mapping)模型,即对象关系映射,是一种程序设计技术,用于实现面向对象编程语言里不同类型系统的数据之间的转换。在面向对象的编程语言中,如Java、Python等,数据通常被组织成复杂的对象关系。简单来说就是ORM模型把面向对象编程与操作数据库之间建立了映射。设置开发者操作数据库无需维护和编写SQL语句,而是基于面对对象的方式操作数据库。
映射关系:数据库中的表>编程语言中的类,表中的字段>类中的属性,表之间的关系>类之间的关系。
使用ORM模型的优势在于:
当然ORM也存在一下劣势:
在Python语言中实现ORM系统的就是SQLAlchemy,它具备以下特点:
然而,ORM模型也存在一些缺点:
注意:在处理复杂的SQL查询时,由于ORM框架效率低下,所以这个时候可以编写SQL语句执行原生SQL语句。
1、核心架构(Core):
2、ORM架构:
4、数据库连接池(Connection Pooling):管理数据库连接的池化,确保高效的数据库连接复用。
5、Dialect:选择连接数据库的DB API种类,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作。
6、架构和类型(Schema/Types):定义数据库的架构和数据类型。
7、SQL表达式语言(SQL Expression Language):
SQLAlchemy官方文档:Dialects — SQLAlchemy 2.0 Documentation
pip install sqlalchemy
注意:sqlalchemy没有提供直接连接数据库的操作,所以需要借助第三方库来连接数据库,操作数据库。以 MySQL 为例,sqlalchemy就是借助pymsql库来实现对数据的连接和操作。
连接不同/相同的数据库借助不同的第三方库如下:
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
创建连接:
- from sqlalchemy import create_engine
- from urllib import parse
-
- user = "root" # 用户名
- password = "" # 密码
- pwd = parse.quote_plus(password) # 解决密码中含@符导致报错
- host = "172.22.70.174" # 数据库主机地址
- # 第一步: 创建engine
- engine = create_engine(
- url=f"mysql+pymysql://{user}:{pwd}@{host}:3306/test?charset=utf8",
- max_overflow=10, # 超过连接池大小外最多创建的连接
- pool_size=10, # 连接池大小
- pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
- pool_recycle=-1 # 对线程池中的线程进行一次连接的回收的时间,如果是3600,表示1个小时后对连接进行回收
- )
需求:查询表t_student的全部数据,执行的SQL语句是:select * from t_student
- from sqlalchemy import create_engine
- from urllib import parse
- import threading
-
- user = "root" # 用户名
- password = "" # 密码
- pwd = parse.quote_plus(password) # 解决密码中含@符导致报错
- host = "172.22.70.174" # 数据库主机地址
- # 第一步: 创建engine
- engine = create_engine(
- url=f"mysql+pymysql://{user}:{pwd}@{host}:3306/test?charset=utf8",
- max_overflow=2, # 超过连接池大小外最多创建的连接
- pool_size=3, # 连接池大小
- pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
- pool_recycle=-1 # 对线程池中的线程进行一次连接的回收的时间,如果是3600,表示1个小时后对连接进行回收
- )
-
- # 第二步:使用
- def test_execute():
- # conn = engine.connect() # 创建一个新的连接
- conn = engine.raw_connection() # 从连接池中取一个连接
- cursor = conn.cursor() # 创建游标
- sql = "select * from t_student" # 定义执行的SQL语句
- cursor.execute(sql) # 执行SQL语句
- print(cursor.fetchall()) # 获取执行的结果并打印置控制台
-
- # 测试配置是否生效
- if __name__ == '__main__':
- for i in range(20):
- t = threading.Thread(target=test_execute)
- t.start()
说明:使用ORM映射已存在的表时,只能映射其对应的字段,对于每个字段的约束最好和原表保持一致,映射已存在的表时不能新增字段,新增外键约束,新增索引操作。如果需要进行这些操作可以数据库中执行相关sql语句或者使用SQLAlchemy的迁移工具(如Alembic)
注意:
示例:
需求:创建Student表模型映射数据库中的t_student表
- import datetime
- from sqlalchemy.orm import declarative_base , sessionmaker
- from sqlalchemyBaseUse import engine # 这里的engine就是上面创建连接中创建的engine
- from sqlalchemy import Column, Integer, String, Text, DateTime, Index
-
- # 声明ORM基类
- Base = declarative_base()
-
- class Student(Base):
- __tablename__ = 't_student'
- id = Column(Integer, primary_key=True)
- stuno = Column(String(10), comment="学号")
- name = Column(String(10), comment="姓名")
- gender = Column(String(1), comment="性别")
- age = Column(Integer, comment="年龄")
- idcard = Column(String(18), comment="身份证")
- entrydate = Column(DateTime, default=datetime.datetime.now, comment="入学时间")
- addr = Column(String(50), comment="家庭地址")
-
- def init_db():
- # 创建继承base类的表的映射关系
- Base.metadata.create_all(engine)
-
- def drop_db():
- # 删除继承base类的表,注意:除了删除表的映射关系,数据库中的表和数据都会被删减,生产中谨慎操作
- Base.metadata.drop_all(engine)
-
- if __name__ == '__main__':
- init_db()
示例:
- from sqlalchemy import create_engine, Column, Integer, String, MetaData, Table, CheckConstraint
-
- # 假设已经有了一个引擎和元数据
- engine = create_engine('sqlite:///:memory:')
- metadata = MetaData()
-
- # 创建一个表,并添加一个检查约束
- my_table = Table('my_table', metadata,
- Column('id', Integer, primary_key=True),
- Column('name', String(50)),
- Column('age', Integer),
- CheckConstraint("age >= 0 AND age <= 150", name='age_check')
- )
-
- # 创建表(包括检查约束)
- metadata.create_all(engine)
示例:
- from sqlalchemy import Column, Integer, String, CheckConstraint
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import sessionmaker
-
- Base = declarative_base()
-
- class MyModel(Base):
- __tablename__ = 'my_table'
-
- id = Column(Integer, primary_key=True)
- name = Column(String(50))
- age = Column(Integer)
-
- __table_args__ = (
- CheckConstraint("age >= 0 AND age <= 150", name='age_check'),
- )
概念:约束是作用于表中字段上的规则,用于限制存储在表中的数据。
目的:保证数据库中数据的正确、有效性和完整性。
分类:
约束 | 描述 | 关键字 |
非空约束 | 限制该字段的数据不能为null | NOT NULL |
唯一约束 | 保证该字段的所有数据都是唯一、不重复的 | UNIQUE |
主键约束 | 主键是一行数据的唯一标识,要求非空且唯一 | PRIMARY KEY |
默认约束 | 保存数据时,如果未指定该字段的值,则采用默认值 | DEFAULT |
检查约束(8.0.16版本之后) | 保证字段值满足某一个条件 | CHECK |
外键约束 | 用来让两张表的数据之间建立连接,保证数据的一致性和完整性 | FOREIGN KEY |
注意:约束是作用于表中字段上的,可以在创建表/修改表的时候添加约束。
需求如下:
要求创建一张名为t_user的数据表,各个字段的规则如下:
字段名 | 字段含义 | 字段类型 | 约束条件 | 约束关键字 |
id | ID唯一标识 | int | 主键,并且自动增长 | PRIMARY KEY,AUTO_INCREMENT |
name | 姓名 | varchar(10) | 不为空,并且唯一 | NOT NULL , UNIQUE |
age | 年龄 | int | 大于0,并且小于等于150 | CHECK |
status | 状态 | char(1) | 如果没有指定该值,默认为1 | DEFAULT |
gender | 性别 | char(1) | 无 |
- from sqlalchemy.orm import declarative_base , sessionmaker
- from sqlalchemyBaseUse import engine
- from sqlalchemy import Column, Integer, String, CheckConstraint
-
- # 声明ORM基类
- Base = declarative_base()
-
- class UserModel(Base):
- __tablename__ = "t_user"
- id = Column(Integer, primary_key=True, autoincrement=True)
- name = Column(String(10), nullable=True, unique=True, comment="姓名")
- age = Column(Integer, comment="年龄")
- status = Column(String(1), default=1, comment="状态")
- gender = Column(String(1), comment="性别")
-
- # 添加age的检查约束
- # 注意__table_args__是元祖数据类型,如果只有一个数据的时候,注意后面的逗号不能少
- __table_args__ = (
- CheckConstraint("age >0 AND age <= 150", name="age_check"),
- )
-
- def init_db():
- # 创建继承base类的表的映射关系
- Base.metadata.create_all(engine)
-
- if __name__ == '__main__':
- init_db()
作用:外键用来让两张表的数据之间建立连接,从而保证数据的一致性和完整性。
mysql 语法:
1、添加外键
CREATE TABLE 表名(
字段名 数据类型,
...
[CONSTRAINT] [外键名称] FOREIGN KEY (外键字段名) REFERENCES 主表 (主表列名)
);
或
ALTER TABLE 表名 ADD CONSTRAINT 外键名称 FOREIGN KEY (外键字段名)REFERENCES 主表 (主表列名) ;2、删除外键
ALTER TABLE 表名 DROP FOREIGN KEY 外键名称;
在 SQLAlchemy 中,创建外键约束通常不需要直接使用 __table_args__,因为 SQLAlchemy 提供了 ForeignKey 和 relationship 这两个工具来定义关系和外键约束。
示例:
需求:创建一张部门表和员工表,建立外键约束关系一个员工对应一个部门。
部门表 t_departments
字段名 | 字段含义 | 字段类型 | 约束条件 | 约束关键字 |
id | ID唯一标识 | int | 主键,并且自动增长 | PRIMARY KEY,AUTO_INCREMENT |
name | 部门名称 | varchar(50) | 非空约束 | NOT NULL |
员工表 t_employees
字段名 | 字段含义 | 字段类型 | 约束条件 | 约束关键字 |
id | ID唯一标识 | int | 主键,并且自动增长 | PRIMARY KEY,AUTO_INCREMENT |
name | 姓名 | varchar(50) | 非空约束 | NOT NULL |
age | 年龄 | int | ||
dept_id | 部门id | int | 外键约束 | FOREIGN KEY |
1、创建ORM表模型如下:
- from sqlalchemy.orm import declarative_base, relationship
- from sqlalchemyBaseUse import engine
- from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index, CheckConstraint
-
- # 声明ORM基类
- Base = declarative_base()
- # 创建部门表
- class Department(Base):
- __tablename__ = 't_departments'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- name = Column(String(50), nullable=False, comment="部门名称")
-
- # 定义与 Employee 的一对多关系
- employees = relationship("Employee", back_populates="department")
-
- # 创建员工表
- class Employee(Base):
- __tablename__ = 't_employees'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- name = Column(String(50), nullable=False, comment="姓名")
- age = Column(Integer, comment="年龄")
- department_id = Column(Integer, ForeignKey('t_departments.id', ondelete="CASCADE"), nullable=False)
- # 定义与 Department 的多对一关系
- department = relationship("Department", back_populates="employees")
-
- def init_db():
- # 创建继承base类的表的映射关系
- Base.metadata.create_all(engine)
-
- if __name__ == '__main__':
- init_db()
2、往数据库中插入测试数据如下:
# 部门表测试数据
INSERT INTO t_departments (id, name) VALUES (1, '研发部'), (2, '市场部'),(3, '财务部');# 员工表测试数据
INSERT INTO t_employees (id, name, age, department_id)
VALUES (1, '张三', 22, 1), (2, '李四', 33, 1), (3, 'TOM', 30, 2), (4, '小艺', 25, 2), (5, '小李', 36, 3);
3、展示通过关联关系查询,如:查询员工姓名为张三对应的部门信息
- def query():
- Session = sessionmaker(bind=engine)
- session = Session()
- # 查询姓名为张三的员工
- employee = session.query(Employee).filter_by(name="张三").first()
- # 如果找到了员工,则获取其部门信息
- if employee:
- department = employee.department
- print(f"员工姓名: {employee.name}, 部门名称: {department.name}")
- else:
- print("没有找到姓名为'张三'的员工")
-
-
- if __name__ == '__main__':
- # init_db()
- query()
4、展示通过关联关系查询,如:查询研发部所有员工信息
- def query2():
- Session = sessionmaker(bind=engine)
- session = Session()
- # 直接通过部门对象获取其员工
- department = session.query(Department).filter_by(name='研发部').first()
- if department:
- for employee in department.employees: # 使用了Department 模型中有一个名为 'employees' 的反向关联
- print(f"员工姓名: {employee.name}, 员工ID: {employee.id}")
-
- if __name__ == '__main__':
- # init_db()
- query2()
还是上面4的示例,如果没有使用反向关联,在正常情况下我们应该如何查询呢?
思考:如果正常在数据库中查询时,利用的外键关系进行查询,可以使用内连接查询,对应的SQL语句如下:
select e.*, d.name from t_departments d inner join t_employees e on e.department_id=d.id where d.name='研发部';
那么把SQL语句对应到ORM中就是(使用子查询):
- def query3():
- Session = sessionmaker(bind=engine)
- session = Session()
- # 1、先查询研发部信息
- department = session.query(Department).filter_by(name='研发部').first()
-
- # 如果没有找到研发部,则退出查询
- if not department:
- print("没有找到研发部")
- session.close()
- exit()
-
- # 2、根据研发部的部门id查询所有员工
- employees = session.query(Employee).filter_by(department_id=department.id).all()
- # 打印研发部所有员工的信息
- for employee in employees:
- print(f"员工姓名: {employee.name}, 员工ID: {employee.id}")
-
- if __name__ == '__main__':
- # init_db()
- query3()
综上:使用反向关联可以直接查询所需要的数据,效率更高,代码更加简洁。
5、relationship的使用说明
可以看到上面两张表中分别创建了一个反向关联如下:
t_departments: employees = relationship("Employee", back_populates="department")
t_employees :department = relationship("Department", back_populates="employees")
参数说明:
添加了外键之后,再删除父表数据时产生的约束行为,就称为删除/更新行为。具体的删除/更新行为有以下几种:
行为 | 说明 |
NO ACTION | 当在父表中删除/更新对应记录时,首先检查该记录是否有对应外键,如果有则不允许删除/更新。 (与 RESTRICT 一致) 默认行为 |
RESTRICT | 当在父表中删除/更新对应记录时,首先检查该记录是否有对应外键,如果有则不允许删除/更新。 (与 NO ACTION 一致) 默认行为 |
CASCADE | 当在父表中删除/更新对应记录时,首先检查该记录是否有对应外键,如果有,则同时删除/更新外键在子表中的记录。 |
SET NULL | 当在父表中删除对应记录时,首先检查该记录是否有对应外键,如果有则设置子表中该外键值为null(这就要求该外键允许取null)。 |
SET DEFAULT | 父表有变更时,子表将外键列设置成一个默认的值 (Innodb不支持) |
对应SQL语法:
ALTER TABLE 表名 ADD CONSTRAINT 外键名称 FOREIGN KEY (外键字段) REFERENCES 主表名 (主表字段名) ON UPDATE CASCADE ON DELETE CASCADE;
示例:
在上面的t_employees表中我们创建了一个外键,并指定了外键的删除行为为CASCADE,也就是如果父表t_departments发生了删除,那么对应子表t_employees中外键关联的数据也会被删除。
department_id = Column(Integer, ForeignKey('t_departments.id', ondelete="CASCADE"), nullable=False)
注意事项:
1、relationship 的作用与使用
relationship 是 SQLAlchemy 库中的一个重要功能,它用于在模型(或称为“表”)之间建立关联关系。以下是对 relationship 的详细说明,包括其作用和使用方式:
作用:
使用方式:
-
- from sqlalchemy.orm import relationship
实现:在任意一方添加外键,关联另一方的主键,并设置外键为唯一的(UNIQUE),一般用作单表的拆分【注:这里的实现是指在MySQL中的实现方式,但是在ORM中实现思路是一样的】
示列:用户 与 用户详情的关系
关系:一对一关系,用于单表拆分,将一张表的基础字段放在一张表中,其他详情字段放在另一张表中,用来提升操作效率。
1、方式一:使用back_populates参数实现反向关联
- # 声明ORM基类
- Base = declarative_base()
-
- class User(Base):
- __tablename__ = 't_user'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- name = Column(String(50), nullable=False, comment="姓名")
-
- # 定义一个关系来访问UserDetail对象
- detail = relationship("UserDetail", uselist=False, back_populates="user")
-
- class UserDetail(Base):
- __tablename__ = 't_user_details'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- address = Column(String(100), comment="地址")
- phone = Column(String(20), comment="电话")
- # 外键列,引用t_user表的id
- user_id = Column(Integer, ForeignKey('t_user.id'), nullable=False, unique=True)
- # 定义一个关系来回引用User对象
- user = relationship("User", back_populates="detail")
2、方式二:使用backref参数实现反向关联
说明:backref 参数是 relationship() 函数的一个非常有用的功能,它允许我们自动创建反向关系,而无需在另一个模型类中显式定义它。使用 backref 可以简化代码并减少冗余。
- class User(Base):
- __tablename__ = 't_user'
- id = Column(Integer, primary_key=True, autoincrement=True)
- name = Column(String(50), nullable=False, comment="姓名")
-
- # 使用 backref 自动创建反向关系
- detail = relationship("UserDetail", uselist=False, backref="user")
-
- class UserDetail(Base):
- __tablename__ = 't_user_details'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- address = Column(String(100), comment="地址")
- phone = Column(String(20), comment="电话")
- # 外键列,引用t_user表的id
- user_id = Column(Integer, ForeignKey('t_user.id'), nullable=False, unique=True)
- # 注意:这里我们不再需要显式定义 user 关系,因为 backref 已经为我们做了
- # 定义一个关系来回引用User对象
- # user = relationship("User", back_populates="detail")
3、backref与back_populates的区别
backref
back_populates
实现:在多的一方建立外键,指向一的一方的主键
示例:部门表和员工表关系
关系:一个员工对应一个部门,一个部门对应多个员工
- class Department(Base):
- __tablename__ = 't_departments'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- name = Column(String(50), nullable=False, comment="部门名称")
-
- # 定义与 Employee 的一对多关系
- employees = relationship("Employee", back_populates="department")
-
- class Employee(Base):
- __tablename__ = 't_employees'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- name = Column(String(50), nullable=False, comment="姓名")
- age = Column(Integer, comment="年龄")
- department_id = Column(Integer, ForeignKey('t_departments.id', ondelete="CASCADE"), nullable=False)
-
- # 定义与 Department 的多对一关系
- department = relationship("Department", back_populates="employees")
实现:建立第三张中间表,中间表至少包含两个外键,分别关联两方的主键
示列:学生 与 课程的关系
关系:一个学生可以选修多门课程,一门课程也可让多个学生选择
学生表 t_student
字段名 | 字段含义 | 字段类型 | 约束条件 | 约束关键字 |
id | ID唯一标识 | int | 主键,并且自动增长 | PRIMARY KEY,AUTO_INCREMENT |
name | 姓名 | varchar(10) | 非空约束 | NOT NULL |
stuno | 学号 | varchar(10) | 非空约束 | NOT NULL |
课程表 t_course
字段名 | 字段含义 | 字段类型 | 约束条件 | 约束关键字 |
id | ID唯一标识 | int | 主键,并且自动增长 | PRIMARY KEY,AUTO_INCREMENT |
name | 课程名称 | varchar(10) | 非空约束 | NOT NULL |
学生课程关系表 t_student_course
字段名 | 字段含义 | 字段类型 | 约束条件 | 约束关键字 |
id | ID唯一标识 | int | 主键,并且自动增长 | PRIMARY KEY,AUTO_INCREMENT |
studentid | 学生ID | int | 非空约束,外键约束 | NOT NULL, FOREIGN KEY |
courseid | 课程ID | int | 非空约束,外键约束 | NOT NULL, FOREIGN KEY |
方式一:关联表不直接映射到ORM类,使用back_populates参数实现反向关联
- from sqlalchemy.orm import declarative_base, sessionmaker, relationship
- from sqlalchemyBaseUse import engine
- from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, Table
-
- # 声明ORM基类
- Base = declarative_base()
-
- # 关联表(不直接映射到ORM类)
- student_course = Table('t_student_course', Base.metadata,
- Column("id", Integer, primary_key=True, autoincrement=True),
- Column('studentid', Integer, ForeignKey('t_student.id'), nullable=False),
- Column('courseid', Integer, ForeignKey('t_course.id'), nullable=False),
- )
-
- class Student(Base):
- __tablename__ = 't_student'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- name = Column(String(10), nullable=False, comment="姓名")
- stuno = Column(String(10), nullable=False, comment="学号")
-
- # 使用relationship()定义多对多关系
- courses = relationship("Course",
- # 指定中间关联表
- secondary="t_student_course",
- back_populates="students",
- lazy='dynamic') # 使用lazy='dynamic'可以返回查询对象
-
- class Course(Base):
- __tablename__ = 't_course'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- name = Column(String(10), nullable=False, comment="课程名称")
-
- # 使用relationship()定义反向多对多关系
- students = relationship("Student",
- # 指定中间关联表
- secondary="t_student_course",
- back_populates="courses")
-
- def init_db():
- # 创建继承base类的表的映射关系
- Base.metadata.create_all(engine)
-
- if __name__ == '__main__':
- init_db()
方式二:关联表映射到ORM类 使用backref参数实现反向关联
- class Student(Base):
- __tablename__ = 't_student'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- name = Column(String(10), nullable=False, comment="姓名")
- stuno = Column(String(10), nullable=False, comment="学号")
-
- # 与StudentCourse定义多对多关系
- courses = relationship("Course",
- secondary="t_student_course", # 关联表的名称
- backref="students") # 为Course模型创建反向引用
-
- class Course(Base):
- __tablename__ = 't_course'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- name = Column(String(10), nullable=False, comment="课程名称")
-
-
- class StudentCourse(Base):
- __tablename__ = 't_student_course'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- studentid = Column(Integer, ForeignKey('t_student.id'), nullable=False)
- courseid = Column(Integer, ForeignKey('t_course.id'), nullable=False)
-
- __table_args__ = (
- # 确保student_id和course_id的组合是唯一的
- UniqueConstraint('studentid', 'courseid', name='_student_course_uc'),
- )
示例:查询姓名为张三选择的所有课程名称
1、插入测试数据
insert into t_student values (null, '张三', '2000100101'),(null, '李四',
'2000100102'),(null, '小五', '2000100103'),(null, '小七', '2000100104');insert into t_course values (null, 'Java'), (null, 'PHP'), (null , 'MySQL') , (null, 'Hadoop');
insert into t_student_course values (null,1,1),(null,1,2),(null,1,3),(null,2,2), (null,2,3),(null,3,4);
2、代码实现
- def query():
- Session = sessionmaker(bind=engine)
- session = Session()
- stu = session.query(Student).filter_by(name="张三").first()
- if stu:
- for course in stu.courses:
- print(course.name)
-
- if __name__ == '__main__':
- query()
在SQLAlchemy中,scoped_session是一个工厂,它产生线程局部(thread-local)的Session对象。也就是在一个线程中,多次调用scoped_session工厂将返回同一个Session实例,而在另一个线程中,你会得到一个不同的实例。这有助于实现线程安全的数据库会话管理,因为每个线程都有自己的会话,从而避免了并发问题。
使用scoped_session来实现线程安全的步骤:
- from sqlalchemy import create_engine
- from sqlalchemy.orm import sessionmaker, scoped_session,declarative_base
-
- # 假设你有一个Base类和你的模型定义...
- Base = declarative_base()
-
- # 创建一个引擎
- engine = create_engine('sqlite:///example.db')
-
- # 创建一个Session类
- Session = sessionmaker(bind=engine)
-
- # 使用scoped_session来包装Session类
- # 这将确保每次在同一个线程中调用scoped_session()时,都会返回相同的Session实例
- scoped_session = scoped_session(Session)
-
- # 在你的代码中...
- def some_function():
- # 获取一个会话
- session = scoped_session()
-
- # 使用会话进行查询、添加、更新或删除操作...
- # 例如: result = session.query(MyModel).filter_by(some_column='value').first()
-
- # 提交事务(如果需要)
- session.commit()
-
- # 关闭会话(通常不需要,因为scoped_session会在线程结束时自动关闭会话)
- # 但如果你想在函数结束时立即关闭它,可以调用remove()方法
- # scoped_session.remove()
需求:在表t_student中新增数据
数据模型:
- class Student(Base):
- __tablename__ = 't_student'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- name = Column(String(10), nullable=False, comment="姓名")
- stuno = Column(String(10), nullable=False, comment="学号")
-
- # 使用relationship()定义多对多关系
- courses = relationship("Course",
- secondary="t_student_course",
- back_populates="students",
- lazy='dynamic') # 使用lazy='dynamic'可以返回查询对象
新增数据示例:
- from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session
- from sqlalchemyBaseUse import engine
- from many_to_many_relationship import Student
-
- # 创建ORM基类
- Base = declarative_base()
-
- # 创建会话类
- Session = sessionmaker(bind=engine)
- # 使用scoped_session创建线程安全会话
- session = scoped_session(Session)
-
- # # 新增一条数据
- # # 创建一个新的Student实例
- # stu1 = Student(name="Tom", stuno="2000100105")
- # # 将新实例添加到会话中
- # session.add(stu1)
- # # 提交会话,将数据保存到数据库
- # session.commit()
-
- # 批量添加数据
- session.add_all([
- Student(name="Jack", stuno="2000100106"),
- Student(name="小爱", stuno="2000100107"),
- Student(name="大胖", stuno="2000100108"),
- ])
- session.commit()
修改数据的流程:先查询出需要修改的数据,然后修改数据,最后提交修改。
需求:修改表t_student的数据
注意:如果stuno的长度是10,需要增加一下长度,不然新增超过长度,会出现添加失败的错误。修改字段数据类型的SQL语句如下(需要执行SQL语句):
ALTER TABLE t_student MODIFY stuno varchar(30);
- from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session
- from sqlalchemyBaseUse import engine
- from many_to_many_relationship import Student
-
- # 创建ORM基类
- Base = declarative_base()
-
- # 创建会话类
- Session = sessionmaker(bind=engine)
- # 使用scoped_session创建线程安全会话
- session = scoped_session(Session)
-
- # 1、修改id=5的数据,修改为name="Nicky", stuno="2000100109"
- # # 查询出id=5的数据
- # stu = session.get(Student, 5)
- # # 修改数据
- # if stu:
- # stu.name = "Nicky"
- # stu.stuno = "2000100109"
- # # 提交修改的数据
- # session.commit()
-
- # # 2、修改name="大胖"的数据,修改为name="小胖"
- # session.query(Student).filter_by(name="大胖").update({"name":"小胖"})
- # # 提交修改的数据
- # session.commit()
-
- # 3、修改id>6的stuno,每个stuno前面都加123
- session.query(Student).filter(Student.id > 6).update({"stuno": "123" + Student.stuno})
- session.commit()
删除数据流程:先查询出数据,然后再删除数据
需求:删除表t_student的数据
- from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session
- from sqlalchemyBaseUse import engine
- from many_to_many_relationship import Student
-
- # 创建ORM基类
- Base = declarative_base()
-
- # 创建会话类
- Session = sessionmaker(bind=engine)
- # 使用scoped_session创建线程安全会话
- session = scoped_session(Session)
-
- # 1、删除id=5的数据
- # stu_info = session.get(Student, 5)
- # if stu_info:
- # session.delete(stu_info)
- # session.commit()
-
- # 2、删除name="小胖"的数据
- session.query(Student).filter_by(name="小胖").delete()
- session.commit()
ALTER TABLE t_student2 CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
- from sqlalchemy.orm import declarative_base , sessionmaker, scoped_session
- from sqlalchemyBaseUse import engine
- from sqlalchemy import Column, Integer, String, DateTime
-
- # 声明ORM基类
- Base = declarative_base()
- Session = sessionmaker(bind=engine)
- session = scoped_session(Session)
-
- class Student(Base):
- __tablename__ = 't_student2'
- id = Column(Integer, primary_key=True)
- stuno = Column(String(10), comment="学号")
- name = Column(String(10), comment="姓名")
- gender = Column(String(1), comment="性别")
- age = Column(Integer, comment="年龄")
- idcard = Column(String(18), comment="身份证")
- entrydate = Column(DateTime, comment="入学时间")
- addr = Column(String(50), comment="家庭地址")
-
- def init_db():
- # 创建继承base类的表的映射关系
- Base.metadata.create_all(engine)
-
- def basic_query():
- pass
-
-
- if __name__ == '__main__':
- init_db()
# 向表中插入数据
insert into t_student2 values('1','1','小洋洋','女',18,'12345678901234957','2023-02-03',"北京"),
('2','2','小芳','女',18,'123456789012345789','2023-02-03',"北京"),
('3','3','小枫','男',22,'123456789012345123','2023-01-03',"上海"),
('4','4','小敏','女',20,'123456789012345345','2022-01-03',"北京"),
('5','5','小李','男',20,'12345678901234534X','2022-01-03',"上海"),
('6','6','王小敏','女',16,'123456789012345345','2022-01-03',"成都"),
('7','7','大刘','男',25,'123456789012345102','2022-01-03',"深圳"),
('8','8','林逸','男',17,'12345678901234534X','2022-01-03',"北京"),
('9','9','莫小迪','女',21,'123456789012345302','2022-01-03',"成都"),
('10','10','林仙仙','女',16,'123456789012345330','2022-01-03',"深圳"),
('11','11','叶小辰','男',18,'123456789012345352','2022-01-03',"成都"),
('12','12','韩跑跑','男',24,'12345678901234554X','2022-01-03',"北京");
- # 指定字段查询
- stus = session.query(Student.name, Student.age).all()
- for name, age in stus:
- print(name, age)
-
- # 全表查询
- stus = session.query(Student).all()
- for stu in stus:
- print(stu.name)
- # 查询单个字段的不重复值
- # 查询User表中所有不重复的name字段值
- unique_names = session.query(distinct(User.name)).all()
-
- # 遍历结果
- for name in unique_names:
- print(name)
-
- # 查询多个字段的不重复组合
- # 查询User表中所有不重复的name和age组合
- unique_name_age_combinations = session.query(distinct(User.name, User.age)).all()
-
- # 遍历结果
- for name, age in unique_name_age_combinations:
- print(name, age)
在编写完查询代码后,可以先打印出代码对应转化的SQL语句,检查SQL语句是否正确,然后再执行。
示例:
- # 指定字段查询
- # 此时语句结尾不加.all(),输出的就是SQL语句
- stus = session.query(Student.name, Student.age)
- print(stus)
比较运算符 | 作用 |
> | 大于 |
>= | 大于等于 |
小于 | |
小于等于 | |
= | 等于 |
<> 或 != | 不等于 |
IN(...) | 在in之后的列表中的值,多选一 |
LIKE 占位符 | 模糊匹配(_匹配单个字符, %匹配任意个字符) |
IS NULL | 是NULL |
逻辑运算符 | 作用 |
AND | 并且 (多个条件同时成立) |
OR | 或者 (多个条件任意一个成立) |
NOT | 非 , 不是 |
- def conditional_query():
- # 1、查询年龄大于20的学生
- stus = session.query(Student).filter(Student.age > 20).all()
-
- # 2、查询年龄不等于20的学生
- stus2 = session.query(Student).filter(Student.age != 20).all()
- # 或者使用<>, 注意python3.X版本不支持使用<>,官方推荐使用!=作为不等于运算符。
- # stus2_1 = session.query(Student).filter(Student.age <> 20).all()
-
- # 3、查询年龄为18或20或25的学生信息
- stus3 = session.query(Student).filter(Student.age.in_([18, 20, 25])).all()
-
- # 4、查询年龄不为18或20或25的学生信息
- stus4 = session.query(Student).filter(~Student.age.in_([18, 20, 25])).all()
-
- # 5、查询家庭住址为空的学生信息
- stus5 = session.query(Student).filter(Student.addr == None).all()
- # 或者使用is_()方法
- stus5_1 = session.query(Student).filter(Student.addr.is_(None)).all()
-
- # 6、查询家庭住址不为空的学生信息
- stus6 = session.query(Student).filter(Student.addr != None).all()
- # 或者使用isnot_()方法
- stus6_1 = session.query(Student).filter(Student.addr.isnot(None)).all()
-
- # 7、查询姓林,名字是两个字的学生信息
- stus7 = session.query(Student).filter(Student.name.like("林_")).all()
-
- # 8、查询身份证号最后一位是X的学生信息
- stus8 = session.query(Student).filter(Student.idcard.like("%X")).all()
-
- # 9、查询年龄在18岁(包含)到25岁(包含)之间的学生信息
- stus9 = session.query(Student).filter(Student.age >=18, Student.age <= 25).all()
- # 或者使用and_()方法
- stus9_1 = session.query(Student).filter(and_(Student.age >= 18, Student.age <= 25)).all()
-
- # 10、查询年龄为18或20或25的学生信息
- stus10 = session.query(Student).filter(or_(Student.age==18, Student.age==20, Student.age==25)).all()
说明:将一列数据作为一个整体,进行纵向计算 。
语法:
注意 : NULL值是不参与所有聚合函数运算的。
函数 | 作用 |
count | 统计数量 |
max | 最大值 |
min | 最小值 |
avg | 平均值 |
sum | 求和 |
说明:SQLAlchemy 提供了一组内置的函数,这些函数可以在 func 命名空间中直接使用,类似于 SQL 中的聚合函数,如 COUNT(), SUM(), AVG(), MAX(), MIN() 等。
- def aggregate_query():
- # 1、统计学生总人数
- count = session.query(func.count(Student.id)).scalar()
-
- # 2、统计学生的平均年龄
- avg = session.query(func.avg(Student.age)).scalar()
-
- # 3、统计学生的最大年龄
- max = session.query(func.max(Student.age)).scalar()
-
- # 4、统计学生的最小年龄
- min = session.query(func.min(Student.age)).scalar()
-
- # 统计男生的总年龄
- count_man = session.query(func.sum(Student.age)).filter_by(gender="男").scalar()
1、where与having区别
注意事项:
在 SQLAlchemy 中,使用 group_by() 方法来执行分组查询。分组查询通常与聚合函数(如 COUNT(), SUM(), AVG(), MAX(), MIN() 等)一起使用,以便对每个分组进行计算。
- def group_by_query():
- # 1、根据性别分组,统计男学生和女学生的数量
- # select sex, count(*) from t_student2 group by sex;
- results = session.query(Student.gender, func.count(Student.id).label("stu_nums")).group_by(Student.gender).all()
- # for gender, stu_nums in results:
- # print(gender, stu_nums)
-
- # 2、根据性别分组 , 统计男学生和女学生的平均年龄
- # select sex,avg(age) from t_student2 group by sex;
- results2 = session.query(Student.gender, func.avg(Student.age).label("avg_age")).group_by(Student.gender).all()
- # for gender, avg_age in results2:
- # print(gender, avg_age)
-
- # 3、查询年龄小于25的学生 , 并根据家庭地址分组 , 获取学生数量大于等于3的家庭地址
- # select addr,count(*) addr_num from t_student2 where age<25 group by addr having addr_num>=3;
- # 方式一: 通过having实现
- results3 = session.query(Student.addr, func.count(Student.id)).filter(Student.age < 25).group_by(Student.addr)\
- .having(func.count(Student.id) >= 3).all()
-
- # 方式二: 通过子查询实现,在子句中使用 filter()和比较运算符来实现它。
- # 3.1、 先查询年龄小于 25 的学生,并根据家庭地址分组,获取学生数量大于等于 3 的家庭地址
- subquery = session.query(Student.addr, func.count(Student.id).label("stu_nums")).filter(Student.age < 25).\
- group_by(Student.addr).subquery()
- # 3.2、使用子查询和 filter 子句来过滤出学生数量大于等于 3 的家庭地址
- results3_1 = session.query(subquery.c.addr, subquery.c.stu_nums).filter(subquery.c.stu_nums >= 3).all()
-
- # for addr, stu_nums in results3:
- # print(addr, stu_nums)
-
- # 统计不同家庭地址男女生的数量
- # select addr,gender,count(*) from t_student2 group by addr,gender;
- results4 = session.query(Student.addr, Student.gender, func.count(Student.id).label("stu_nums")).group_by(Student.addr, Student.gender).all()
- print(results4)
注意:
- def order_by_query():
- # 1、根据年龄对学生进行升序排序
- # select *from t_student order by age asc;
- res = session.query(Student).order_by(Student.age.asc()).all()
- # 或者
- # select *from t_student order by age;
- res_1 = session.query(Student).order_by(Student.age).all()
- # for res in res:
- # print(res.age)
-
- # 2、根据年龄对学生进行升序排序 , 年龄相同 , 再按照入学时间进行降序排序
- # select *from t_student order by age,entrydate desc;
- res2 = session.query(Student).order_by(Student.age, Student.entrydate.desc()).all()
- # for res in res2:
- # print(res.age,res.entrydate)
注意:
说明:在 SQLAlchemy 中,分页查询通常通过使用 offset() 和 limit() 方法来实现。这两个方法分别用于设置索引偏移量和限制查询结果的条数。
- def limit_query():
- # 1、查询第1页学生数据, 每页展示10条记录
- # select *from t_student limit 0,10;
- res = session.query(Student).offset(0).limit(10).all()
- # 或者
- # select *from t_student limit 10;
- res_1 = session.query(Student).limit(10).all()
- # for stu in res:
- # print(stu.id)
-
- # 2、查询第2页学生数据, 每页展示10条记录
- # 说明:起始索引=(页码-1)*页展示记录数
- # select *from t_student limit 10,10;
- res2 = session.query(Student).offset(10).limit(10).all()
- # for stu in res2:
- # print(stu.id)
说明:先建立t_emp员工表和t_dept部门表两张表,并插入对应数据
1、创建对应的ORM模型如下:
- from sqlalchemy.orm import declarative_base , sessionmaker, scoped_session
- from sqlalchemyBaseUse import engine
- from sqlalchemy import Column, Integer, String, DateTime, distinct, or_, and_, func, ForeignKey
-
- # 声明ORM基类
- Base = declarative_base()
- Session = sessionmaker(bind=engine)
- session = scoped_session(Session)
-
- # 部门表
- class Departments(Base):
- __tablename__ = 't_dept'
- id = Column(Integer, primary_key=True, autoincrement=True)
- name = Column(String(50), nullable=False, comment="部门名称")
-
- # 员工表
- class Employes(Base):
- __tablename__ = "t_emp"
- id = Column(Integer, primary_key=True, autoincrement=True)
- name = Column(String(50), nullable=False, comment="姓名")
- age = Column(Integer, comment="年龄")
- job = Column(String(20), comment="职位")
- salary = Column(Integer, comment="薪资")
- entrydate = Column(DateTime, comment="入职时间")
- managerid = Column(Integer, comment="直属领导ID")
- dept_id = Column(Integer, ForeignKey("t_dept.id"))
-
-
- def init_db():
- # 创建继承base类的表的映射关系
- Base.metadata.create_all(engine)
-
- if __name__ == '__main__':
- init_db()
注意:如果创建的表的编码不是utf8mb4,需要修改为utf8mb4,不然插入中文的数据会出现编码错误,执行以下SQL语句:
ALTER TABLE t_dept CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
ALTER TABLE t_emp CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
2、插入测试数据
INSERT INTO t_dept (name) VALUES ('研发部'), ('市场部'),('财务部'), ('销售部'), ('总经办'), ('人事部');
INSERT INTO t_emp (id, name, age, job,salary, entrydate, managerid, dept_id) VALUES
(1, '大刘', 28, '总裁',40000, '2000-01-01', null,5),
(2, '夏析', 20, '项目经理',20000, '2005-12-05', 1,1),
(3, '李兴', 33, '开发', 8000,'2000-11-03', 2,1),
(4, '张敏', 30, '开发',11000, '2002-02-05', 2,1),
(5, '林夕', 43, '开发',10500, '2004-09-07', 3,1),
(6, '小美', 19, '程序员鼓励师',6600, '2004-10-12', 2,1),
(7, '林逸', 60, '财务总监',8500, '2002-09-12', 1,3),
(8, '李媛', 19, '会计',48000, '2006-06-02', 7,3),
(9, '林妙妙', 23, '出纳',5250, '2009-05-13', 7,3),
(10, '赵芳', 20, '市场部总监',12500, '2004-10-12', 1,2),
(11, '张三', 56, '职员',3750, '2006-10-03', 10,2),
(12, '李四', 19, '职员',3750, '2007-05-09', 10,2),
(13, '王二', 19, '职员',5500, '2009-02-12', 10,2),
(14, '周鑫', 88, '销售总监',14000, '2004-10-12', 1,4),
(15, '刘达', 38, '销售',4600, '2004-10-12', 14,4),
(16, '老钱', 40, '销售',4600, '2004-10-12', 14,4),
(17, '小六', 42, null,2000, '2011-10-12', 1,null);
说明:多表查询就是指从多张表中查询数据。
操作:要执行多表查询,就只需要使用逗号分隔多张表,如: select * from t_emp , t_dept;
具体的执行结果如下:
解释:可见查询结果中包含了大量的结果集,总共102条记录,而这其实就是员工表emp所有的记录 (17) 与 部门表dept所有记录(6) 的所有组合情况,这种现象称之为笛卡尔积。
笛卡尔积: 笛卡尔乘积是指在数学中,两个集合A集合 和 B集合的所有组合情况。
而在多表查询中,需要消除无效的笛卡尔积,只保留两张表关联部分的数据;使用SQL语句,消除多表查询的笛卡尔积:
select *from t_emp,t_dept where t_emp.dept_id = t_dept.id;
说明:内连接查询的是两张表交集部分的数据
内连接的语法分为两种:
语法:
1、隐式内连接
SELECT 字段列表 FROM 表1 , 表2 WHERE 条件 ... ;
2、显示内连接(inner join)
SELECT 字段列表 FROM 表1 [ INNER ] JOIN 表2 ON 连接条件 ... ;
注意:SQL语句中的内连接的inner关键可以省略
说明:在 SQLAlchemy 中,内连接(INNER JOIN)是默认的连接类型,当你使用 join() 方法而不指定 isouter=True 时,你就是在执行内连接。内连接只返回满足连接条件的行,即两个表中都存在匹配项的行。
示例:
1、查询每一个员工的姓名 , 及关联的部门的名称 (使用隐式内连接实现)
res = session.query(Employes, Departments).filter(Employes.dept_id == Departments.id).all()
2、查询每一个员工的姓名 , 及关联的部门的名称 (使用显式内连接实现)
res2 = session.query(Employes, Departments).join(Departments, Employes.dept_id == Departments.id).all()
3、查询每一个员工的姓名 , 及关联的部门的名称(使用relationship实现内连接查询)
employes = relationship("Employes", back_populates="departments")
departments = relationship("Departments", back_populates="employes")
- def init_db():
- # 创建继承base类的表的映射关系
- Base.metadata.create_all(engine)
-
- if __name__ == '__main__':
- init_db()
res3 = session.query(Employes).join(Employes.departments)
外连接分为两种,分别是:
1、左外连接
说明:左外连接相当于查询表1(左表)的所有数据,当然也包含表1和表2交集部分的数据。
语法:
SELECT 字段列表 FROM 表1 LEFT [ OUTER ] JOIN 表2 ON 条件 ... ;
2、右外连接
说明:右外连接相当于查询表2(右表)的所有数据,当然也包含表1和表2交集部分的数据。
语法:
SELECT 字段列表 FROM 表1 RIGHT [ OUTER ] JOIN 表2 ON 条件 ... ;
说明:SQLAlchemy 支持左外连接(LEFT OUTER JOIN)和右外连接(RIGHT OUTER JOIN)。不过,在 SQLAlchemy 的 ORM 层,通常更常见的是使用左外连接,因为 SQLAlchemy 更多地是按照关系型数据库的标准来设计的,而标准 SQL 中右外连接并不如左外连接那样常见。
但是,你可以使用 SQLAlchemy 的 Core 表达式语言来执行右外连接。以下是如何在 SQLAlchemy 中使用左外连接和右外连接的示例:
注意事项:左外连接和右外连接是可以相互替换,只需要调整在连接查询时SQL中,表结构的先后顺序就可以。在实际开发使用时,左外连接常用。
示例:
1、查询t_emp表的所有数据, 和对应的部门信息 (左外连接)
- # 1、查询t_emp表的所有数据, 和对应的部门信息(左外连接)
- # select e.*,d.name from t_emp e left outer join t_dept d on e.dept_id = d.id;
- # 注意:outerjoin()函数中的连接条件可以省略,Employes.dept_id == Departments.id
- res = session.query(Employes, Departments.name).outerjoin(Departments).all()
- # print(res)
2、查询t_dept表的所有数据, 和对应的员工信息(右外连接)
- # 查询t_dept表的所有数据, 和对应的员工信息(右外连接)
- # select d.*,e.* from t_emp e right outer join t_dept d on d.id = e.dept_id;
- # 使用join()函数实现,isouter=True使用左外连接,左外连接和右外连接可以相互转换,也就是表的位置不同
- # join()函数中的第一个参数Employes表示左外连接关联的表是t_emp,主表是t_dept
- res2 = session.query(Employes, Departments).join(Employes, isouter=True).all()
- # print(res2)
说明:自连接查询,顾名思义,就是自己连接自己,也就是把一张表连接查询多次。
连接方式:对于自连接查询,可以是内连接查询,也可以是外连接查询。
注意事项:在自连接查询中,必须要为表起别名,不然不清楚所指定的条件、返回的字段,到底是哪一张表的字段。
语法:
SELECT 字段列表 FROM 表A 别名A JOIN 表A 别名B ON 条件 ... ;
说明:同样的在SQlAlchemy中实现自连接查询,也需要为表起别名的方式,起别名使用aliased()函数实现。
示列:
1、查询员工 及其 所属领导的名字
- # select a.name '员工姓名', b.name '领导姓名' from t_emp a join t_emp b on a.managerid = b.id;
- # 或
- # select a.name '员工姓名', b.name '领导姓名' from t_emp a, t_emp b where a.managerid = b.id;
- # 为表Employes创建别名
- user = aliased(Employes)
- manager = aliased(Employes)
-
- # 使用Inner join 的连接方式
- res = session.query(user.name.label("employee_name"), manager.name.label("manager_name")).join(manager, user.managerid == manager.id).all()
- print(res)
2、查询所有员工 t_emp a 及其领导的名字 t_emp b, 如果员工没有领导, 也需要查询出来 。
- # 为表Employes创建别名
- user = aliased(Employes)
- manager = aliased(Employes)
-
- # select a.name '员工姓名', b.name '领导姓名' from t_emp a left join t_emp b on a.managerid = b.id;
- res2 = session.query(user.name.label("employee_name"), manager.name.label("manager_name")).join(manager, user.managerid == manager.id, isouter=True).all()
- print(res2)
关键字:union
说明:
语法:
SELECT 字段列表 FROM 表A ...
UNION [ ALL ]
SELECT 字段列表 FROM 表B ....;
说明:在 SQLAlchemy 中,可以使用 union() 或者 union_all()方法来执行联合查询。
示例:
1、将薪资低于 5000 的员工 , 和 年龄大于 45 岁的员工全部查询出来.
SQL实现如下:
select * from t_emp where salary < 5000
union all
select * from t_emp where age > 45;
ORM实现如下:
- query1 = session.query(Employes).filter(Employes.salary < 5000)
- query2 = session.query(Employes).filter(Employes.age > 45)
- union_query = query1.union(query2).all()
- for un in union_query:
- print(un.name)
1、概念
如:
SELECT * FROM t1 WHERE column1 = ( SELECT column1 FROM t2 );
2、分类
分类依据:根据子查询结果
分为:
分类依据2:根据子查询位置
分为:
说明:子查询返回的结果是单个值(数字、字符串、日期等)
常用操作符:= <> > >= <
示列:
1、查询“销售部”所有员工的信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出销售部的部门id
select id from t_dept where name = '销售部';2、然后根据查出的部门id再查询销售部的所有员工信息
select e.* from t_emp e where dept_id = (select id from t_dept where name = '销售部');或者使用关联查询
select e.* from t_emp e join t_dept d on e.dept_id = d.id where d.name='销售部';
ORM实现如下:
- # 1.1、先查询出销售部的部门id
- subquery = session.query(Departments.id).filter(Departments.name=="销售部").scalar_subquery()
- # 可能会出现警告,可以将子查询转换成 select 语句
- # subquery = select([Departments.id]).where(Departments.name == "销售部")
- # 1.2、然后根据查出的部门id再查询销售部的所有员工信息
- res = session.query(Employes).filter(Employes.dept_id.in_(subquery)).all()
- print(res)
2、查询在员工“林逸”之后入职的所有员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出林逸的入职时间
select entrydate from t_emp where name = '林逸';2、然后根据入职时间再查找出在林逸入职时间之后的员工信息
select e.* from t_emp e where entrydate > (select entrydate from t_emp where name = '林逸');
ORM实现如下:
- subquery2 = session.query(Employes.entrydate).filter_by(name="林逸").scalar_subquery()
- res2 = session.query(Employes).filter(Employes.entrydate > subquery2)
- print(res2)
说明:子查询返回的结果是一列(可以是多行)
常用操作符:IN 、NOT IN 、 ANY 、SOME 、 ALL
操作符 | 描述 |
IN | 在指定的集合范围之内,多选一 |
NOT IN | 不在指定的集合范围之内 |
ANY | 子查询返回列表中,有任意一个满足即可 |
SOME | 与ANY等同,使用SOME的地方都可以使用ANY |
ALL | 子查询返回列表的所有值都必须满足 |
示列:
1、查询 "销售部" 和 "市场部" 的所有员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先在部门表中查询出销售部和市场部的部门id
select id from t_dept where name = '销售部' or name = '市场部';2、然后根据1查出的结果,使用关键字in查询出对应销售部和市场部的员工信息1
select * from t_emp where dept_id in (select id from t_dept where name = '销售部' or name = '市场部');
ORM实现如下:
- subquery = session.query(Departments.id).filter(or_(Departments.name=="销售部", Departments.name=="市场部")).subquery()
- res = session.query(Employes).filter(Employes.id.in_(subquery))
2、 查询比“财务部”所有人工资都高的员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出财务部所有人员的工资信息
select salary from t_emp e join t_dept d on e.dept_id = d.id where d.name = '财务部';2、然后根据1查出的结果,使用关键字all查询出比财务部所有人工资都高的员工信息
select * from t_emp where salary > all (select salary from t_emp e join t_dept d on e.dept_id = d.id where d.name = '财务部');
或
select * from t_emp where salary > all ( select salary from t_emp where dept_id =(select id from t_dept where name = '财务部') );
ORM实现如下:
- subquery2 = session.query(Employes.salary).join(Departments).filter(Departments.name=="财务部").scalar_subquery()
- res2 = session.query(Employes).filter(Employes.salary > all_(subquery2)).all()
3、 查询比“研发部”其中任意一人工资都高的员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出研发部所有人员的工资信息
select salary from t_emp where dept_id = (select id from t_dept where name = '研发部');2、然后根据1查出的结果,使用关键字any查询出比研发部任意一人工资都高的员工信息
select * from t_emp where salary > any (select salary from t_emp where dept_id = (select id from t_dept where name = '研发部'));
ORM实现如下:
- subquery3 = session.query(Employes.salary).join(Departments).filter(Departments.name == "研发部").scalar_subquery()
- res3 = session.query(Employes).filter(Employes.salary > any_(subquery3)).all()
说明:子查询返回的结果是一行(可以是多列)
常用的操作符:= 、<> 、IN 、NOT IN
示列:
1、 查询与 "张敏" 的薪资及直属领导相同的员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出张敏的薪资和直属领导的id
select salary,managerid from t_emp where name = '张敏';2、然后根据1查出的结果,使用 = 查询与"张敏"的薪资及直属领导相同的员工信息
select * from t_emp where (salary,managerid) = (select salary,managerid from t_emp where name = '张敏');
ORM实现如下:
- subquery = session.query(Employes.salary, Employes.managerid).filter_by(name="张敏").subquery()
- res = session.query(Employes).filter(tuple_(Employes.salary,Employes.managerid).in_(subquery)).all()
说明:子查询返回的结果是多行多列
常用的操作符:IN
示列:
1、查询与 "林夕" , "林妙妙" 的职位和薪资相同的员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询林夕和林妙妙的职位与薪资信息
select job, salary from t_emp where name in ('林夕','林妙妙');2、然后根据1查出的结果,使用 in 查询与"林夕","林妙妙"的职位和薪资相同的员工信息
select * from t_emp where (job,salary) in (select job, salary from t_emp where name in ('林夕','林妙妙'));
ORM实现如下:
- subquery = session.query(Employes.job, Employes.salary).filter(Employes.name.in_(["林夕", "林妙妙"])).subquery()
- res = session.query(Employes).filter(tuple_(Employes.job, Employes.salary).in_(subquery)).all()
2、查询入职日期是 "2002-09-12" 之后的员工信息 , 及其部门信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出入职日期是 "2002-09-12" 之后的员工信息
select * from t_emp where entrydate > "2002-09-12";2、根据1查询出的表信息,在查询对应的部门信息
select e.*, d.* from (select * from t_emp where entrydate > "2002-09-12") e join t_dept d on e.dept_id = d.id;
ORM实现如下:
- subquery2 = session.query(Employes).filter(Employes.entrydate > "2002-09-12").subquery()
- res2 = session.query(subquery2, Departments).join(Departments, subquery2.c.dept_id == Departments.id).all()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。