当前位置:   article > 正文

SQLAlchemy 增删改查和基础操作_sqlalchemy delete

sqlalchemy delete

1.数据表定义和创建表

        

  1. from sqlalchemy.orm import sessionmaker
  2. from config import setting
  3. from sqlalchemy import Column, String, Integer, Float, DECIMAL, Boolean, DateTime, create_engine,BigInteger
  4. from sqlalchemy.ext.declarative import declarative_base
  5. from sqlalchemy.sql import func
  6. engine = create_engine("mysql+pymysql://{}:{}@{}/{}".format(setting.User, setting.Passwd, setting.Host,setting.DB),
  7. encoding='utf8',
  8. echo=False,
  9. max_overflow=0, # 超过连接池大小外最多创建的连接
  10. pool_size=5, # 连接池大小
  11. pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
  12. pool_recycle=3600, # 多久之后对线程池中的线程进行一次连接的回收(重置)
  13. # pool_pre_ping=True #悲观方式, 每次执行sql钱会检查连接,解决数据库异常回复后连接依然没有恢复问题
  14. )
  15. Base = declarative_base()
  16. session = sessionmaker(bind=engine)()
  17. a = [{'name': '1', 'item_id': '284440448086', 'year': '2007', 'make': 'Honda', 'model': 'Accord', 'trim': 'Special Edition Sedan 4-Door', 'engine': '2.4L 2354CC 144Cu. In. l4 GAS DOHC Naturally Aspirated', 'notes': 'All Trim Levels'},
  18. {'name': '21', 'item_id': '284440448086', 'year': '2007', 'make': 'Honda', 'model': 'Accord', 'trim': 'Special Edition Sedan 4-Door', 'engine': '3.0L 2997CC V6 GAS SOHC Naturally Aspirated', 'notes': 'All Trim Levels'}]
  19. class YtTest(Base):
  20. __tablename__ = 'yt_test'
  21. id = Column(Integer, primary_key=True)
  22. name = Column(String(30), comment='名字')
  23. item_id = Column(String(50), comment='item_id')
  24. year = Column(String(50), comment='year')
  25. make = Column(String(100), comment='make')
  26. model = Column(String(100), comment='model')
  27. trim = Column(String(255), comment='trim')
  28. engine = Column(String(255), comment='engine')
  29. notes = Column(String(255), comment='notes')
  30. create_time = Column(DateTime(timezone=True), server_default=func.now())
  31. # update_time = Column(DateTime, comment='更新时间')
  32. def __init__(self, **items):
  33. for key in items:
  34. if hasattr(self, key):
  35. setattr(self, key, items[key])
  36. addre = [
  37. {'name':'21','address':'盘龙区','city':'四圣市','e_num':'10111','t_id':'12'},
  38. {'name':'21','address':'盘龙区','city':'四圣市','e_num':'10111','t_id':'12'},
  39. ]
  40. class Address(Base):
  41. __tablename__ = 'yt_address'
  42. id = Column(Integer, primary_key=True)
  43. name = Column(String(30), comment='name')
  44. address = Column(String(30), comment='Address')
  45. city = Column(String(50), comment='city')
  46. e_num = Column(String(50), comment='e_num')
  47. t_id = Column(String(50), comment='t_id')
  48. def __init__(self, **items):
  49. for key in items:
  50. if hasattr(self, key):
  51. setattr(self, key, items[key])
  52. # 创建表
  53. # Base.metadata.create_all(engine)

2.新增数据(add_all()/add())

  1. # # 一 新增数据
  2. # # 新增数据 多条数据
  3. # save_data = []
  4. # for i in a:
  5. # report_to_sql = eval('YtTest')(**i)
  6. # save_data.append(report_to_sql)
  7. # session.add_all(save_data)
  8. # session.commit()
  9. # save_data = []
  10. # for i in addre:
  11. # report_to_sql = eval('Address')(**i)
  12. # save_data.append(report_to_sql)
  13. # session.add_all(save_data)
  14. # session.commit()
  15. # # #新增数据 单条数据
  16. # role2 = YtTest(name="李明")
  17. # session.add(role2)
  18. # session.commit()
  19. # print(role2.id) #返回id

3.删除数据

  1. # # 二, 删除 session.delete(i)/session.query(YtTest).delete()
  2. # # 循环删除
  3. user = session.query(YtTest).filter(YtTest.name == "22").all()
  4. for i in user:
  5. session.delete(i)
  6. session.commit()
  7. session.close()
  8. # # 全量删除
  9. session.query(YtTest).filter(YtTest.name=='21r').delete()
  10. session.commit()

4.更新数据

  1. # 三. 更新操作 session.query(YtTest).update()/查询-commit()
  2. # update 更新
  3. session.query(YtTest).filter(YtTest.name == 'stuff').update({YtTest.name: '李明'})
  4. # 3:提交与关闭
  5. session.commit()
  6. # # 更新二
  7. test = session.query(YtTest).filter(YtTest.name == '李明').all()
  8. for i in test:
  9. i.name = '李明1'
  10. # 3:提交与关闭
  11. session.commit()
  12. test = session.query(YtTest).filter(YtTest.name == '李明').first()
  13. test.name = '李明1'
  14. # 3:提交与关闭
  15. session.commit()

