当前位置:   article > 正文

flask框架基础

flask langchain

一 web的一些框架介绍

 Flask:短小精悍,内部没有包含多少组件,但是第三方的组件是非常丰富的。

 Django:django是一个重武器,内部包含了非常多的组件:orm,form,modelForm,缓存,session等等

 Tornado:牛逼之处就是异步非阻塞框架和node.js

二 Flask的快速入门

 创建python虚拟环境:virtualenv 虚拟名

 安装:pip install flask

 什么是werkzeug:Werkzeug是一个WSGI工具包,他可以作为一个Web框架的底层库。官方的介绍说是一个 WSGI 工具包,它可以作为一个 Web 框架的底层库,因为它封装好了很多 Web 框架的东西,例如 Request,Response 等等

from werkzeug.wrappers import Request, Response

@Request.application
def hello(request):
    return Response('Hello World!')

if __name__ == '__main__':
    from werkzeug.serving import run_simple
    run_simple('localhost', 4000, hello)
View Code

 基本使用:

from flask import Flask
app = Flask(__name__)
 
@app.route('/')
def hello_world():
    return 'Hello World!'
 
if __name__ == '__main__':
    app.run()
View Code

 wsgi:使用werkzeug模块实现的,还可以使用wsgiref实现。本质是导入socket实现的。

   一旦出现这个,监听事件就开始了

 实例化Flask对象

  app.run():监听用户的请求,一旦有用户的请求过来,就会直接执行用户的__call__方法。

  flask的所有相关的组件全都存在flask文件下的,需要什么导入什么

from flask import Flask,render_template,request,redirect,session,url_for

 for_url:高级版的重定向

from flask import Flask,render_template,request,redirect,session,url_for

