当前位置:   article > 正文

Flask----第三方Session、WTForms

Flask----第三方Session、WTForms

一、使用第三方组件进行自定制Session

第三方组件将session放到Redis里面,所以我们先安装两个模块并导入

  1. from flask_session import RedisSessionInterface
  2. from redis import Redis

使用的原理:

  1. 在Flask请求初始化时会进行
  2. self.session = self.app.open_session(self.request)
  3. ---------------------------------------------------------------
  4. def open_session(self, request):
  5. return self.session_interface.open_session(self, request)
  6. ---------------------------------------------------------------
  7. session_interface = SecureCookieSessionInterface()
  8. ---------------------------------------------------------------
  9. 当一个请求结束时
  10. self.save_session(ctx.session, response)
  11. ---------------------------------------------------------------
  12. def save_session(self, session, response):
  13. return self.session_interface.save_session(self, session, response)

发现和Session相关的操作都是在SecureCookieSessionInterface()类中进行的,按照这个原理进行自定制Session

  1. # 写法一
  2. conn = Redis() # 初始化Redis
  3. app.session_interface = RedisSessionInterface(conn,key_prefix="__",use_signer=False) # user_signer 数字签名
  4. #写法二
  5. app.config['SESSION_TYPE'] ='redis'
  6. app.config['SESSION_REDIS'] = Redis(host="127.0.0.1",port=8080)
  7. Session(app)

