当前位置:   article > 正文

flask实现mysql连接池_flask 使用数据库连接池

flask postgetsql 链接池

使用原生sql语句

目录

初级阶段

制定一个函数

import pymysql

from flask import Flask

app = Flask(__name__)

def fetchall(sql):

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')

cursor = conn.cursor()

cursor.execute(sql)

result = cursor.fetchall()

cursor.close()

conn.close()

return result

@app.route('/login')

def login():

result = fetchall('select * from user')

return 'login'

@app.route('/index')

def index():

result = fetchall('select * from user')

return 'xxx'

@app.route('/order')

def order():

result = fetchall('select * from user')

return 'xxx'

if __name__ == '__main__':

app.run()

在修改

使用的是同一个连接

import pymysql

from flask import Flask

app = Flask(__name__)

CONN = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')

def fetchall(sql):

cursor = CONN.cursor()

cursor.execute(sql)

result = cursor.fetchall()

cursor.close()

return result

@app.route('/login')

def login():

result = fetchall('select * from user')

return 'login'

@app.route('/index')

def index():

result = fetchall('select * from user')

return 'xxx'

@app.route('/order')

def order():

result = fetchall('select * from user')

return 'xxx'

if __name__ == '__main__':

app.run()

数据库连接池

安装

pip3 install dbutils

pip3 install pymysql

使用

import pymysql

from DBUtils.PooledDB import PooledDB

POOL = PooledDB(

creator=pymysql, # 使用链接数据库的模块

maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数

mincached=2, # 初始化时,链接池中至少创建的链接,0表示不创建

blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错

ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always

host='127.0.0.1',

port=3306,

user='root',

password='222',

database='cmdb',

charset='utf8'

)

# 去连接池中获取一个连接

conn = POOL.connection()

cursor = conn.cursor()#游标

cursor.execute('select * from web_models_disk')#放入语法

result = cursor.fetchall()#获取结果

cursor.close()

# 将连接放会到连接池

conn.close()

print(result)

多线程测试

import pymysql

from DBUtils.PooledDB import PooledDB

POOL = PooledDB(

creator=pymysql, # 使用链接数据库的模块

maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 用才会有

mincached=2, # 初始化时,链接池中至少创建的链接,0表示不创建

blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错

ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always

host='127.0.0.1',

port=3306,

user='root',

password='222',

database='cmdb',

charset='utf8'

)

def task(num):

# 去连接池中获取一个连接

conn = POOL.connection()

cursor = conn.cursor()

# cursor.execute('select * from web_models_disk')

cursor.execute('select sleep(3)')

result = cursor.fetchall()

cursor.close()

# 将连接放会到连接池

conn.close()

print(num,'------------>',result)

from threading import Thread

for i in range(57):

t = Thread(target=task,args=(i,))

t.start()

flask 使用基于函数实现sqlhelper

封装的数据池

注意

我们在用python操作数据库的时候,经常会碰见两个函数:fetchone()和fetchall()

刚开始学习的时候可能会搞不清楚他们两个的区别

其实非常简单

首先fetchone()函数它的返回值是单个的元组,也就是一行记录,如果没有结果,那就会返回null

其次是fetchall()函数,它的返回值是多个元组,即返回多个行记录,如果没有结果,返回的是()

import pymysql

from DBUtils.PooledDB import PooledDB

POOL = PooledDB(

creator=pymysql, # 使用链接数据库的模块

maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数

mincached=2, # 初始化时,链接池中至少创建的链接,0表示不创建

blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错

ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always

host='127.0.0.1',

port=3306,

user='root',

password='222',

database='cmdb',

charset='utf8'

)

def fetchall(sql,*args):

""" 获取所有数据 """

conn = POOL.connection()

cursor = conn.cursor()

cursor.execute(sql,args)#评级

result = cursor.fetchall()

cursor.close()

conn.close()

return result

def fetchone(sql, *args):

""" 获取单条数据 """

conn = POOL.connection()

cursor = conn.cursor()

cursor.execute(sql, args)

result = cursor.fetchone()

cursor.close()

conn.close()

return result

flask文件中使用

from flask import Flask

import sqlhelper#导入文件

app = Flask(__name__)

@app.route('/login')

def login():

result = sqlhelper.fetchone('select * from web_models_admininfo where username=%s ','wupeiqi')

print(result)

return 'login'

@app.route('/index')

def index():

result = sqlhelper.fetchall('select * from web_models_disk')

print(result)

return 'xxx'

if __name__ == '__main__':

app.run()

基于类实现sqlhelper

单列模式

import pymysql

from DBUtils.PooledDB import PooledDB

class SqlHelper(object):

def __init__(self):

self.pool = PooledDB(

creator=pymysql, # 使用链接数据库的模块

maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数

mincached=2, # 初始化时,链接池中至少创建的链接,0表示不创建

blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错

ping=0,

# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always

host='127.0.0.1',

port=3306,

user='root',

password='222',

database='cmdb',

charset='utf8'

)

def open(self):

conn = self.pool.connection()

cursor = conn.cursor()

return conn,cursor

def close(self,cursor,conn):

cursor.close()

conn.close()

def fetchall(self,sql, *args):

""" 获取所有数据 """

conn,cursor = self.open()

cursor.execute(sql, args)

result = cursor.fetchall()

self.close(conn,cursor)

return result

def fetchone(self,sql, *args):

""" 获取所有数据 """

conn, cursor = self.open()

cursor.execute(sql, args)

result = cursor.fetchone()

self.close(conn, cursor)

return result

db = SqlHelper()

使用

from flask import Flask

from sqlhelper2 import db

app = Flask(__name__)

@app.route('/login')

def login():

# db.fetchone()

return 'login'

@app.route('/index')

def index():

# db.fetchall()

return 'xxx'

@app.route('/order')

def order():

# db.fetchall()

conn,cursor = db.open()

# 自己做操作

db.close(conn,cursor)

return 'xxx'

if __name__ == '__main__':

app.run()

上下文管理

拿到obj对象先执行__enter__方法 返回值给到f 在执行 with'后面语句

在执行__exit__

class Foo(object):

def __enter__(self):#第一步

return 123

def __exit__(self, exc_type, exc_val, exc_tb):#第3步

pass

obj = Foo()

with obj as f:

print(f)#第2步

class Foo(object):

def do_somthing(self):

pass

def close(self):

pass

class Context:

def __enter__(self):

self.data = Foo()

return self.data

def __exit__(self, exc_type, exc_val, exc_tb):

self.data.close()

with Context() as ctx:

ctx.do_somthing()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/496603
推荐阅读
相关标签
  

闽ICP备14008679号