app = Flask(__name__)
app.debug=True
app.secret_key='fangshao'
USERS = {
    1:{'name':'张桂坤','age':18,'gender':'','text':"当眼泪掉下来的时候,是真的累了, 其实人生就是这样: 你有你的烦,我有我的难,人人都有无声的泪,人人都有难言的苦。 忘不了的昨天,忙不完的今天,想不到的明天,走不完的人生,过不完的坎坷,越不过的无奈,听不完的谎言,看不透的人心放不下的牵挂,经历不完的酸甜苦辣,这就是人生,这就是生活。"},
    2:{'name':'主城','age':28,'gender':'','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"},
    3:{'name':'服城','age':18,'gender':'','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"},
}
@app.route('/')
def hello_world():
    return 'Hello World!'

# @app.route('/index',methods=['GET'])
# def index():
#     user=session.get('user_info')
#     print(user)
#     if user:
#         return render_template('index.html',data=user,user_dict=USERS)
#     return redirect('/login')

@app.route('/index',methods=['GET'])
def index():
    user=session.get('user_info')
    if user:
        return render_template('index.html',data=user,user_dict=USERS)
    url=url_for('111')
    return redirect(url)


@app.route('/detail/<int:nid>',methods=['GET'])
def detail(nid):
    user=session.get('user_info')
    if not user:
        return redirect('/login')
    return render_template('detail.html',data=USERS[nid])

@app.route('/login',methods=['GET','POST'],endpoint='111')
def login():
    if request.method=='POST':
        user=request.form.get('user')
        pwd=request.form.get('pwd')
        if user=='fang' and pwd=='123':
            session['user_info']=user
            return redirect('/index')
        else:
            return render_template('login.html',msg='用户名或密码错误')

    return render_template('login.html')


if __name__ == '__main__':
    app.run()
View Code

三 配置文件

 开放封闭原则:对代码的修改是封闭的,对配置文件的修改时开放的。

 app.debug=True:修改过后自动重启项目

 app.secret_key='随机设置字符串':全局设置session,Session, Cookies以及一些第三方扩展都会用到,

 app.config['debug']=True:修改过后自动重启项目

 app.config:获取当前app的所有配置

 app.config.from_object:导入文件的一个类,

  内部实现原理:导入时,将路径由点分割

 配置文件的格式有:

flask中的配置文件是一个flask.config.Config对象(继承字典),默认配置为:
    {
        'DEBUG':                                get_debug_flag(default=False),  是否开启Debug模式
        'TESTING':                              False,                          是否开启测试模式
        'PROPAGATE_EXCEPTIONS':                 None,                          
        'PRESERVE_CONTEXT_ON_EXCEPTION':        None,
        'SECRET_KEY':                           None,
        'PERMANENT_SESSION_LIFETIME':           timedelta(days=31),
        'USE_X_SENDFILE':                       False,
        'LOGGER_NAME':                          None,
        'LOGGER_HANDLER_POLICY':               'always',
        'SERVER_NAME':                          None,
        'APPLICATION_ROOT':                     None,
        'SESSION_COOKIE_NAME':                  'session',
        'SESSION_COOKIE_DOMAIN':                None,
        'SESSION_COOKIE_PATH':                  None,
        'SESSION_COOKIE_HTTPONLY':              True,
        'SESSION_COOKIE_SECURE':                False,
        'SESSION_REFRESH_EACH_REQUEST':         True,
        'MAX_CONTENT_LENGTH':                   None,
        'SEND_FILE_MAX_AGE_DEFAULT':            timedelta(hours=12),
        'TRAP_BAD_REQUEST_ERRORS':              False,
        'TRAP_HTTP_EXCEPTIONS':                 False,
        'EXPLAIN_TEMPLATE_LOADING':             False,
        'PREFERRED_URL_SCHEME':                 'http',
        'JSON_AS_ASCII':                        True,
        'JSON_SORT_KEYS':                       True,
        'JSONIFY_PRETTYPRINT_REGULAR':          True,
        'JSONIFY_MIMETYPE':                     'application/json',
        'TEMPLATES_AUTO_RELOAD':                None,
    }
View Code

  格式一:

app.config['DEBUG'] = True
 
PS: 由于Config对象本质上是字典,所以还可以使用app.config.update(...)
View Code

  格式二:

    app.config.from_pyfile("python文件名称")
        如:
            settings.py
                DEBUG = True
 
            app.config.from_pyfile("settings.py")
 
    app.config.from_envvar("环境变量名称")
        环境变量的值为python文件名称名称,内部调用from_pyfile方法
 
 
    app.config.from_json("json文件名称")
        JSON文件名称,必须是json格式,因为内部会执行json.loads
 
    app.config.from_mapping({'DEBUG':True})
View Code

  格式三:

    app.config.from_object("python类或类的路径")
 
        app.config.from_object('pro_flask.settings.TestingConfig')
 
        settings.py
 
            class Config(object):
                DEBUG = False
                TESTING = False
                DATABASE_URI = 'sqlite://:memory:'
 
            class ProductionConfig(Config):
                DATABASE_URI = 'mysql://user@localhost/foo'
 
            class DevelopmentConfig(Config):
                DEBUG = True
 
            class TestingConfig(Config):
                TESTING = True
 
        PS: 从sys.path中已经存在路径开始写
View Code

 settings.py默认路径要放在当前项目的第一级目录下面

  PS: settings.py文件默认路径要放在程序root_path目录,如果instance_relative_config为True,则就是instance_path目录

四 路由系统

  • @app.route('/user/<username>')
  • @app.route('/post/<int:post_id>')
  • @app.route('/post/<float:post_id>')
  • @app.route('/post/<path:path>')
  • @app.route('/login', methods=['GET', 'POST'])

 路由比较特殊,:是基于装饰器实现的,但是究其本质还是有add_url_rule实现的。

 装饰器可以有多个,放在上面和下面是不同的,

from flask import Flask,render_template,request,redirect,session,url_for

app = Flask(__name__)
app.debug=True
app.secret_key='fangshao'
USERS = {
    1:{'name':'张桂坤','age':18,'gender':'','text':"当眼泪掉下来的时候,是真的累了, 其实人生就是这样: 你有你的烦,我有我的难,人人都有无声的泪,人人都有难言的苦。 忘不了的昨天,忙不完的今天,想不到的明天,走不完的人生,过不完的坎坷,越不过的无奈,听不完的谎言,看不透的人心放不下的牵挂,经历不完的酸甜苦辣,这就是人生,这就是生活。"},
    2:{'name':'主城','age':28,'gender':'','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"},
    3:{'name':'服城','age':18,'gender':'','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"},
}


def get_session(func):
    def inner(*args,**kwargs):
        user = session.get('user_info')
        if not user:
            return redirect('/login')
        return func(*args,**kwargs)
    return inner


@app.route('/',endpoint='1111')
@get_session
def hello_world():
    return 'Hello World!'



@app.route('/index',methods=['GET'],endpoint='n1')
@get_session
def index():
    # user=session.get('user_info')
    # print(user)
    # if user:
    return render_template('index.html',user_dict=USERS)
    # return redirect('/login')


# @app.route('/index',methods=['GET'])
# def index():
#     user=session.get('user_info')
#     if user:
#         return render_template('index.html',user_dict=USERS)
#     url=url_for('111')
#     return redirect(url)


@app.route('/detail/<int:nid>',methods=['GET'],endpoint='n2')
def detail(nid):
    user=session.get('user_info')
    if not user:
        return redirect('/login')
    return render_template('detail.html',data=USERS[nid])

@app.route('/login',methods=['GET','POST'],endpoint='111')
def login():
    if request.method=='POST':
        user=request.form.get('user')
        pwd=request.form.get('pwd')
        if user=='fang' and pwd=='123':
            session['user_info']=user
            return redirect('/index')
        else:
            return render_template('login.html',msg='用户名或密码错误')

    return render_template('login.html')


if __name__ == '__main__':
    app.run()
View Code

 注意:这里加上装饰器,会重名,给他设置一个别名endpoint=‘别名’

 functools.wraps(函数):# 帮助我们保存一些设置函数的元信息

from flask import Flask,render_template,request,redirect,session,url_for

app = Flask(__name__)
app.debug=True
app.secret_key='fangshao'
USERS = {
    1:{'name':'张桂坤','age':18,'gender':'','text':"当眼泪掉下来的时候,是真的累了, 其实人生就是这样: 你有你的烦,我有我的难,人人都有无声的泪,人人都有难言的苦。 忘不了的昨天,忙不完的今天,想不到的明天,走不完的人生,过不完的坎坷,越不过的无奈,听不完的谎言,看不透的人心放不下的牵挂,经历不完的酸甜苦辣,这就是人生,这就是生活。"},
    2:{'name':'主城','age':28,'gender':'','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"},
    3:{'name':'服城','age':18,'gender':'','text':"高中的时候有一个同学家里穷,每顿饭都是膜膜加点水,有时候吃点咸菜,我们六科老师每天下课都叫他去办公室回答问题背诵课文,然后说太晚啦一起吃个饭,后来他考上了人大,拿到通知书的时候给每个老师磕了一个头"},
}

import functools
def get_session(func):
    @functools.wraps(func)
    def inner(*args,**kwargs):
        user = session.get('user_info')
        if not user:
            return redirect('/login')
        return func(*args,**kwargs)
    return inner


@app.route('/',endpoint='1111')
@get_session
def hello_world():
    return 'Hello World!'



@app.route('/index',methods=['GET'])
@get_session
def index():
    # user=session.get('user_info')
    # print(user)
    # if user:
    return render_template('index.html',user_dict=USERS)
    # return redirect('/login')


# @app.route('/index',methods=['GET'])
# def index():
#     user=session.get('user_info')
#     if user:
#         return render_template('index.html',user_dict=USERS)
#     url=url_for('111')
#     return redirect(url)


@app.route('/detail/<int:nid>',methods=['GET'])
def detail(nid):
    user=session.get('user_info')
    if not user:
        return redirect('/login')
    return render_template('detail.html',data=USERS[nid])

@app.route('/login',methods=['GET','POST'])
def login():
    if request.method=='POST':
        user=request.form.get('user')
        pwd=request.form.get('pwd')
        if user=='fang' and pwd=='123':
            session['user_info']=user
            return redirect('/index')
        else:
            return render_template('login.html',msg='用户名或密码错误')

    return render_template('login.html')


if __name__ == '__main__':
    app.run()
View Code

 FBV的写法:

  使用装饰器,将括号里面的内容添加到路由中@app.route()

DEFAULT_CONVERTERS = {
    'default':          UnicodeConverter,
    'string':           UnicodeConverter,
    'any':              AnyConverter,
    'path':             PathConverter,
    'int':              IntegerConverter,
    'float':            FloatConverter,
    'uuid':             UUIDConverter,
}
View Code

   methods=[]:里面写的是请求的方法,支持什么方法都要写上什么方法。

      endpoint=别名:为当前的url反向生成一个url,也就是起一个别名

      app.add_url_rule('/...'):添加路由的另一种方法

        def auth(func):
            def inner(*args, **kwargs):
                print('before')
                result = func(*args, **kwargs)
                print('after')
                return result

        return inner

        @app.route('/index.html',methods=['GET','POST'],endpoint='index')
        @auth
        def index():
            return 'Index'def index():
            return "Index"

        self.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"])
        or
        app.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"])
        app.view_functions['index'] = index
View Code

 CBV的写法:

  MethodView:API的简单的一种实现方式,class创建的视图类就需要继承它。

class IndexView(views.MethodView):
    methods=['GET']
    decorators=[auth,]
    def get(self):
        return 'Index.GET'
    
    def post(self):
        return 'Index.POST'
View Code

  app.add_url_rule('/路径',view_func='类名'.as_view(name=返回的那个函数)

app.add_url_rule('/index',view_func=IndexView.as_view(name='index'))

  对于CBV来说:虽然传进去的是类名,但是最后返回的还是一个函数。

        def auth(func):
            def inner(*args, **kwargs):
                print('before')
                result = func(*args, **kwargs)
                print('after')
                return result

        return inner

        class IndexView(views.View):
            methods = ['GET']
            decorators = [auth, ]

            def dispatch_request(self):
                print('Index')
                return 'Index!'

        app.add_url_rule('/index', view_func=IndexView.as_view(name='index'))  # name=endpoint
class IndexView(views.MethodView):
            methods = ['GET']
            decorators = [auth, ]

            def get(self):
                return 'Index.GET'

            def post(self):
                return 'Index.POST'


        app.add_url_rule('/index', view_func=IndexView.as_view(name='index'))  # name=endpoint




        @app.route和app.add_url_rule参数:
            rule,                       URL规则
            view_func,                  视图函数名称
            defaults=None,              默认值,当URL中无参数,函数需要参数时,使用defaults={'k':'v'}为函数提供参数
            endpoint=None,              名称,用于反向生成URL,即: url_for('名称')
            methods=None,               允许的请求方式,如:["GET","POST"]
            

            strict_slashes=None,        对URL最后的 / 符号是否严格要求,
                                        如:
                                            @app.route('/index',strict_slashes=False),
                                                访问 http://www.xx.com/index/ 或 http://www.xx.com/index均可
                                            @app.route('/index',strict_slashes=True)
                                                仅访问 http://www.xx.com/index 
            redirect_to=None,           重定向到指定地址
                                        如:
                                            @app.route('/index/<int:nid>', redirect_to='/home/<nid>')
                                            或
                                            def func(adapter, nid):
                                                return "/home/888"
                                            @app.route('/index/<int:nid>', redirect_to=func)
            subdomain=None,             子域名访问
                                                from flask import Flask, views, url_for

                                                app = Flask(import_name=__name__)
                                                app.config['SERVER_NAME'] = 'wupeiqi.com:5000'


                                                @app.route("/", subdomain="admin")
                                                def static_index():
                                                    """Flask supports static subdomains
                                                    This is available at static.your-domain.tld"""
                                                    return "static.your-domain.tld"


                                                @app.route("/dynamic", subdomain="<username>")
                                                def username_index(username):
                                                    """Dynamic subdomains are also supported
                                                    Try going to user1.your-domain.tld/dynamic"""
                                                    return username + ".your-domain.tld"


                                                if __name__ == '__main__':
                                                    app.run()
        
View Code

 default:传入函数的参数。就是url后面的那个参数

 subdomain={}:创建子域名

 window设置域名:hosts而文件下面直接就可以设置了C:\Windows\System32\driver\etc\hosts

 mcs系统设置域名:/ect/hosts文件下面就可以设置域名了  

          from flask import Flask, views, url_for
            from werkzeug.routing import BaseConverter

            app = Flask(import_name=__name__)


            class RegexConverter(BaseConverter):
                """
                自定义URL匹配正则表达式
                """
                def __init__(self, map, regex):
                    super(RegexConverter, self).__init__(map)
                    self.regex = regex

                def to_python(self, value):
                    """
                    路由匹配时,匹配成功后传递给视图函数中参数的值
                    :param value: 
                    :return: 
                    """
                    return int(value)

                def to_url(self, value):
                    """
                    使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
                    :param value: 
                    :return: 
                    """
                    val = super(RegexConverter, self).to_url(value)
                    return val

            # 添加到flask中
            app.url_map.converters['regex'] = RegexConverter


            @app.route('/index/<regex("\d+"):nid>')
            def index(nid):
                print(url_for('index', nid='888'))
                return 'Index'


            if __name__ == '__main__':
                app.run()
View Code

五 模板

 1、模板的使用:Flask使用的是Jinja2模板,所以其语法和Django无差别

 2、自定义模板方法:Flask中自定义模板方法的方式和Bottle相似,创建一个函数并通过参数的形式传入

 for循环,并取到索引

  {% for k,v in 对象.items()%}

    字典的取值方法:v.字段名  v['字段名']  v.get('字段名')

  {% endfor %}

 函数渲染:不仅要加上括号,还可以加上参数

  {{函数名(参数)}}  加上|safe:防止xss攻击

<!DOCTYPE html>
<html>
<head lang="en">
    <meta charset="UTF-8">
    <title></title>
</head>
<body>
    <h1>自定义函数</h1>
    {{ww()|safe}}

</body>
</html>
View Code

  在后台如果使用

from flask import Flask,render_template
app = Flask(__name__)
 
 
def wupeiqi():
    return '<h1>Wupeiqi</h1>'
 
@app.route('/login', methods=['GET', 'POST'])
def login():
    return render_template('login.html', ww=wupeiqi)
 
app.run()
View Code

 Markup:后台设置xss攻击

 宏定义:就是定义一块html,定义的这一块就是就是一个函数

  {% macre 函数名(参数)%}  {% endmacre %}

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>


    {% macro input(name, type='text', value='') %}
        <input type="{{ type }}" name="{{ name }}" value="{{ value }}">
    {% endmacro %}

    {{ input('n1') }}

    {% include 'tp.html' %}

    <h1>asdf{{ v.k1}}</h1>
</body>
</html>
View Code

六 请求和响应

 请求和响应都是从flask为念中导入的

 request:请求

 response:响应

  jsonify:响应的数据类型不是字符串类型,就是用这个将响应的内容转成字符串。

from flask import Flask
    from flask import request
    from flask import render_template
    from flask import redirect
    from flask import make_response

    app = Flask(__name__)


    @app.route('/login.html', methods=['GET', "POST"])
    def login():

        # 请求相关信息
        # request.method
        # request.args
        # request.form
        # request.values
        # request.cookies
        # request.headers
        # request.path
        # request.full_path
        # request.script_root
        # request.url
        # request.base_url
        # request.url_root
        # request.host_url
        # request.host
        # request.files
        # obj = request.files['the_file_name']
        # obj.save('/var/www/uploads/' + secure_filename(f.filename))

        # 响应相关信息
        # return "字符串"
        # return render_template('html模板路径',**{})
        # return redirect('/index.html')

        # response = make_response(render_template('index.html'))
        # response是flask.wrappers.Response类型
        # response.delete_cookie('key')
        # response.set_cookie('key', 'value')
        # response.headers['X-Something'] = 'A value'
        # return response


        return "内容"

    if __name__ == '__main__':
        app.run()
View Code

七 Session和Cookie

 session在使用前必须要有app.sceret_key加密,就相当于加盐

 session['名']=字段:设置session

 应用demo:实例,使用装饰器写一个用户认证

  思路:装饰器,一个函数可以加上多个装饰器,反向查找的名称不允许重名:endpoint

  session.pop:删除一个session

 我们这里使用的session是flask内置的使用加密的cookie来保存数据的。

 基本使用:

from flask import Flask, session, redirect, url_for, escape, request
 
app = Flask(__name__)
 
@app.route('/')
def index():
    if 'username' in session:
        return 'Logged in as %s' % escape(session['username'])
    return 'You are not logged in'
 
@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        session['username'] = request.form['username']
        return redirect(url_for('index'))
    return '''
        <form action="" method="post">
            <p><input type=text name=username>
            <p><input type=submit value=Login>
        </form>
    '''
 
@app.route('/logout')
def logout():
    # remove the username from the session if it's there
    session.pop('username', None)
    return redirect(url_for('index'))
 
# set the secret key.  keep this really secret:
app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT'

基本使用
View Code

 自定义session 

pip3 install Flask-Session
        
        run.py
            from flask import Flask
            from flask import session
            from pro_flask.utils.session import MySessionInterface
            app = Flask(__name__)

            app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT'
            app.session_interface = MySessionInterface()

            @app.route('/login.html', methods=['GET', "POST"])
            def login():
                print(session)
                session['user1'] = 'alex'
                session['user2'] = 'alex'
                del session['user2']

                return "内容"

            if __name__ == '__main__':
                app.run()

        session.py
            #!/usr/bin/env python
            # -*- coding:utf-8 -*-
            import uuid
            import json
            from flask.sessions import SessionInterface
            from flask.sessions import SessionMixin
            from itsdangerous import Signer, BadSignature, want_bytes


            class MySession(dict, SessionMixin):
                def __init__(self, initial=None, sid=None):
                    self.sid = sid
                    self.initial = initial
                    super(MySession, self).__init__(initial or ())


                def __setitem__(self, key, value):
                    super(MySession, self).__setitem__(key, value)

                def __getitem__(self, item):
                    return super(MySession, self).__getitem__(item)

                def __delitem__(self, key):
                    super(MySession, self).__delitem__(key)



            class MySessionInterface(SessionInterface):
                session_class = MySession
                container = {}

                def __init__(self):
                    import redis
                    self.redis = redis.Redis()

                def _generate_sid(self):
                    return str(uuid.uuid4())

                def _get_signer(self, app):
                    if not app.secret_key:
                        return None
                    return Signer(app.secret_key, salt='flask-session',
                                  key_derivation='hmac')

                def open_session(self, app, request):
                    """
                    程序刚启动时执行,需要返回一个session对象
                    """
                    sid = request.cookies.get(app.session_cookie_name)
                    if not sid:
                        sid = self._generate_sid()
                        return self.session_class(sid=sid)

                    signer = self._get_signer(app)
                    try:
                        sid_as_bytes = signer.unsign(sid)
                        sid = sid_as_bytes.decode()
                    except BadSignature:
                        sid = self._generate_sid()
                        return self.session_class(sid=sid)

                    # session保存在redis中
                    # val = self.redis.get(sid)
                    # session保存在内存中
                    val = self.container.get(sid)

                    if val is not None:
                        try:
                            data = json.loads(val)
                            return self.session_class(data, sid=sid)
                        except:
                            return self.session_class(sid=sid)
                    return self.session_class(sid=sid)

                def save_session(self, app, session, response):
                    """
                    程序结束前执行,可以保存session中所有的值
                    如:
                        保存到resit
                        写入到用户cookie
                    """
                    domain = self.get_cookie_domain(app)
                    path = self.get_cookie_path(app)
                    httponly = self.get_cookie_httponly(app)
                    secure = self.get_cookie_secure(app)
                    expires = self.get_expiration_time(app, session)

                    val = json.dumps(dict(session))

                    # session保存在redis中
                    # self.redis.setex(name=session.sid, value=val, time=app.permanent_session_lifetime)
                    # session保存在内存中
                    self.container.setdefault(session.sid, val)

                    session_id = self._get_signer(app).sign(want_bytes(session.sid))

                    response.set_cookie(app.session_cookie_name, session_id,
                                        expires=expires, httponly=httponly,
                                        domain=domain, path=path, secure=secure)

自定义Session
View Code

 第三方session

from flask import Flask, session, redirect
from flask.ext.session import Session


app = Flask(__name__)
app.debug = True
app.secret_key = 'asdfasdfasd'


app.config['SESSION_TYPE'] = 'redis'
from redis import Redis
app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379')
Session(app)


@app.route('/login')
def login():
    session['username'] = 'alex'
    return redirect('/index')


@app.route('/index')
def index():
    name = session['username']
    return name


if __name__ == '__main__':
    app.run()

第三方session
View Code

八 闪现

 flash:向某一个地方设置一个值

  category:设置值的分类

 get_flashed_messages:从某一个地方获取多个值,并且清除

  他们也是基于app.secret_key实现的

from flask import Flask,flash,get_flashed_messages

app = Flask(__name__)
app.secret_key = 'asdfasdf'
@app.route('/get')
def get():
    # 从某个地方获取设置过的所有值,并清除。
    data = get_flashed_messages()
    print(data)
    return 'Hello World!'


@app.route('/set')
def set():
    # 向某个地方设置一个值
    flash('阿斯蒂芬')

    return 'Hello World!'


if __name__ == '__main__':
    app.run()
View Code

  category_filter:只取一类的值

from flask import Flask,flash,get_flashed_messages,request,redirect

app = Flask(__name__)
app.secret_key = 'asdfasdf'


@app.route('/index')
def index():
    # 从某个地方获取设置过的所有值,并清除。
    val = request.args.get('v')
    if val == 'oldboy':
        return 'Hello World!'
    flash('超时错误',category="x1")
    return "ssdsdsdfsd"
    # return redirect('/error')


@app.route('/error')
def error():
    """
    展示错误信息
    :return:
    """
    data = get_flashed_messages(category_filter=['x1'])
    if data:
        msg = data[0]
    else:
        msg = "..."
    return "错误信息:%s" %(msg,)


if __name__ == '__main__':
    app.run()
View Code

    request.query_string.get('字段'):获取到所有的url

 request.args.get('字段'):获取get请求后面的值

 什么是闪现:设置不管多少次的值,是基于session实现的,只要一次取到全部的值,并且清除

 闪现的用法:应用于临时数据的操作,比如:显示错误信息等等

from flask import Flask,session,Session,flash,get_flashed_messages,redirect,render_template,request
app = Flask(__name__)
app.secret_key ='sdfsdfsdf'

@app.route('/users')
def users():
    # 方式一
    # msg = request.args.get('msg','')
    # 方式二
    # msg = session.get('msg')
    # if msg:
    #     del session['msg']
    # 方式三
    v = get_flashed_messages()
    print(v)
    msg = ''
    return render_template('users.html',msg=msg)

@app.route('/useradd')
def user_add():
    # 在数据库中添加一条数据
    # 假设添加成功,在跳转到列表页面时,显示添加成功
    # 方式一
    # return redirect('/users?msg=添加成功')
    # 方式二
    # session['msg'] = '添加成功'
    # 方式三
    flash('添加成功')
    return redirect('/users')


if __name__ == '__main__':
    app.run(debug=True)
View Code

九 蓝图(blueprint)

 什么是蓝图:功能分类,还可以定义自己的模板和路由分发。就是为了构造目录的。

 蓝图的特性:不仅能将不同功能的app进行分来,并且还可以定义自己的模板路径可以实现路由的分发。可以实现每一个程序构造自己的app。

 Blueprint:创建蓝图  ('蓝图名','__name__')

  url_prefix:为下面所有函数的使用机上一个前缀,可以为统一的某一类加上前缀

  template_folder:定义自己的模板路径(html文件勒颈)

 app.register_blueprint:将蓝图注册到app

 小型应用程序:示例

 大型应用程序:示例

小中型:

manage.py

import fcrm
if __name__ == '__main__': fcrm.app.run()

__init__.py(只要一导入fcrm就会执行__init__.py文件)

复制代码
from flask import Flask
#导入accout 和order
from fcrm.views import accout from fcrm.views import order app = Flask(__name__) print(app.root_path) #根目录  app.register_blueprint(accout.accout) #吧蓝图注册到app里面,accout.accout是创建的蓝图对象 app.register_blueprint(order.order)
复制代码

accout.py

复制代码
from flask import  Blueprint,render_template
accout = Blueprint("accout",__name__) @accout.route('/accout') def xx(): return "accout" @accout.route("/login") def login(): return render_template("login.html")
复制代码

order.py

from flask import Blueprint
order = Blueprint("order",__name__) @order.route('/order') def register(): #注意视图函数的名字不能和蓝图对象的名字一样 return "order

使用蓝图时需要注意的

大型:

 

十 请求的扩展

 类似于django的中间件

 @app.defore_request:定制请求函数,每次请求都会执行有这个装饰器的函数,每次都是从上到下执行的

 request.url:拿到正要运行的url

 @app.after_request:定制响应的函数,每次响应执行这个装饰器的函数,每次都是从下到上执行的

 请求哈响应可以有多个,如果请求给拦截了,但是所有的响应都会执行

 @app.errorhandler:定制错误的信息。

 @ app.template_global:为模板定制函数,也就是自定义模板

 @app.before_first_request:只有第一次请求来了才执行这个函数

from flask import Flask,render_template,request,redirect,session,url_for
app = Flask(__name__)
app.debug = True
app.secret_key = 'siuljskdjfs'


@app.before_request
def process_request1(*args,**kwargs):
    print('process_request1 进来了')

@app.before_request
def process_request2(*args,**kwargs):
    print('process_request2 进来了')


@app.after_request
def process_response1(response):
    print('process_response1 走了')
    return response

@app.after_request
def process_response2(response):
    print('process_response2 走了')
    return response



@app.errorhandler(404)
def error_404(arg):
    return "404错误了"


@app.before_first_request
def first(*args,**kwargs):
    pass

@app.route('/index',methods=['GET'])
def index():
    print('index函数')
    return "Index"





if __name__ == '__main__':
    app.run()
View Code

 基于中间件实现用户的认证登陆

                        @app.before_request
            def process_request(*args,**kwargs):
                if request.path == '/login':
                    return None
                user = session.get('user_info')
                if user:
                    return None
                return redirect('/login')
View Code

 模板中定制方法:

            @app.template_global()
            def sb(a1, a2):
                return a1 + a2
            {{sb(1,2)}}  

            
            @app.template_filter()
            def db(a1, a2, a3):
                return a1 + a2 + a3
            {{ 1|db(2,3)}}
View Code

十一 中间件

 中间件:在这里就是一个请求的入口

 每次请求进来都会执行app的call方法。

from flask import Flask

app = Flask(__name__)

@app.route('/')
def index():
    return 'Hello World!'

class Md(object):
    def __init__(self,old_wsgi_app):
        self.old_wsgi_app = old_wsgi_app

    def __call__(self,  environ, start_response):
        print('开始之前')
        ret = self.old_wsgi_app(environ, start_response)
        print('结束之后')
        return ret

if __name__ == '__main__':
    app.wsgi_app = Md(app.wsgi_app)
    app.run()
View Code

十二 上下文处理

 threading.local:为每一个线程开辟一个单独的内存空间来保存他自己的值

import threading

# class Foo():
#     def __init__(self):
#         self.name=0
#
# local_value=Foo()

local_value=threading.local()
def func(num):
    local_value.name=num
    import time
    time.sleep(1)
    print(local_value.name,threading.current_thread().name)

for i in range(20):
    th=threading.Thread(target=func,args=(i,),name='线程%s'%i)
    th.start()
View Code

 request:

  情况一:单线程和单进程的情况下不会有问题,应为自己使用为完毕过后,就会自动的清空request。

  情况二:单进程和多线程,threading.local对象

  情况三:但进程和单线程多个协程,theading.local对象就会出问题。

 解决方法:

  以后不支持协程:就可以使用内置的threading.local对象

  支持协程:自定义类似threading.local对象的功能支持协程

 _thread.get_ident:获取线程的一个唯一标识

 greenlet.getcurrent:获取协程额唯一标识

 自定义支持协程:

"""
{
   1368:{}
}
"""

import threading
try:
    from greenlet import getcurrent as get_ident  # 协程
except ImportError:
    try:
        from thread import get_ident
    except ImportError:
       from _thread import get_ident  # 线程

class Local():
    def __init__(self):
        self.storage={}
        self.get_ident=get_ident

    def set(self,k,v):
        ident=self.get_ident()
        origin=self.storage.get(ident)
        if not origin:
            origin={k:v}
        else:
            origin[k]=v
        self.storage[ident]=origin

    def get(self,k):
        ident=self.get_ident()
        origin=self.storage.get(ident)
        if not origin:
            return None
        return origin.get(k,None)

local_values=Local()

def task(num):
    local_values.set('name',num)
    import time
    time.sleep(1)
    print(local_values.get('name'),threading.current_thread().name)

for i in range(20):
    th=threading.Thread(target=task,args=(i,),name='线程%s'%i)
    th.start()
View Code

 反射实例:

class Foo(object):
    def __init__(self):
        object.__setattr__(self,'storage',{})

    def __setattr__(self, key, value):
        self.storage={'k1:v1'}
        print(key,value)

    def __getattr__(self, item):
        print(item)
        return 'df'

obj=Foo()
# obj.x = 123

print(obj)
View Code

 自定义支持协程的flask:使用反射实现

"""
{
   1368:{}
}
"""
import flask.globals
import threading
try:
    from greenlet import getcurrent as get_ident  # 协程
except ImportError:
    try:
        from thread import get_ident
    except ImportError:
       from _thread import get_ident  # 线程

class Local():
    def __init__(self):
        object.__setattr__(self,'__storage__',{})
        object.__setattr__(self,'__ident_func__',get_ident)

    def __setattr__(self, name, value):
        ident=self.__ident_func__()
        storage=self.__storage__
        try:
            storage[ident][name]=value
        except KeyError:
            storage[ident]={name:value}
    def __getattr__(self, name):
        try:
            return self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)


    def __delattr__(self, name):
        try:
            del self.__storage__[self.__ident_finc__()][name]
        except KeyError:
            raise AttributeError(name)

local_values=Local()

def task(num):
    local_values.name=num
    import time
    time.sleep(1)
    print(local_values.name,threading.current_thread().name)
for i in range(20):
    th=threading.Thread(target=task,args=(i,),name='线程%s'%i)
    th.start()
View Code

  补充:
  偏函数:functools模块

   functools.partial:创建一个新的函数,主要是为了给原函数传入参数

# import functools
#
# def func(a1,a2):
#     print(a1+a2)
#
#
# new_func=functools.partial(func,12)  #
# new_func(21)
View Code

  面向对象的__add__方法补充:当把面向对象中的所有__函数__实现时,对象做任何操作时,都会执行其中对应的方法。

   __add__:谁在前面就会调用谁的__add__方法

# class Foo():
#     def __init__(self,num):
#         self.num=num
#     def __add__(self,other):
#         return Foo(self.num+other.num)
# obj1=Foo(11)
# obj2=Foo(22)
# v=obj1+obj2
# print(v)
View Code

  拼接列表中的值:

   itertools.chain:传入函数,产生一个新的函数对象的列表,主要帮助我们做列表元素的拼接

    实例1:

from itertools import chain

# def a1(x):
#     return x+1
#
# func_list=[a1,lambda x:x+1]
#
# def a2(y):
#     return y+10
#
# func_list_1=chain([a2],func_list)
# for func in func_list_1:
#     print(func)
View Code

    实例2:

# v1=[1,2,3]
# v2=[4,5,6]
#
# for l in chain(v1,v2):  # chain 主要是做列表的拼接
#     print(l)
View Code

  上下文源码流程:https://www.processon.com/diagraming/5ab8c9f0e4b0d165d5b83fbb

 谈谈flask的上下文管理:

        - 与django相比是两种不同的实现方式。
            - django/tornado是通过传参数形式
            - flask是通过上下文管理
            两种都可以实现,只不过试下方式不一样。
            - 上下文管理:
                - threading.local/Local类,其中创建了一个字典{greelet做唯一标识:存数据} 保证数据隔离
                - 请求进来:
                    - 请求相关所有数据封装到了RequestContext中。
                    - 再讲RequestContext对象添加到Local中(通过LocalStack将对象添加到Local对象中)
                - 使用,调用request
                    - 调用此类方法 request.method、print(request)、request+xxx 会执行LocalProxy中对应的方法
                    - 函数
                    - 通过LocalStack去Local中获取值。
                - 请求终止
                    - 通过LocalStack的pop方法 Local中将值异常。
View Code

 补充:

    再将对象封装到Local中
    Flask可以传入任何的字符串参数
View Code

 请求上下文:请求上下文封装的就是RequestContext对象

  request:是LocalProxy对象

   这个对象实例化事传入了一个函数,还有一个request参数
   以后执行偏函数partail(_Lookup_req_object,'request')时,自动传递request参数
   目标:去Local中获取ctx,然后再在ctx中获取request
   ctx.push:里面做的事,将ctx通过LocalStack添加到Local中

  session:经过push之后,session里面已经有值了,seif.session帮助我获取session信息

 应用上下文:应用上下文封装的是AppContext

  AppContext里面封装了app对象
  app.context:创建了app对象
  app.app_ctx_global_class:相当于一个全局变量

  app和g

  g:每个请求周期都会创建一个用于在请求周期中传递值的一个容器。只有一次生命周期可用,因为一次请求周期后第二次就会重新创建
print(g):执行g的实例对象

 请求到来 ,有人来访问:

                # 将请求相关的数据environ封装到了RequestContext对象中
                # 再讲对象封装到local中(每个线程/每个协程独立空间存储)
                # ctx.app # 当前APP的名称
                # ctx.request # Request对象(封装请求相关东西)
                # ctx.session # 空
                _request_ctx_stack.local = {
                    唯一标识:{
                        "stack":[ctx, ]
                    },
                    唯一标识:{
                        "stack":[ctx, ]
                    },
                }
                
                
                # app_ctx = AppContext对象
                # app_ctx.app
                # app_ctx.g
                    
                _app_ctx_stack.local = {
                    唯一标识:{
                        "stack":[app_ctx, ]
                    },
                    唯一标识:{
                        "stack":[app_ctx, ]
                    },
                }
View Code

 使用:print打印request,session.g,current_app的时候,都会执行他们相对应的对象的__str__
他们之间不同的是偏函数不一样

                    from flask import request,session,g,current_app
                    
                    print(request,session,g,current_app)
                    
                    都会执行相应LocalProxy对象的 __str__
                    
                    current_app = LocalProxy(_find_app)
                        request = LocalProxy(partial(_lookup_req_object, 'request'))
                        session = LocalProxy(partial(_lookup_req_object, 'session'))
                        
                        current_app = LocalProxy(_find_app)
                        g = LocalProxy(partial(_lookup_app_object, 'g'))
                    
View Code

 终止,全部pop

 问题:

  如果他是多线程的时候是如何实现的。

            请求到来的时候就是两个Local,但是没有值,一共只有两个Local,进来一个线程创建自己的唯一标识。不过进来多少的线程,都只用这两个Local
View Code

  flask的local中保存数据时,使用列表创建出来的栈。为什么用栈?

                   - 如果写web程序,web运行环境;栈中永远保存1条数据(可以不用栈)。
                   - 写脚本获取app信息时,可能存在app上下文嵌套关系。
                        from flask import Flask,current_app,globals,_app_ctx_stack

                        app1 = Flask('app01')
                        app1.debug = False # 用户/密码/邮箱
                        # app_ctx = AppContext(self):
                        # app_ctx.app
                        # app_ctx.g

                        app2 = Flask('app02')
                        app2.debug = True # 用户/密码/邮箱
                        # app_ctx = AppContext(self):
                        # app_ctx.app
                        # app_ctx.g



                        with app1.app_context():# __enter__方法 -> push -> app_ctx添加到_app_ctx_stack.local
                            # {<greenlet.greenlet object at 0x00000000036E2340>: {'stack': [<flask.ctx.AppContext object at 0x00000000037CA438>]}}
                            print(_app_ctx_stack._local.__storage__)
                            print(current_app.config['DEBUG'])

                            with app2.app_context():
                                # {<greenlet.greenlet object at 0x00000000036E2340>: {'stack': [<flask.ctx.AppContext object at 0x00000000037CA438> ]}}
                                print(_app_ctx_stack._local.__storage__)
                                print(current_app.config['DEBUG'])

                            print(current_app.config['DEBUG'])
View Code

 多app应用:不仅可以使用蓝图进行分发,还可以使用app进行分发
  DispatchMiddleware:可以进行路由分发。
  在这里面没有app.run,直接可以run_simple启动

            from werkzeug.wsgi import DispatcherMiddleware
            from werkzeug.serving import run_simple
            from flask import Flask, current_app

            app1 = Flask('app01')

            app2 = Flask('app02')



            @app1.route('/index')
            def index():
                return "app01"


            @app2.route('/index2')
            def index2():
                return "app2"

            # http://www.oldboyedu.com/index
            # http://www.oldboyedu.com/sec/index2
            dm = DispatcherMiddleware(app1, {
                '/sec': app2,
            })

            if __name__ == "__main__":
                run_simple('localhost', 5000, dm)
View Code

 问题:web访问多app,上线问管理是如何实现的

  请求进来,为栈添加的还是一个值

 问题:为什么使用栈,离线脚本

  如果写web程序,或者web运行环境:栈中永远保存一条数据(这个可以不使用栈)
  写脚本获取app信息的时候,可能会存在上下文嵌套关系。这时有可能要用到栈

 问题:问题:Web访问多app应用时,上下文管理是如何实现?

 补充:

 遇到with就会执行__enter__方法,这个方法返回值,as后面那个值就是返回值
 当指执行完毕之后 ,自动调用类的__exit__方法
from flask import Flask,current_app,globals,_app_ctx_stack

app1 = Flask('app01')
app1.debug = False # 用户/密码/邮箱
# app_ctx = AppContext(self):
# app_ctx.app
# app_ctx.g

app2 = Flask('app02')
app2.debug = True # 用户/密码/邮箱
# app_ctx = AppContext(self):
# app_ctx.app
# app_ctx.g

with app1.app_context():  # # __enter__方法 -> push -> app_ctx添加到_app_ctx_stack.local
    print(_app_ctx_stack._local.__storage__)
    print(current_app.config['DEBUG'])
    with app1.app_context():
        print(_app_ctx_stack._local.__storage__)
        print(current_app.config['DEBUG'])
    print(current_app.config['DEBUG'])


with app1.app_context():
    print(_app_ctx_stack._local.__storage__)
    print(current_app.config['DEBUG'])
with app1.app_context():
    print(_app_ctx_stack._local.__storage__)
    print(current_app.config['DEBUG'])
View Code

 补充:永远两个Local对象

 实现细节:
  ResquestContext对象通过LocalStark添加到Local中
  导入的request是一个LocalProxy对象,然后在通过偏函数调用了LocakStack,在调用Local
  RequestContext的auto_pop,在执行LocalStack,再到Local中移除

十三 数据库的连接池

 前夕:

  django的常用数据库:

   ORM:django默认的数据库

   pymysql模块:导入mysql数据库,python2和python3版本都有这个模块

   MySQLdb:一样,也是导入mysql数据库,这个模块只有python2版本才有

  flask/其他:

   pymysql:导mysql数据库

   MySQLdb:导入mysql数据库

  SQLAchemy:

   是ORM的一款框架,可以导入mysql数据库(pymysql / MySQLdb)

 原生SQL:

from flask import Flask

app = Flask(__name__)

@app.route("/")
def hello():
    import pymysql
    CONN = pymysql.connect(host='127.0.0.1',
                           port=3306,
                           user='root',
                           password='123',
                           database='pooldb',
                           charset='utf8')

    cursor = CONN.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    cursor.close()

    print(result)

    return "Hello World"

if __name__ == '__main__':
    app.run()
View Code
from flask import Flask

app = Flask(__name__)
import pymysql
CONN = pymysql.connect(host='127.0.0.1',
                           port=3306,
                           user='root',
                           password='123',
                           database='pooldb',
                           charset='utf8')

@app.route("/")
def hello():
    cursor = CONN.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    cursor.close()
    print(result)
    return "Hello World"

if __name__ == '__main__':
    app.run()




######加锁#########################
from flask import Flask
import threading
app = Flask(__name__)
import pymysql
CONN = pymysql.connect(host='127.0.0.1',
                           port=3306,
                           user='root',
                           password='123',
                           database='pooldb',
                           charset='utf8')

@app.route("/")
def hello():
    with threading.Lock():
        cursor = CONN.cursor()
        cursor.execute('select * from tb1')
        result = cursor.fetchall()
        cursor.close()

        print(result)

    return "Hello World"

if __name__ == '__main__':
    app.run()
View Code

 问题:
 解决:

  不能为每一个用户创建一个链接

  创建一定数量的连接池,只要连接池有空的,才会进来,不然就会等着。

 使用UBDtils模块:

  下载网站:https://pypi.python.org/pypi/DBUtils

   pip install UBDtils

  如果安装到虚拟环境下面,需要先切换到虚拟环境下面

  使用:

   模式一:

    为每一个线程创建一个链接,即使县城调用close方法,也不会关闭掉

    占用连接池不放,浪费了连接池

import pymysql
from DBUtils.PersistentDB import PersistentDB

POOL = PersistentDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    closeable=False,
    # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
    threadlocal=None,  # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)

def func():
    conn = POOL.connection(shareable=False)
    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    cursor.close()
    conn.close()

func()
View Code

   模式二:
    设置最大限制创建连接池的数量,进来一个线程,就创建一个连接池

    如果超过连接池的限制数量,就会在那里等着有空的连接池释放出来才能够下一个线程链接进去

import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)


def func():
    # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
    # 否则
    # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
    # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
    # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
    # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
    conn = POOL.connection()

    # print(th, '链接被拿走了', conn1._con)
    # print(th, '池子里目前有', pool._idle_cache, '\r\n')

    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    conn.close()


func()
View Code

  工作实用:

                         import pymysql
                from DBUtils.PooledDB import PooledDB
                POOL = PooledDB(
                    creator=pymysql,  # 使用链接数据库的模块
                    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
                    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
                    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
                    maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
                    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
                    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
                    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
                    ping=0,
                    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
                    host='127.0.0.1',
                    port=3306,
                    user='root',
                    password='123',
                    database='pooldb',
                    charset='utf8'
                )

                """
                class SQLHelper(object):
                    
                    @staticmethod
                    def fetch_one(sql,args):
                        conn = POOL.connection()
                        cursor = conn.cursor()
                        cursor.execute(sql, args)
                        result = cursor.fetchone()
                        conn.close()
                        return result

                    @staticmethod
                    def fetch_all(self,sql,args):
                        conn = POOL.connection()
                        cursor = conn.cursor()
                        cursor.execute(sql, args)
                        result = cursor.fetchone()
                        conn.close()
                        return result
                    
                # 以后使用:
                result = SQLHelper.fetch_one('select * from xxx',[])
                print(result)
                """
View Code

 十四 信号

 什么是信号:signal 一种处理异步时间的方法。信号是POSIX系统的信号,由硬件或软件触发,再有操作系统内核发给应用程序的中断形式。POSIX由一系列的信号集。

 什么是信号量:semaphore 一种进程同步的机制。信号量是POSIX进程间通信的工具,在它上面定义了一系列操作原语,简单地讲它可以在进程间进行通信。

 flask自己没有信号,要依赖blinker这个模块 安装 pip install blinker
 内置信号:

request_started = _signals.signal('request-started')                # 请求到来前执行
request_finished = _signals.signal('request-finished')              # 请求结束后执行
 
before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行
 
got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行
 
request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 请求上下文执行完毕后自动执行(无论成功与否)
 
appcontext_pushed = _signals.signal('appcontext-pushed')            # 请求上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped')            # 请求上下文pop时执行
message_flashed = _signals.signal('message-flashed')                # 调用flask在其中添加数据时,自动触发
View Code

 信号放哪里了:放在了signals文件里面
 signals.request_started.conncet(函数名) :注册函数
 等待请求的到来
 在视图函数执行之前触法信号的
 是怎么触发信号的?
  signals.request_started.send:方法触发信号
  send是在full_dispatch_request方法中执行的

 信号的源码流程:

 a. before_first_request
 b. 触发 request_started 信号
 c. before_request
 d. 模板渲染
   渲染前的信号 before_render_template.send(app, template=template, context=context)
    rv = template.render(context) # 模板渲染
   渲染后的信号 template_rendered.send(app, template=template, context=context)
 e. after_request
 f. session.save_session()
 g. 触发 request_finished信号

 如果上述过程出错:
   触发错误处理信号 got_request_exception.send(self, exception=e)
   
 h. 触发信号 request_tearing_down
      
 由信号引发的源码流程:找扩展点
View Code

 十五 flask-session插件

 Flask中的session处理机制(内置:将session保存在加密cookie中实现)

 内置的session:

  请求刚到来:获取随机字符串,存在则去“数据库”中获取原来的个人数据,否则创建一个空容器。 --> 内存:对象(随机字符串,{放置数据的容器})

# 1. obj = 创建SecureCookieSessionInterface()
# 2. obj = open_session(self.request) = SecureCookieSession()
# self.session = SecureCookieSession()对象。
self.session = self.app.open_session(self.request)
View Code

  视图:操作内存中 对象(随机字符串,{放置数据的容器})
   响应:内存对象(随机字符串,{放置数据的容器})
  将数据保存到“数据库”
  把随机字符串写在用户cookie中。

 自定义:

  请求刚到来:

# 创建特殊字典,并添加到Local中。
# 调用关系:
# self.session_interface.open_session(self, request)
# 由于默认app中的session_interface=SecureCookieSessionInterface()
# SecureCookieSessionInterface().open_session(self, request)
# 由于默认app中的session_interface=MySessionInterFace()
# MySessionInterFace().open_session(self, request)
self.session = self.app.open_session(self.request)
View Code

  调用:

from flask import Flask,session


app = Flask(__name__)
app.secret_key = 'suijksdfsd'


import json
class MySessionInterFace(object):
    def open_session(self,app,request):
        return {}

    def save_session(self, app, session, response):
        response.set_cookie('session_idfsdfsdfsdf',json.dumps(session))

    def is_null_session(self, obj):
        """Checks if a given object is a null session.  Null sessions are
        not asked to be saved.

        This checks if the object is an instance of :attr:`null_session_class`
        by default.
        """
        return False

app.session_interface = MySessionInterFace()

@app.route('/')
def index():
    # 特殊空字典
    # 在local的ctx中找到session
    # 在空字典中写值
    # 在空字典中获取值
    session['xxx'] = 123


    return 'Index'

# # 一旦请求到来
# app.__call__
# app.wsgi_app
# app.session_interface
# app.open_session


if __name__ == '__main__':

    app.run()
View Code

session -> LocalProxy -> 偏函数 -> LocalStack -> Local
  请求终止:

# 由于默认app中的session_interface=SecureCookieSessionInterface()
# SecureCookieSessionInterface().save_session(self, app, session, response)
# 由于默认app中的session_interface=MySessionInterFace()
# MySessionInterFace().save_session(self, app, session, response)
View Code

 flask-session组件
  使用:

from flask import Flask,session
from flask_session import RedisSessionInterface

app = Flask(__name__)
app.secret_key = 'suijksdfsd'
View Code

   方式一

from redis import Redis
conn = Redis()
app.session_interface = RedisSessionInterface(conn,key_prefix='__',use_signer=False)
View Code

   方式二

from redis import Redis
from flask.ext.session import Session
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379')
Session(app)
View Code
@app.route('/')
def index():
session['xxx'] = 123
return 'Index'


if __name__ == '__main__':

app.run()
View Code

  源码:
      流程
 问题:设置cookie时,如何设定关闭浏览器则cookie失效。
  response.set_cookie('k','v',exipre=None)

十六 wtforms组建

 安装:pip3 install wtforms

 使用:

from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')
app.debug = True



class RegisterForm(Form):
    name = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired()
        ],
        widget=widgets.TextInput(),
        render_kw={'class': 'form-control'},
        default='alex'
    )

    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.')
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    pwd_confirm = simple.PasswordField(
        label='重复密码',
        validators=[
            validators.DataRequired(message='重复密码不能为空.'),
            validators.EqualTo('pwd', message="两次密码输入不一致")
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    email = html5.EmailField(
        label='邮箱',
        validators=[
            validators.DataRequired(message='邮箱不能为空.'),
            validators.Email(message='邮箱格式错误')
        ],
        widget=widgets.TextInput(input_type='email'),
        render_kw={'class': 'form-control'}
    )

    gender = core.RadioField(
        label='性别',
        choices=(
            (1, ''),
            (2, ''),
        ),
        coerce=int # “1” “2”
     )
    city = core.SelectField(
        label='城市',
        choices=(
            ('bj', '北京'),
            ('sh', '上海'),
        )
    )

    hobby = core.SelectMultipleField(
        label='爱好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        coerce=int
    )

    favor = core.SelectMultipleField(
        label='喜好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        widget=widgets.ListWidget(prefix_label=False),
        option_widget=widgets.CheckboxInput(),
        coerce=int,
        default=[1, 2]
    )

    def __init__(self, *args, **kwargs):
        super(RegisterForm, self).__init__(*args, **kwargs)
        self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))

    def validate_pwd_confirm(self, field):
        """
        自定义pwd_confirm字段规则,例:与pwd字段是否一致
        :param field:
        :return:
        """
        # 最开始初始化时,self.data中已经有所有的值

        if field.data != self.data['pwd']:
            # raise validators.ValidationError("密码不一致") # 继续后续验证
            raise validators.StopValidation("密码不一致")  # 不再继续后续验证


@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'GET':
        form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
        return render_template('register.html', form=form)
    else:
        form = RegisterForm(formdata=request.form)
        if form.validate():
            print('用户提交数据通过格式验证,提交的值为:', form.data)
        else:
            print(form.errors)
        return render_template('register.html', form=form)



if __name__ == '__main__':
    app.run()
View Code

 源码流程:

  实现方式:

                        1. 自动生成HTML
                class LoginForm(Form):
                    # 字段(内部包含正则表达式)
                    name = simple.StringField(
                        label='用户名',
                        validators=[
                            validators.DataRequired(message='用户名不能为空.'),
                            validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
                        ],
                        widget=widgets.TextInput(), # 页面上显示的插件
                        render_kw={'class': 'form-control'}

                    )
                    # 字段(内部包含正则表达式)
                    pwd = simple.PasswordField(
                        label='密码',
                        validators=[
                            validators.DataRequired(message='密码不能为空.'),
                            validators.Length(min=8, message='用户名长度必须大于%(min)d'),
                            validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
                                              message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')

                        ],
                        widget=widgets.PasswordInput(),
                        render_kw={'class': 'form-control'}
                    )
                    
                    def __iter__(self):
                        return iter([self.name,self.pwd])
                # 方式一:
                    obj = LoginForm()
                    print(obj.name) # 调用字段的__str__
                # 方式二:
                    obj = LoginForm()
                    for item in obj:
                        print(item) # 调用字段的__str__
            
View Code

  校验:

a. 后台定义好正则
b. 用户发来数据
c. 对数据进行校验
View Code

  源码实现:自动生成HTML文件
     解释:metaclass

  Metaclass作用:用来指定当前类是谁来创建的,如果不指定默认是type创建的
  类继承:只要这个指定了Metaclass,那么这个类以及他的子类都指定同一个metaclass
- MetaClass作用:用来指定当前类由谁来创建(默认type创建)。
        - 使用metaclass
            class Foo(metaclass=type):
                pass 
            
            class Foo(object):
                __metaclass__ = type
        - 类继承
            
            class MyType(type):
                def __init__(self,*args,**kwargs):
                    print('init')
                    super(MyType,self).__init__(*args,**kwargs)

                def __call__(self, *args, **kwargs):
                    print('call本质:调用类的__new__,再调用类的__init__')
                    return super(MyType,self).__call__( *args, **kwargs)


            class Foo(metaclass=MyType):
                pass

            class Bar(Foo):
                pass

            obj = Bar()
        - 问题:
            1. 什么意思?
                # 类由type来创建
                class Foo(metaclass=type)
                # 继承Type
                class Foo(type)
            2. Flask多线程:服务端开多线程
View Code

   其他方式使用:

class MyType(type):
    def __init__(self,*args,**kwargs):
        print('init')
        super(MyType,self).__init__(*args,**kwargs)

    def __call__(self, *args, **kwargs):
        print('call')
        return super(MyType,self).__call__(*args,**kwargs)

# Base=MyType('Base',(object,),{})  是有MyType创建; metaclass=MyType
#
# class Foo(Base):
#     pass
#
# Foo()
# 1. type可以创建类metaclass=type;MyType也可以创建类metaclass=MyType
# 2. Base = MyType('Base', (object,), {}) -->
# class Base(metaclass=MyType):
#     pass
# class Foo(Base):
#     pass

class Foo(MyType("Base",(object,),{})):
    # 第一个是类名,第二个就是他的父类,第三个就是属性
    pass

Foo()
View Code
class MyType(type):
    def __init__(self,*args,**kwargs):
        print('init')
        super(MyType,self).__init__(*args,**kwargs)

    def __call__(self, *args, **kwargs):
        print('call')
        return super(MyType,self).__call__(*args,**kwargs)

def with_metaclass(obj):
    return MyType('XX',(obj,),{})

class Foo(with_metaclass(object)):
    pass


Foo()
View Code
"""
1. 什么意思?

# 类由type来创建
class Foo(metaclass=type)
# 继承Type
class Foo(type)

"""


class Foo(object):
    pass
obj = Foo()
# 对象是由类创建



# 一切皆对象,类由type创建
class Foo(object):
    pass

Foo = type('Foo',(object,),{})



# 一切皆对象,类由MyType创建
class MyType(type):
    pass
Foo = MyType('Foo',(object,),{})


class Foo(object,metaclass=MyType):
    pass



# 一切皆对象,类由MyType创建
class MyType(type):
    def __init__(self, *args, **kwargs):
        print('init')
        super(MyType, self).__init__(*args, **kwargs)

    def __call__(cls, *args, **kwargs):
        print('call')
        return super(MyType, cls).__call__(*args, **kwargs)

Foo = MyType('Foo',(object,),{})


class Foo(object,metaclass=MyType):
    pass

Foo()
View Code

  实例:form = LoginForm()

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets


app = Flask(__name__, template_folder='templates')

app.debug = True

class MyForm(Form):
    '''
    创建字段,内部包含正则表达式
    '''
    name=simple.StringField(label='用户名',
                            validators=[
                                validators.DataRequired(message='用户名不能为空'),
                                validators.Length(min=6,max=18,message='用户名的长度不能小于%(min)d不能大于%(max)d')
                            ],
                            widget=widgets.TextInput(),
                            render_kw={'class': 'form-control'}
                            )
    password=simple.PasswordField(label='密码',
                                  validators=[
                                      validators.DataRequired(message='密码不能为空'),
                                      validators.Length(max=18,message='用户名不能大于%(max)'),
                                    validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
                              message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
                                  ],
                                  widget=widgets.PasswordInput(),
                                  render_kw={'class': 'form-control'}
                                  )





@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method=='GET':
        form=MyForm()
        return render_template('login.html',form=form)
    else:
        form = MyForm(formdata=request.form)
        if form.validate():
            print('用户提交数据通过格式验证,提交的值为:', form.data)
        else:
            return render_template('login.html', form=form)
if __name__ == '__main__':
    app.run()
View Code

  验证:form.validate()

import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
View Code

 补充:

  为什么要在类里面定义方法:主要是为对象的数据进行二次加工

  类的对象不能直接被for循环,若想被循环,在类里面加上__iter__方法

十七 SQLAlchmey ORM框架

 目标:类/对象操作 -> SQL -> pymysql、MySQLdb -> 再在数据库中执行。

 基本使用:这个不常见,

class MyForm(Form):
    '''
    创建字段,内部包含正则表达式
    '''
    name=simple.StringField(label='用户名',
                            validators=[
                                validators.DataRequired(message='用户名不能为空'),
                                validators.Length(min=6,max=18,message='用户名的长度不能小于%(min)d不能大于%(max)d')
                            ],
                            widget=widgets.TextInput(),
                            render_kw={'class': 'form-control'}
                            )
    password=simple.PasswordField(label='密码',
                                  validators=[
                                      validators.DataRequired(message='密码不能为空'),
                                      validators.Length(max=18,message='用户名不能大于%(max)'),
                                    validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
                              message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
                                  ],
                                  widget=widgets.PasswordInput(),
                                  render_kw={'class': 'form-control'}
                                  )
View Code

  方式一:

                    engine = create_engine(
                        "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
                        max_overflow=2,  # 超过连接池大小外最多创建的连接
                        pool_size=5,  # 连接池大小
                        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
                        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
                    )
                    
                    conn = engine.raw_connection()
                    cursor = conn.cursor()
                    cursor.execute(
                        "select * from t1"
                    )
                    result = cursor.fetchall()
                    cursor.close()
                    conn.close()
View Code

  方式二:

                    engine = create_engine(
                        "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
                        max_overflow=0,  # 超过连接池大小外最多创建的连接
                        pool_size=5,  # 连接池大小
                        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
                        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
                    )
                    
                    def task(arg):
                        conn = engine.raw_connection()
                        cursor = conn.cursor()
                        cursor.execute(
                            #"select * from t1"
                            "select sleep(2)"
                        )
                        result = cursor.fetchall()
                        cursor.close()
                        conn.close()
                     
                     
                    for i in range(20):
                        t = threading.Thread(target=task, args=(i,))
                        t.start()
                
View Code

 ORM:

                models.py 
                    #!/usr/bin/env python
                    # -*- coding:utf-8 -*-
                    import datetime
                    from sqlalchemy import create_engine
                    from sqlalchemy.ext.declarative import declarative_base
                    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

                    Base = declarative_base()

                    class Users(Base):
                        __tablename__ = 'users' # 数据库表名称
                        id = Column(Integer, primary_key=True) # id 主键
                        name = Column(String(32), index=True, nullable=False) # name列,
                        
                    def init_db():
                        """
                        根据类创建数据库表
                        :return: 
                        """
                        engine = create_engine(
                            "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
                            max_overflow=0,  # 超过连接池大小外最多创建的连接
                            pool_size=5,  # 连接池大小
                            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
                            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
                        )

                        Base.metadata.create_all(engine)


                    def drop_db():
                        """
                        根据类删除数据库表
                        :return: 
                        """
                        engine = create_engine(
                            "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
                            max_overflow=0,  # 超过连接池大小外最多创建的连接
                            pool_size=5,  # 连接池大小
                            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
                            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
                        )

                        Base.metadata.drop_all(engine)


                    if __name__ == '__main__':
                        #drop_db()
                        #init_db()
            
            
                app.py 
                    #!/usr/bin/env python
                    # -*- coding:utf-8 -*-
                    from sqlalchemy.orm import sessionmaker
                    from sqlalchemy import create_engine
                    from models import Users
                      
                     
                    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
                    Connection = sessionmaker(bind=engine)
                      
                    # 每次执行数据库操作时,都需要创建一个Connection
                    con = Connection()
                      
                      
                    # ############# 执行ORM操作 #############
                    obj1 = Users(name="alex1")
                    con.add(obj1)
                    # 提交事务
                    con.commit()
                    
                    
                    
                    # 关闭session
                    con.close()
                    
            
View Code

  详细信息:

  创建表:

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()


class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    email = Column(String(32), unique=True)
    ctime = Column(DateTime, default=datetime.datetime.now)  # 3期师兄
    extra = Column(Text, nullable=True)

    __table_args__ = (
        # UniqueConstraint('id', 'name', name='uix_id_name'),
        # Index('ix_id_name', 'name', 'email'),
    )

class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')


class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    hobby_id = Column(Integer, ForeignKey("hobby.id"))


class b2g(Base):
    __tablename__ = 'b2g'
    id = Column(Integer, primary_key=True, autoincrement=True)
    girl_id = Column(Integer, ForeignKey('girl.id'))
    boy_id = Column(Integer, ForeignKey('boy.id'))


class Girl(Base):
    __tablename__ = 'girl'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)


class Boy(Base):
    __tablename__ = 'boy'

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)