为什么会有写法二?

  1. # 首先将app传到Session中
  2. Session(app)
  3. ---------------------------------
  4. def __init__(self, app=None):
  5. self.app = app
  6. if app is not None: # app存在
  7. self.init_app(app)
  8. def init_app(self, app):
  9. app.session_interface = self._get_interface(app) # 这里进行了对app.session_interface进行赋值
  10. def _get_interface(self, app): # 可以发现支持许多类型的数据库
  11. ......
  12. if config['SESSION_TYPE'] == 'redis': # 这样写的原理
  13. session_interface = RedisSessionInterface(
  14. config['SESSION_REDIS'], config['SESSION_KEY_PREFIX'],
  15. config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
  16. elif config['SESSION_TYPE'] == 'memcached':
  17. .....
  18. elif config['SESSION_TYPE'] == 'filesystem':
  19. .....
  20. elif config['SESSION_TYPE'] == 'mongodb':
  21. .....
  22. elif config['SESSION_TYPE'] == 'sqlalchemy':
  23. .....
  24. else:
  25. session_interface = NullSessionInterface()
  26. return session_interface

RedisSessionInterface源码流程分析

  1. class RedisSessionInterface(SessionInterface):
  2. serializer = pickle
  3. session_class = RedisSession
  4. def __init__(self, redis, key_prefix, use_signer=False, permanent=True):
  5. if redis is None: # 如果没有传redis会自动导入
  6. from redis import Redis
  7. redis = Redis()
  8. self.redis = redis
  9. self.key_prefix = key_prefix
  10. self.use_signer = use_signer
  11. self.permanent = permanent # 持久性和Cookie有关
  12. def open_session(self, app, request): # 当创建Session会调用此法
  13. sid = request.cookies.get(app.session_cookie_name) # 去Cookie中获取Session
  14. if not sid: # 第一次肯定没有
  15. sid = self._generate_sid() # 产生一个随机字符串
  16. return self.session_class(sid=sid, permanent=self.permanent) # 将特殊的字典返回{sid,{}}
  17. if self.use_signer: # 有没有数据签名
  18. signer = self._get_signer(app)
  19. if signer is None:
  20. return None
  21. try:
  22. sid_as_bytes = signer.unsign(sid)
  23. sid = sid_as_bytes.decode()
  24. except BadSignature:
  25. sid = self._generate_sid()
  26. return self.session_class(sid=sid, permanent=self.permanent)
  27. if not PY2 and not isinstance(sid, text_type):
  28. sid = sid.decode('utf-8', 'strict') # 将sid进行解码
  29. val = self.redis.get(self.key_prefix + sid) # 从redis获取值
  30. if val is not None:
  31. try:
  32. data = self.serializer.loads(val) # 将这个值序列化
  33. return self.session_class(data, sid=sid) # 放到字典中并返回
  34. except:
  35. return self.session_class(sid=sid, permanent=self.permanent)
  36. return self.session_class(sid=sid, permanent=self.permanent)
  37. def save_session(self, app, session, response): # 当请求结束时调用此方法
  38. domain = self.get_cookie_domain(app)
  39. path = self.get_cookie_path(app)
  40. if not session:
  41. if session.modified:
  42. self.redis.delete(self.key_prefix + session.sid)
  43. response.delete_cookie(app.session_cookie_name,
  44. domain=domain, path=path)
  45. return
  46. httponly = self.get_cookie_httponly(app)
  47. secure = self.get_cookie_secure(app)
  48. expires = self.get_expiration_time(app, session)
  49. val = self.serializer.dumps(dict(session)) # 将session序列化
  50. self.redis.setex(name=self.key_prefix + session.sid, value=val,
  51. time=total_seconds(app.permanent_session_lifetime)) # 存到redis
  52. if self.use_signer:
  53. session_id = self._get_signer(app).sign(want_bytes(session.sid))
  54. else:
  55. session_id = session.sid
  56. response.set_cookie(app.session_cookie_name, session_id,
  57. expires=expires, httponly=httponly,
  58. domain=domain, path=path, secure=secure) # 将随机字符串放到Cookie中

2、如何使浏览器关闭时cookie自动失效

答案是:将expires=none时浏览器关闭时cookie自动失效。

  1. response.set_cookie(app.session_cookie_name, session_id,expires=none) 当expires=none时浏览器关闭时cookie自动失效
  2. 因为这句话expires = self.get_expiration_time(app, session)
  3. def get_expiration_time(self, app, session):
  4. # 发现和session的permanent持久性有关
  5. if session.permanent:
  6. return datetime.utcnow() + app.permanent_session_lifetime

二、WTForms的使用以及源码解读

WTForm的用法和Djangp的Form组件的用法相差不大。

基本用法:

  1. from wtforms import Form
  2. from wtforms import widgets,validators
  3. from wtforms.fields import simple,core
  4. from flask import Flask,render_template,request
  5. app = Flask(__name__)
  6. class CheckForm(Form): # 必须继承Form
  7. name = simple.StringField(label="用户名", # 标签名
  8. render_kw={"class":"contorl"}, # render_kw代表声明的属性
  9. widget=widgets.TextInput(), # widget表示页面上用什么插件
  10. default="alex", # 默认值
  11. validators=[validators.length(min=2,max=8,message="用户名必须大于2小于8"),validators.DataRequired(message="请填写用户名")]) # 在这里定义各种验证
  12. passwd = simple.PasswordField(label="密码",validators=[validators.DataRequired()]) # 必须要求填值
  13. repasswd = simple.PasswordField(label="重复密码",validators=[validators.equal_to(passwd,message="两次密码不一致")]) # 直接验证两次密码是否相同,用validators.equal_to(字段名,错误信息)
  14. gender = core.RadioField(label="性别", choices=[(1, "男"), (2, "女")], coerce=int) # coerce 是把返回值变成int类型 单选框,choices就是数据源
  15. hobby = core.SelectMultipleField(label="爱好", choices=[(1, "电影"), (2, "ks"),(3, "电影"),(4, "电影")], coerce=int) # 下拉框多选
  16. city = core.SelectField(label="下拉框", choices=[(1, "北京"), (2, "济南")]) # 下拉框
  17. favor = core.SelectMultipleField(label="真爱", choices=[(1, "电影"), (2, "ks")], coerce=int,widget=widgets.ListWidget(prefix_label=False),option_widget=widgets.CheckboxInput())# 复选框
  18. @app.route("/index",methods=["GET","POST"])
  19. def index():
  20. if request.method == "GET":
  21. form = CheckForm(data={"favor":"1"}) # 在这里传入默认值
  22. return render_template("test.html",form=form)
  23. else:
  24. form = CheckForm(formdata=request.form) # 把要验证的数据传入Form中
  25. if form.validate():
  26. print("ok",form.data) # form.data 是验证成功的信息
  27. else:
  28. print(form.errors) # form.errors 是错误信息
  29. return render_template("test.html",form=form)
  30. if __name__ == '__main__':
  31. app.run()

其实还可以通过类来进行数据的更新

  1. class CheckForm(Form):
  2. gender = core.RadioField(label="性别", choices=[(1, "男"), (2, "女")], coerce=int) # coerce 是把返回值变成int类型 单选
  3. def __init__(self,*args,**kwargs):
  4. super().__init__(*args,**kwargs) # 通过父类完成初始化
  5. self.gender.choices = [(1, "男"), (2, "女"), (3, "人妖")] # 在这里进行数据的实时更新,去数据库中拿数据, 通过字段的choices方法进行数据更新

也可以直接用H5元素

  1. from wtforms import Form
  2. from wtforms import widgets,validators
  3. from wtforms.fields import html5,core # 可以引入H5
  4. from flask import Flask, render_template, request
  5. app = Flask(__name__)
  6. class CheckForm(Form):
  7. passwd = html5.IntegerField() # 自动包含正则表达式
  8. @app.route("/index",methods=["GET","POST"])
  9. def index():
  10. if request.method == "GET":
  11. form = CheckForm(data={"favor":"1"}) # 在这里传入默认值
  12. return render_template("test.html",form=form)
  13. if __name__ == '__main__':
  14. app.run()

解读WTForms源码:

  1. from wtforms import Form,widgets
  2. from wtforms.fields import simple
  3. class CheckForm(Form):
  4. name = simple.StringField(label="xxx",widget=widgets.TextInput())

首先CheckForm继承了Form,我们来看Form的源码

  1. class Form(with_metaclass(FormMeta, BaseForm)) # 执行with_metaclass函数
  2. --------------------------------------------------------
  3. with_metaclass函数
  4. def with_metaclass(meta, base=object): # meta=FormMeta, base=BaseForm
  5. return meta("NewBase", (base,), {}) # 相当于 class NewBase(metaclass=FormMeta, BaseForm)
  6. # 会先执行FormMeta的__init__方法
  7. ---------------------------------------------------------
  8. FormMeta的__init__方法
  9. class FormMeta(type):
  10. def __init__(cls, name, bases, attrs): # 这里的cls就是自己创建的类(checkForm)
  11. type.__init__(cls, name, bases, attrs)
  12. cls._unbound_fields = None # cls._unbound_fields = None -->> CheckForm._unbound_fields
  13. cls._wtforms_meta = None # cls._wtforms_meta = None -->> CheckForm._wtforms_meta

关于__new__方法的补充

  1. class Foo:
  2. def __new__(cls, *args, **kwargs): 用于生成对象
  3. # return super().__new__(cls, *args, **kwargs)
  4. return "666"
  5. f = Foo() # 在Python的面向对象的方法中,f的值取决于__new__的返回值
  6. print(f)

当执行类里面的语句时

  1. name = simple.StringField(label="xxx",widget=widgets.TextInput())
  2. # 当创建一个类时先实例化执行meta的__call__方法
  3. # 再找这个类的__new__方法
  4. # 再找这个类的__init__方法
  5. -------------------------------------------------------------------
  6. __new__方法
  7. def __new__(cls, *args, **kwargs):
  8. if '_form' in kwargs and '_name' in kwargs: # 第一次来不满足条件
  9. return super(Field, cls).__new__(cls)
  10. else:
  11. return UnboundField(cls, *args, **kwargs) # 返回一个UnboundField的对象
  12. # cls是simple.StringField这个类,把其传递所有的参数全部传过来
  13. -------------------------------------------------------------------
  14. UnboundField的__init__方法
  15. class UnboundField(object): # 这个类的功能是排序,为避免字典的无序
  16. _formfield = True
  17. creation_counter = 0
  18. def __init__(self, field_class, *args, **kwargs):
  19. UnboundField.creation_counter += 1
  20. self.field_class = field_class
  21. self.args = args
  22. self.kwargs = kwargs
  23. self.creation_counter = UnboundField.creation_counter # 每初始化一次就+1 将值封装到每个对象中

执行完metaclass之后,ChekcForm.name != simple.StringField() 是因为在StringField()的__new__方法返回的并不是一个filed的对象,而是UnBoundField的对象。

当我们实例化这个类时会执行metaclass的__call__方法

  1. FormMeta的__call__方法
  2. def __call__(cls, *args, **kwargs): # cls 是 checkForm
  3. if cls._unbound_fields is None: # 初始化时cls._unbound_fields就为None
  4. fields = []
  5. for name in dir(cls): # dir(cls)会把__dict__的属性所有的key拿到
  6. if not name.startswith('_'): # 对于不是'_'开头的key
  7. unbound_field = getattr(cls, name) # 去CheckForm类中找到对象,此时就是UnBoundField对象,unboundField就封装了(1,simple.StringField,所有的参数)
  8. if hasattr(unbound_field, '_formfield'): # 会发现_formfield的值为True
  9. fields.append((name, unbound_field)) # 将其添加到一个队列
  10. fields.sort(key=lambda x: (x[1].creation_counter, x[0])) # 然后根据其UnBoundField对象的初始化的值进行排序,当出现相同值时会根据第二个值进行排序
  11. cls._unbound_fields = fields # 将其赋值给cls._unbound_fields
  12. if cls._wtforms_meta is None: # 初始化时cls._wtforms_meta就为None
  13. bases = []
  14. for mro_class in cls.__mro__: # 开始遍历Checkform这个类的继承关系
  15. if 'Meta' in mro_class.__dict__: # Meta = DefaultMeta
  16. bases.append(mro_class.Meta)
  17. cls._wtforms_meta = type('Meta', tuple(bases), {}) # 这里创建了一个类 class Meta(DefaultMeta)
  18. return type.__call__(cls, *args, **kwargs)

执行完__call__方法以后此时已经发生变换,此时的 cls._unbound_fields已经是一个列表里面包含了排完序的对象[ (name,UnBoundField对象(simple.StringField,所有的参数)),(pwd,UnBoundField对象(simple.PasswordField,所有的参数))],cls._wtforms_meta也是一个列表

然后继续执行__init__方法

  1. class Form(with_metaclass(FormMeta, BaseForm)):
  2. Meta = DefaultMeta
  3. def __init__(self, formdata=None, obj=None, prefix='', data=None, meta=None, **kwargs):
  4. meta_obj = self._wtforms_meta() # Meta这个类进行实例化
  5. if meta is not None and isinstance(meta, dict): # 如果有自定义的meta进行添加
  6. meta_obj.update_values(meta) # 进行数据的更新
  7. super(Form, self).__init__(self._unbound_fields, meta=meta_obj, prefix=prefix) # self._unbound_fields 是已经排好顺序的对象
  8. for name, field in iteritems(self._fields):
  9. setattr(self, name, field) # 这样做的目的是可以直接打.调用
  10. self.process(formdata, obj, data=data, **kwargs) # 为每个字段设置默认值,以及提供验证的值,由于都是空的就什么都不做
  11. ------------------------------------------------------------------
  12. 执行父类的__init__方法
  13. def __init__(self, fields, prefix='', meta=DefaultMeta()): # fields就是封装好每一个对象
  14. if prefix and prefix[-1] not in '-_;:/.':
  15. prefix += '-'
  16. self.meta = meta
  17. self._prefix = prefix
  18. self._errors = None
  19. self._fields = OrderedDict()
  20. if hasattr(fields, 'items'):
  21. fields = fields.items()
  22. translations = self._get_translations()
  23. extra_fields = []
  24. if meta.csrf: # 在这里可以通过自己在类中创建一个类定义csrf属性,如果meta有csrf,就再生成一条个文本框
  25. self._csrf = meta.build_csrf(self)
  26. extra_fields.extend(self._csrf.setup_form(self)) # 将其添加到后面的对列
  27. for name, unbound_field in itertools.chain(fields, extra_fields): # 将两个列表合并成一个列表进行遍历
  28. options = dict(name=name, prefix=prefix, translations=translations)
  29. field = meta.bind_field(self, unbound_field, options) # unbound_field是一个UnBoundField的对象,此时的field就是一个对象
  30. self._fields[name] = field # 将对象放到列表中
  31. ------------------------------------------------------------------
  32. 执行Meta类的bind_field方法
  33. def bind_field(self, form, unbound_field, options):
  34. return unbound_field.bind(form=form, **options)
  35. -------------------------------------------------------------------
  36. def bind(self, form, name, prefix='', translations=None, **kwargs):
  37. kw = dict(
  38. self.kwargs,
  39. _form=form,
  40. _prefix=prefix,
  41. _name=name,
  42. _translations=translations,
  43. **kwargs
  44. )
  45. return self.field_class(*self.args, **kw) # 这里就是对StringField+括号进行实例化,在初始化进行赋值的类
  46. ------------------------------------------------------------------
  47. self.process 函数
  48. def process(self, formdata=None, obj=None, data=None, **kwargs):
  49. formdata = self.meta.wrap_formdata(self, formdata)
  50. if data is not None:
  51. kwargs = dict(data, **kwargs)
  52. for name, field, in iteritems(self._fields):
  53. if obj is not None and hasattr(obj, name):
  54. field.process(formdata, getattr(obj, name))
  55. elif name in kwargs:
  56. field.process(formdata, kwargs[name])
  57. else:
  58. field.process(formdata)

 通过上面可以知道,请求传过来的值可以用三种格式

  1. ChekcForm(data=request.form) 按照字典的形式取值
  2. ChekcForm(obj=request.form) 打点的方式进行取值
  3. ChekcForm(formdata=request.form) getlist()方式取值

当我们要打印一个字段时进行的操作

  1. print(form.name) # 就会调用StringField(要生成的类)的__str__方法
  2. -------------------------------------------------------------
  3. __str__方法
  4. def __str__(self):
  5. return self() # 对象+括号 调用__call__方法
  6. -------------------------------------------------------------
  7. __call__方法
  8. def __call__(self, **kwargs):
  9. return self.meta.render_field(self, kwargs) # 调用了meta的render_field的方法,self是当前的调用对象(StringField)
  10. -------------------------------------------------------------
  11. def render_field(self, field, render_kw):
  12. other_kw = getattr(field, 'render_kw', None)
  13. if other_kw is not None:
  14. render_kw = dict(other_kw, **render_kw)
  15. return field.widget(field, **render_kw) # field是调用的字段
  16. # 由于widget=widgets.TextInput(),widget已经是对象,再加上括号直接调用插件的__call__方法
  17. -------------------------------------------------------------
  18. 插件的__call__方法
  19. def __call__(self, field, **kwargs):
  20. kwargs.setdefault('id', field.id)
  21. kwargs.setdefault('type', self.input_type)
  22. if 'value' not in kwargs:
  23. kwargs['value'] = field._value()
  24. if 'required' not in kwargs and 'required' in getattr(field, 'flags', []):
  25. kwargs['required'] = True
  26. return HTMLString('<input %s>' % self.html_params(name=field.name, **kwargs)) # 输出

当我们调用validate方法进行验证数据合法性时

  1. def validate(self):
  2. extra = {}
  3. for name in self._fields:
  4. # name=name field=StringField
  5. # name=pwd field=PasswordField
  6. inline = getattr(self.__class__, 'validate_%s' % name, None) # 在自己的类中找以"validate_"开头的方法
  7. if inline is not None:
  8. extra[name] = [inline] # 将所有的钩子函数放到extra里面
  9. return super(Form, self).validate(extra)
  10. -------------------------------------------------------------------
  11. 调用父类的validate方法
  12. def validate(self, extra_validators=None):
  13. self._errors = None
  14. success = True
  15. for name, field in iteritems(self._fields): #
  16. if extra_validators is not None and name in extra_validators:
  17. extra = extra_validators[name]
  18. else:
  19. extra = tuple() # 没有钩子就是空元组
  20. if not field.validate(self, extra): # 在这里进行每一个字段的验证
  21. success = False
  22. return success

以StringField为例子

  1. def validate(self, form, extra_validators=tuple()):
  2. self.errors = list(self.process_errors)
  3. stop_validation = False
  4. try:
  5. self.pre_validate(form) # 这是一个空函数,可以进行自定义
  6. except StopValidation as e:
  7. if e.args and e.args[0]:
  8. self.errors.append(e.args[0])
  9. stop_validation = True
  10. except ValueError as e:
  11. self.errors.append(e.args[0])
  12. if not stop_validation: # 在这里执行验证
  13. chain = itertools.chain(self.validators, extra_validators) # 将所有的验证规则拼接到一起
  14. stop_validation = self._run_validation_chain(form, chain)
  15. try:
  16. self.post_validate(form, stop_validation) # 这里也是一个钩子
  17. except ValueError as e:
  18. self.errors.append(e.args[0])
  19. return len(self.errors) == 0
  20. --------------------------------------------------------------
  21. def _run_validation_chain(self, form, validators): form用户提交的数据
  22. for validator in validators:
  23. try:
  24. validator(form, self) # 执行所有的验证
  25. except StopValidation as e:
  26. if e.args and e.args[0]:
  27. self.errors.append(e.args[0])
  28. return True
  29. except ValueError as e:
  30. self.errors.append(e.args[0])
  31. --------------------------------------------------------------
  32. def pre_validate(self, form): # 这是一个钩子hook
  33. pass

到此,WTForms从实例化再到打印的源码流程结束!!!!,下面说一下钩子们怎么使用。

在使用validate_的钩子函数时需要多个参数接收

  1. def validate_name(self,*args):
  2. for i in args:
  3. print(type(i))
  4. print(i)
  5. >>> <class 'wtforms.fields.core.StringField'>
  6. >>> <input id="name" name="name" required type="text" value="我是输入值">

在使用pre_validate方法和post_validate方法时有一些特殊

  1. from wtforms import Form,validators,widgets,SelectField
  2. class AgeForm(SelectField): # 自定义一个类,需要继承要使用组件的类
  3. widget = widgets.ListWidget(prefix_label=False) # 自定义使用组件
  4. option_widget = widgets.RadioInput()
  5. def pre_validate(self, form): # 钩子函数,不需要返回值
  6. print("*******")
  7. def post_validate(self, form, validation_stopped): # 钩子函数,不需要返回值
  8. print("-------------")
  9. class Check(Form):
  10. name = simple.StringField(label="姓名", validators=[validators.data_required()], widget=widgets.TextInput(),)
  11. pwd = simple.PasswordField(label="密码", validators=[validators.data_required(),], widget=widgets.PasswordInput())
  12. age = AgeForm('Age', validators=[validators.DataRequired()],choices=((1,"2333"),(2,"6666"))) # 初始化自定义的类

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/180974
推荐阅读
相关标签
  

闽ICP备14008679号