赞
踩
目录:
homework
homework
static
templates
login.html
utils
utils.py
views
account.py
index.py
__init__.py
manage.py
settings.py
manage.py
from homework import create_app
app = create_app()
if __name__ == '__main__':
app.run()
settings.py
class Config(object):
SALT = b'asdasff'
SECRET_KEY = 'dasfakdfjfwwuiew'
__init__.py
from flask import Flask
from .views.account import account
from .views.index import ind
def create_app():
app = Flask(__name__)
app.config.from_object('settings.Config')
app.register_blueprint(account)
app.register_blueprint(ind)
return app
utils.py
import hashlib
from settings import Config
def md5(arg):
hash = hashlib.md5(Config.SALT)
hash.update(bytes(arg, encoding='utf-8'))
return hash.hexdigest()
account.py
from flask import Blueprint, render_template,request,redirect,session from ..utils.utils import md5 account = Blueprint('account', __name__) # 登录 @account.route('/login/', methods=['GET', 'POST']) def login(): if request.method == 'GET': return render_template('login.html') user = request.form.get('user') pwd = request.form.get('pwd') pwd_md5 = md5(pwd) import pymysql conn = pymysql.Connect( host='127.0.0.1', user='root', password='123456', database='admin', charset='utf8' ) cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) sql = 'select id,nickname from userinfo where user=%s and pwd=%s' cursor.execute(sql, (user, pwd_md5)) data = cursor.fetchone() cursor.close() conn.close() if not data: return render_template('login.html', error='用户名或密码错误') session['userinfo'] = data return redirect('/home/') # 注销 @account.route('/logout/') def logout(): if 'userinfo' in session: del session['userinfo'] return redirect('/login/')
index.py
from flask import Blueprint, redirect, session
ind = Blueprint('ind', __name__)
# 判断是否登录
@ind.before_request
def process_request():
if not session.get('userinfo'):
return redirect('/login/')
return None
# 登陆后才能看到
@ind.route('/home/')
def home():
return 'home主页'
如果我们在前端利用form表单提交文件,那么后台拿到的是一个文件对象
file_obj = request.files.get('起的名') file_obj.filename # 文件名 file_obj.stream # 文件内容,打印出来是内存地址 file_obj.save(file_obj.filename) # 把上传的文件保存(读stream) # 如果上传的是压缩文件,调用shutil模块进行解压,再对内部的文件进行操作。但是会多一步读取的操作,因此我们可以把上传的文件不保存直接解压到指定目录 import shutil import uuid 目标目录 = os.path.join('files', str(uuid.uuid4())) # 不重复命名 shutil._unpack_zipfile(file_obj.stream, '目标目录') # 遍历目录 # os.listdir() 只能列出目录下的一级,还要自己写递归 # 因此,我们使用 os.walk()来遍历,得到如下的一些数据: (当前的路径,路径下的文件夹, 路径下的文件) total_num = 0 # 初始行数 for base_path, folder_list, file_list in os.walk(目标目录): for file_name in file_list: # 拿到每个文件的路径 file_path = os.path.join(base_path, file_name) file_ext = file_path.rsplit('.',maxsplit=1) if len(file_ext) != 2: continue if file_ext[1] != 'py': continue file_num = 0 with open(file_path,'rb') as f: for line in f: line = line.strip() if not line: continue if line.startswith(b'#'): continue file_num += 1 total_num += file_num # 代码量统计
配置文件中app.config['MAX_CONTENT_LENGTH']=1024*1024*7
可以控制上传文件的大小,这里限制最大为7M
LocalProxy
本质DATA = { 'request': { 'method': 'GET', 'form': {} }, 'session': { 'user': 'shj' } } class LocalProxy(object): def __init__(self, key): self.key = key def get_dict(self): return DATA[self.key] def __str__(self): return '__str__' def __getattr__(self, item): data_dict = self.get_dict() return data_dict[item] def __getitem__(self, item): data_dict = self.get_dict() return data_dict[item] def __add__(self, other): return other + 1 request = LocalProxy('request') session = LocalProxy('session') print(request.method) # GET print(session.user) # shj
LocalProxy
from flask import Flask, request # from flask.globals import _request_ctx_stack app = Flask(__name__) @app.route('/index/') def index(): # print(_request_ctx_stack.top.request.method) print(request.method) return 'Index' if __name__ == '__main__': app.run() """ 第一阶段:请求到来 app.__call__ app.wsgi_app # 将request和session相关数据封装到ctx中 ctx = RequestContext(self,environ) # self是app对象,environ是请求相关的原始数据 ctx.request = Request(environ) ctx.session = None # 通过LocalStack将ctx添加到Local中 ctx.push() __storage__={ 唯一身份标识: {'stack':[ctx(request,session)]} } 第二阶段:视图函数获取request或session 方式一:直接找LocalStack获取 from flask.globals import _request_ctx_stack _request_ctx_stack=LocalStack() print(_request_ctx_stack.top.request.method) 方式二:通过代理,直接导入request request是LocalProxy对象 request.method触发__getattr__()方法 session['xxx']触发__getitem__() session['xxx']=xxx 触发__setitem__() """
app/g
# 程序启动时,先把flask的全局变量加载一遍,里面有如下两个变量 from flask import globals _request_ctx_stack = LocalStack() _app_ctx_stack = LocalStack() request = LocalProxy(偏函数) session = LocalProxy(偏函数) current_app = LocalProxy(_find_app) g=LocalProxy(partial(_lookup_app_object, 'g')) # 请求到来时 对数据进行封装 ctx = RequestContext(request, session) app_ctx = AppContext(app, g) 保存数据 1、将包含了(app, g)数据的app_ctx对象,利用_app_ctx_stack将app_ctx对象添加到Local中 即:storage={ 1231:{stack:[app_ctx(app, g), ]} } 2、将包含了request, session数据的ctx对象,利用_request_ctx_stack,将ctx添加到Local中 即:storage={ 1231:{stack:[ctx(request, session), ]} } # 视图函数处理 request和session都是去请求上下文(ctx=RequestContext())中获取值 current_app和g都是去app上下文(app_ctx=AppContext())获取值
一次请求即产生,请求结束即销毁
前面:@app.brfore_request:g.x=1
后面:print(g.x)
是安全的,因为线程数据存储是按照线程的唯一标识作为键值存储的。
WTForms是一个支持多个web框架的form组件,主要用于对用户请求数据进行验证
安装:pip3 install wtforms
用户登录:
当用户登录的时候,需要对用户提交的用户名和密码进行多种格式校验。如:
用户名不能为空;用户名长度限制;密码限制等。
app.py
#!/usr/bin/env python # -*- coding:utf-8 -*- from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class LoginForm(Form): name = simple.StringField( label='用户名', validators=[ validators.DataRequired(message='用户名不能为空.'), validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d') ], widget=widgets.TextInput(), render_kw={'class': 'form-control'} ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.'), validators.Length(min=8,max=12 message='用户名长度必须大于%(min)d,必须小于%(max)d'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control', 'placeholder': '请输入密码' } ) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): print('用户提交数据通过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('login.html', form=form) if __name__ == '__main__': app.run()
login.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登录</h1> <form method="post" novalidate> <!--<input type="text" name="name">--> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <!--<input type="password" name="pwd">--> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html>
用户注册
注册页面需要让用户输入:用户名、密码、确认密码、性别、爱好等。
app.py
from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class RegisterForm(Form): name = simple.StringField( label='用户名', validators=[ validators.DataRequired(message='用户名不能为空.') ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='alex' ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重复密码', validators=[ validators.DataRequired(message='重复密码不能为空.'), validators.EqualTo('pwd', message="两次密码输入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='邮箱', validators=[ validators.DataRequired(message='邮箱不能为空.'), validators.Email(message='邮箱格式错误') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性别', choices=( (1, '男'), (2, '女'), ), coerce=int # int(转换类型) ) city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) hobby = core.SelectMultipleField( label='爱好', choices=( (1, '篮球'), (2, '足球'), ), coerce=int # 类型转换 ) favor = core.SelectMultipleField( label='喜好', choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] # 默认选中 ) def __init__(self, *args, **kwargs): super(RegisterForm, self).__init__(*args, **kwargs) self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球')) def validate_pwd_confirm(self, field): """ 自定义pwd_confirm字段规则,例:与pwd字段是否一致 :param field: :return: """ # 最开始初始化时,self.data中已经有所有的值 if field.data != self.data['pwd']: # raise validators.ValidationError("密码不一致") # 继续后续验证 raise validators.StopValidation("密码不一致") # 不再继续后续验证 @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': form = RegisterForm(data={'gender': 1}) return render_template('register.html', form=form) else: form = RegisterForm(formdata=request.form) if form.validate(): print('用户提交数据通过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('register.html', form=form) if __name__ == '__main__': app.run()
register.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用户注册</h1> <form method="post" novalidate style="padding:0 50px"> {% for item in form %} <p>{{item.label}}: {{item}} {{item.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>
helper.py
import pymysql from DBUtils.PooledDB import PooledDB, SharedDBConnection import pymysql POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123456', database='s9day119', charset='utf8' ) def connect(type): conn = POOL.connection() cursor = conn.cursor(cursor=type) return conn,cursor def connect_close(conn,cursor): cursor.close() conn.close() def fetch_all(sql,args,type=pymysql.cursors.DictCursor): conn,cursor = connect(type) cursor.execute(sql, args) record_list = cursor.fetchall() connect_close(conn,cursor) return record_list def fetch_one(sql, args): conn, cursor = connect() cursor.execute(sql, args) result = cursor.fetchone() connect_close(conn, cursor) return result def insert(sql, args): conn, cursor = connect() row = cursor.execute(sql, args) conn.commit() connect_close(conn, cursor) return row
基本使用.py
from flask import Flask,request,render_template,session,current_app,g,redirect from wtforms import Form from wtforms.fields import simple from wtforms.fields import html5 from wtforms.fields import core from wtforms import widgets from wtforms import validators app = Flask(__name__) import helper class UserForm(Form): city = core.SelectField( label='城市', choices=(), # 这里直接从数据库拿,只有第一次才刷新,后续数据库有修改时不会再去数据库拿,因此写一个__init__方法 coerce=int ) name = simple.StringField(label='姓名') def __init__(self,*args,**kwargs): super(UserForm,self).__init__(*args,**kwargs) self.city.choices=helper.fetch_all('select id,name from tb1',[],type=None) @app.route('/user') def user(): if request.method == "GET": #form = UserForm(data={'name':'alex','city':3}) form = UserForm() return render_template('user.html',form=form) if __name__ == '__main__': app.run()
user.html
<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>Title</title> <meta name="viewport" content="width=device-width, initial-scale=1"> </head> <body> <form method="post"> {% for field in form %} <p>{{field.label}}: {{field}} {{field.errors[0]}}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>
如果我们forms认证的字段是从数据库拿的,后续更新不会同步,解决方法同wtforms一样
from django import forms
from django.forms import fields
class IndexForm(forms.Form):
title = fields.CharField()
group = fields.ChoiceField(
choices = ()
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields['group'].choices=models.USerGroup.object.all().value_list('id, 'title)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。