engine = create_engine(
    "mysql+pymysql://root:0410@127.0.0.1:3306/sqlachemy?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

Base.metadata.create_all(engine)
# Base.metadata.drop_all(engine)
View Code

  对于表的增删改查:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text

from db import Users, Hosts

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

session = Session()

# ################ 添加 ################
"""
obj1 = Users(name="wupeiqi")
session.add(obj1)

session.add_all([
    Users(name="wupeiqi"),
    Users(name="alex"),
    Hosts(name="c1.com"),
])
session.commit()
"""

# ################ 删除 ################
"""
session.query(Users).filter(Users.id > 2).delete()
session.commit()
"""
# ################ 修改 ################
"""
session.query(Users).filter(Users.id > 0).update({"name" : "099"})
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
session.commit()
"""
# ################ 查询 ################
"""
r1 = session.query(Users).all()
r2 = session.query(Users.name.label('xx'), Users.age).all()
r3 = session.query(Users).filter(Users.name == "alex").all()
r4 = session.query(Users).filter_by(name='alex').all()
r5 = session.query(Users).filter_by(name='alex').first()
r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()
r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
"""


session.close()
View Code

  查看的其他操作:

# 条件
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()


# 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 限制
ret = session.query(Users)[1:2]

# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分组
from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()

ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 连表

ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()

ret = session.query(Person).join(Favor).all()

ret = session.query(Person).join(Favor, isouter=True).all()


# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()
View Code

  原生的sql语句:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

session = Session()

# 查询
# cursor = session.execute('select * from users')
# result = cursor.fetchall()

# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
session.commit()
print(cursor.lastrowid)

session.close()
View Code

  多表操作:

一对多:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 添加
"""
session.add_all([
    Hobby(caption='乒乓球'),
    Hobby(caption='羽毛球'),
    Person(name='张三', hobby_id=3),
    Person(name='李四', hobby_id=4),
])

person = Person(name='张九', hobby=Hobby(caption='姑娘'))
session.add(person)

hb = Hobby(caption='人妖')
hb.pers = [Person(name='文飞'), Person(name='博雅')]
session.add(hb)

session.commit()
"""

# 使用relationship正向查询
"""
v = session.query(Person).first()
print(v.name)
print(v.hobby.caption)
"""

# 使用relationship反向查询
"""
v = session.query(Hobby).first()
print(v.caption)
print(v.pers)
"""

session.close()


多对多:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 添加
"""
session.add_all([
    Server(hostname='c1.com'),
    Server(hostname='c2.com'),
    Group(name='A组'),
    Group(name='B组'),
])
session.commit()

s2g = Server2Group(server_id=1, group_id=1)
session.add(s2g)
session.commit()


gp = Group(name='C组')
gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')]
session.add(gp)
session.commit()


ser = Server(hostname='c6.com')
ser.groups = [Group(name='F组'),Group(name='G组')]
session.add(ser)
session.commit()
"""


# 使用relationship正向查询
"""
v = session.query(Group).first()
print(v.name)
print(v.servers)
"""

# 使用relationship反向查询
"""
v = session.query(Server).first()
print(v.hostname)
print(v.groups)
"""


session.close()
View Code

  其他操作:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text, func
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()

# 关联子查询
subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
result = session.query(Group.name, subqry)
"""
SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid 
FROM server 
WHERE server.id = `group`.id) AS anon_1 
FROM `group`
"""


# 原生SQL
"""
# 查询
cursor = session.execute('select * from users')
result = cursor.fetchall()

# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
session.commit()
print(cursor.lastrowid)
"""

session.close()

其他
View Code

  基于scoped_session实现线程安全:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Users

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

"""
# 线程安全,基于本地线程实现每个线程用同一个session
# 特殊的:scoped_session中有原来方法的Session中的一下方法:

public_methods = (
    '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
    'close', 'commit', 'connection', 'delete', 'execute', 'expire',
    'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
    'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
    'bulk_update_mappings',
    'merge', 'query', 'refresh', 'rollback',
    'scalar'
)
"""
session = scoped_session(Session)


# ############# 执行ORM操作 #############
obj1 = Users(name="alex1")
session.add(obj1)



# 提交事务
session.commit()
# 关闭session
session.close()
View Code

  多线程执行实例:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from db import Users

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)


