赞
踩
第三方组件将session放到Redis里面,所以我们先安装两个模块并导入
- from flask_session import RedisSessionInterface
- from redis import Redis
使用的原理:
- 在Flask请求初始化时会进行
- self.session = self.app.open_session(self.request)
-
- ---------------------------------------------------------------
- def open_session(self, request):
- return self.session_interface.open_session(self, request)
-
- ---------------------------------------------------------------
-
- session_interface = SecureCookieSessionInterface()
-
- ---------------------------------------------------------------
-
- 当一个请求结束时
- self.save_session(ctx.session, response)
- ---------------------------------------------------------------
- def save_session(self, session, response):
- return self.session_interface.save_session(self, session, response)
发现和Session相关的操作都是在SecureCookieSessionInterface()类中进行的,按照这个原理进行自定制Session
- # 写法一
- conn = Redis() # 初始化Redis
- app.session_interface = RedisSessionInterface(conn,key_prefix="__",use_signer=False) # user_signer 数字签名
-
- #写法二
- app.config['SESSION_TYPE'] ='redis'
- app.config['SESSION_REDIS'] = Redis(host="127.0.0.1",port=8080)
- Session(app)
为什么会有写法二?
- # 首先将app传到Session中
- Session(app)
- ---------------------------------
- def __init__(self, app=None):
- self.app = app
- if app is not None: # app存在
- self.init_app(app)
-
- def init_app(self, app):
- app.session_interface = self._get_interface(app) # 这里进行了对app.session_interface进行赋值
-
- def _get_interface(self, app): # 可以发现支持许多类型的数据库
- ......
- if config['SESSION_TYPE'] == 'redis': # 这样写的原理
- session_interface = RedisSessionInterface(
- config['SESSION_REDIS'], config['SESSION_KEY_PREFIX'],
- config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
- elif config['SESSION_TYPE'] == 'memcached':
- .....
- elif config['SESSION_TYPE'] == 'filesystem':
- .....
- elif config['SESSION_TYPE'] == 'mongodb':
- .....
- elif config['SESSION_TYPE'] == 'sqlalchemy':
- .....
- else:
- session_interface = NullSessionInterface()
-
- return session_interface
RedisSessionInterface源码流程分析
- class RedisSessionInterface(SessionInterface):
-
- serializer = pickle
- session_class = RedisSession
-
- def __init__(self, redis, key_prefix, use_signer=False, permanent=True):
- if redis is None: # 如果没有传redis会自动导入
- from redis import Redis
- redis = Redis()
- self.redis = redis
- self.key_prefix = key_prefix
- self.use_signer = use_signer
- self.permanent = permanent # 持久性和Cookie有关
-
- def open_session(self, app, request): # 当创建Session会调用此法
- sid = request.cookies.get(app.session_cookie_name) # 去Cookie中获取Session
- if not sid: # 第一次肯定没有
- sid = self._generate_sid() # 产生一个随机字符串
- return self.session_class(sid=sid, permanent=self.permanent) # 将特殊的字典返回{sid,{}}
- if self.use_signer: # 有没有数据签名
- signer = self._get_signer(app)
- if signer is None:
- return None
- try:
- sid_as_bytes = signer.unsign(sid)
- sid = sid_as_bytes.decode()
- except BadSignature:
- sid = self._generate_sid()
- return self.session_class(sid=sid, permanent=self.permanent)
-
- if not PY2 and not isinstance(sid, text_type):
- sid = sid.decode('utf-8', 'strict') # 将sid进行解码
- val = self.redis.get(self.key_prefix + sid) # 从redis获取值
- if val is not None:
- try:
- data = self.serializer.loads(val) # 将这个值序列化
- return self.session_class(data, sid=sid) # 放到字典中并返回
- except:
- return self.session_class(sid=sid, permanent=self.permanent)
- return self.session_class(sid=sid, permanent=self.permanent)
-
- def save_session(self, app, session, response): # 当请求结束时调用此方法
- domain = self.get_cookie_domain(app)
- path = self.get_cookie_path(app)
- if not session:
- if session.modified:
- self.redis.delete(self.key_prefix + session.sid)
- response.delete_cookie(app.session_cookie_name,
- domain=domain, path=path)
- return
-
- httponly = self.get_cookie_httponly(app)
- secure = self.get_cookie_secure(app)
- expires = self.get_expiration_time(app, session)
- val = self.serializer.dumps(dict(session)) # 将session序列化
- self.redis.setex(name=self.key_prefix + session.sid, value=val,
- time=total_seconds(app.permanent_session_lifetime)) # 存到redis
- if self.use_signer:
- session_id = self._get_signer(app).sign(want_bytes(session.sid))
- else:
- session_id = session.sid
- response.set_cookie(app.session_cookie_name, session_id,
- expires=expires, httponly=httponly,
- domain=domain, path=path, secure=secure) # 将随机字符串放到Cookie中
2、如何使浏览器关闭时cookie自动失效
答案是:将expires=none时浏览器关闭时cookie自动失效。
- response.set_cookie(app.session_cookie_name, session_id,expires=none) 当expires=none时浏览器关闭时cookie自动失效
-
- 因为这句话expires = self.get_expiration_time(app, session)
-
- def get_expiration_time(self, app, session):
- # 发现和session的permanent持久性有关
- if session.permanent:
- return datetime.utcnow() + app.permanent_session_lifetime
WTForm的用法和Djangp的Form组件的用法相差不大。
基本用法:
- from wtforms import Form
- from wtforms import widgets,validators
- from wtforms.fields import simple,core
- from flask import Flask,render_template,request
-
- app = Flask(__name__)
-
- class CheckForm(Form): # 必须继承Form
- name = simple.StringField(label="用户名", # 标签名
- render_kw={"class":"contorl"}, # render_kw代表声明的属性
- widget=widgets.TextInput(), # widget表示页面上用什么插件
- default="alex", # 默认值
- validators=[validators.length(min=2,max=8,message="用户名必须大于2小于8"),validators.DataRequired(message="请填写用户名")]) # 在这里定义各种验证
-
- passwd = simple.PasswordField(label="密码",validators=[validators.DataRequired()]) # 必须要求填值
-
- repasswd = simple.PasswordField(label="重复密码",validators=[validators.equal_to(passwd,message="两次密码不一致")]) # 直接验证两次密码是否相同,用validators.equal_to(字段名,错误信息)
-
- gender = core.RadioField(label="性别", choices=[(1, "男"), (2, "女")], coerce=int) # coerce 是把返回值变成int类型 单选框,choices就是数据源
-
- hobby = core.SelectMultipleField(label="爱好", choices=[(1, "电影"), (2, "ks"),(3, "电影"),(4, "电影")], coerce=int) # 下拉框多选
-
- city = core.SelectField(label="下拉框", choices=[(1, "北京"), (2, "济南")]) # 下拉框
-
- favor = core.SelectMultipleField(label="真爱", choices=[(1, "电影"), (2, "ks")], coerce=int,widget=widgets.ListWidget(prefix_label=False),option_widget=widgets.CheckboxInput())# 复选框
-
- @app.route("/index",methods=["GET","POST"])
- def index():
- if request.method == "GET":
- form = CheckForm(data={"favor":"1"}) # 在这里传入默认值
- return render_template("test.html",form=form)
- else:
- form = CheckForm(formdata=request.form) # 把要验证的数据传入Form中
- if form.validate():
- print("ok",form.data) # form.data 是验证成功的信息
- else:
- print(form.errors) # form.errors 是错误信息
- return render_template("test.html",form=form)
- if __name__ == '__main__':
- app.run()
其实还可以通过类来进行数据的更新
- class CheckForm(Form):
- gender = core.RadioField(label="性别", choices=[(1, "男"), (2, "女")], coerce=int) # coerce 是把返回值变成int类型 单选
-
- def __init__(self,*args,**kwargs):
- super().__init__(*args,**kwargs) # 通过父类完成初始化
- self.gender.choices = [(1, "男"), (2, "女"), (3, "人妖")] # 在这里进行数据的实时更新,去数据库中拿数据, 通过字段的choices方法进行数据更新
也可以直接用H5元素
- from wtforms import Form
- from wtforms import widgets,validators
- from wtforms.fields import html5,core # 可以引入H5
- from flask import Flask, render_template, request
-
- app = Flask(__name__)
-
- class CheckForm(Form):
- passwd = html5.IntegerField() # 自动包含正则表达式
-
- @app.route("/index",methods=["GET","POST"])
- def index():
- if request.method == "GET":
- form = CheckForm(data={"favor":"1"}) # 在这里传入默认值
- return render_template("test.html",form=form)
-
- if __name__ == '__main__':
- app.run()
解读WTForms源码:
- from wtforms import Form,widgets
- from wtforms.fields import simple
-
- class CheckForm(Form):
- name = simple.StringField(label="xxx",widget=widgets.TextInput())
首先CheckForm继承了Form,我们来看Form的源码
- class Form(with_metaclass(FormMeta, BaseForm)) # 执行with_metaclass函数
-
- --------------------------------------------------------
- with_metaclass函数
-
- def with_metaclass(meta, base=object): # meta=FormMeta, base=BaseForm
- return meta("NewBase", (base,), {}) # 相当于 class NewBase(metaclass=FormMeta, BaseForm)
- # 会先执行FormMeta的__init__方法
- ---------------------------------------------------------
- FormMeta的__init__方法
-
- class FormMeta(type):
- def __init__(cls, name, bases, attrs): # 这里的cls就是自己创建的类(checkForm)
- type.__init__(cls, name, bases, attrs)
- cls._unbound_fields = None # cls._unbound_fields = None -->> CheckForm._unbound_fields
- cls._wtforms_meta = None # cls._wtforms_meta = None -->> CheckForm._wtforms_meta
关于__new__方法的补充
- class Foo:
- def __new__(cls, *args, **kwargs): 用于生成对象
- # return super().__new__(cls, *args, **kwargs)
- return "666"
-
- f = Foo() # 在Python的面向对象的方法中,f的值取决于__new__的返回值
- print(f)
当执行类里面的语句时
- name = simple.StringField(label="xxx",widget=widgets.TextInput())
-
- # 当创建一个类时先实例化执行meta的__call__方法
- # 再找这个类的__new__方法
- # 再找这个类的__init__方法
- -------------------------------------------------------------------
- __new__方法
-
- def __new__(cls, *args, **kwargs):
- if '_form' in kwargs and '_name' in kwargs: # 第一次来不满足条件
- return super(Field, cls).__new__(cls)
- else:
- return UnboundField(cls, *args, **kwargs) # 返回一个UnboundField的对象
- # cls是simple.StringField这个类,把其传递所有的参数全部传过来
- -------------------------------------------------------------------
- UnboundField的__init__方法
-
- class UnboundField(object): # 这个类的功能是排序,为避免字典的无序
- _formfield = True
- creation_counter = 0
-
- def __init__(self, field_class, *args, **kwargs):
- UnboundField.creation_counter += 1
- self.field_class = field_class
- self.args = args
- self.kwargs = kwargs
- self.creation_counter = UnboundField.creation_counter # 每初始化一次就+1 将值封装到每个对象中
执行完metaclass之后,ChekcForm.name != simple.StringField() 是因为在StringField()的__new__方法返回的并不是一个filed的对象,而是UnBoundField的对象。
当我们实例化这个类时会执行metaclass的__call__方法
- FormMeta的__call__方法
- def __call__(cls, *args, **kwargs): # cls 是 checkForm
- if cls._unbound_fields is None: # 初始化时cls._unbound_fields就为None
- fields = []
- for name in dir(cls): # dir(cls)会把__dict__的属性所有的key拿到
- if not name.startswith('_'): # 对于不是'_'开头的key
- unbound_field = getattr(cls, name) # 去CheckForm类中找到对象,此时就是UnBoundField对象,unboundField就封装了(1,simple.StringField,所有的参数)
- if hasattr(unbound_field, '_formfield'): # 会发现_formfield的值为True
- fields.append((name, unbound_field)) # 将其添加到一个队列
- fields.sort(key=lambda x: (x[1].creation_counter, x[0])) # 然后根据其UnBoundField对象的初始化的值进行排序,当出现相同值时会根据第二个值进行排序
- cls._unbound_fields = fields # 将其赋值给cls._unbound_fields
- if cls._wtforms_meta is None: # 初始化时cls._wtforms_meta就为None
- bases = []
- for mro_class in cls.__mro__: # 开始遍历Checkform这个类的继承关系
- if 'Meta' in mro_class.__dict__: # Meta = DefaultMeta
- bases.append(mro_class.Meta)
- cls._wtforms_meta = type('Meta', tuple(bases), {}) # 这里创建了一个类 class Meta(DefaultMeta)
- return type.__call__(cls, *args, **kwargs)
执行完__call__方法以后此时已经发生变换,此时的 cls._unbound_fields已经是一个列表里面包含了排完序的对象[ (name,UnBoundField对象(simple.StringField,所有的参数)),(pwd,UnBoundField对象(simple.PasswordField,所有的参数))],cls._wtforms_meta也是一个列表
然后继续执行__init__方法
- class Form(with_metaclass(FormMeta, BaseForm)):
- Meta = DefaultMeta
-
- def __init__(self, formdata=None, obj=None, prefix='', data=None, meta=None, **kwargs):
- meta_obj = self._wtforms_meta() # Meta这个类进行实例化
- if meta is not None and isinstance(meta, dict): # 如果有自定义的meta进行添加
- meta_obj.update_values(meta) # 进行数据的更新
- super(Form, self).__init__(self._unbound_fields, meta=meta_obj, prefix=prefix) # self._unbound_fields 是已经排好顺序的对象
- for name, field in iteritems(self._fields):
- setattr(self, name, field) # 这样做的目的是可以直接打.调用
- self.process(formdata, obj, data=data, **kwargs) # 为每个字段设置默认值,以及提供验证的值,由于都是空的就什么都不做
-
- ------------------------------------------------------------------
- 执行父类的__init__方法
- def __init__(self, fields, prefix='', meta=DefaultMeta()): # fields就是封装好每一个对象
- if prefix and prefix[-1] not in '-_;:/.':
- prefix += '-'
-
- self.meta = meta
- self._prefix = prefix
- self._errors = None
- self._fields = OrderedDict()
-
- if hasattr(fields, 'items'):
- fields = fields.items()
-
- translations = self._get_translations()
- extra_fields = []
- if meta.csrf: # 在这里可以通过自己在类中创建一个类定义csrf属性,如果meta有csrf,就再生成一条个文本框
- self._csrf = meta.build_csrf(self)
- extra_fields.extend(self._csrf.setup_form(self)) # 将其添加到后面的对列
-
- for name, unbound_field in itertools.chain(fields, extra_fields): # 将两个列表合并成一个列表进行遍历
- options = dict(name=name, prefix=prefix, translations=translations)
- field = meta.bind_field(self, unbound_field, options) # unbound_field是一个UnBoundField的对象,此时的field就是一个对象
- self._fields[name] = field # 将对象放到列表中
- ------------------------------------------------------------------
- 执行Meta类的bind_field方法
- def bind_field(self, form, unbound_field, options):
- return unbound_field.bind(form=form, **options)
- -------------------------------------------------------------------
- def bind(self, form, name, prefix='', translations=None, **kwargs):
- kw = dict(
- self.kwargs,
- _form=form,
- _prefix=prefix,
- _name=name,
- _translations=translations,
- **kwargs
- )
- return self.field_class(*self.args, **kw) # 这里就是对StringField+括号进行实例化,在初始化进行赋值的类
- ------------------------------------------------------------------
- self.process 函数
- def process(self, formdata=None, obj=None, data=None, **kwargs):
- formdata = self.meta.wrap_formdata(self, formdata)
- if data is not None:
- kwargs = dict(data, **kwargs)
- for name, field, in iteritems(self._fields):
- if obj is not None and hasattr(obj, name):
- field.process(formdata, getattr(obj, name))
- elif name in kwargs:
- field.process(formdata, kwargs[name])
- else:
- field.process(formdata)
通过上面可以知道,请求传过来的值可以用三种格式
- ChekcForm(data=request.form) 按照字典的形式取值
- ChekcForm(obj=request.form) 打点的方式进行取值
- ChekcForm(formdata=request.form) getlist()方式取值
当我们要打印一个字段时进行的操作
- print(form.name) # 就会调用StringField(要生成的类)的__str__方法
- -------------------------------------------------------------
- __str__方法
- def __str__(self):
- return self() # 对象+括号 调用__call__方法
- -------------------------------------------------------------
- __call__方法
- def __call__(self, **kwargs):
- return self.meta.render_field(self, kwargs) # 调用了meta的render_field的方法,self是当前的调用对象(StringField)
- -------------------------------------------------------------
- def render_field(self, field, render_kw):
- other_kw = getattr(field, 'render_kw', None)
- if other_kw is not None:
- render_kw = dict(other_kw, **render_kw)
- return field.widget(field, **render_kw) # field是调用的字段
- # 由于widget=widgets.TextInput(),widget已经是对象,再加上括号直接调用插件的__call__方法
- -------------------------------------------------------------
- 插件的__call__方法
- def __call__(self, field, **kwargs):
- kwargs.setdefault('id', field.id)
- kwargs.setdefault('type', self.input_type)
- if 'value' not in kwargs:
- kwargs['value'] = field._value()
- if 'required' not in kwargs and 'required' in getattr(field, 'flags', []):
- kwargs['required'] = True
- return HTMLString('<input %s>' % self.html_params(name=field.name, **kwargs)) # 输出
当我们调用validate方法进行验证数据合法性时
- def validate(self):
- extra = {}
- for name in self._fields:
- # name=name field=StringField
- # name=pwd field=PasswordField
- inline = getattr(self.__class__, 'validate_%s' % name, None) # 在自己的类中找以"validate_"开头的方法
- if inline is not None:
- extra[name] = [inline] # 将所有的钩子函数放到extra里面
- return super(Form, self).validate(extra)
- -------------------------------------------------------------------
- 调用父类的validate方法
- def validate(self, extra_validators=None):
- self._errors = None
- success = True
- for name, field in iteritems(self._fields): #
- if extra_validators is not None and name in extra_validators:
- extra = extra_validators[name]
- else:
- extra = tuple() # 没有钩子就是空元组
- if not field.validate(self, extra): # 在这里进行每一个字段的验证
- success = False
- return success
以StringField为例子
- def validate(self, form, extra_validators=tuple()):
- self.errors = list(self.process_errors)
- stop_validation = False
- try:
- self.pre_validate(form) # 这是一个空函数,可以进行自定义
- except StopValidation as e:
- if e.args and e.args[0]:
- self.errors.append(e.args[0])
- stop_validation = True
- except ValueError as e:
- self.errors.append(e.args[0])
-
- if not stop_validation: # 在这里执行验证
- chain = itertools.chain(self.validators, extra_validators) # 将所有的验证规则拼接到一起
- stop_validation = self._run_validation_chain(form, chain)
-
- try:
- self.post_validate(form, stop_validation) # 这里也是一个钩子
- except ValueError as e:
- self.errors.append(e.args[0])
- return len(self.errors) == 0
- --------------------------------------------------------------
- def _run_validation_chain(self, form, validators): form用户提交的数据
- for validator in validators:
- try:
- validator(form, self) # 执行所有的验证
- except StopValidation as e:
- if e.args and e.args[0]:
- self.errors.append(e.args[0])
- return True
- except ValueError as e:
- self.errors.append(e.args[0])
- --------------------------------------------------------------
- def pre_validate(self, form): # 这是一个钩子hook
- pass
到此,WTForms从实例化再到打印的源码流程结束!!!!,下面说一下钩子们怎么使用。
在使用validate_的钩子函数时需要多个参数接收
- def validate_name(self,*args):
- for i in args:
- print(type(i))
- print(i)
-
- >>> <class 'wtforms.fields.core.StringField'>
- >>> <input id="name" name="name" required type="text" value="我是输入值">
在使用pre_validate方法和post_validate方法时有一些特殊
- from wtforms import Form,validators,widgets,SelectField
-
- class AgeForm(SelectField): # 自定义一个类,需要继承要使用组件的类
- widget = widgets.ListWidget(prefix_label=False) # 自定义使用组件
- option_widget = widgets.RadioInput()
-
- def pre_validate(self, form): # 钩子函数,不需要返回值
- print("*******")
-
- def post_validate(self, form, validation_stopped): # 钩子函数,不需要返回值
- print("-------------")
-
- class Check(Form):
- name = simple.StringField(label="姓名", validators=[validators.data_required()], widget=widgets.TextInput(),)
- pwd = simple.PasswordField(label="密码", validators=[validators.data_required(),], widget=widgets.PasswordInput())
- age = AgeForm('Age', validators=[validators.DataRequired()],choices=((1,"2333"),(2,"6666"))) # 初始化自定义的类
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。