赞
踩
DBUtils 是一套用于管理 数据库 "连接池" 的Python包,为 "高频度、高并发" 的数据库访问提供更好的性能,可以自动管理连接对象的创建和释放。并允许对非线程安全的数据库接口进行线程安全包装和连接。该连接可在各种多线程环境中使用。
使用场景:如果使用的是流行的对象关系映射器 SQLObject 或 SQLAlchemy 之一,则不需要 DBUtils,因为它们带有自己的连接池。SQLObject 2 (SQL-API) 实际上是从 DBUtils 中借用了一些代码,将池化分离到一个单独的层中。
DBUtils 提供两种外部接口:
另外,实际使用的数据库驱动也有所依赖,比如SQLite数据库只能使用PersistentDB作连接池。 下载地址:http://www.webwareforpython.org/downloads/DBUtils/
安装:pip install DBUtils
连接池对象只初始化一次,一般可以作为模块级代码来确保。 PersistentDB 的连接例子:
- import DBUtils.PersistentDB
-
- # maxusage 则为一个连接最大使用次数
- persist = DBUtils.PersistentDB.PersistentDB(dbpai=MySQLdb,maxusage=1000,**kwargs)
-
- # 获取连接池
- conn = persist.connection()
-
- # 关闭连接池
- conn.close()
参数 dbpai 指定使用的数据库模块,兼容 DB-API 。下面是支持 DB-API 2 规范的数据库模块
pip install pymysql(mysql)
pip install pymssql(sqlserver)
pip install cx_Oracle(oracle)
pip install phoenixdb(hbase)
pip install sqlite3(sqlite3 python自带)
DBUtils 仅提供给了连接池管理,实际的数据库操作依然是由符合 DB-API 2 标准的目标数据库模块完成的。
PooledDB 使用方法同 PersistentDB,只是参数有所不同。
conn = pooled.connection()
cur = conn.cursor()
cur.execute(sql)
res = cur.fetchone()
cur.close() # 或者 del cur
conn.close() # 或者 del conn
- import pymysql
- from dbutils.pooled_db import PooledDB
-
- # 定义连接参数
- pool = PooledDB(
- creator=pymysql,
- maxconnections=6,
- mincached=2,
- maxcached=5,
- blocking=True,
- host='localhost',
- user='root',
- passwd='123456',
- db='mydb',
- port=3306,
- charset='utf8mb4'
- )
-
-
- def main():
- # 从连接池获取连接
- conn = pool.connection()
- cursor = conn.cursor()
-
- # 执行 SQL 语句
- sql = "SELECT * FROM students"
- cursor.execute(sql)
- result = cursor.fetchall()
-
- # 处理查询结果
- for row in result:
- print(row)
-
- # 关闭游标和连接
- cursor.close()
- conn.close()
-
-
- if __name__ == '__main__':
- main()