def task(arg):
    session = Session()

    obj1 = Users(name="alex1")
    session.add(obj1)

    session.commit()


for i in range(10):
    t = threading.Thread(target=task, args=(i,))
    t.start()

多线程执行示例
View Code

 flask_sqlalchemy:Flask-SQLAlchemy(文件和目录的管理)

  Flask和SQLAlchemy的管理
 - db = SQLAlchemy()
            - 包含配置
            - 包含ORM基类
            - 包含create_all
            - engine
            - 创建连接
                
        # 目录结构保存好
        
View Code

 flask_sqlalchemy实例:https://pan.baidu.com/s/1IL68-68tBDluDtqsB1NS1g

 补充:

3. pipreqs
    
    pip3 install pipreqs
        
    pipreqs ./
        
View Code

 十八 flask_script和flask_migrate

 flask_script:功能相当于django中的manage文件,用于启动flask项目使用的,python manage.py runserver

 flask_migrate:相当于django中的数据库迁移,makemigrations/migrate -> migrate/upgrade

  批量导入:xlrd/xlwt

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
生成依赖文件:
    pipreqs ./

"""
from sansa import create_app,db
from flask_script import Manager
from flask_migrate import Migrate,MigrateCommand

app = create_app()
manager = Manager(app)
migrate = Migrate(app, db)


"""
# 数据库迁移命名
    python manage.py db init
    python manage.py db migrate
    python manage.py db upgrade
"""
manager.add_command('db', MigrateCommand)


@manager.command
def custom(arg):
    """
    自定义命令
    python manage.py custom 123
    :param arg:
    :return:
    """
    print(arg)


@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
    """
    自定义命令
    执行: python manage.py  cmd -n wupeiqi -u http://www.oldboyedu.com
    :param name:
    :param url:
    :return:
    """
    print(name, url)



if __name__ == '__main__':
    manager.run()

"""
1. 运行程序时:
    python manage.py runserver 
    

"""
View Code

 补充:  

 flask和django的其他导入静态文件

  

 

转载于:https://www.cnblogs.com/fangjie0410/p/8608536.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/180980
推荐阅读
相关标签
  

闽ICP备14008679号