当前位置:   article > 正文

Flask + PyJWT 实现基于Json Web Token的用户认证授权_flask-pyjwt-auth

flask-pyjwt-auth

这是我在做用户认证开发过程中看到一位大神写的文章,不过源地址已经失效了,希望有可能未来还能看到传送门。在此转载是不忍心这么好的文章绝版

我在 github 上找到了作者的源码,有需要的可以去下载https://github.com/yaoyonstudio/flask-pyjwt-auth

在这里插入图片描述
在程序开发中,用户认证授权是一个绕不过的重难点。以前的开发模式下,cookie和session认证是主流,随着前后端分离的趋势,基于Token的认证方式成为主流,而JWT是基于Token认证方式的一种机制,是实现单点登录认证的一种有效方法。

PyJWT是一个用来编码和解码JWT(JSON Web Tokens)的Python库,也可以用在Flask上。本文就通过一个实例来演示Flask项目整合PyJWT来实现基于Token的用户认证授权。

一、需求

1、程序将实现一个用户注册、登录和获取用户信息的功能

2、用户注册时输入用户名(username)、邮箱(email)和密码(password),用户名和邮箱是唯一的,如果数据库中已有则会注册失败;用户注册成功后返回用户的信息。

3、用户使用用户名(username)和密码(password)登录,登录成功时返回token,每次登录都会更新一次token。

4、用户要获取用户信息,需要在请求Header中传入验证参数和token,程序会验证这个token的有效性并给出响应。

5、程序构建方面,将用户和认证分列两个模块。

二、程序目录结构

根据示例需求构建程序目录结构:

在这里插入图片描述

三、程序实现
1、程序构建及相关文件

数据迁移配置文件:

flask-pyjwt-auth/db.py

from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
from run import app
from app import db

app.config.from_object('app.config')

db.init_app(app)

migrate = Migrate(app, db)
manager = Manager(app)
manager.add_command('db', MigrateCommand)

if __name__ == '__main__':
    manager.run()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

运行入口文件:

flask-pyjwt-auth/run.py

from app import create_app

app = create_app('app.config')

if __name__ == '__main__':
    app.run(host=app.config['HOST'],
            port=app.config['PORT'],
            debug=app.config['DEBUG'])
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

程序初始化文件:

flask-pyjwt-auth/app/__init__.py

from flask import Flask, request
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

def create_app(config_filename):
    app = Flask(__name__)
    app.config.from_object(config_filename)

    @app.after_request
    def after_request(response):
        response.headers.add('Access-Control-Allow-Origin', '*')
        if request.method == 'OPTIONS':
            response.headers['Access-Control-Allow-Methods'] = 'DELETE, GET, POST, PUT'
            headers = request.headers.get('Access-Control-Request-Headers')
            if headers:
                response.headers['Access-Control-Allow-Headers'] = headers
        return response

    from app.users.model import db
    db.init_app(app)

    from app.users.api import init_api
    init_api(app)

    return app
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

上面代码加入了全局HTTP请求头配置,设置允许所有跨域请求。

配置文件:

flask-pyjwt-auth/app/config.py

DB_USER = 'root'
DB_PASSWORD = ''
DB_HOST = 'localhost'
DB_DB = 'flask-pyjwt-auth'

DEBUG = True
PORT = 3333
HOST = "192.168.1.141"
SECRET_KEY = "my blog"

SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_DATABASE_URI = 'mysql://' + DB_USER + ':' + DB_PASSWORD + '@' + DB_HOST + '/' + DB_DB
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

公共文件:

flask-pyjwt-auth/app/common.py

def trueReturn(data, msg):
    return {
        "status": True,
        "data": data,
        "msg": msg
    }


