赞
踩
一,创建连接:
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from contextlib import contextmanager from functools import partial # 1.创建连接数据库的引擎,session连接数据库 # engine = create_engine("mysql+pymysql://root:root@172.16.81.129:3306/nms_db", # max_overflow=0, # pool_size=5, # pool_timeout=30, # pool_recycle=120 # ) engine = create_engine("mysql+pymysql://root:root@172.16.81.129:3306/nms_db", poolclass=None) # 2.创建一个配置过的DBSession类 DBSession = sessionmaker(bind=engine) # 3.实例化一个session session = DBSession() # 4.使用session ''' flush: 预提交,提交到数据库文件,还未写入数据库文件中 commit: 提交了一个事务 rollback:回滚 close: 关闭 ''' @contextmanager def session_scope(maker): try: yield session # yield 相当于return 加 中断回执。yield后面的语句会最终回来继续执行 session.commit() except: session.rollback() raise finally: session.close() nms_session_scope = partial(session_scope, maker=session)
二、创建数据库表类(模型):
"""数据库映射""" from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Table, Column, Integer, String, Enum, SmallInteger, Enum, Date, ForeignKey, DateTime, Text import enum Base = declarative_base() class LevelEnum(enum.Enum): critical = '1' important = '2' normal = '3' # 创建数据库表类(模型) class ServerWarningUnrepaired(Base): __tablename__ = 'server_warning_unrepaired' id = Column(Integer, primary_key=True) device_moid = Column(String(40)) device_name = Column(String(128), nullable=True) device_type = Column(String(36), nullable=True) device_ip = Column(String(128), nullable=True) machine_room_moid = Column(String(40)) machine_room_name = Column(String(128), nullable=True) code = Column(Integer) level = Column(Enum(LevelEnum)) description = Column(String(128), nullable=True) start_time = Column(DateTime) resolve_time = Column(DateTime, nullable=True)
三、使用session对数据库表操作:
from dao.mysql.nms import * from dao.mysql import nms_session_scope from sqlalchemy import and_, or_, func, text def init(): with nms_session_scope() as session: # 1.增 data = { 'device_moid': '00c94e09-b7ba-4ce7-9132-c5f5b72415b1', 'device_name': 'xmpu5', 'device_type': 'nds', 'device_ip': '172.16.81.129', 'machine_room_moid': 'mooooooo-oooo-oooo-oooo-defaultmachi', 'machine_room_name': '默认机房', 'code': '2020', 'level': 'important', 'description': '测试', 'start_time': '2020-08-24 15:17:39.000000' } info = ServerWarningUnrepaired( device_moid=data.get('device_moid'), device_name=data.get('device_name'), device_type=data.get('device_type'), device_ip=data.get('device_ip'), machine_room_moid=data.get('machine_room_moid'), machine_room_name=data.get('machine_room_name'), code=data.get('code'), level=data.get('level'), description=data.get('description'), start_time=data.get('start_time'), resolve_time=data.get('resolve_time') ) session.add(info) datas = [{ 'device_moid': '00c94e09-b7ba-4ce7-9132-c5f5b72415b1', 'device_name': 'xmpu5', 'device_type': 'nds', 'device_ip': '172.16.81.129', 'machine_room_moid': 'mooooooo-oooo-oooo-oooo-defaultmachi', 'machine_room_name': '默认机房', 'code': '2020', 'level': 'important', 'description': '测试', 'start_time': '2020-08-24 15:17:39.000000' }, { 'device_moid': '00c94e09-b7ba-4ce7-9132-c5f5b72415b1', 'device_name': 'xmpu5', 'device_type': 'nds', 'device_ip': '172.16.81.129', 'machine_room_moid': 'mooooooo-oooo-oooo-oooo-defaultmachi', 'machine_room_name': '默认机房', 'code': '2020', 'level': 'important', 'description': '添加', 'start_time': '2020-08-24 15:17:39.000000' } ] infos = [] for data in datas: info = ServerWarningUnrepaired( device_moid=data.get('device_moid'), device_name=data.get('device_name'), device_type=data.get('device_type'), device_ip=data.get('device_ip'), machine_room_moid=data.get('machine_room_moid'), machine_room_name=data.get('machine_room_name'), code=data.get('code'), level=data.get('level'), description=data.get('description'), start_time=data.get('start_time'), resolve_time=data.get('resolve_time') ) infos.append(info) session.add_all(infos) # 2.删 session.query(ServerWarningUnrepaired).filter(ServerWarningUnrepaired.id == 30).delete(synchronize_session='fetch') # 3.改 session.query(ServerWarningUnrepaired).filter(ServerWarningUnrepaired.id == 21).first().description = '修改' # 4.查 result = session.query(ServerWarningUnrepaired).all() for i in result: print(i.device_name) result = session.query(ServerWarningUnrepaired).all()[0:3] for i in result: print(i.id) result = session.query(ServerWarningUnrepaired.device_name, ServerWarningUnrepaired.id).all() a = {} for i in result: a[i[0]] = set() # print(a) a[i[0]].add(i[1]) print(a) result = session.query(ServerWarningUnrepaired).first() print(result.device_name) resault = session.query(ServerWarningUnrepaired).order_by(ServerWarningUnrepaired.id) for i in resault: print(i.id) resault = session.query(ServerWarningUnrepaired).order_by(-ServerWarningUnrepaired.id) for i in resault: print(i.id) result = session.query(ServerWarningUnrepaired).filter(ServerWarningUnrepaired.id == 21) for i in result: print(i.device_name) result = session.query(ServerWarningUnrepaired).filter(ServerWarningUnrepaired.device_name != 'modb_10.23.46.39') for i in result: print(i.device_name) result = session.query(ServerWarningUnrepaired).filter(ServerWarningUnrepaired.id.in_([1,2,3])) for i in result: print(i.id) result = session.query(ServerWarningUnrepaired).filter(~ServerWarningUnrepaired.id.in_([1,2,3])) for i in result: print(i.id) result = session.query(ServerWarningUnrepaired).filter(and_(ServerWarningUnrepaired.level == 'critical', ServerWarningUnrepaired.device_type == 'umm')) for i in result: print(i.device_type) result = session.query(ServerWarningUnrepaired).filter(ServerWarningUnrepaired.level == 'critical', ServerWarningUnrepaired.device_type == 'umm') for i in result: print(i.device_type) result = session.query(ServerWarningUnrepaired).filter(or_(ServerWarningUnrepaired.device_type == 'sus', ServerWarningUnrepaired.device_type == 'umm')) for i in result: print(i.device_type) result = session.query(ServerWarningUnrepaired).filter(ServerWarningUnrepaired.device_name.like('%s%')) for i in result: print(i.device_name) result = session.query(ServerWarningUnrepaired).filter(ServerWarningUnrepaired.device_name.like('S%')) for i in result: print(i.device_name) result = session.query(ServerWarningUnrepaired).filter(ServerWarningUnrepaired.device_name.like('%2')) for i in result: print(i.device_name) result = session.query(ServerWarningUnrepaired).count() print(result) result = session.query(func.count('*')).select_from(ServerWarningUnrepaired).scalar() print(result) result = session.query(func.count(ServerWarningUnrepaired.device_name), ServerWarningUnrepaired.device_name).group_by(ServerWarningUnrepaired.device_name).all() print(result) result = session.query(ServerWarningUnrepaired).from_statement(text('select * from server_warning_unrepaired where device_name=:device_name')).params(device_name='10.23.46.39').all() print(result) for i in result: print(i.device_name) if __name__ == "__main__": init()
文档:SQLAlchemy 1.3文档
git链接: git
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。