当前位置:   article > 正文

SQLAlchemy高级SqlHelper封装_sqlalchemy 封装

sqlalchemy 封装

环境与版本

python: 3.10

SQLAlchemy: 2.0.9

网上好多sql语句查询相关的代码执行异常,不确定是否与版本有有关

说明

封装了比较实用的 复合唯一索引表的插入或更新,多条件查询,叠加条件查询,以及返回pandas DataFrame。

容易出错的问题

1. 执行sql语句错误提示:sqlalchemy.exc.ObjectNotExecutableError: Not an executable object‘......’

参考

2. pd.read_sql()  pd.read_sql_query() 报错 AttributeError: 'OptionEngine' object has no attribute 'execute'

read_sql() pd.read_sql_query()的con参数,必须是session.bind.connect() 而不是session.bind

数据库初始化以及Model定义

  1. from sqlalchemy import create_engine
  2. from sqlalchemy.orm import sessionmaker, relationship
  3. from sqlalchemy.ext.declarative import declarative_base
  4. from sqlalchemy import Column, Integer, String, ForeignKey, Index
  5. # 数据库配置
  6. db_config = {
  7. 'sqlite': {
  8. 'engine': 'sqlite:///mydatabase.db'
  9. },
  10. 'mysql': {
  11. 'engine': 'mysql+pymysql://username:password@host/database'
  12. }
  13. }
  14. # 根据配置选择 engine
  15. db_choice = 'sqlite'
  16. if db_choice == 'sqlite':
  17. engine = create_engine(db_config['sqlite']['engine'])
  18. elif db_choice == 'mysql':
  19. engine = create_engine(db_config['mysql']['engine'])
  20. else:
  21. engine = create_engine('sqlite:///mydatabase.db')
  22. Session = sessionmaker(bind=engine)
  23. Base = declarative_base()
  24. class Address(Base):
  25. __tablename__ = 'address'
  26. id = Column(Integer, primary_key=True, autoincrement=True)
  27. address = Column(String)
  28. class User(Base):
  29. __tablename__ = 'users'
  30. id = Column(Integer, primary_key=True, autoincrement=True)
  31. addr_id = Column(Integer, ForeignKey('address.id'))
  32. name = Column(String)
  33. age = Column(Integer)
  34. def __repr__(self):
  35. return f"<User(id='{self.id}', name='{self.name}', age='{self.age}')>"
  36. class Author(Base):
  37. __tablename__ = 'author'
  38. id = Column(Integer, primary_key=True)
  39. name = Column(String)
  40. books = relationship('Book', back_populates='author')
  41. class Book(Base):
  42. __tablename__ = 'book'
  43. id = Column(Integer, primary_key=True)
  44. title = Column(String)
  45. author_id = Column(Integer, ForeignKey('author.id'))
  46. author = relationship('Author', back_populates='books')
  47. class Department(Base):
  48. __tablename__ = 'department'
  49. id = Column(Integer, primary_key=True)
  50. name = Column(String)
  51. employees = relationship('Employee', back_populates='department')
  52. class Employee(Base):
  53. __tablename__ = 'employee'
  54. id = Column(Integer, primary_key=True)
  55. name = Column(String)
  56. remark = Column(String)
  57. score = Column(Integer)
  58. department_id = Column(Integer, ForeignKey('department.id'))
  59. department = relationship('Department', back_populates='employees')
  60. class Code(Base):
  61. __tablename__ = 'code'
  62. id = Column(Integer, primary_key=True)
  63. symbol = Column(String)
  64. exchange = Column(String)
  65. remark = Column(String)
  66. # 复合唯一索引
  67. __table_args__ = (
  68. Index('ix_vt_symbol', 'symbol', 'exchange', unique=True),
  69. )
  70. Base.metadata.create_all(engine)