5.查询数据

  1. # 四,查询
  2. # filter/filter_by :对应 WHERE ,fillter 可以进行比较运算(==, >, < ...)来对条件进行灵活的运用,不同的条件用逗号分割,
  3. # fillter_by 只能指定参数传参来获取查询结果。
  4. users = session.query(YtTest).filter(YtTest.name == "李明").all()
  5. users = session.query(YtTest).filter_by(name = "李明").all()
  6. print([i.name for i in users])

filter_by()和 filter()的最主要区别:

模块语法        ><(大于和小于) 查询and和or查询
filter_by()直接用属性名,比较=        不支持不支持
filter()用类名.属性名,比较用==        支持支持

filter_by()
       查询 user 表里面名字等于 Tom 的:
       db.session.query(User).filter_by(name='Tom').all()
       查询 user 表里面名字等于 Tom 并且年龄等于 18:
       db.session.query(User).filter_by(name='Tom', age=18).all()
       比如新的需求,查询 user 表里面名字等于 Tom 或者年龄等于 18 的用户,那么 filter_by() 就满足不了要求了

filter()
       查询 user 表里面名字等于 Tom 的:
       db.session.query(User).filter(User.name == 'Tom').all()
       查询 user 表里面名字等于 Tom 并且年龄等于 18:
       db.session.query(User).filter(User.name == 'Tom', User.age == 18).all()
       也可以这样:
       db.session.query(User).filter(User.name == 'Tom').filter(User.age == 18).all()
       如果想使用 and 拼接需要用以下方式:
       db.session.query(User).filter(and_(User.name == 'Tom', User.age == 18)).all()
       以下的方式 and 后面的 User.age == 18 不会生效:
       db.session.query(User).filter(User.name == 'Tom' and User.age == 18).all()
       查询 user 表里面名字等于 Tom 的或者年龄等于 18:
       db.session.query(User).filter(or_(User.name == 'Tom', User.age == 18)).all()
       查询 user 表里面名字等于 Tom 的并且年龄大于 18
       db.session.query(User).filter(User.name == 'Tom', User.age > 18).all()
       查询 name 中包含字母 a 的所有数据(模糊查询)
       db.session.query(User).filter(User.name.like('%{0}%'.format("a"))).all()

以上的例子都是查询中使用比较多的,使用方面看大家喜好,filter_by() 对组合查询等等支持的不是很好,但是语法相对 filter() 简洁一些; 另外 filter() 还有很多其他的查询,大家可以自己去多多探讨。。。

笔记:

现在有 mysql 的 user 数据库表,存储内容如下图,下面通过例子来说明两种查询方式的用法:

在一个用户的模型类中, 要查询id为5的一个用户;

User.query.filter(User.id==5).all()  #  使用filter 中必须指定那个 模型类.id==5; 

User.query.filter(User.id==5).first()  # all() , first() ,都是查询执行器 , 一个返回列表包含对象, 一个直接返回对象;

User.query.filter_by(id=5).first()  # filter_by默认就是使用id  进行查询 , 写法为id=5; 不可以使用==; 也不用模型类.id的方式; 它是一种更加精确的查询;

# users = User.query.filter_by(isdelete=False).all()        用等值符号


# users = User.query.filter(User.isdelete == False,

                   User.phone.startswith('150')).order_by(-User.rdatetime)  用关系符号 升序降序正负号
users = User.query.filter(or_(User.username==search, User.phone == search)).all()  # select * from user where username=search or phone=search
number = User.query.filter(or_(User.username==search,User.phone == search)).count()
users = User.query.filter(or_(User.username.like('%' + search + '%'), User.phone == search)).all()  #返回的是列表
number = User.query.filter(or_(User.username.like(search), User.phone == search)).count() 

查询一些方法

为了更好的理解 SQL 与 SQLalchemy 的写法区别,可以参照以下内容:

  • query :对应 SELECT xxx FROM xxx
  • filter/filter_by :对应 WHERE ,fillter 可以进行比较运算(==, >, < ...)来对条件进行灵活的运用,不同的条件用逗号分割,fillter_by 只能指定参数传参来获取查询结果。
  • limit :对应 limit()
  • order by :对应 order_by()
  • group by :对应 group_by()

like查询

  1. data_like = session.query(YtTest).filter(YtTest.name.like('%李%')).first()
  2. data_like = session.query(YtTest).filter(YtTest.name.notlike('%李%')).first()

is查询

  1. # is_ 相当于 ==
  2. result = session.query(YtTest).filter(YtTest.item_id.is_(None)).all()
  3. result = session.query(YtTest).filter(YtTest.item_id == 'None').all()
  4. # isnot 相当于 !=
  5. result = session.query(YtTest).filter(YtTest.item_id.isnot(None)).all()
  6. result = session.query(YtTest).filter(YtTest.item_id != None).all()
  7. print([i.name for i in result])

