当前位置:   article > 正文

SQLAlchemy实战使用总结(通俗易懂)!!!

SQLAlchemy实战使用总结(通俗易懂)!!!

目录

简介

基本用法

安装

连接数据库

配置信息

 创建引擎并连接数据库

创建ORM模型并映射到数据建库中

增删改查(CRUD)操作

新增

单条新增:

批量新增:

删除

修改

 单条更新:

 批量更新:

查询

 查询所有数据

 指定查询列

 获取返回数据的第一行

 查询数据 指定查询数据列 加入别名

原生SQL查询

表达式筛选条件

原生SQL筛选条件

筛选条件格式

字符串匹配方式筛选条件 并使用 order_by进行排序

and or使用

复杂查询

查询语句

排序

通配符

限制(切片)

分组

between in使用

类型转换

dict_to_object

db_tuple_to_dict

model_to_dict

res_copy_model_to_dest(修改的时候会用到)


简介

        SQLAlchemy是用Python编程语言开发的一个开源项目。它提供了SQL工具包和ORM(对象关系映射)工具,使用MIT许可证发行。

        SQLAlchemy最初在2006年2月发行,发行后便很快的成为Python社区中最广泛使用的ORM工具之一,丝毫不亚于Django自带的ORM框架。

        SQLAlchemy采用简单的Python语言,提供高效和高性能的数据库访问,实现了完整的企业级持久模型。它的理念是,SQL数据库的量级和性能比对象集合重要,而对象集合的抽象又重要于表和行。


以上都是比较官方的说法,帮助大家了解一下SQLAlchemy。了解了解就可。

基本用法

安装

安装sqlalchemy

  1. pip3 install sqlalchemy
  2. pip3 install pymysql

本文使用MySQL作为数据库,使用pymysql作为驱动,因此需要安装pymysql

连接数据库

配置信息

在连接数据库前,需要使用到一些配置信息,然后把它们组合成满足以下条件的字符串:

dialect+driver://username:password@host:port/database

例:mysql+pymysql://root:root@192.168.105.110:30306/testDataBase

建议将配置信息放到你的配置文件中,如config.py

  • dialect:数据库,如:sqlite、mysql、oracle等
  • driver:数据库驱动,用于连接数据库的,本文使用pymysql
  • username:用户名
  • password:密码
  • host:IP地址
  • port:端口
  • database:数据库

 创建引擎并连接数据库

  1. from sqlalchemy import create_engine
  2. from config import DB_URI
  3. engine = create_engine(DB_URI) # 创建引擎
  4. conn = engine.connect() # 连接
  5. result = conn.execute('SELECT 1') # 执行SQL
  6. print(result.fetchone())
  7. conn.close() # 关闭连接

创建ORM模型并映射到数据建库中

  1. from sqlalchemy.ext.declarative import declarative_base
  2. from sqlalchemy import create_engine, Column, Integer, String
  3. from sqlalchemy.orm import sessionmaker
  4. from config import DB_URI
  5. engine = create_engine(DB_URI)
  6. Base = declarative_base(engine) # SQLORM基类
  7. session = sessionmaker(engine)() # 构建session对象
  8. class Student(Base):
  9. __tablename__ = 'student' # 表名
  10. id = Column(Integer, primary_key=True, autoincrement=True)
  11. name = Column(String(50))
  12. age = Column(Integer)
  13. sex = Column(String(10))
  14. Base.metadata.create_all() # 将模型映射到数据库中

执行上面代码,将会在数据库中生成对应的映射表student。

增删改查(CRUD)操作

        新增和修改的时候可能会遇到类型错误问题,可以参考下面的类型转换。

新增

注意:主要看使用的方法以及传入的参数,像开发session以及命名是什么,用自己框架的获取session对象就可以。

单条新增:

  1. # 1.首先导入之间做好的ORM 对象 User
  2. from my_create_table import User
  3. # 2.使用Users ORM模型创建一条数据
  4. user1 = User(name="wang")
  5. # 在db_session会话中添加一条 UserORM模型创建的数据
  6. db_session.add(user1)
  7. # 使用 db_session 会话提交 , 这里的提交是指将db_session中的所有指令一次性提交
  8. db_session.commit()

批量新增:

 方法一:(最常用,但是执行时间不算快)

  1. # 方法一:
  2. user2 = User(name="张三")
  3. user3 = User(name="王五")
  4. db_session.add(user2)
  5. db_session.add(user3)
  6. db_session.commit()
  7. # 之前说过commit是将db_session中的所有指令一次性提交,现在的db_session中至少有两条指令user2和user3
  8. db_session.close()
  9. #关闭会话

 方法二:

  1. # 方法二:
  2. user_list = [
  3. User(name="wang1"),
  4. User(name="wang2"),
  5. User(name="wang3")
  6. ]
  7. db_session.add_all(user_list)
  8. db_session.commit()
  9. db_session.close()

 方法三:(执行时间最短排名:4)

  1. db_session.bulk_save_objects(
  2. [
  3. Customer(name="NAME " + str(i))
  4. for i in xrange(min(10000, n1))
  5. ]
  6. )
  7. db_session.commit()

 方法四:(执行时间最短排名:3)

  1. db_session.bulk_insert_mappings(
  2. Customer,
  3. [
  4. dict(name="NAME " + str(i))
  5. for i in xrange(min(10000, n1))
  6. ]
  7. )
  8. db_session.commit()