def falseReturn(data, msg):
    return {
        "status": False,
        "data": data,
        "msg": msg
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
2、用户模块

模块入口(空)

flask-pyjwt-auth/app/users/__init__.py

#
  • 1

用户模型:

flask-pyjwt-auth/app/users/model.py

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.security import generate_password_hash, check_password_hash

from app import db

class Users(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    email = db.Column(db.String(250),  unique=True, nullable=False)
    username = db.Column(db.String(250),  unique=True, nullable=False)
    password = db.Column(db.String(250))
    login_time = db.Column(db.Integer)

    def __init__(self, username, password, email):
        self.username = username
        self.password = password
        self.email = email

    def __str__(self):
        return "Users(id='%s')" % self.id

    def set_password(self, password):
        return generate_password_hash(password)

    def check_password(self, hash, password):
        return check_password_hash(hash, password)

    def get(self, id):
        return self.query.filter_by(id=id).first()

    def add(self, user):
        db.session.add(user)
        return session_commit()

    def update(self):
        return session_commit()

    def delete(self, id):
        self.query.filter_by(id=id).delete()
        return session_commit()


def session_commit():
    try:
        db.session.commit()
    except SQLAlchemyError as e:
        db.session.rollback()
        reason = str(e)
        return reason
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

在上面用户模型定义中,定义了set_password和check_password方法,分别用来加密用户注册时填写的密码(将加密后的密码写入数据库)和在用户登录时检查用户密码是否正确。

用户相关接口实现:

flask-pyjwt-auth/app/users/api.py

from flask import jsonify, request
from app.users.model import Users
from app.auth.auths import Auth
from .. import common

def init_api(app):
    @app.route('/register', methods=['POST'])
    def register():
        """
        用户注册
        :return: json
        """
        email = request.form.get('email')
        username = request.form.get('username')
        password = request.form.get('password')
        user = Users(email=email, username=username, password=Users.set_password(Users, password))
        result = Users.add(Users, user)
        if user.id:
            returnUser = {
                'id': user.id,
                'username': user.username,
                'email': user.email,
                'login_time': user.login_time
            }
            return jsonify(common.trueReturn(returnUser, "用户注册成功"))
        else:
            return jsonify(common.falseReturn('', '用户注册失败'))


    @app.route('/login', methods=['POST'])
    def login():
        """
        用户登录
        :return: json
        """
        username = request.form.get('username')
        password = request.form.get('password')
        if (not username or not password):
            return jsonify(common.falseReturn('', '用户名和密码不能为空'))
        else:
            return Auth.authenticate(Auth, username, password)


    @app.route('/user', methods=['GET'])
    def get():
        """
        获取用户信息
        :return: json
        """
        result = Auth.identify(Auth, request)
        if (result['status'] and result['data']):
            user = Users.get(Users, result['data'])
            returnUser = {
                'id': user.id,
                'username': user.username,
                'email': user.email,
                'login_time': user.login_time
            }
            result = common.trueReturn(returnUser, "请求成功")
        return jsonify(result)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

上面用户模块的API实现代码中,先从auth模块中导入Auth类,在用户登录接口中,调用Auth类的authenticate方法来执行用户认证,认证通过则返回token,认证不通过则返回错误信息。在获取用户信息的接口,首先要进行“用户鉴权”,只有拥有权限的用户才有权限拿到用户信息。

3、认证模块

模块入口(空)

flask-pyjwt-auth/app/auth/__init__.py

#
  • 1

授权认证处理:

flask-pyjwt-auth/app/auth/auths.py

import jwt, datetime, time
from flask import jsonify
from app.users.model import Users
from .. import config
from .. import common

class Auth():
    @staticmethod
    def encode_auth_token(user_id, login_time):
        """
        生成认证Token
        :param user_id: int
        :param login_time: int(timestamp)
        :return: string
        """
        try:
            payload = {
                'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=10),
                'iat': datetime.datetime.utcnow(),
                'iss': 'ken',
                'data': {
                    'id': user_id,
                    'login_time': login_time
                }
            }
            return jwt.encode(
                payload,
                config.SECRET_KEY,
                algorithm='HS256'
            )
        except Exception as e:
            return e

    @staticmethod
    def decode_auth_token(auth_token):
        """
        验证Token
        :param auth_token:
        :return: integer|string
        """
        try:
            # payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'), leeway=datetime.timedelta(seconds=10))
            # 取消过期时间验证
            payload = jwt.decode(auth_token, config.SECRET_KEY, options={'verify_exp': False})
            if ('data' in payload and 'id' in payload['data']):
                return payload
            else:
                raise jwt.InvalidTokenError
        except jwt.ExpiredSignatureError:
            return 'Token过期'
        except jwt.InvalidTokenError:
            return '无效Token'


    def authenticate(self, username, password):
        """
        用户登录,登录成功返回token,写将登录时间写入数据库;登录失败返回失败原因
        :param password:
        :return: json
        """
        userInfo = Users.query.filter_by(username=username).first()
        if (userInfo is None):
            return jsonify(common.falseReturn('', '找不到用户'))
        else:
            if (Users.check_password(Users, userInfo.password, password)):
                login_time = int(time.time())
                userInfo.login_time = login_time
                Users.update(Users)
                token = self.encode_auth_token(userInfo.id, login_time)
                return jsonify(common.trueReturn(token.decode(), '登录成功'))
            else:
                return jsonify(common.falseReturn('', '密码不正确'))

    def identify(self, request):
        """
        用户鉴权
        :return: list
        """
        auth_header = request.headers.get('Authorization')
        if (auth_header):
            auth_tokenArr = auth_header.split(" ")
            if (not auth_tokenArr or auth_tokenArr[0] != 'JWT' or len(auth_tokenArr) != 2):
                result = common.falseReturn('', '请传递正确的验证头信息')
            else:
                auth_token = auth_tokenArr[1]
                payload = self.decode_auth_token(auth_token)
                if not isinstance(payload, str):
                    user = Users.get(Users, payload['data']['id'])
                    if (user is None):
                        result = common.falseReturn('', '找不到该用户信息')
                    else:
                        if (user.login_time == payload['data']['login_time']):
                            result = common.trueReturn(user.id, '请求成功')
                        else:
                            result = common.falseReturn('', 'Token已更改,请重新登录获取')
                else:
                    result = common.falseReturn('', payload)
        else:
            result = common.falseReturn('', '没有提供认证token')
        return result
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100

认证模块实现token的生成、解析,以及用户的认证和鉴权。

首先要安装PyJWT

Pip install pyjwt

认证模块的实现主要包括下面4个部分(方法):

(1)encode_auth_token方法用来生成认证Token

要生成Token需要用到pyjwt的encode方法,这个方法可以传入三个参数,如示例:

jwt.encode(payload, config.SECRET_KEY, algorithm=’HS256′)

上面代码的jwt.encode方法中传入了三个参数:第一个是payload,这是认证依据的主要信息,第二个是密钥,这里是读取配置文件中的SECRET_KEY配置变量,第三个是生成Token的算法。

这里稍微讲一下payload,这是认证的依据,也是后续解析token后定位用户的依据,需要包含特定用户的特定信息,如本例注册了data声明,data声明中包括了用户ID和用户登录时间两个参数,在“用户鉴权”方法中,解析token完成后要利用这个用户ID来查找并返回用户信息给用户。这里的data声明是我们自己加的,pyjwt内置注册了以下几个声明:

  • “exp”: 过期时间
  • “nbf”: 表示当前时间在nbf里的时间之前,则Token不被接受
  • “iss”: token签发者
  • “aud”: 接收者
  • “iat”: 发行时间

要注意的是”exp”过期时间是按当地时间确定,所以设置时要使用utc时间。

(2)decode_auth_token方法用于Token验证

这里的Token验证主要包括过期时间验证和声明验证。使用pyjwt的decode方法解析Token,得到payload。如:

jwt.decode(auth_token, config.SECRET_KEY, options={‘verify_exp’: False})

上面的options设置不验证过期时间,如果不设置这个选项,token将在原payload中设置的过期时间后过期。

经过上面解析后,得到的payload可以跟原来生成payload进行比较来验证token的有效性。

(3)authenticate方法用于用户登录验证

这个方法进行用户登录验证,如果通过验证,先把登录时间写入用户记录,再调用上面第一个方法生成token,返回给用户(用户登录成功后,据此token来获取用户信息或其他操作)。

(4)identify方法用于用户鉴权

当用户有了token后,用户可以拿token去执行一些需要token才能执行的操作。这个用户鉴权方法就是进一步检查用户的token,如果完全符合条件则返回用户需要的信息或执行用户的操作。

用户鉴权的操作首先判断一个用户是否正确传递token,这里使用header的方式来传递,并要求header传值字段名为“Authorization”,字段值以“JWT”开头,并与token用“ ”(空格)隔开。

用户按正确的方式传递token后,再调用decode_auth_token方法来解析token,如果解析正确,获取解析出来的用户信息(user_id)并到数据库中查找详细信息返回给用户。

四、运行结果
1、注册成功

在这里插入图片描述

2、注册失败

在这里插入图片描述

3、登录成功

在这里插入图片描述

4、登录失败

在这里插入图片描述

5、成功获取用户信息

在这里插入图片描述

6、用户重新登录,token变更,原token无法获取用户信息

在这里插入图片描述

7、不带token请求,无法获取用户信息

在这里插入图片描述

PyJWT的使用比较简单,也比较安全,本文基本涵盖了Flask和PyJWT的整合和使用过程,希望对大家有用。本文完。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号