- """
- 使用DBUtils数据库连接池中的连接,操作数据库
- """
- import json
- import pymysql
- import datetime
- from DBUtils.PooledDB import PooledDB
- import pymysql
-
-
- class MysqlClient(object):
- __pool = None;
-
- def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
- maxusage=100, setsession=None, reset=True,
- host='127.0.0.1', port=3306, db='test',
- user='root', passwd='123456', charset='utf8mb4'):
- """
- :param mincached:连接池中空闲连接的初始数量
- :param maxcached:连接池中空闲连接的最大数量
- :param maxshared:共享连接的最大数量
- :param maxconnections:创建连接池的最大数量
- :param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
- :param maxusage:单个连接的最大重复使用次数
- :param setsession:optional list of SQL commands that may serve to prepare
- the session, e.g. ["set datestyle to ...", "set time zone ..."]
- :param reset:how connections should be reset when returned to the pool
- (False or None to rollback transcations started with begin(),
- True to always issue a rollback for safety's sake)
- :param host:数据库ip地址
- :param port:数据库端口
- :param db:库名
- :param user:用户名
- :param passwd:密码
- :param charset:字符编码
- """
-
- if not self.__pool:
- self.__class__.__pool = PooledDB(pymysql,
- mincached, maxcached,
- maxshared, maxconnections, blocking,
- maxusage, setsession, reset,
- host=host, port=port, db=db,
- user=user, passwd=passwd,
- charset=charset,
- cursorclass=pymysql.cursors.DictCursor
- )
- self._conn = None
- self._cursor = None
- self.__get_conn()
-
- def __get_conn(self):
- self._conn = self.__pool.connection();
- self._cursor = self._conn.cursor();
-
- def close(self):
- try:
- self._cursor.close()
- self._conn.close()
- except Exception as e:
- print(e)
-
- def __execute(self, sql, param=()):
- count = self._cursor.execute(sql, param)
- print(count)
- return count
-
- @staticmethod
- def __dict_datetime_obj_to_str(result_dict):
- """把字典里面的datatime对象转成字符串,使json转换不出错"""
- if result_dict:
- result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}
- result_dict.update(result_replace)
- return result_dict
-
- def select_one(self, sql, param=()):
- """查询单个结果"""
- count = self.__execute(sql, param)
- result = self._cursor.fetchone()
- """:type result:dict"""
- result = self.__dict_datetime_obj_to_str(result)
- return count, result
-
- def select_many(self, sql, param=()):
- """
- 查询多个结果
- :param sql: qsl语句
- :param param: sql参数
- :return: 结果数量和查询结果集
- """
- count = self.__execute(sql, param)
- result = self._cursor.fetchall()
- """:type result:list"""
- [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
- return count, result
-
- def execute(self, sql, param=()):
- count = self.__execute(sql, param)
- return count
-
- def begin(self):
- """开启事务"""
- self._conn.autocommit(0)
-
- def end(self, option='commit'):
- """结束事务"""
- if option == 'commit':
- self._conn.autocommit()
- else:
- self._conn.rollback()
-
-
- if __name__ == "__main__":
- mc = MysqlClient()
- sql1 = 'SELECT * FROM shiji WHERE id = 1'
- result1 = mc.select_one(sql1)
- print(json.dumps(result1[1], ensure_ascii=False))
-
- sql2 = 'SELECT * FROM shiji WHERE id IN (%s,%s,%s)'
- param = (2, 3, 4)
- print(json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False))

- import MySQLdb
- conn= MySQLdb.connect(host='localhost',user='root',passwd='pwd',db='myDB',port=3306)
- #import pymysql
- #conn = pymysql.connect(host='localhost', port='3306', db='game', user='root', password='123456', charset='utf8')
- cur=conn.cursor()
- SQL="select * from table1"
- r=cur.execute(SQL)
- r=cur.fetchall()
- cur.close()
- conn.close()
- import MySQLdb
- from DBUtils.PooledDB import PooledDB
- #5为连接池里的最少连接数
- pool = PooledDB(MySQLdb,5,host='localhost',user='root',passwd='pwd',db='myDB',port=3306)
-
- # 以后每次需要数据库连接就是用connection()函数获取连接就好了
- conn = pool.connection()
- cur=conn.cursor()
- SQL="select * from table1"
- r=cur.execute(SQL)
- r=cur.fetchall()
- cur.close()
- conn.close()
- import sys
- import threading
- import MySQLdb
- import DBUtils.PooledDB
-
- connargs = { "host":"localhost", "user":"user1", "passwd":"123456", "db":"test" }
- def test(conn):
- try:
- cursor = conn.cursor()
- count = cursor.execute("select * from users")
- rows = cursor.fetchall()
- for r in rows: pass
- finally:
- conn.close()
-
- def testloop():
- print ("testloop")
- for i in range(1000):
- conn = MySQLdb.connect(**connargs)
- test(conn)
-
- def testpool():
- print ("testpool")
- pooled = DBUtils.PooledDB.PooledDB(MySQLdb, **connargs)
- for i in range(1000):
- conn = pooled.connection()
- test(conn)
-
- def main():
- t = testloop if len(sys.argv) == 1 else testpool
- for i in range(10):
- threading.Thread(target = t).start()
-
- if __name__ == "__main__":
- main()