方法五:(执行时间最短排名:2)

  1. engine = create_engine(dbname, echo=False)
  2. engine.execute(
  3. Customer.__table__.insert(),
  4. [{"name": 'NAME ' + str(i)} for i in xrange(n)]
  5. ) ##==> engine.execute('insert into ttable (name) values ("NAME"), ("NAME2")')

方法六:(执行时间最短排名:1)

  1. def init_sqlite3(dbname):
  2. conn = sqlite3.connect(dbname)
  3. c = conn.cursor()
  4. c.execute("DROP TABLE IF EXISTS customer")
  5. c.execute(
  6. "CREATE TABLE customer (id INTEGER NOT NULL, "
  7. "name VARCHAR(255), PRIMARY KEY(id))")
  8. conn.commit()
  9. return conn
  10. def test_sqlite3(n=100000, dbname='sqlite3.db'):
  11. conn = init_sqlite3(dbname)
  12. c = conn.cursor()
  13. t0 = time.time()
  14. for i in xrange(n):
  15. row = ('NAME ' + str(i),)
  16. c.execute("INSERT INTO customer (name) VALUES (?)", row)
  17. conn.commit()

删除

  1. # 删除
  2. class_info = db_session.query(ClassTable).filter(ClassTable.name=="OldBoyS1").first()
  3. db_session.query(Student).filter(Student.class_id == class_info.id).delete()
  4. db_session.commit()
  5. db_session.close()
  1. user = UserSample()
  2. db.session.query(UserSample).filter_by(id=user.id).delete()
  3. db.session.commit()
  4. db.session.close()

修改

 单条更新:

  1. # UPDATE user SET name="NBDragon" WHERE id=20 更新一条数据
  2. # 语法是这样的 :
  3. # 使用 db_session 执行User表 query(User) 筛选 User.id = 20 的数据 filter(User.id == 20)
  4. # 将name字段的值改为NBDragon update({"name":"NBDragon"})
  5. res = db_session.query(User).filter(User.id == 20).update({"name":"NBDragon"})
  6. print(res) # 1 res就是我们当前这句更新语句所更新的行数
  7. # 注意注意注意
  8. # 这里一定要将db_session中的执行语句进行提交,因为你这是要对数据中的数据进行操作
  9. # 数据库中 增 改 删 都是操作,也就是说执行以上三种操作的时候一定要commit
  10. db_session.commit()
  11. db_session.close()
  12. #关闭会话

 批量更新:

  1. # 更新多条
  2. res = db_session.query(User).filter(User.id <= 20).update({"name":"NBDragon"})
  3. print(res) # 6 res就是我们当前这句更新语句所更新的行数
  4. db_session.commit()
  5. db_session.close()
  6. #关闭会话

查询

 查询所有数据

  1. # 查询所有数据
  2. res = db_session.query(User).all()

 指定查询列

  1. # query的时候我们不在使用User ORM对象,而是使用User.name来对内容进行选取
  2. user_list = db_session.query(User.name).all()

 获取返回数据的第一行

  1. # 获取返回数据的第一行
  2. res = db_session.query(User).first()

 查询数据 指定查询数据列 加入别名

  1. # 查询数据 指定查询数据列 加入别名
  2. res = db_session.query(User.name.label('username'), User.id).first()

原生SQL查询

  1. #原生SQL查询
  2. res = db_session.query(User).from_statement(text("SELECT * FROM User where name=:name")).params(name='wang').all()

表达式筛选条件

  1. # 表达式筛选条件
  2. res = db_session.query(User).filter(User.name == "wang").all()

原生SQL筛选条件

  1. # 原生SQL筛选条件
  2. r4 = db_session.query(User).filter_by(name='wang').all()
  3. r5 = db_session.query(User).filter_by(name='wang').first()

筛选条件格式

  1. # 筛选条件格式
  2. # filter:
  3. user_list = db_session.query(User).filter(User.name == "wang").all()
  4. user_list = db_session.query(User).filter(User.name == "wang").first()
  5. # filter_by:
  6. user_list = db_session.query(User).filter_by(name="wang").all()
  7. user_list = db_session.query(User).filter_by(name="wang").first()

字符串匹配方式筛选条件 并使用 order_by进行排序

  1. # 字符串匹配方式筛选条件 并使用 order_by进行排序
  2. res = db_session.query(User).filter(text("id<:value and name=:name")).params(value=224, name='wang').order_by(User.id).all()

and or使用

  1. # and or
  2. from sqlalchemy.sql import and_ , or_
  3. res = db_session.query(User).filter(and_(User.id > 3, User.name == 'wang')).all()
  4. ret = db_session.query(User).filter(or_(User.id < 2, User.name == 'wang')).all()