SqlHelper类

  1. import pandas as pd
  2. import sqlalchemy
  3. from sqlalchemy import or_, and_, text
  4. from sqlalchemy.exc import IntegrityError
  5. from sqltool.models import Session, User, Address, Book, Author, Department, Employee, Code
  6. def or_filters(*args):
  7. return or_(*args)
  8. def and_filters(*args):
  9. return and_(*args)
  10. class SqlHelper:
  11. def __init__(self):
  12. self.session = Session()
  13. def add(self, obj):
  14. self.session.add(obj)
  15. self.session.commit()
  16. def delete(self, obj):
  17. self.session.delete(obj)
  18. self.session.commit()
  19. def update(self, obj):
  20. self.session.merge(obj)
  21. self.session.commit()
  22. def upsert(self, model):
  23. self._upsert(model)
  24. self.session.commit()
  25. def _upsert(self, model):
  26. '''
  27. 根据唯一索引查找记录,再决定更新(补全primary_key的自增列)或新增记录
  28. '''
  29. unique = self.get_unique_keys(model)
  30. obj = self.session.query(model.__class__).filter(
  31. # getattr(model.__class__, primary_key) == getattr(model, primary_key),
  32. *[(getattr(model.__class__, c) == getattr(model, c)) for c in unique]
  33. ).first()
  34. if obj:
  35. primary_key = self.get_primary_key(model)
  36. val = getattr(obj, primary_key)
  37. setattr(model, primary_key, val)
  38. self.session.merge(model)
  39. else:
  40. self.session.add(model)
  41. def add_all(self, models):
  42. try:
  43. self.session.begin()
  44. self.session.add_all(models)
  45. self.session.commit()
  46. except IntegrityError:
  47. # 如果出现唯一键冲突错误,回滚事务
  48. self.session.rollback()
  49. def update_all(self, models):
  50. try:
  51. for m in models:
  52. self.session.merge(m)
  53. self.session.commit()
  54. except IntegrityError:
  55. self.session.rollback()
  56. def delete_all(self, models):
  57. try:
  58. for m in models:
  59. self.session.delete(m)
  60. self.session.commit()
  61. except IntegrityError:
  62. self.session.rollback()
  63. def upsert_all(self, models):
  64. try:
  65. self.session.begin()
  66. for m in models:
  67. self._upsert(m)
  68. self.session.commit()
  69. except IntegrityError:
  70. self.session.rollback()
  71. def query(self, model):
  72. return self.session.query(model)
  73. def update_by(self, model, filters, col_value_map):
  74. return self.session.query(model).filter(filters).update(col_value_map)
  75. def query_by(self, model, *args):
  76. return self.session.query(model).filter(and_(*args))
  77. def query_or(self, model, *args):
  78. return self.session.query(model).filter(or_(*args))
  79. def query_by_func(self, model, func):
  80. '''
  81. 使用 or_filters 和 and_filters 叠加查询条件
  82. demo
  83. and_filter(or_filters(User.age < 30, User.age > 60), User.name == 'lisi_6')
  84. 等同于
  85. (User.age < 30 or User.age > 60) and User.name == 'lisi_6'
  86. '''
  87. return self.session.query(model).filter(func)
  88. def get_by(self, model, **kwargs):
  89. return self.session.query(model).filter_by(**kwargs).first()
  90. def count_by(self, model, **kwargs):
  91. return self.session.query(model).filter_by(**kwargs).count()
  92. def exists(self, model, **kwargs):
  93. return self.session.query(model).filter_by(**kwargs).exists()
  94. def join(self, model1, model2):
  95. return self.session.query(model1).join(model2)
  96. def join_ex(self, model1, model2):
  97. return self.session.query(model1, model2)
  98. def first(self, query):
  99. return query.first()
  100. def commit(self):
  101. self.session.commit()
  102. def close(self):
  103. self.session.commit()
  104. self.session.close()
  105. def get_primary_key(self, model):
  106. for col in model.__table__.columns:
  107. if col.primary_key:
  108. return col.name
  109. def get_unique_keys(self, model) -> list[str]:
  110. ''' 获取唯一键列 '''
  111. # 1.从列中unique属性获取列名
  112. unique = [c.name for c in model.__table__.columns if c.unique]
  113. # 2.从__table_args__自定义索引(Index)中获取unique的列名
  114. idxs = list(model.__table__.indexes)
  115. for idx in idxs:
  116. if not idx.unique:
  117. continue
  118. for c in idx.columns:
  119. unique.append(c.name)
  120. return unique
  121. def execute_sql(self, sqlstr) -> list[tuple]:
  122. return self.session.execute(text(sqlstr)).fetchall()
  123. def read_from(self, sql) -> pd.DataFrame:
  124. '''
  125. 查询sql语句,必须使用text()
  126. pandas 需要connection,不是engine
  127. '''
  128. return pd.read_sql_query(text(sql), self.session.bind.connect())
  129. def read_model(self, model) -> pd.DataFrame:
  130. '''
  131. 查询sql语句,必须使用text()
  132. pandas 需要connection,不是engine
  133. '''
  134. return pd.read_sql(self.session.query(model).statement, self.session.bind.connect())

