一、flask-session
安装flask-session组件
使用流程:
import redis from flask_session import Session app.config["SESSION_TYPE"] = "redis" app.config["SESSION_REDIS"] = redis.Redis(host="", port=6379,password="") Session(app)
二、DBUtils
DBUtils是python用来实现数据库连接的模块。有两种实现方式,一是为每一个线程创建一个连接,二是创建一个连接池。
模式一:
POOL = PersistentDB( creator=pymysql, # 使用链接数据库的模块 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always closeable=False, # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接) threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置 host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): conn = POOL.connection(shareable=False) cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() conn.close() func()
模式二:
import time import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常 # 否则 # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。 # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。 # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。 # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。 conn = POOL.connection() # print(th, '链接被拿走了', conn1._con) # print(th, '池子里目前有', pool._idle_cache, '\r\n') cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() conn.close() func()
Flask中的使用:
class Conf(object): SALT = b"asd" SECRET_KEY = "asdf" POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='ywj971020', database='code_count', charset='utf8' ) conn = Pool.connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) #如果不传参数,默认返回的数据格式是元祖 cursor.execute(sql) data = cursor.fetchone() data_list = cursor.fetchall() cursor.close() conn.close()
三、wtform
https://www.cnblogs.com/wupeiqi/articles/8202357.html
1、用户登录实例
#!/usr/bin/env python # -*- coding:utf-8 -*- from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class LoginForm(Form): name = simple.StringField( label='用户名', validators=[ validators.DataRequired(message='用户名不能为空.'), validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d') ], widget=widgets.TextInput(), render_kw={'class': 'form-control'} ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.'), validators.Length(min=8, message='用户名长度必须大于%(min)d'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): print('用户提交数据通过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('login.html', form=form) if __name__ == '__main__': app.run() app.py
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登录</h1> <form method="post"> <!--<input type="text" name="name">--> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <!--<input type="password" name="pwd">--> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html> login.html
2、用户注册实例
from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class RegisterForm(Form): name = simple.StringField( label='用户名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='alex' ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重复密码', validators=[ validators.DataRequired(message='重复密码不能为空.'), validators.EqualTo('pwd', message="两次密码输入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='邮箱', validators=[ validators.DataRequired(message='邮箱不能为空.'), validators.Email(message='邮箱格式错误') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性别', choices=( (1, '男'), (2, '女'), ), coerce=int ) city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) hobby = core.SelectMultipleField( label='爱好', choices=( (1, '篮球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜好', choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self, *args, **kwargs): super(RegisterForm, self).__init__(*args, **kwargs) self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球')) def validate_pwd_confirm(self, field): """ 自定义pwd_confirm字段规则,例:与pwd字段是否一致 :param field: :return: """ # 最开始初始化时,self.data中已经有所有的值 if field.data != self.data['pwd']: # raise validators.ValidationError("密码不一致") # 继续后续验证 raise validators.StopValidation("密码不一致") # 不再继续后续验证 @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': form = RegisterForm(data={'gender': 1}) return render_template('register.html', form=form) else: form = RegisterForm(formdata=request.form) if form.validate(): print('用户提交数据通过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('register.html', form=form) if __name__ == '__main__': app.run() app.py
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用户注册</h1> <form method="post" novalidate style="padding:0 50px"> {% for item in form %} <p>{{item.label}}: {{item}} {{item.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html> register.html
3、其他
数据库数据实时更新:自定义一个__init__方法,因为静态字段只在程序开始时加载一次,构造方法会实时更新。
设置input框的默认值:在实例化Form对象时传参:data={'字段名' : '默认值'}
四、SQLAlchemy
https://www.cnblogs.com/wupeiqi/articles/8259356.html
SQLAlchemy是一个ORM框架,ORM是关系对象映射(类对应数据库的表,类里的字段对应表中的列,对象对应数据库的行)
1、创建表
import datetime from sqlalchemy import create_engine from sqlalchemy.ext..declarative import declarative_base from sqlalchemy import Column,Datetime Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer,primary_key=True) name = Column(String(32), index=True, nullable=False) email = Column(String(32), unique=True) ctime = Column(Datetime,default=datetime.datetime.now) def create_tables(): engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.create_all(engine) def drop_tables(): engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == "__main__": create_tables() #drop_tables()
2、基本增删改查
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个session session = Session() #1.添加 obj = Users(name='xx') session.add(obj) #session.add_all(Users(name='xx'),User(name='xx')) session.commit() #2.查询 result = session.query(Users).all() result = session.query(Users).filter(Users.id=1).first() #3.删除 session.query(Users).filter(User.id=1).delete() session.commit() #4.修改 session.query(Users).filter(User.id=1).update({'name':'xx'}) session.query(Users).filter(User.id=1).update({Users.name:'xx'})
3.常用查询操作
#查询时只取某些字段 result = session.query(Users.id,Users.name).all() #为查询的字段起别名 result = session.query(Users.name.label('cname')).all() for item in result: print(item.cname) # 条件 ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() #默认的逗号就是and ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() #between ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() #in ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() #not in ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all() #子查询 from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() #默认的and可以不加and_ ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all() #模糊匹配like ret = session.query(Users).filter(Users.name.like('e%')).all() ret = session.query(Users).filter(~Users.name.like('e%')).all() # 切片/分页 ret = session.query(Users)[1:2] # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分组 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 组合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all()
给查询字段起别名:session.query(Users,name.label("n")).all()
4.一对多和多对多
#创建FK字段 class Depart(Base): __tablename__ = 'depart' id = Column(Int, primary_key=True) name = Column(String(32), nullable=False) class Users(Base): __tablename__ = 'users' id = Column(Int, primary_key=True) name = Column(String(32), nullable=False) depart_id = Column(Int, ForeignKey("depart.id")) dp = relationship('Depart', backref='us') #1.查询用户名+所在部门 ret = session.query(Users.name, Depart.name).join(Depart).all() #ret = session.query(Users.name, Depart.name).join(Depart, isouter=True).all() #表示使用left join #2.relation字段 查询用户名+所在部门 ret = session.query(Users).all() for row in ret: print(ret.name, ret.dp.name) #3.relation字段 查找某个部门所有员工 ret = session.query(Depart).filter(Depart.name='销售').first() for row in ret.us: print(row.name) #4.添加一个IT部门,同时增加一名员工 user = Users(name='xxx', dp.name='IT') #自动创建IT部门 session.add(user) session.commit() #5.添加一个IT部门,同时增加多名员工 dep = Depart(name='IT') dep.us = [Users(name=xxx), Users(name=xxx)] session.add(dep) session.commit() 一对多示例
#创建表 class Server2Group(Base): __tablename__ = 'server2group' id = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey('server.id')) group_id = Column(Integer, ForeignKey('group.id')) __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), #创建联合唯一索引 # Index('ix_id_name', 'name', 'extra'), ) class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) # 与生成表结构无关,仅用于查询方便 servers = relationship('Server', secondary='server2group', backref='groups') class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False)
5.两种连接方式
from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) def task(): session = Session() #从连接池中取一个连接 xxxxx session.close() #把连接返回给连接池 from threading import Thread for i in range(20): t = Thread(target=task) t.start()
from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = scoped_session(Session) #这里相当于是基于threading.Local为每个线程创建独立的内存空间,去连接池中取连接,这个连接只有当前的线程可以使用。 def task(): xxxxx session.remove() from threading import Thread for i in range(20): t = Thread(target=task) t.start()
6.原生sql
#查询 cursor = session.execute('select * from xx') result = cursor.fetchall() #添加 cursor = session.execute('insert into xxxxx') session.commit()
conn = engine.raw_connection() cursor = conn.cursor() cursor.execute(xxxx) result = cursor.fetchall() cursor.close() conn.close()