复杂查询

  1. # 复杂查询
  2. from sqlalchemy.sql import text
  3. user_list = db_session.query(User).filter(text("id<:value and name=:name")).params(value=3,name="wang")

查询语句

  1. # 查询语句
  2. data = db_session.execute('select * from t_user where name=:value', params={"value": 'abc'})
  3. row = data.fetchone() # 取第一条
  4. print(row.id) # 取主键
  5. print(row.name) # 取字段
  6. rows = data.fetchall() # 取所有数据
  7. for row in rows:
  8. print(row.id, row.name)
  9. print(data.rowcount) # 取条数

排序

  1. # 排序 :
  2. user_list = db_session.query(User).order_by(User.id).all()
  3. user_list = db_session.query(User).order_by(User.id.desc()).all()

通配符

  1. # 通配符
  2. ret = db_session.query(User).filter(User.name.like('e%')).all()
  3. ret = db_session.query(User).filter(~User.name.like('e%')).all()

限制(切片)

  1. # 限制
  2. ret = db_session.query(User)[1:2]

分组

  1. from sqlalchemy.sql import func
  2. ret = db_session.query(User).group_by(User.extra).all()
  3. ret = db_session.query(
  4. func.max(User.id),
  5. func.sum(User.id),
  6. func.min(User.id)).group_by(User.name).all()
  7. ret = db_session.query(
  8. func.max(User.id),
  9. func.sum(User.id),
  10. func.min(User.id)).group_by(User.name).having(func.min(User.id) >2).all()

between in使用

  1. # between使用
  2. ret = session.query(User).filter(User.id.between(1, 3), User.name == 'wang').all() # between 大于1小于3的
  3. # in使用
  4. ret = session.query(User).filter(User.id.in_([1,3,4])).all() # in_([1,3,4]) 只查询id等于1,3,4的
  5. ret = session.query(User).filter(~User.id.in_([1,3,4])).all() # ~xxxx.in_([1,3,4]) 查询不等于1,3,4的
  6. ret = session.query(User).filter(User.id.in_(session.query(User.id).filter_by(name='wang'))).all() 子查询

类型转换

dict_to_object

  1. def dict_to_object(dict_data, obj):
  2. dic2class(dict_data, obj)
  3. def dic2class(py_data, obj):
  4. for name in [name for name in dir(obj) if not name.startswith('_')]:
  5. if name not in py_data:
  6. setattr(obj, name, None)
  7. else:
  8. value = getattr(obj, name)
  9. setattr(obj, name, set_value(value, py_data[name]))
  10. def set_value(value, py_data):
  11. if str(type(value)).__contains__('.'):
  12. # value 为自定义类
  13. dic2class(py_data, value)
  14. elif str(type(value)) == "<class 'list'>":
  15. # value为列表
  16. if value.__len__() == 0:
  17. # value列表中没有元素,无法确认类型
  18. value = py_data
  19. else:
  20. # value列表中有元素,以第一个元素类型为准
  21. child_value_type = type(value[0])
  22. value.clear()
  23. for child_py_data in py_data:
  24. child_value = child_value_type()
  25. child_value = set_value(child_value, child_py_data)
  26. value.append(child_value)
  27. else:
  28. value = py_data
  29. return value

db_tuple_to_dict

  1. def db_tuple_to_dict(resultproxy):
  2. d, a = {}, []
  3. for rowproxy in resultproxy:
  4. for column, value in rowproxy.items():
  5. d = {**d, **{column: value}}
  6. a.append(d)
  7. return a

model_to_dict

  1. def model_to_dict(obj):
  2. dic = {}
  3. dic_columns = obj.__table__.columns
  4. # 保证都是字符串和数字
  5. types = [str, int, float, bool]
  6. # 注意,obj.__dict__会在commit后被作为过期对象清空dict,所以保险的办法还是用columns
  7. for k, tmp in dic_columns.items():
  8. # k=nick,tmp=usergroup.nick
  9. v = getattr(obj, k, None)
  10. if v != None:
  11. dic[k] = str(v) if v and type(v) not in types else v
  12. return dic

res_copy_model_to_dest(修改的时候会用到)

  1. def res_copy_model_to_dest(res, dest):
  2. dic_columns = res.__table__.columns
  3. # 保证都是字符串和数字
  4. types = [str, int, float, bool, bytes]
  5. for k, tmp in dic_columns.items():
  6. v = getattr(res, k, None)
  7. value = str(v) if v and type(v) not in types else v
  8. print("key:", k, " v:", value)
  9. if v is not None:
  10. setattr(dest, k, value)

例:

test = TestModel()
db_test = db.session.query(TestModel).get(test.id)
res_copy_model_to_dest(test, db_test)
db.session.commit()
db.session.close()

在项目中用到的东西,分享一波,有问题可以私信或者评论

作者:筱白爱学习!!

欢迎关注转发评论点赞沟通,您的支持是筱白的动力!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/180982
推荐阅读
相关标签
  

闽ICP备14008679号