赞
踩
1、封装
# 导入Flask类
from sqlalchemy import create_engine, text
import pymysql
import traceback
class Db(object):
# 连接库
def base(database):
engine = create_engine(
f"mysql+pymysql://root:root@127.0.0.1:3306/{database}",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# conn = engine.connect()
conn = engine.raw_connection()
return conn
# 查询数据
def query(conn,sql):
try:
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute(sql)
res = cursor.fetchall()
# print(res)
except Exception:
#打印错误信息
traceback.print_exc()
return "出错了"
# print(res)
return res
# 新增,table表,data数据
def inserts(conn, table, data):
# 组装数据
sql = ''
for v1 in data:
if sql == "":
sql = f"`{v1}`='{data[v1]}'"
else:
sql = f"sql,`{v1}`='{data[v1]}'"
insert = f"INSERT INTO {table} set {sql}"
print(insert)
try:
# 执行sql
cursor = conn.cursor(pymysql.cursors.DictCursor)
req = cursor.execute(insert)
conn.commit()
except Exception:
#打印错误信息
traceback.print_exc()
print(str(Exception))
return "出错了"
return req
# 修改,table表,data数据,where条件
def updates(conn, table, data, where):
# 组装数据
sql = ''
for v1 in data:
if sql == "":
sql = f"`{v1}`='{data[v1]}'"
else:
sql = f"sql,`{v1}`='{data[v1]}'"
update = f"UPDATE {table} set {sql} where {where}"
print(update)
try:
cursor = conn.cursor(pymysql.cursors.DictCursor)
req = cursor.execute(update)
conn.commit()
except Exception:
#打印错误信息
traceback.print_exc()
return "出错了"
return 1
#删除,table表,where条件
def deletes(conn, table, where):
delete = f"delete from {table} where {where}"
print(delete)
try:
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute(delete)
conn.commit()
except Exception:
#打印错误信息
traceback.print_exc()
return "出错了"
return 1
# 关闭连接
def close(conn):
conn.close()
2使用
from configs.Db import Db
def query():
conns = Db.base("wbrj_accset")
req = Db.query(conns, "select datasource_id,`sql` from databasesource where databasetype='mysql'")
print(req)
return 1
def add():
conns = Db.base("wbrj_accset")
data = {'user_name': 1}
dd = Db.inserts(conns, "databasesource", data)
print(dd)
return 1
def update():
conns = Db.base("wbrj_accset")
data = {'user_name': 888}
Db.updates(conns, "databasesource", data, "id=11339")
return 1
def delete():
conns = Db.base("wbrj_accset")
Db.deletes(conns, "databasesource", "id=11338")
return 1
# 关闭连接
def close():
conns = Db.base("wbrj_accset")
conns.close()
# 测试
# query()
delete();
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。