demo

  1. def demo():
  2. sql_helper = SqlHelper()
  3. count = sql_helper.count_by(Address)
  4. addr0 = Address()
  5. addr0.address = f"浙江杭州_{count + 1}"
  6. sql_helper.add(addr0)
  7. # addr = sql_helper.query(Address).first()
  8. user = User()
  9. user.addr_id = addr0.id
  10. user.age = 33
  11. user.name = f"lisi_{count + 1}"
  12. sql_helper.add(user)
  13. users = sql_helper.query_by(User, User.name == 'Tom').all()
  14. users = sql_helper.query(User).filter(User.age > 30).all()
  15. users = sql_helper.query(User).filter(User.name.like('a%')).all()
  16. tom = sql_helper.get_by(User, name='Tom')
  17. count = sql_helper.count_by(User)
  18. exists = sql_helper.exists(User, age=30)
  19. first_user = sql_helper.first(sql_helper.query(User))
  20. query_result = sql_helper.join_ex(Address, User).filter(User.addr_id == Address.id).all()
  21. for address, user in query_result:
  22. print(user.id, user.name, address.id, address.address)
  23. # query_result = sql_helper.query(User).join(Address).filter(User.addr_id == Address.id).all()
  24. query_result = sql_helper.query(User).join(Address).all()
  25. for result in query_result:
  26. print(result)
  27. # 连接
  28. author1 = Author(name='Jam')
  29. author2 = Author(name='Sam')
  30. sql_helper.add(author1)
  31. sql_helper.add(author2)
  32. book1 = Book(title="C Language", author_id=author1.id)
  33. book2 = Book(title="C++ Language", author_id=author1.id)
  34. book3 = Book(title="Python", author_id=author2.id)
  35. book4 = Book(title="JavaScript", author_id=author2.id)
  36. sql_helper.add(book1)
  37. sql_helper.add(book2)
  38. sql_helper.add(book3)
  39. sql_helper.add(book4)
  40. # n--1 连接
  41. books = sql_helper.query(Book).join(Author).all()
  42. for book in books:
  43. print(f'Book: {book.title} Author:{book.author.name}')
  44. authors = sql_helper.query(Author).join(Book).all()
  45. for author in authors:
  46. for book in author.books:
  47. print(f'Author: {author.name} Book: {book.title}')
  48. # 1--n 连接
  49. department = Department(name='Sales')
  50. sql_helper.add(department)
  51. employee1 = Employee(name='John', department=department, remark='test1111')
  52. employee2 = Employee(name='Jane', department=department, remark='test2222')
  53. sql_helper.add(employee1)
  54. sql_helper.add(employee2)
  55. dept = sql_helper.query(Department).filter_by(name='Sales').first()
  56. employees = dept.employees
  57. for employee in employees:
  58. print(employee.name)
  59. # 单条件批量更新
  60. rst = sql_helper.update_by(Employee, Employee.name == 'John', {Employee.remark: employee1.remark})
  61. sql_helper.commit()
  62. # 多条件批量更新
  63. rst = sql_helper.update_by(Employee, or_filters(Employee.name == 'John', Employee.name == 'Jane'),
  64. {Employee.remark: employee2.remark})
  65. sql_helper.commit()
  66. print('------------------------------------------------------------')
  67. # 复合唯一索引的存在更新否则插入
  68. code1 = Code(symbol='SA309', exchange='CZCE', remark='test11111')
  69. code2 = Code(symbol='SA309', exchange='CZCE', remark='test22222')
  70. # sql_helper.add(code1)
  71. sql_helper.upsert(code1)
  72. sql_helper.upsert(code2)
  73. print('------------------------------------------------------------')
  74. # or 查询
  75. users = sql_helper.query_or(User, User.age > 55, User.age < 30, User.age == 44, User.name == 'lisi_40').all()
  76. for user in users:
  77. print(user)
  78. print('------------------------------------------------------------')
  79. # and 查询
  80. users = sql_helper.query_by(User, User.name == 'lisi_6', User.age > 45)
  81. for user in users:
  82. print(user)
  83. print('------------------------------------------------------------')
  84. # 多条件嵌套查询 or + and
  85. users = sql_helper.query_by_func(User, and_filters(or_filters(User.age < 30, User.age > 60),
  86. User.name == 'lisi_6')).all()
  87. for user in users:
  88. print(user)
  89. print('------------------------------------------------------------')
  90. result = sql_helper.execute_sql('select * from users;')
  91. for item in result:
  92. print(item)
  93. print('------------------------------------------------------------')
  94. df = sql_helper.read_model(User)
  95. print(df.head())
  96. print('------------------------------------------------------------')
  97. df = sql_helper.read_from('select * from users;')
  98. print(df.tail())
  99. print('------------------------------------------------------------')
  100. result = sql_helper.execute_sql('select * from employee;')
  101. for item in result:
  102. print(item)
  103. print('------------------------------------------------------------')
  104. sql_helper.close()
  105. if __name__ == '__main__':
  106. print(sqlalchemy.__version__)
  107. demo()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/584782
推荐阅读
相关标签
  

闽ICP备14008679号