虽然测试方式不是很严谨,但从测试结果还是能感受到 DBUtils 带来的性能提升。当然,我们我们也可以在 testloop() 中一直重复使用一个不关闭的 Connection,但这却不适合实际开发时的情形。
Flask之配置文件,蓝图,数据库连接池,上下文原理:https://www.cnblogs.com/yunweixiaoxuesheng/p/8418135.html
方式一,使用字典方式配置
app.config['SESSION_COOKE_NAME'] = 'session_liling'
方式二,引入文件,设置
from flask import Flask
app = Flask(__name__)
app.config.from_pyfile('settings.py') # 引用settings.py中的AAAA
print(app.config['AAAA']) # 123# settings.py
AAAA = 123
方法三,使用环境变量设置,推荐使用
from flask import Flask
app = Flask(__name__)
import os
os.environ['FLASK-SETTINGS'] = 'settings.py'
app.config.from_envvar('FLASK-SETTINGS')
方式四,通过对象方式导入使用,可根据不同环境选择不同的配置,推荐使用
from flask import Flask
app = Flask(__name__)
app.config.from_object('settings.BaseConfig')
print(app.config['NNNN']) # 123
# settings.pyclass BaseConfig(object): # 公用配置
NNNN = 123
class TestConfig(object):
DB = '127.0.0.1'
class DevConfig(object):
DB = '192.168.1.1'
class ProConfig(object):
DB = '47.18.1.1'
- from flask import Flask,current_app
-
- app = Flask(__name__)
-
- app.secret_key = 'adfadsfhjkhakljsdfh'
-
- app.config.from_object('settings.BaseConfig')
-
- @app.route('/index')
- def index():
- print(current_app.config['NNNN'])
- return 'xxx'
-
- if __name__ == '__main__':
- app.run()
- from flask import Flask,current_app
-
- app = Flask(__name__,instance_path=None,instance_relative_config=False)
- # 默认 instance_relative_config = False 那么 instance_relative_config和instance_path都不会生效
- # instance_relative_config=True,instance_path才会生效,app.config.from_pyfile('settings.py')将会失效
- # 配置文件找的路径,按instance_path的值作为配置文件路径
- # 默认instance_path=None,None会按照当前路径下的instance文件夹为配置文件的路径
- # 如果设置路径,按照设置的路径查找配置文件。
-
- app.config.from_pyfile('settings.py')
-
- @app.route('/index')
- def index():
- print(current_app.config['NNNN'])
- return 'xxx'
-
-
- if __name__ == '__main__':
- app.run()

对应用程序的目录结构进行分配,一般适用于小中型企业
代码:
- # crm/__init__.py
- # 创建flask项目,用蓝图注册不同的模块
-
- from flask import Flask
- from .views import account
- from .views import order
-
- app = Flask(__name__)
-
- app.register_blueprint(account.account)
- app.register_blueprint(order.order)
-
- ------------------------------------------------------------------------
- # manage.py
- # 启动文件
- import crm
-
- if __name__ == '__main__':
- crm.app.run()
-
- ------------------------------------------------------------------------
- # crm/views/account.py
- # 视图函数模块,Blueprint,将函数引入app
- from flask import Blueprint
-
- account = Blueprint('account',__name__,url_prefix='/xxx')
-
- @account.route('/login')
- def login():
- return 'Login'

:https://www.cnblogs.com/TheLand/p/9178305.html
ORM(Object-Relational Mapping,对象关系映射)是一种编程技术,用于在关系型数据库和面向对象编程语言之间建立映射关系。它允许开发人员使用面向对象的方式来操作数据库,而无需直接编写或执行 SQL 查询。
ORM 提供了一个抽象层,将数据库表格映射为对象,并提供了一组方法和工具,以便于进行数据库的增删改查操作。开发人员可以通过使用对象和方法来表示和操作数据,而不必关心底层的 SQL 语句和数据库细节。
常见的 ORM 框架包括:
使用 ORM 的好处包括:
注意:ORM 并不能解决所有数据库问题。在某些情况下,复杂的查询和性能要求可能需要直接使用原生 SQL。因此,根据具体的需求和场景,谨慎选择和使用合适的 ORM 框架。
解决方法:
数据库连接池避免每次操作都要连接数据库,一直使用一个连接,多线程也会出现问题,可加锁,但变为串行
- import pymysql
- import threading
- from threading import RLock
-
- LOCK = RLock()
- CONN = pymysql.connect(
- host='127.0.0.1',
- port=3306,
- user='root',
- password='123',
- database='ok1',
- charset='utf8'
- )
-
-
- def task(arg):
- with LOCK:
- cursor = CONN.cursor()
- cursor.execute('select * from book')
- result = cursor.fetchall()
- cursor.close()
- print(result)
-
-
- for i in range(10):
- t = threading.Thread(target=task, args=(i,))
- t.start()