正则查询

  1. data_regexp = session.query(YtTest).filter(YtTest.name.op("regexp")(r"^[\u4e00-\u9fa5]+")).all()
  2. print([i.name for i in data_regexp])

统计数量 .count()

data_like_count = session.query(YtTest).filter(YtTest.name.like("李%")).count()

in查询

more_person = session.query(YtTest).filter(YtTest.name.in_(['李明', '21'])).all()

notin 查询

  1. # ~代表取反,转换成sql就是关键字not
  2. more_person = session.query(YtTest).filter(~YtTest.name.in_(['李明', '21'])).all()
  3. # 或 notin_
  4. more_person = session.query(YtTest).filter(~YtTest.name.notin_(['李明', '21'])).all()

and 查询

  1. from sqlalchemy import and_
  2. # and 作用
  3. more_person = session.query(YtTest).filter_by(name = '李明' , year = "2006").all()
  4. more_person = session.query(YtTest).filter(and_(YtTest.name == '李明', YtTest.item_id == "21")).all()
  5. more_person = session.query(YtTest).filter(YtTest.name == '李明' and YtTest.year == "2006").all() #and后面的不起作用
  6. more_person = session.query(YtTest).filter(YtTest.name == '李明' , YtTest.year == "2006").all()

OR 查询

  1. from sqlalchemy import or_
  2. more_person = session.query(YtTest).filter(or_(YtTest.name == '李明' , YtTest.year == "2006")).all()

分组查询

  1. std_group_by = session.query(YtTest).group_by(YtTest.year).all()
  2. # 或是
  3. from sqlalchemy.sql import func
  4. res = session.query(YtTest.year,
  5. func.count(YtTest.year),
  6. ).group_by(YtTest.year).all()
  7. # 遍历查看,已无ed用户记录
  8. for person in std_group_by:
  9. print(person.name)

排序函数  limit 查询  偏移量查询  聚合函数

  1. # 排序查询
  2. std_order_by = session.query(YtTest).order_by(YtTest.year.desc()).all()
  3. # limit 限制数量查询, limit里传入一个整型来约束查看的数量, 当limit里面的参数大于实例表中的数量时,会返回所有的查询结果
  4. data_limit = session.query(YtTest).filter(YtTest.name.notlike("李%")).limit(1).all()
  5. # offset 偏移量查询,offset中传入一个整型,从表中的该位置开始查询,offset可以和limit混用来进行限制
  6. data_like = session.query(YtTest).filter(YtTest.name.like("李%")).offset(1).all()
  7. result = session.query(YtTest).offset(1).limit(6).all()
  8. print([i.name for i in result])
  9. # 聚合函数
  10. from sqlalchemy import func, extract
  11. # count
  12. result = session.query(YtTest.name, func.count(YtTest.id)).group_by(YtTest.name).all()
  13. # sum
  14. result = session.query(YtTest.name, func.sum(YtTest.id)).group_by(YtTest.name).all()
  15. # max
  16. result = session.query(YtTest.name, func.max(YtTest.id)).group_by(YtTest.name).all()
  17. # min
  18. result = session.query(YtTest.name, func.min(YtTest.id)).group_by(YtTest.name).all()
  19. # having
  20. result = session.query(YtTest.name, func.count(YtTest.id)).group_by(YtTest.name).having(func.count(YtTest.id) > 1).all()
  21. print(result)

传参

  1. # 传参
  2. filter = (YtTest.name == '李明')
  3. our_user = session.query(YtTest).filter(filter).first()
  4. print(our_user.name)

  1. from django.db import models
  2. class BookInfo(models.Model):
  3. __tablename__ = 'book_info'
  4. id = models.AutoField(primary_key=True) # 主账号信 AutoField 自增; primary_key 主键
  5. name = models.CharField(default='',max_length=30)
  6. title = models.CharField(default='',max_length=50)
  7. content = models.CharField(default='',max_length=60)
  8. imgs = models.CharField(default='',max_length=30)
  9. author = models.CharField(default='',max_length=50)
  10. def __unicode__(self):
  11. return self.name
  1. @api_view(['GET']) # 获取用户信息功能
  2. def get_jur(request):
  3. user_id = request.GET.get('user_id') # 获取请求所携带的用户uuid
  4. auth = user.objects.filter(id=user_id, state='true').all() # 查询是否是主账户
  5. #返回账号可以创建的子账号数目
  6. if auth[0].user_id == '0':
  7. c_num = '10000'
  8. # elif auth[0].user_id == '4':
  9. # c_num = '10'
  10. elif auth[0].user_id == '':
  11. c_num = '0'
  12. else:
  13. c_num = '10'
  14. if len(auth) != 0:
  15. auth_list = user.objects.filter(t_sid=auth[0].user_id).all() # 查询主账户下的子账户
  16. dic = [model_to_dict(i) for i in auth_list] # 模型转字典
  17. return Response({'res': '0','c_num':c_num, 'data': {'Primary': model_to_dict(auth[0]) # 返回所查询信息
  18. , 'secondary': dic}})
  19. return Response({'res': '2', 'data': '权限不足'}) # 返回错误信息

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/613246
推荐阅读
相关标签
  

闽ICP备14008679号