一 web的一些框架介绍
Flask:短小精悍,内部没有包含多少组件,但是第三方的组件是非常丰富的。
Django:django是一个重武器,内部包含了非常多的组件:orm,form,modelForm,缓存,session等等
Tornado:牛逼之处就是异步非阻塞框架和node.js
二 Flask的快速入门
创建python虚拟环境:virtualenv 虚拟名
安装:pip install flask
什么是werkzeug:Werkzeug是一个WSGI工具包,他可以作为一个Web框架的底层库。官方的介绍说是一个 WSGI 工具包,它可以作为一个 Web 框架的底层库,因为它封装好了很多 Web 框架的东西,例如 Request,Response 等等
from werkzeug.wrappers import Request, Response @Request.application def hello(request): return Response('Hello World!') if __name__ == '__main__': from werkzeug.serving import run_simple run_simple('localhost', 4000, hello)
基本使用:
from flask import Flask app = Flask(__name__) @app.route('/') def hello_world(): return 'Hello World!' if __name__ == '__main__': app.run()
wsgi:使用werkzeug模块实现的,还可以使用wsgiref实现。本质是导入socket实现的。
一旦出现这个,监听事件就开始了
实例化Flask对象
app.run():监听用户的请求,一旦有用户的请求过来,就会直接执行用户的__call__方法。
flask的所有相关的组件全都存在flask文件下的,需要什么导入什么
from flask import Flask,render_template,request,redirect,session,url_for
for_url:高级版的重定向
from flask import Flask,render_template,request,redirect,session,url_for app = Flask(__name__) app.debug=True app.secret_key='fangshao' USERS = { 1:{'name':'张桂坤','age':18,'gender':'男','text':"当眼泪掉下来的时候,是真的累了, 其实人生就是这样: 你有你的烦,我有我的难,人人都有无声的泪,人人都有难言的苦。 忘不了的昨天,忙不完的今天,想不到的明天,走不完的人生,过不完的坎坷,越不过的无奈,听不完的谎言,看不透的人心放不下的牵挂,经历不完的酸甜苦辣,这就是人生,这就是生活。"}, 2:{'name':'主城','age':28,'gender':'男','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"}, 3:{'name':'服城','age':18,'gender':'女','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"}, } @app.route('/') def hello_world(): return 'Hello World!' # @app.route('/index',methods=['GET']) # def index(): # user=session.get('user_info') # print(user) # if user: # return render_template('index.html',data=user,user_dict=USERS) # return redirect('/login') @app.route('/index',methods=['GET']) def index(): user=session.get('user_info') if user: return render_template('index.html',data=user,user_dict=USERS) url=url_for('111') return redirect(url) @app.route('/detail/<int:nid>',methods=['GET']) def detail(nid): user=session.get('user_info') if not user: return redirect('/login') return render_template('detail.html',data=USERS[nid]) @app.route('/login',methods=['GET','POST'],endpoint='111') def login(): if request.method=='POST': user=request.form.get('user') pwd=request.form.get('pwd') if user=='fang' and pwd=='123': session['user_info']=user return redirect('/index') else: return render_template('login.html',msg='用户名或密码错误') return render_template('login.html') if __name__ == '__main__': app.run()
三 配置文件
开放封闭原则:对代码的修改是封闭的,对配置文件的修改时开放的。
app.debug=True:修改过后自动重启项目
app.secret_key='随机设置字符串':全局设置session,Session, Cookies以及一些第三方扩展都会用到,
app.config['debug']=True:修改过后自动重启项目
app.config:获取当前app的所有配置
app.config.from_object:导入文件的一个类,
内部实现原理:导入时,将路径由点分割
配置文件的格式有:
flask中的配置文件是一个flask.config.Config对象(继承字典),默认配置为: { 'DEBUG': get_debug_flag(default=False), 是否开启Debug模式 'TESTING': False, 是否开启测试模式 'PROPAGATE_EXCEPTIONS': None, 'PRESERVE_CONTEXT_ON_EXCEPTION': None, 'SECRET_KEY': None, 'PERMANENT_SESSION_LIFETIME': timedelta(days=31), 'USE_X_SENDFILE': False, 'LOGGER_NAME': None, 'LOGGER_HANDLER_POLICY': 'always', 'SERVER_NAME': None, 'APPLICATION_ROOT': None, 'SESSION_COOKIE_NAME': 'session', 'SESSION_COOKIE_DOMAIN': None, 'SESSION_COOKIE_PATH': None, 'SESSION_COOKIE_HTTPONLY': True, 'SESSION_COOKIE_SECURE': False, 'SESSION_REFRESH_EACH_REQUEST': True, 'MAX_CONTENT_LENGTH': None, 'SEND_FILE_MAX_AGE_DEFAULT': timedelta(hours=12), 'TRAP_BAD_REQUEST_ERRORS': False, 'TRAP_HTTP_EXCEPTIONS': False, 'EXPLAIN_TEMPLATE_LOADING': False, 'PREFERRED_URL_SCHEME': 'http', 'JSON_AS_ASCII': True, 'JSON_SORT_KEYS': True, 'JSONIFY_PRETTYPRINT_REGULAR': True, 'JSONIFY_MIMETYPE': 'application/json', 'TEMPLATES_AUTO_RELOAD': None, }
格式一:
app.config['DEBUG'] = True PS: 由于Config对象本质上是字典,所以还可以使用app.config.update(...)
格式二:
app.config.from_pyfile("python文件名称") 如: settings.py DEBUG = True app.config.from_pyfile("settings.py") app.config.from_envvar("环境变量名称") 环境变量的值为python文件名称名称,内部调用from_pyfile方法 app.config.from_json("json文件名称") JSON文件名称,必须是json格式,因为内部会执行json.loads app.config.from_mapping({'DEBUG':True})
格式三:
app.config.from_object("python类或类的路径") app.config.from_object('pro_flask.settings.TestingConfig') settings.py class Config(object): DEBUG = False TESTING = False DATABASE_URI = 'sqlite://:memory:' class ProductionConfig(Config): DATABASE_URI = 'mysql://user@localhost/foo' class DevelopmentConfig(Config): DEBUG = True class TestingConfig(Config): TESTING = True PS: 从sys.path中已经存在路径开始写
settings.py默认路径要放在当前项目的第一级目录下面
PS: settings.py文件默认路径要放在程序root_path目录,如果instance_relative_config为
True
,则就是instance_path目录
四 路由系统
- @app.route('/user/<username>')
- @app.route('/post/<int:post_id>')
- @app.route('/post/<float:post_id>')
- @app.route('/post/<path:path>')
- @app.route('/login', methods=['GET', 'POST'])
路由比较特殊,:是基于装饰器实现的,但是究其本质还是有add_url_rule实现的。
装饰器可以有多个,放在上面和下面是不同的,
from flask import Flask,render_template,request,redirect,session,url_for app = Flask(__name__) app.debug=True app.secret_key='fangshao' USERS = { 1:{'name':'张桂坤','age':18,'gender':'男','text':"当眼泪掉下来的时候,是真的累了, 其实人生就是这样: 你有你的烦,我有我的难,人人都有无声的泪,人人都有难言的苦。 忘不了的昨天,忙不完的今天,想不到的明天,走不完的人生,过不完的坎坷,越不过的无奈,听不完的谎言,看不透的人心放不下的牵挂,经历不完的酸甜苦辣,这就是人生,这就是生活。"}, 2:{'name':'主城','age':28,'gender':'男','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"}, 3:{'name':'服城','age':18,'gender':'女','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"}, } def get_session(func): def inner(*args,**kwargs): user = session.get('user_info') if not user: return redirect('/login') return func(*args,**kwargs) return inner @app.route('/',endpoint='1111') @get_session def hello_world(): return 'Hello World!' @app.route('/index',methods=['GET'],endpoint='n1') @get_session def index(): # user=session.get('user_info') # print(user) # if user: return render_template('index.html',user_dict=USERS) # return redirect('/login') # @app.route('/index',methods=['GET']) # def index(): # user=session.get('user_info') # if user: # return render_template('index.html',user_dict=USERS) # url=url_for('111') # return redirect(url) @app.route('/detail/<int:nid>',methods=['GET'],endpoint='n2') def detail(nid): user=session.get('user_info') if not user: return redirect('/login') return render_template('detail.html',data=USERS[nid]) @app.route('/login',methods=['GET','POST'],endpoint='111') def login(): if request.method=='POST': user=request.form.get('user') pwd=request.form.get('pwd') if user=='fang' and pwd=='123': session['user_info']=user return redirect('/index') else: return render_template('login.html',msg='用户名或密码错误') return render_template('login.html') if __name__ == '__main__': app.run()
注意:这里加上装饰器,会重名,给他设置一个别名endpoint=‘别名’
functools.wraps(函数):# 帮助我们保存一些设置函数的元信息
from flask import Flask,render_template,request,redirect,session,url_for app = Flask(__name__) app.debug=True app.secret_key='fangshao' USERS = { 1:{'name':'张桂坤','age':18,'gender':'男','text':"当眼泪掉下来的时候,是真的累了, 其实人生就是这样: 你有你的烦,我有我的难,人人都有无声的泪,人人都有难言的苦。 忘不了的昨天,忙不完的今天,想不到的明天,走不完的人生,过不完的坎坷,越不过的无奈,听不完的谎言,看不透的人心放不下的牵挂,经历不完的酸甜苦辣,这就是人生,这就是生活。"}, 2:{'name':'主城','age':28,'gender':'男','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"}, 3:{'name':'服城','age':18,'gender':'女','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"}, } import functools def get_session(func): @functools.wraps(func) def inner(*args,**kwargs): user = session.get('user_info') if not user: return redirect('/login') return func(*args,**kwargs) return inner @app.route('/',endpoint='1111') @get_session def hello_world(): return 'Hello World!' @app.route('/index',methods=['GET']) @get_session def index(): # user=session.get('user_info') # print(user) # if user: return render_template('index.html',user_dict=USERS) # return redirect('/login') # @app.route('/index',methods=['GET']) # def index(): # user=session.get('user_info') # if user: # return render_template('index.html',user_dict=USERS) # url=url_for('111') # return redirect(url) @app.route('/detail/<int:nid>',methods=['GET']) def detail(nid): user=session.get('user_info') if not user: return redirect('/login') return render_template('detail.html',data=USERS[nid]) @app.route('/login',methods=['GET','POST']) def login(): if request.method=='POST': user=request.form.get('user') pwd=request.form.get('pwd') if user=='fang' and pwd=='123': session['user_info']=user return redirect('/index') else: return render_template('login.html',msg='用户名或密码错误') return render_template('login.html') if __name__ == '__main__': app.run()
FBV的写法:
使用装饰器,将括号里面的内容添加到路由中@app.route()
DEFAULT_CONVERTERS = { 'default': UnicodeConverter, 'string': UnicodeConverter, 'any': AnyConverter, 'path': PathConverter, 'int': IntegerConverter, 'float': FloatConverter, 'uuid': UUIDConverter, }
methods=[]:里面写的是请求的方法,支持什么方法都要写上什么方法。
endpoint=别名:为当前的url反向生成一个url,也就是起一个别名
app.add_url_rule('/...'):添加路由的另一种方法
def auth(func): def inner(*args, **kwargs): print('before') result = func(*args, **kwargs) print('after') return result return inner @app.route('/index.html',methods=['GET','POST'],endpoint='index') @auth def index(): return 'Index' 或 def index(): return "Index" self.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"]) or app.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"]) app.view_functions['index'] = index
CBV的写法:
MethodView:API的简单的一种实现方式,class创建的视图类就需要继承它。
class IndexView(views.MethodView): methods=['GET'] decorators=[auth,] def get(self): return 'Index.GET' def post(self): return 'Index.POST'
app.add_url_rule('/路径',view_func='类名'.as_view(name=返回的那个函数)
app.add_url_rule('/index',view_func=IndexView.as_view(name='index'))
对于CBV来说:虽然传进去的是类名,但是最后返回的还是一个函数。
def auth(func): def inner(*args, **kwargs): print('before') result = func(*args, **kwargs) print('after') return result return inner class IndexView(views.View): methods = ['GET'] decorators = [auth, ] def dispatch_request(self): print('Index') return 'Index!' app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint 或 class IndexView(views.MethodView): methods = ['GET'] decorators = [auth, ] def get(self): return 'Index.GET' def post(self): return 'Index.POST' app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint @app.route和app.add_url_rule参数: rule, URL规则 view_func, 视图函数名称 defaults=None, 默认值,当URL中无参数,函数需要参数时,使用defaults={'k':'v'}为函数提供参数 endpoint=None, 名称,用于反向生成URL,即: url_for('名称') methods=None, 允许的请求方式,如:["GET","POST"] strict_slashes=None, 对URL最后的 / 符号是否严格要求, 如: @app.route('/index',strict_slashes=False), 访问 http://www.xx.com/index/ 或 http://www.xx.com/index均可 @app.route('/index',strict_slashes=True) 仅访问 http://www.xx.com/index redirect_to=None, 重定向到指定地址 如: @app.route('/index/<int:nid>', redirect_to='/home/<nid>') 或 def func(adapter, nid): return "/home/888" @app.route('/index/<int:nid>', redirect_to=func) subdomain=None, 子域名访问 from flask import Flask, views, url_for app = Flask(import_name=__name__) app.config['SERVER_NAME'] = 'wupeiqi.com:5000' @app.route("/", subdomain="admin") def static_index(): """Flask supports static subdomains This is available at static.your-domain.tld""" return "static.your-domain.tld" @app.route("/dynamic", subdomain="<username>") def username_index(username): """Dynamic subdomains are also supported Try going to user1.your-domain.tld/dynamic""" return username + ".your-domain.tld" if __name__ == '__main__': app.run()
default:传入函数的参数。就是url后面的那个参数
subdomain={}:创建子域名
window设置域名:hosts而文件下面直接就可以设置了C:\Windows\System32\driver\etc\hosts
mcs系统设置域名:/ect/hosts文件下面就可以设置域名了
from flask import Flask, views, url_for from werkzeug.routing import BaseConverter app = Flask(import_name=__name__) class RegexConverter(BaseConverter): """ 自定义URL匹配正则表达式 """ def __init__(self, map, regex): super(RegexConverter, self).__init__(map) self.regex = regex def to_python(self, value): """ 路由匹配时,匹配成功后传递给视图函数中参数的值 :param value: :return: """ return int(value) def to_url(self, value): """ 使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数 :param value: :return: """ val = super(RegexConverter, self).to_url(value) return val # 添加到flask中 app.url_map.converters['regex'] = RegexConverter @app.route('/index/<regex("\d+"):nid>') def index(nid): print(url_for('index', nid='888')) return 'Index' if __name__ == '__main__': app.run()
五 模板
1、模板的使用:Flask使用的是Jinja2模板,所以其语法和Django无差别
2、自定义模板方法:Flask中自定义模板方法的方式和Bottle相似,创建一个函数并通过参数的形式传入
for循环,并取到索引
{% for k,v in 对象.items()%}
字典的取值方法:v.字段名 v['字段名'] v.get('字段名')
{% endfor %}
函数渲染:不仅要加上括号,还可以加上参数
{{函数名(参数)}} 加上|safe:防止xss攻击
<!DOCTYPE html> <html> <head lang="en"> <meta charset="UTF-8"> <title></title> </head> <body> <h1>自定义函数</h1> {{ww()|safe}} </body> </html>
在后台如果使用
from flask import Flask,render_template app = Flask(__name__) def wupeiqi(): return '<h1>Wupeiqi</h1>' @app.route('/login', methods=['GET', 'POST']) def login(): return render_template('login.html', ww=wupeiqi) app.run()
Markup:后台设置xss攻击
宏定义:就是定义一块html,定义的这一块就是就是一个函数
{% macre 函数名(参数)%} {% endmacre %}
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> {% macro input(name, type='text', value='') %} <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> {% endmacro %} {{ input('n1') }} {% include 'tp.html' %} <h1>asdf{{ v.k1}}</h1> </body> </html>
六 请求和响应
请求和响应都是从flask为念中导入的
request:请求
response:响应
jsonify:响应的数据类型不是字符串类型,就是用这个将响应的内容转成字符串。
from flask import Flask from flask import request from flask import render_template from flask import redirect from flask import make_response app = Flask(__name__) @app.route('/login.html', methods=['GET', "POST"]) def login(): # 请求相关信息 # request.method # request.args # request.form # request.values # request.cookies # request.headers # request.path # request.full_path # request.script_root # request.url # request.base_url # request.url_root # request.host_url # request.host # request.files # obj = request.files['the_file_name'] # obj.save('/var/www/uploads/' + secure_filename(f.filename)) # 响应相关信息 # return "字符串" # return render_template('html模板路径',**{}) # return redirect('/index.html') # response = make_response(render_template('index.html')) # response是flask.wrappers.Response类型 # response.delete_cookie('key') # response.set_cookie('key', 'value') # response.headers['X-Something'] = 'A value' # return response return "内容" if __name__ == '__main__': app.run()
七 Session和Cookie
session在使用前必须要有app.sceret_key加密,就相当于加盐
session['名']=字段:设置session
应用demo:实例,使用装饰器写一个用户认证
思路:装饰器,一个函数可以加上多个装饰器,反向查找的名称不允许重名:endpoint
session.pop:删除一个session
我们这里使用的session是flask内置的使用加密的cookie来保存数据的。
基本使用:
from flask import Flask, session, redirect, url_for, escape, request app = Flask(__name__) @app.route('/') def index(): if 'username' in session: return 'Logged in as %s' % escape(session['username']) return 'You are not logged in' @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': session['username'] = request.form['username'] return redirect(url_for('index')) return ''' <form action="" method="post"> <p><input type=text name=username> <p><input type=submit value=Login> </form> ''' @app.route('/logout') def logout(): # remove the username from the session if it's there session.pop('username', None) return redirect(url_for('index')) # set the secret key. keep this really secret: app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT' 基本使用
自定义session
pip3 install Flask-Session run.py from flask import Flask from flask import session from pro_flask.utils.session import MySessionInterface app = Flask(__name__) app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT' app.session_interface = MySessionInterface() @app.route('/login.html', methods=['GET', "POST"]) def login(): print(session) session['user1'] = 'alex' session['user2'] = 'alex' del session['user2'] return "内容" if __name__ == '__main__': app.run() session.py #!/usr/bin/env python # -*- coding:utf-8 -*- import uuid import json from flask.sessions import SessionInterface from flask.sessions import SessionMixin from itsdangerous import Signer, BadSignature, want_bytes class MySession(dict, SessionMixin): def __init__(self, initial=None, sid=None): self.sid = sid self.initial = initial super(MySession, self).__init__(initial or ()) def __setitem__(self, key, value): super(MySession, self).__setitem__(key, value) def __getitem__(self, item): return super(MySession, self).__getitem__(item) def __delitem__(self, key): super(MySession, self).__delitem__(key) class MySessionInterface(SessionInterface): session_class = MySession container = {} def __init__(self): import redis self.redis = redis.Redis() def _generate_sid(self): return str(uuid.uuid4()) def _get_signer(self, app): if not app.secret_key: return None return Signer(app.secret_key, salt='flask-session', key_derivation='hmac') def open_session(self, app, request): """ 程序刚启动时执行,需要返回一个session对象 """ sid = request.cookies.get(app.session_cookie_name) if not sid: sid = self._generate_sid() return self.session_class(sid=sid) signer = self._get_signer(app) try: sid_as_bytes = signer.unsign(sid) sid = sid_as_bytes.decode() except BadSignature: sid = self._generate_sid() return self.session_class(sid=sid) # session保存在redis中 # val = self.redis.get(sid) # session保存在内存中 val = self.container.get(sid) if val is not None: try: data = json.loads(val) return self.session_class(data, sid=sid) except: return self.session_class(sid=sid) return self.session_class(sid=sid) def save_session(self, app, session, response): """ 程序结束前执行,可以保存session中所有的值 如: 保存到resit 写入到用户cookie """ domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) httponly = self.get_cookie_httponly(app) secure = self.get_cookie_secure(app) expires = self.get_expiration_time(app, session) val = json.dumps(dict(session)) # session保存在redis中 # self.redis.setex(name=session.sid, value=val, time=app.permanent_session_lifetime) # session保存在内存中 self.container.setdefault(session.sid, val) session_id = self._get_signer(app).sign(want_bytes(session.sid)) response.set_cookie(app.session_cookie_name, session_id, expires=expires, httponly=httponly, domain=domain, path=path, secure=secure) 自定义Session
第三方session
from flask import Flask, session, redirect from flask.ext.session import Session app = Flask(__name__) app.debug = True app.secret_key = 'asdfasdfasd' app.config['SESSION_TYPE'] = 'redis' from redis import Redis app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379') Session(app) @app.route('/login') def login(): session['username'] = 'alex' return redirect('/index') @app.route('/index') def index(): name = session['username'] return name if __name__ == '__main__': app.run() 第三方session
八 闪现
flash:向某一个地方设置一个值
category:设置值的分类
get_flashed_messages:从某一个地方获取多个值,并且清除
他们也是基于app.secret_key实现的
from flask import Flask,flash,get_flashed_messages app = Flask(__name__) app.secret_key = 'asdfasdf' @app.route('/get') def get(): # 从某个地方获取设置过的所有值,并清除。 data = get_flashed_messages() print(data) return 'Hello World!' @app.route('/set') def set(): # 向某个地方设置一个值 flash('阿斯蒂芬') return 'Hello World!' if __name__ == '__main__': app.run()
category_filter:只取一类的值
from flask import Flask,flash,get_flashed_messages,request,redirect app = Flask(__name__) app.secret_key = 'asdfasdf' @app.route('/index') def index(): # 从某个地方获取设置过的所有值,并清除。 val = request.args.get('v') if val == 'oldboy': return 'Hello World!' flash('超时错误',category="x1") return "ssdsdsdfsd" # return redirect('/error') @app.route('/error') def error(): """ 展示错误信息 :return: """ data = get_flashed_messages(category_filter=['x1']) if data: msg = data[0] else: msg = "..." return "错误信息:%s" %(msg,) if __name__ == '__main__': app.run()
request.query_string.get('字段'):获取到所有的url
request.args.get('字段'):获取get请求后面的值
什么是闪现:设置不管多少次的值,是基于session实现的,只要一次取到全部的值,并且清除
闪现的用法:应用于临时数据的操作,比如:显示错误信息等等
from flask import Flask,session,Session,flash,get_flashed_messages,redirect,render_template,request app = Flask(__name__) app.secret_key ='sdfsdfsdf' @app.route('/users') def users(): # 方式一 # msg = request.args.get('msg','') # 方式二 # msg = session.get('msg') # if msg: # del session['msg'] # 方式三 v = get_flashed_messages() print(v) msg = '' return render_template('users.html',msg=msg) @app.route('/useradd') def user_add(): # 在数据库中添加一条数据 # 假设添加成功,在跳转到列表页面时,显示添加成功 # 方式一 # return redirect('/users?msg=添加成功') # 方式二 # session['msg'] = '添加成功' # 方式三 flash('添加成功') return redirect('/users') if __name__ == '__main__': app.run(debug=True)
九 蓝图(blueprint)
什么是蓝图:功能分类,还可以定义自己的模板和路由分发。就是为了构造目录的。
蓝图的特性:不仅能将不同功能的app进行分来,并且还可以定义自己的模板路径可以实现路由的分发。可以实现每一个程序构造自己的app。
Blueprint:创建蓝图 ('蓝图名','__name__')
url_prefix:为下面所有函数的使用机上一个前缀,可以为统一的某一类加上前缀
template_folder:定义自己的模板路径(html文件勒颈)
app.register_blueprint:将蓝图注册到app
小型应用程序:示例
大型应用程序:示例
小中型:
manage.py
import fcrm
if __name__ == '__main__': fcrm.app.run()
__init__.py(只要一导入fcrm就会执行__init__.py文件)
from flask import Flask
#导入accout 和order
from fcrm.views import accout from fcrm.views import order app = Flask(__name__) print(app.root_path) #根目录 app.register_blueprint(accout.accout) #吧蓝图注册到app里面,accout.accout是创建的蓝图对象 app.register_blueprint(order.order)
accout.py
from flask import Blueprint,render_template
accout = Blueprint("accout",__name__) @accout.route('/accout') def xx(): return "accout" @accout.route("/login") def login(): return render_template("login.html")
order.py
from flask import Blueprint
order = Blueprint("order",__name__) @order.route('/order') def register(): #注意视图函数的名字不能和蓝图对象的名字一样 return "order
使用蓝图时需要注意的
大型:
十 请求的扩展
类似于django的中间件
@app.defore_request:定制请求函数,每次请求都会执行有这个装饰器的函数,每次都是从上到下执行的
request.url:拿到正要运行的url
@app.after_request:定制响应的函数,每次响应执行这个装饰器的函数,每次都是从下到上执行的
请求哈响应可以有多个,如果请求给拦截了,但是所有的响应都会执行
@app.errorhandler:定制错误的信息。
@ app.template_global:为模板定制函数,也就是自定义模板
@app.before_first_request:只有第一次请求来了才执行这个函数
from flask import Flask,render_template,request,redirect,session,url_for app = Flask(__name__) app.debug = True app.secret_key = 'siuljskdjfs' @app.before_request def process_request1(*args,**kwargs): print('process_request1 进来了') @app.before_request def process_request2(*args,**kwargs): print('process_request2 进来了') @app.after_request def process_response1(response): print('process_response1 走了') return response @app.after_request def process_response2(response): print('process_response2 走了') return response @app.errorhandler(404) def error_404(arg): return "404错误了" @app.before_first_request def first(*args,**kwargs): pass @app.route('/index',methods=['GET']) def index(): print('index函数') return "Index" if __name__ == '__main__': app.run()
基于中间件实现用户的认证登陆
@app.before_request def process_request(*args,**kwargs): if request.path == '/login': return None user = session.get('user_info') if user: return None return redirect('/login')
模板中定制方法:
@app.template_global() def sb(a1, a2): return a1 + a2 {{sb(1,2)}} @app.template_filter() def db(a1, a2, a3): return a1 + a2 + a3 {{ 1|db(2,3)}}
十一 中间件
中间件:在这里就是一个请求的入口
每次请求进来都会执行app的call方法。
from flask import Flask app = Flask(__name__) @app.route('/') def index(): return 'Hello World!' class Md(object): def __init__(self,old_wsgi_app): self.old_wsgi_app = old_wsgi_app def __call__(self, environ, start_response): print('开始之前') ret = self.old_wsgi_app(environ, start_response) print('结束之后') return ret if __name__ == '__main__': app.wsgi_app = Md(app.wsgi_app) app.run()
十二 上下文处理
threading.local:为每一个线程开辟一个单独的内存空间来保存他自己的值
import threading # class Foo(): # def __init__(self): # self.name=0 # # local_value=Foo() local_value=threading.local() def func(num): local_value.name=num import time time.sleep(1) print(local_value.name,threading.current_thread().name) for i in range(20): th=threading.Thread(target=func,args=(i,),name='线程%s'%i) th.start()
request:
情况一:单线程和单进程的情况下不会有问题,应为自己使用为完毕过后,就会自动的清空request。
情况二:单进程和多线程,threading.local对象
情况三:但进程和单线程多个协程,theading.local对象就会出问题。
解决方法:
以后不支持协程:就可以使用内置的threading.local对象
支持协程:自定义类似threading.local对象的功能支持协程
_thread.get_ident:获取线程的一个唯一标识
greenlet.getcurrent:获取协程额唯一标识
自定义支持协程:
""" { 1368:{} } """ import threading try: from greenlet import getcurrent as get_ident # 协程 except ImportError: try: from thread import get_ident except ImportError: from _thread import get_ident # 线程 class Local(): def __init__(self): self.storage={} self.get_ident=get_ident def set(self,k,v): ident=self.get_ident() origin=self.storage.get(ident) if not origin: origin={k:v} else: origin[k]=v self.storage[ident]=origin def get(self,k): ident=self.get_ident() origin=self.storage.get(ident) if not origin: return None return origin.get(k,None) local_values=Local() def task(num): local_values.set('name',num) import time time.sleep(1) print(local_values.get('name'),threading.current_thread().name) for i in range(20): th=threading.Thread(target=task,args=(i,),name='线程%s'%i) th.start()
反射实例:
class Foo(object): def __init__(self): object.__setattr__(self,'storage',{}) def __setattr__(self, key, value): self.storage={'k1:v1'} print(key,value) def __getattr__(self, item): print(item) return 'df' obj=Foo() # obj.x = 123 print(obj)
自定义支持协程的flask:使用反射实现
""" { 1368:{} } """ import flask.globals import threading try: from greenlet import getcurrent as get_ident # 协程 except ImportError: try: from thread import get_ident except ImportError: from _thread import get_ident # 线程 class Local(): def __init__(self): object.__setattr__(self,'__storage__',{}) object.__setattr__(self,'__ident_func__',get_ident) def __setattr__(self, name, value): ident=self.__ident_func__() storage=self.__storage__ try: storage[ident][name]=value except KeyError: storage[ident]={name:value} def __getattr__(self, name): try: return self.__storage__[self.__ident_func__()][name] except KeyError: raise AttributeError(name) def __delattr__(self, name): try: del self.__storage__[self.__ident_finc__()][name] except KeyError: raise AttributeError(name) local_values=Local() def task(num): local_values.name=num import time time.sleep(1) print(local_values.name,threading.current_thread().name) for i in range(20): th=threading.Thread(target=task,args=(i,),name='线程%s'%i) th.start()
补充:
偏函数:functools模块
functools.partial:创建一个新的函数,主要是为了给原函数传入参数
# import functools # # def func(a1,a2): # print(a1+a2) # # # new_func=functools.partial(func,12) # # new_func(21)
面向对象的__add__方法补充:当把面向对象中的所有__函数__实现时,对象做任何操作时,都会执行其中对应的方法。
__add__:谁在前面就会调用谁的__add__方法
# class Foo(): # def __init__(self,num): # self.num=num # def __add__(self,other): # return Foo(self.num+other.num) # obj1=Foo(11) # obj2=Foo(22) # v=obj1+obj2 # print(v)
拼接列表中的值:
itertools.chain:传入函数,产生一个新的函数对象的列表,主要帮助我们做列表元素的拼接
实例1:
from itertools import chain # def a1(x): # return x+1 # # func_list=[a1,lambda x:x+1] # # def a2(y): # return y+10 # # func_list_1=chain([a2],func_list) # for func in func_list_1: # print(func)
实例2:
# v1=[1,2,3] # v2=[4,5,6] # # for l in chain(v1,v2): # chain 主要是做列表的拼接 # print(l)
上下文源码流程:https://www.processon.com/diagraming/5ab8c9f0e4b0d165d5b83fbb
谈谈flask的上下文管理:
- 与django相比是两种不同的实现方式。 - django/tornado是通过传参数形式 - flask是通过上下文管理 两种都可以实现,只不过试下方式不一样。 - 上下文管理: - threading.local/Local类,其中创建了一个字典{greelet做唯一标识:存数据} 保证数据隔离 - 请求进来: - 请求相关所有数据封装到了RequestContext中。 - 再讲RequestContext对象添加到Local中(通过LocalStack将对象添加到Local对象中) - 使用,调用request - 调用此类方法 request.method、print(request)、request+xxx 会执行LocalProxy中对应的方法 - 函数 - 通过LocalStack去Local中获取值。 - 请求终止 - 通过LocalStack的pop方法 Local中将值异常。
补充:
再将对象封装到Local中
Flask可以传入任何的字符串参数
请求上下文:请求上下文封装的就是RequestContext对象
request:是LocalProxy对象
这个对象实例化事传入了一个函数,还有一个request参数
以后执行偏函数partail(_Lookup_req_object,'request')时,自动传递request参数
目标:去Local中获取ctx,然后再在ctx中获取request
ctx.push:里面做的事,将ctx通过LocalStack添加到Local中
session:经过push之后,session里面已经有值了,seif.session帮助我获取session信息
应用上下文:应用上下文封装的是AppContext
AppContext里面封装了app对象
app.context:创建了app对象
app.app_ctx_global_class:相当于一个全局变量
app和g
g:每个请求周期都会创建一个用于在请求周期中传递值的一个容器。只有一次生命周期可用,因为一次请求周期后第二次就会重新创建
print(g):执行g的实例对象
请求到来 ,有人来访问:
# 将请求相关的数据environ封装到了RequestContext对象中 # 再讲对象封装到local中(每个线程/每个协程独立空间存储) # ctx.app # 当前APP的名称 # ctx.request # Request对象(封装请求相关东西) # ctx.session # 空 _request_ctx_stack.local = { 唯一标识:{ "stack":[ctx, ] }, 唯一标识:{ "stack":[ctx, ] }, } # app_ctx = AppContext对象 # app_ctx.app # app_ctx.g _app_ctx_stack.local = { 唯一标识:{ "stack":[app_ctx, ] }, 唯一标识:{ "stack":[app_ctx, ] }, }
使用:print打印request,session.g,current_app的时候,都会执行他们相对应的对象的__str__
他们之间不同的是偏函数不一样
from flask import request,session,g,current_app print(request,session,g,current_app) 都会执行相应LocalProxy对象的 __str__ current_app = LocalProxy(_find_app) request = LocalProxy(partial(_lookup_req_object, 'request')) session = LocalProxy(partial(_lookup_req_object, 'session')) current_app = LocalProxy(_find_app) g = LocalProxy(partial(_lookup_app_object, 'g'))
终止,全部pop
问题:
如果他是多线程的时候是如何实现的。
请求到来的时候就是两个Local,但是没有值,一共只有两个Local,进来一个线程创建自己的唯一标识。不过进来多少的线程,都只用这两个Local
flask的local中保存数据时,使用列表创建出来的栈。为什么用栈?
- 如果写web程序,web运行环境;栈中永远保存1条数据(可以不用栈)。 - 写脚本获取app信息时,可能存在app上下文嵌套关系。 from flask import Flask,current_app,globals,_app_ctx_stack app1 = Flask('app01') app1.debug = False # 用户/密码/邮箱 # app_ctx = AppContext(self): # app_ctx.app # app_ctx.g app2 = Flask('app02') app2.debug = True # 用户/密码/邮箱 # app_ctx = AppContext(self): # app_ctx.app # app_ctx.g with app1.app_context():# __enter__方法 -> push -> app_ctx添加到_app_ctx_stack.local # {<greenlet.greenlet object at 0x00000000036E2340>: {'stack': [<flask.ctx.AppContext object at 0x00000000037CA438>]}} print(_app_ctx_stack._local.__storage__) print(current_app.config['DEBUG']) with app2.app_context(): # {<greenlet.greenlet object at 0x00000000036E2340>: {'stack': [<flask.ctx.AppContext object at 0x00000000037CA438> ]}} print(_app_ctx_stack._local.__storage__) print(current_app.config['DEBUG']) print(current_app.config['DEBUG'])
多app应用:不仅可以使用蓝图进行分发,还可以使用app进行分发
DispatchMiddleware:可以进行路由分发。
在这里面没有app.run,直接可以run_simple启动
from werkzeug.wsgi import DispatcherMiddleware from werkzeug.serving import run_simple from flask import Flask, current_app app1 = Flask('app01') app2 = Flask('app02') @app1.route('/index') def index(): return "app01" @app2.route('/index2') def index2(): return "app2" # http://www.oldboyedu.com/index # http://www.oldboyedu.com/sec/index2 dm = DispatcherMiddleware(app1, { '/sec': app2, }) if __name__ == "__main__": run_simple('localhost', 5000, dm)
问题:web访问多app,上线问管理是如何实现的
请求进来,为栈添加的还是一个值
问题:为什么使用栈,离线脚本
如果写web程序,或者web运行环境:栈中永远保存一条数据(这个可以不使用栈)
写脚本获取app信息的时候,可能会存在上下文嵌套关系。这时有可能要用到栈
问题:问题:Web访问多app应用时,上下文管理是如何实现?
补充:
遇到with就会执行__enter__方法,这个方法返回值,as后面那个值就是返回值
当指执行完毕之后 ,自动调用类的__exit__方法
from flask import Flask,current_app,globals,_app_ctx_stack app1 = Flask('app01') app1.debug = False # 用户/密码/邮箱 # app_ctx = AppContext(self): # app_ctx.app # app_ctx.g app2 = Flask('app02') app2.debug = True # 用户/密码/邮箱 # app_ctx = AppContext(self): # app_ctx.app # app_ctx.g with app1.app_context(): # # __enter__方法 -> push -> app_ctx添加到_app_ctx_stack.local print(_app_ctx_stack._local.__storage__) print(current_app.config['DEBUG']) with app1.app_context(): print(_app_ctx_stack._local.__storage__) print(current_app.config['DEBUG']) print(current_app.config['DEBUG']) with app1.app_context(): print(_app_ctx_stack._local.__storage__) print(current_app.config['DEBUG']) with app1.app_context(): print(_app_ctx_stack._local.__storage__) print(current_app.config['DEBUG'])
补充:永远两个Local对象
实现细节:
ResquestContext对象通过LocalStark添加到Local中
导入的request是一个LocalProxy对象,然后在通过偏函数调用了LocakStack,在调用Local
RequestContext的auto_pop,在执行LocalStack,再到Local中移除
十三 数据库的连接池
前夕:
django的常用数据库:
ORM:django默认的数据库
pymysql模块:导入mysql数据库,python2和python3版本都有这个模块
MySQLdb:一样,也是导入mysql数据库,这个模块只有python2版本才有
flask/其他:
pymysql:导mysql数据库
MySQLdb:导入mysql数据库
SQLAchemy:
是ORM的一款框架,可以导入mysql数据库(pymysql / MySQLdb)
原生SQL:
from flask import Flask app = Flask(__name__) @app.route("/") def hello(): import pymysql CONN = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8') cursor = CONN.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() print(result) return "Hello World" if __name__ == '__main__': app.run()
from flask import Flask app = Flask(__name__) import pymysql CONN = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8') @app.route("/") def hello(): cursor = CONN.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() print(result) return "Hello World" if __name__ == '__main__': app.run() ######加锁######################### from flask import Flask import threading app = Flask(__name__) import pymysql CONN = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8') @app.route("/") def hello(): with threading.Lock(): cursor = CONN.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() print(result) return "Hello World" if __name__ == '__main__': app.run()
问题:
解决:
不能为每一个用户创建一个链接
创建一定数量的连接池,只要连接池有空的,才会进来,不然就会等着。
使用UBDtils模块:
下载网站:https://pypi.python.org/pypi/DBUtils
pip install UBDtils
如果安装到虚拟环境下面,需要先切换到虚拟环境下面
使用:
模式一:
为每一个线程创建一个链接,即使县城调用close方法,也不会关闭掉
占用连接池不放,浪费了连接池
import pymysql from DBUtils.PersistentDB import PersistentDB POOL = PersistentDB( creator=pymysql, # 使用链接数据库的模块 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always closeable=False, # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接) threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置 host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): conn = POOL.connection(shareable=False) cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() conn.close() func()
模式二:
设置最大限制创建连接池的数量,进来一个线程,就创建一个连接池
如果超过连接池的限制数量,就会在那里等着有空的连接池释放出来才能够下一个线程链接进去
import time import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常 # 否则 # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。 # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。 # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。 # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。 conn = POOL.connection() # print(th, '链接被拿走了', conn1._con) # print(th, '池子里目前有', pool._idle_cache, '\r\n') cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() conn.close() func()
工作实用:
import pymysql from DBUtils.PooledDB import PooledDB POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) """ class SQLHelper(object): @staticmethod def fetch_one(sql,args): conn = POOL.connection() cursor = conn.cursor() cursor.execute(sql, args) result = cursor.fetchone() conn.close() return result @staticmethod def fetch_all(self,sql,args): conn = POOL.connection() cursor = conn.cursor() cursor.execute(sql, args) result = cursor.fetchone() conn.close() return result # 以后使用: result = SQLHelper.fetch_one('select * from xxx',[]) print(result) """
十四 信号
什么是信号:signal 一种处理异步时间的方法。信号是POSIX系统的信号,由硬件或软件触发,再有操作系统内核发给应用程序的中断形式。POSIX由一系列的信号集。
什么是信号量:semaphore 一种进程同步的机制。信号量是POSIX进程间通信的工具,在它上面定义了一系列操作原语,简单地讲它可以在进程间进行通信。
flask自己没有信号,要依赖blinker这个模块 安装 pip install blinker
内置信号:
request_started = _signals.signal('request-started') # 请求到来前执行 request_finished = _signals.signal('request-finished') # 请求结束后执行 before_render_template = _signals.signal('before-render-template') # 模板渲染前执行 template_rendered = _signals.signal('template-rendered') # 模板渲染后执行 got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行 request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否) appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 请求上下文执行完毕后自动执行(无论成功与否) appcontext_pushed = _signals.signal('appcontext-pushed') # 请求上下文push时执行 appcontext_popped = _signals.signal('appcontext-popped') # 请求上下文pop时执行 message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发
信号放哪里了:放在了signals文件里面
signals.request_started.conncet(函数名) :注册函数
等待请求的到来
在视图函数执行之前触法信号的
是怎么触发信号的?
signals.request_started.send:方法触发信号
send是在full_dispatch_request方法中执行的
信号的源码流程:
a. before_first_request b. 触发 request_started 信号 c. before_request d. 模板渲染 渲染前的信号 before_render_template.send(app, template=template, context=context) rv = template.render(context) # 模板渲染 渲染后的信号 template_rendered.send(app, template=template, context=context) e. after_request f. session.save_session() g. 触发 request_finished信号 如果上述过程出错: 触发错误处理信号 got_request_exception.send(self, exception=e) h. 触发信号 request_tearing_down 由信号引发的源码流程:找扩展点
十五 flask-session插件
Flask中的session处理机制(内置:将session保存在加密cookie中实现)
内置的session:
请求刚到来:获取随机字符串,存在则去“数据库”中获取原来的个人数据,否则创建一个空容器。 --> 内存:对象(随机字符串,{放置数据的容器})
# 1. obj = 创建SecureCookieSessionInterface() # 2. obj = open_session(self.request) = SecureCookieSession() # self.session = SecureCookieSession()对象。 self.session = self.app.open_session(self.request)
视图:操作内存中 对象(随机字符串,{放置数据的容器})
响应:内存对象(随机字符串,{放置数据的容器})
将数据保存到“数据库”
把随机字符串写在用户cookie中。
自定义:
请求刚到来:
# 创建特殊字典,并添加到Local中。 # 调用关系: # self.session_interface.open_session(self, request) # 由于默认app中的session_interface=SecureCookieSessionInterface() # SecureCookieSessionInterface().open_session(self, request) # 由于默认app中的session_interface=MySessionInterFace() # MySessionInterFace().open_session(self, request) self.session = self.app.open_session(self.request)
调用:
from flask import Flask,session app = Flask(__name__) app.secret_key = 'suijksdfsd' import json class MySessionInterFace(object): def open_session(self,app,request): return {} def save_session(self, app, session, response): response.set_cookie('session_idfsdfsdfsdf',json.dumps(session)) def is_null_session(self, obj): """Checks if a given object is a null session. Null sessions are not asked to be saved. This checks if the object is an instance of :attr:`null_session_class` by default. """ return False app.session_interface = MySessionInterFace() @app.route('/') def index(): # 特殊空字典 # 在local的ctx中找到session # 在空字典中写值 # 在空字典中获取值 session['xxx'] = 123 return 'Index' # # 一旦请求到来 # app.__call__ # app.wsgi_app # app.session_interface # app.open_session if __name__ == '__main__': app.run()
session -> LocalProxy -> 偏函数 -> LocalStack -> Local
请求终止:
# 由于默认app中的session_interface=SecureCookieSessionInterface() # SecureCookieSessionInterface().save_session(self, app, session, response) # 由于默认app中的session_interface=MySessionInterFace() # MySessionInterFace().save_session(self, app, session, response)
flask-session组件
使用:
from flask import Flask,session from flask_session import RedisSessionInterface app = Flask(__name__) app.secret_key = 'suijksdfsd'
方式一
from redis import Redis conn = Redis() app.session_interface = RedisSessionInterface(conn,key_prefix='__',use_signer=False)
方式二
from redis import Redis from flask.ext.session import Session app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379') Session(app)
@app.route('/') def index(): session['xxx'] = 123 return 'Index' if __name__ == '__main__': app.run()
源码:
流程
问题:设置cookie时,如何设定关闭浏览器则cookie失效。
response.set_cookie('k','v',exipre=None)
十六 wtforms组建
安装:pip3 install wtforms
使用:
from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class RegisterForm(Form): name = simple.StringField( label='用户名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='alex' ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重复密码', validators=[ validators.DataRequired(message='重复密码不能为空.'), validators.EqualTo('pwd', message="两次密码输入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='邮箱', validators=[ validators.DataRequired(message='邮箱不能为空.'), validators.Email(message='邮箱格式错误') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性别', choices=( (1, '男'), (2, '女'), ), coerce=int # “1” “2” ) city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) hobby = core.SelectMultipleField( label='爱好', choices=( (1, '篮球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜好', choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self, *args, **kwargs): super(RegisterForm, self).__init__(*args, **kwargs) self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球')) def validate_pwd_confirm(self, field): """ 自定义pwd_confirm字段规则,例:与pwd字段是否一致 :param field: :return: """ # 最开始初始化时,self.data中已经有所有的值 if field.data != self.data['pwd']: # raise validators.ValidationError("密码不一致") # 继续后续验证 raise validators.StopValidation("密码不一致") # 不再继续后续验证 @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial return render_template('register.html', form=form) else: form = RegisterForm(formdata=request.form) if form.validate(): print('用户提交数据通过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('register.html', form=form) if __name__ == '__main__': app.run()
源码流程:
实现方式:
1. 自动生成HTML class LoginForm(Form): # 字段(内部包含正则表达式) name = simple.StringField( label='用户名', validators=[ validators.DataRequired(message='用户名不能为空.'), validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d') ], widget=widgets.TextInput(), # 页面上显示的插件 render_kw={'class': 'form-control'} ) # 字段(内部包含正则表达式) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.'), validators.Length(min=8, message='用户名长度必须大于%(min)d'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) def __iter__(self): return iter([self.name,self.pwd]) # 方式一: obj = LoginForm() print(obj.name) # 调用字段的__str__ # 方式二: obj = LoginForm() for item in obj: print(item) # 调用字段的__str__
校验:
a. 后台定义好正则
b. 用户发来数据
c. 对数据进行校验
源码实现:自动生成HTML文件
解释:metaclass
Metaclass作用:用来指定当前类是谁来创建的,如果不指定默认是type创建的
类继承:只要这个指定了Metaclass,那么这个类以及他的子类都指定同一个metaclass
- MetaClass作用:用来指定当前类由谁来创建(默认type创建)。 - 使用metaclass class Foo(metaclass=type): pass class Foo(object): __metaclass__ = type - 类继承 class MyType(type): def __init__(self,*args,**kwargs): print('init') super(MyType,self).__init__(*args,**kwargs) def __call__(self, *args, **kwargs): print('call本质:调用类的__new__,再调用类的__init__') return super(MyType,self).__call__( *args, **kwargs) class Foo(metaclass=MyType): pass class Bar(Foo): pass obj = Bar() - 问题: 1. 什么意思? # 类由type来创建 class Foo(metaclass=type) # 继承Type class Foo(type) 2. Flask多线程:服务端开多线程
其他方式使用:
class MyType(type): def __init__(self,*args,**kwargs): print('init') super(MyType,self).__init__(*args,**kwargs) def __call__(self, *args, **kwargs): print('call') return super(MyType,self).__call__(*args,**kwargs) # Base=MyType('Base',(object,),{}) 是有MyType创建; metaclass=MyType # # class Foo(Base): # pass # # Foo() # 1. type可以创建类metaclass=type;MyType也可以创建类metaclass=MyType # 2. Base = MyType('Base', (object,), {}) --> # class Base(metaclass=MyType): # pass # class Foo(Base): # pass class Foo(MyType("Base",(object,),{})): # 第一个是类名,第二个就是他的父类,第三个就是属性 pass Foo()
class MyType(type): def __init__(self,*args,**kwargs): print('init') super(MyType,self).__init__(*args,**kwargs) def __call__(self, *args, **kwargs): print('call') return super(MyType,self).__call__(*args,**kwargs) def with_metaclass(obj): return MyType('XX',(obj,),{}) class Foo(with_metaclass(object)): pass Foo()
""" 1. 什么意思? # 类由type来创建 class Foo(metaclass=type) # 继承Type class Foo(type) """ class Foo(object): pass obj = Foo() # 对象是由类创建 # 一切皆对象,类由type创建 class Foo(object): pass Foo = type('Foo',(object,),{}) # 一切皆对象,类由MyType创建 class MyType(type): pass Foo = MyType('Foo',(object,),{}) class Foo(object,metaclass=MyType): pass # 一切皆对象,类由MyType创建 class MyType(type): def __init__(self, *args, **kwargs): print('init') super(MyType, self).__init__(*args, **kwargs) def __call__(cls, *args, **kwargs): print('call') return super(MyType, cls).__call__(*args, **kwargs) Foo = MyType('Foo',(object,),{}) class Foo(object,metaclass=MyType): pass Foo()
实例:form = LoginForm()
#!/usr/bin/env python # -*- coding:utf-8 -*- from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class MyForm(Form): ''' 创建字段,内部包含正则表达式 ''' name=simple.StringField(label='用户名', validators=[ validators.DataRequired(message='用户名不能为空'), validators.Length(min=6,max=18,message='用户名的长度不能小于%(min)d不能大于%(max)d') ], widget=widgets.TextInput(), render_kw={'class': 'form-control'} ) password=simple.PasswordField(label='密码', validators=[ validators.DataRequired(message='密码不能为空'), validators.Length(max=18,message='用户名不能大于%(max)'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method=='GET': form=MyForm() return render_template('login.html',form=form) else: form = MyForm(formdata=request.form) if form.validate(): print('用户提交数据通过格式验证,提交的值为:', form.data) else: return render_template('login.html', form=form) if __name__ == '__main__': app.run()
验证:form.validate()
import time import threading import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine
补充:
为什么要在类里面定义方法:主要是为对象的数据进行二次加工
类的对象不能直接被for循环,若想被循环,在类里面加上__iter__方法
十七 SQLAlchmey ORM框架
目标:类/对象操作 -> SQL -> pymysql、MySQLdb -> 再在数据库中执行。
基本使用:这个不常见,
class MyForm(Form): ''' 创建字段,内部包含正则表达式 ''' name=simple.StringField(label='用户名', validators=[ validators.DataRequired(message='用户名不能为空'), validators.Length(min=6,max=18,message='用户名的长度不能小于%(min)d不能大于%(max)d') ], widget=widgets.TextInput(), render_kw={'class': 'form-control'} ) password=simple.PasswordField(label='密码', validators=[ validators.DataRequired(message='密码不能为空'), validators.Length(max=18,message='用户名不能大于%(max)'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} )
方式一:
engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8", max_overflow=2, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) conn = engine.raw_connection() cursor = conn.cursor() cursor.execute( "select * from t1" ) result = cursor.fetchall() cursor.close() conn.close()
方式二:
engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) def task(arg): conn = engine.raw_connection() cursor = conn.cursor() cursor.execute( #"select * from t1" "select sleep(2)" ) result = cursor.fetchall() cursor.close() conn.close() for i in range(20): t = threading.Thread(target=task, args=(i,)) t.start()
ORM:
models.py #!/usr/bin/env python # -*- coding:utf-8 -*- import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Users(Base): __tablename__ = 'users' # 数据库表名称 id = Column(Integer, primary_key=True) # id 主键 name = Column(String(32), index=True, nullable=False) # name列, def init_db(): """ 根据类创建数据库表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.create_all(engine) def drop_db(): """ 根据类删除数据库表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': #drop_db() #init_db() app.py #!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Connection = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个Connection con = Connection() # ############# 执行ORM操作 ############# obj1 = Users(name="alex1") con.add(obj1) # 提交事务 con.commit() # 关闭session con.close()
详细信息:
创建表:
import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) email = Column(String(32), unique=True) ctime = Column(DateTime, default=datetime.datetime.now) # 3期师兄 extra = Column(Text, nullable=True) __table_args__ = ( # UniqueConstraint('id', 'name', name='uix_id_name'), # Index('ix_id_name', 'name', 'email'), ) class Hobby(Base): __tablename__ = 'hobby' id = Column(Integer, primary_key=True) caption = Column(String(50), default='篮球') class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) hobby_id = Column(Integer, ForeignKey("hobby.id")) class b2g(Base): __tablename__ = 'b2g' id = Column(Integer, primary_key=True, autoincrement=True) girl_id = Column(Integer, ForeignKey('girl.id')) boy_id = Column(Integer, ForeignKey('boy.id')) class Girl(Base): __tablename__ = 'girl' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) class Boy(Base): __tablename__ = 'boy' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) engine = create_engine( "mysql+pymysql://root:0410@127.0.0.1:3306/sqlachemy?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.create_all(engine) # Base.metadata.drop_all(engine)
对于表的增删改查:
#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text from db import Users, Hosts engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # ################ 添加 ################ """ obj1 = Users(name="wupeiqi") session.add(obj1) session.add_all([ Users(name="wupeiqi"), Users(name="alex"), Hosts(name="c1.com"), ]) session.commit() """ # ################ 删除 ################ """ session.query(Users).filter(Users.id > 2).delete() session.commit() """ # ################ 修改 ################ """ session.query(Users).filter(Users.id > 0).update({"name" : "099"}) session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False) session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate") session.commit() """ # ################ 查询 ################ """ r1 = session.query(Users).all() r2 = session.query(Users.name.label('xx'), Users.age).all() r3 = session.query(Users).filter(Users.name == "alex").all() r4 = session.query(Users).filter_by(name='alex').all() r5 = session.query(Users).filter_by(name='alex').first() r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all() r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all() """ session.close()
查看的其他操作:
# 条件 ret = session.query(Users).filter_by(name='alex').all() ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all() from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all() # 通配符 ret = session.query(Users).filter(Users.name.like('e%')).all() ret = session.query(Users).filter(~Users.name.like('e%')).all() # 限制 ret = session.query(Users)[1:2] # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分组 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 连表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() ret = session.query(Person).join(Favor, isouter=True).all() # 组合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all()
原生的sql语句:
#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text from sqlalchemy.engine.result import ResultProxy from db import Users, Hosts engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # 查询 # cursor = session.execute('select * from users') # result = cursor.fetchall() # 添加 cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'}) session.commit() print(cursor.lastrowid) session.close()
多表操作:
一对多: #!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text from sqlalchemy.engine.result import ResultProxy from db import Users, Hosts, Hobby, Person engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # 添加 """ session.add_all([ Hobby(caption='乒乓球'), Hobby(caption='羽毛球'), Person(name='张三', hobby_id=3), Person(name='李四', hobby_id=4), ]) person = Person(name='张九', hobby=Hobby(caption='姑娘')) session.add(person) hb = Hobby(caption='人妖') hb.pers = [Person(name='文飞'), Person(name='博雅')] session.add(hb) session.commit() """ # 使用relationship正向查询 """ v = session.query(Person).first() print(v.name) print(v.hobby.caption) """ # 使用relationship反向查询 """ v = session.query(Hobby).first() print(v.caption) print(v.pers) """ session.close() 多对多: #!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text from sqlalchemy.engine.result import ResultProxy from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # 添加 """ session.add_all([ Server(hostname='c1.com'), Server(hostname='c2.com'), Group(name='A组'), Group(name='B组'), ]) session.commit() s2g = Server2Group(server_id=1, group_id=1) session.add(s2g) session.commit() gp = Group(name='C组') gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')] session.add(gp) session.commit() ser = Server(hostname='c6.com') ser.groups = [Group(name='F组'),Group(name='G组')] session.add(ser) session.commit() """ # 使用relationship正向查询 """ v = session.query(Group).first() print(v.name) print(v.servers) """ # 使用relationship反向查询 """ v = session.query(Server).first() print(v.hostname) print(v.groups) """ session.close()
其他操作:
#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text, func from sqlalchemy.engine.result import ResultProxy from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # 关联子查询 subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar() result = session.query(Group.name, subqry) """ SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid FROM server WHERE server.id = `group`.id) AS anon_1 FROM `group` """ # 原生SQL """ # 查询 cursor = session.execute('select * from users') result = cursor.fetchall() # 添加 cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'}) session.commit() print(cursor.lastrowid) """ session.close() 其他
基于scoped_session实现线程安全:
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Users engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) """ # 线程安全,基于本地线程实现每个线程用同一个session # 特殊的:scoped_session中有原来方法的Session中的一下方法: public_methods = ( '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested', 'close', 'commit', 'connection', 'delete', 'execute', 'expire', 'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind', 'is_modified', 'bulk_save_objects', 'bulk_insert_mappings', 'bulk_update_mappings', 'merge', 'query', 'refresh', 'rollback', 'scalar' ) """ session = scoped_session(Session) # ############# 执行ORM操作 ############# obj1 = Users(name="alex1") session.add(obj1) # 提交事务 session.commit() # 关闭session session.close()
多线程执行实例:
#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from db import Users engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) def task(arg): session = Session() obj1 = Users(name="alex1") session.add(obj1) session.commit() for i in range(10): t = threading.Thread(target=task, args=(i,)) t.start() 多线程执行示例
flask_sqlalchemy:Flask-SQLAlchemy(文件和目录的管理)
Flask和SQLAlchemy的管理
- db = SQLAlchemy() - 包含配置 - 包含ORM基类 - 包含create_all - engine - 创建连接 # 目录结构保存好
flask_sqlalchemy实例:https://pan.baidu.com/s/1IL68-68tBDluDtqsB1NS1g
补充:
3. pipreqs
pip3 install pipreqs
pipreqs ./
十八 flask_script和flask_migrate
flask_script:功能相当于django中的manage文件,用于启动flask项目使用的,python manage.py runserver
flask_migrate:相当于django中的数据库迁移,makemigrations/migrate -> migrate/upgrade
批量导入:xlrd/xlwt
#!/usr/bin/env python # -*- coding:utf-8 -*- """ 生成依赖文件: pipreqs ./ """ from sansa import create_app,db from flask_script import Manager from flask_migrate import Migrate,MigrateCommand app = create_app() manager = Manager(app) migrate = Migrate(app, db) """ # 数据库迁移命名 python manage.py db init python manage.py db migrate python manage.py db upgrade """ manager.add_command('db', MigrateCommand) @manager.command def custom(arg): """ 自定义命令 python manage.py custom 123 :param arg: :return: """ print(arg) @manager.option('-n', '--name', dest='name') @manager.option('-u', '--url', dest='url') def cmd(name, url): """ 自定义命令 执行: python manage.py cmd -n wupeiqi -u http://www.oldboyedu.com :param name: :param url: :return: """ print(name, url) if __name__ == '__main__': manager.run() """ 1. 运行程序时: python manage.py runserver """
补充:
flask和django的其他导入静态文件