"本地线程" 可以实现线程之间的数据隔离。保证每个线程都只有自己的一份数据,在操作时不会影响别人的,即使是多线程,自己的值也是互相隔离的
- import threading
- import time
-
- # 本地线程对象
- local_values = threading.local()
-
-
- def func(num):
- """
- # 第一个线程进来,本地线程对象会为他创建一个
- # 第二个线程进来,本地线程对象会为他创建一个
- {
- 线程1的唯一标识:{name:1},
- 线程2的唯一标识:{name:2},
- }
- :param num:
- :return:
- """
- local_values.name = num # 4
- # 线程停下来了
- time.sleep(2)
- # 第二个线程: local_values.name,去local_values中根据自己的唯一标识作为key,获取value中name对应的值
- print(local_values.name, threading.current_thread().name)
-
-
- for i in range(5):
- th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
- th.start()

基于threading.local实现创建每个连接。
每个线程会创建一个连接,该线程没有真正关闭。
再次调用该线程时,还是使用原有的连接。
线程真正终止的时候,连接才会关闭。
- from DBUtils.PersistentDB import PersistentDB
- import pymysql
-
- POOL = PersistentDB(
- creator=pymysql, # 使用链接数据库的模块
- maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
- setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
- ping=0,
- # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
- closeable=False,
- # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
- threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
- host='127.0.0.1',
- port=3306,
- user='root',
- password='123',
- database='pooldb',
- charset='utf8'
- )
-
-
- def func():
- # conn = SteadyDBConnection()
- conn = POOL.connection()
- cursor = conn.cursor()
- cursor.execute('select * from tb1')
- result = cursor.fetchall()
- cursor.close()
- conn.close() # 不是真的关闭,而是假的关闭。 conn = pymysql.connect() conn.close()
-
- conn = POOL.connection()
- cursor = conn.cursor()
- cursor.execute('select * from tb1')
- result = cursor.fetchall()
- cursor.close()
- conn.close()
-
- import threading
-
- for i in range(10):
- t = threading.Thread(target=func)
- t.start()

创建一个连接池,为所有线程提供连接,线程使用连接时获取连接,使用完毕放回连接池。
线程不断地重用连接池里的连接。
- import time
- import pymysql
- import threading
- from DBUtils.PooledDB import PooledDB, SharedDBConnection
-
- POOL = PooledDB(
- creator=pymysql, # 使用链接数据库的模块
- maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
- mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
-
- maxcached=5, # 链接池中最多闲置的链接,0和None不限制
- maxshared=3,
- # 链接池中最多共享的链接数量,0和None表示全部共享。
- # PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,
- # _maxcached永远为0,所以永远是所有链接都共享。
- blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
- maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
- setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
- # ping MySQL服务端,检查是否服务可用。
- # 如:0 = None = never, 1 = default = whenever it is requested,
- # 2 = when a cursor is created, 4 = when a query is executed, 7 = always
- ping=0,
- host='127.0.0.1',
- port=3306,
- user='root',
- password='123456',
- database='flask_test',
- charset='utf8'
- )
-
-
- def func():
- # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
- # 否则
- # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
- # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
- # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,
- # 再封装到PooledDedicatedDBConnection中并返回。
- # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
-
- # PooledDedicatedDBConnection
- conn = POOL.connection()
-
- # print(th, '链接被拿走了', conn1._con)
- # print(th, '池子里目前有', pool._idle_cache, '\r\n')
-
- cursor = conn.cursor()
- cursor.execute('select * from userinfo')
- result = cursor.fetchall()
- print(result)
- conn.close()
-
- conn = POOL.connection()
-
- # print(th, '链接被拿走了', conn1._con)
- # print(th, '池子里目前有', pool._idle_cache, '\r\n')
-
- cursor = conn.cursor()
- cursor.execute('select * from userinfo')
- result = cursor.fetchall()
- conn.close()
-
-
- func()

所谓上下文,像考试题目根据上下文,回答一下问题。
程序中,泛指的外部环境,像wsgi来的网络请求,而且通常只有上文。
flask中的上下文,被使用在 current_app,session,request 上。
- from flask import session
-
- try:
- from greenlet import getcurrent as get_ident # grenlet协程模块
- except ImportError:
- try:
- from thread import get_ident
- except ImportError:
- from _thread import get_ident # get_ident(),获取线程的唯一标识
-
-
- class Local(object): # 引用session中的LocalStack下的Local
- __slots__ = ('__storage__', '__ident_func__') # __slots__该类在外面调用时,只能调用定义的字段,其他的不能调用
-
- def __init__(self):
- # object.__setattr__为self设置值,等价于self.__storage__ = {}
- # 为父类object中包含的__steattr__方法中的self.__storage__ = {}
- # 由于类内包含__steattr__,self.xxx(对象.xxx)时会自动会触发__steattr__,
- # 当前__steattr__中storage = self.__storage__又会像self.xxx要值,故会造成递归
- # 所以在父类中__steattr__方法赋值,避免self.xxx调用__setattr__造成的递归
- object.__setattr__(self, '__storage__', {})
-
- object.__setattr__(self, '__ident_func__', get_ident) # 赋值为协程
-
- def __iter__(self):
- return iter(self.__storage__.items())
-
- def __release_local__(self):
- self.__storage__.pop(self.__ident_func__(), None)
-
- def __getattr__(self, name):
- try:
- return self.__storage__[self.__ident_func__()][name]
- except KeyError:
- raise AttributeError(name)
-
- def __setattr__(self, name, value):
- ident = self.__ident_func__() # 获取单钱线程(协程)的唯一标识
- storage = self.__storage__ # {}
- try:
- storage[ident][name] = value # { 111 : {'stack':[] },222 : {'stack':[] } }
- except KeyError:
- storage[ident] = {name: value}
-
- def __delattr__(self, name):
- try:
- del self.__storage__[self.__ident_func__()][name]
- except KeyError:
- raise AttributeError(name)
-
-
- _local = Local() # flask的本地线程功能,类似于本地线程,如果有人创建Local对象并,设置值,每个线程里一份
- _local.stack = [] # _local.stack会调用__setattr__的self.__ident_func__()取唯一标识等

- from flask import session
-
- try:
- from greenlet import getcurrent as get_ident
- except ImportError:
- try:
- from thread import get_ident
- except ImportError:
- from _thread import get_ident # 获取线程的唯一标识 get_ident()
-
-
- class Local(object):
- __slots__ = ('__storage__', '__ident_func__')
-
- def __init__(self):
- # self.__storage__ = {}
- # self.__ident_func__ = get_ident
- object.__setattr__(self, '__storage__', {})
- object.__setattr__(self, '__ident_func__', get_ident)
-
- def __iter__(self):
- return iter(self.__storage__.items())
-
- def __release_local__(self):
- self.__storage__.pop(self.__ident_func__(), None)
-
- def __getattr__(self, name):
- try:
- return self.__storage__[self.__ident_func__()][name]
- except KeyError:
- raise AttributeError(name)
-
- def __setattr__(self, name, value):
- ident = self.__ident_func__() # 获取当前线程(协程)的唯一标识
- storage = self.__storage__ # {}
- try:
- storage[ident][name] = value # { 111:{'stack':[] },222:{'stack':[] } }
- except KeyError:
- storage[ident] = {name: value}
-
- def __delattr__(self, name):
- try:
- del self.__storage__[self.__ident_func__()][name]
- except KeyError:
- raise AttributeError(name)
-
-
- _local = Local()
- _local.stack = []

- from functools import partial
- from flask.globals import LocalStack, LocalProxy
-
- _request_ctx_stack = LocalStack()
-
-
- class RequestContext(object):
- def __init__(self, environ):
- self.request = environ
-
-
- def _lookup_req_object(name):
- top = _request_ctx_stack.top
- if top is None:
- raise RuntimeError(_request_ctx_stack)
- return getattr(top, name)
-
-
- # 实例化了LocalProxy对象,_lookup_req_object参数传递
- session = LocalProxy(partial(_lookup_req_object, 'session'))
-
- """
- local = {
- “标识”: {'stack': [RequestContext(),]}
- }
- """
- _request_ctx_stack.push(RequestContext('c1')) # 当请求进来时,放入
-
- print(session) # 获取 RequestContext('c1'), top方法
- print(session) # 获取 RequestContext('c1'), top方法
- _request_ctx_stack.pop() # 请求结束pop

示例:
- from functools import partial
- from flask.globals import LocalStack, LocalProxy
-
- ls = LocalStack()
-
-
- class RequestContext(object):
- def __init__(self, environ):
- self.request = environ
-
-
- def _lookup_req_object(name):
- top = ls.top
- if top is None:
- raise RuntimeError(ls)
- return getattr(top, name)
-
-
- session = LocalProxy(partial(_lookup_req_object, 'request'))
-
- ls.push(RequestContext('c1')) # 当请求进来时,放入
- print(session) # 视图函数使用
- print(session) # 视图函数使用
- ls.pop() # 请求结束pop
-
-
- ls.push(RequestContext('c2'))
- print(session)
-
- ls.push(RequestContext('c3'))
- print(session)

Flask SQLAlchemy 提供了内置的连接池功能,可以方便地配置和使用。
在Flask应用中,我们可以通过配置 SQLALCHEMY_POOL_SIZE
参数来设置连接池的大小。连接池的大小决定了同时打开的数据库连接的数量。例如,我们可以将连接池的大小设置为10:
app.config['SQLALCHEMY_POOL_SIZE'] = 10
- from flask_sqlalchemy import SQLAlchemy
-
- app = Flask(__name__)
- app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:password@localhost/db_name'
- db = SQLAlchemy(app)
-
- # 创建模型类
- class User(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- name = db.Column(db.String(50))
-
- # 添加数据到数据库
- user = User(name='John')
- db.session.add(user)
- db.session.commit()
-
- # 查询数据
- all_users = User.query.all()
-
- # 更新数据
- user = User.query.filter_by(name='John').first()
- user.name = 'Jane'
- db.session.commit()
-
- # 删除数据
- user = User.query.filter_by(name='Jane').first()
- db.session.delete(user)
- db.session.commit()

Flask 本身并不提供内置的 HTTP 连接池功能,但可以使用第三方库来实现在 Flask 中使用 HTTP 连接池。其中一个常用的库是 urllib3,它提供了高级的连接池管理功能。
示例:在 Flask 中使用 urllib3 来创建和管理 HTTP 连接池:
安装:pip install urllib3
- from flask import Flask
- import urllib3
-
- app = Flask(__name__)
-
- """
- 使用 urllib3.PoolManager() 创建了一个连接池管理器对象 http,然后使用 http.request() 方法发送了一个 GET 请求。
- 您可以根据需要进行配置和自定义,例如设置最大连接数、超时时间、重试策略等。以下是一个示例,展示了如何进行自定义设置:
- """
-
-
- @app.route('/index_1')
- def index_1():
- http = urllib3.PoolManager()
- response = http.request('GET', 'http://api.example.com')
- return response.data
-
-
- """
- 对连接池进行了一些自定义配置,包括最大连接数、每个连接的最大数量、连接和读取的超时时间以及重试策略。
- 使用 urllib3 可以更好地控制和管理 HTTP 连接,提高 Flask 应用程序的性能和效率。
- """
-
-
- @app.route('/index_2')
- def index_2():
- http = urllib3.PoolManager(
- num_pools=10, # 最大连接数
- maxsize=100, # 每个连接的最大数量
- timeout=urllib3.Timeout(connect=2.0, read=5.0), # 连接和读取的超时时间
- retries=urllib3.Retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]) # 重试策略
- )
- response = http.request('GET', 'http://api.example.com')
- return response.data

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。