赞
踩
这是我在做用户认证开发过程中看到一位大神写的文章,不过源地址已经失效了,希望有可能未来还能看到传送门。在此转载是不忍心这么好的文章绝版
我在 github
上找到了作者的源码,有需要的可以去下载https://github.com/yaoyonstudio/flask-pyjwt-auth
在程序开发中,用户认证授权是一个绕不过的重难点。以前的开发模式下,cookie和session认证是主流,随着前后端分离的趋势,基于Token的认证方式成为主流,而JWT是基于Token认证方式的一种机制,是实现单点登录认证的一种有效方法。
PyJWT是一个用来编码和解码JWT(JSON Web Tokens)的Python库,也可以用在Flask上。本文就通过一个实例来演示Flask项目整合PyJWT来实现基于Token的用户认证授权。
1、程序将实现一个用户注册、登录和获取用户信息的功能
2、用户注册时输入用户名(username)、邮箱(email)和密码(password),用户名和邮箱是唯一的,如果数据库中已有则会注册失败;用户注册成功后返回用户的信息。
3、用户使用用户名(username)和密码(password)登录,登录成功时返回token,每次登录都会更新一次token。
4、用户要获取用户信息,需要在请求Header中传入验证参数和token,程序会验证这个token的有效性并给出响应。
5、程序构建方面,将用户和认证分列两个模块。
根据示例需求构建程序目录结构:
数据迁移配置文件:
flask-pyjwt-auth/db.py
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
from run import app
from app import db
app.config.from_object('app.config')
db.init_app(app)
migrate = Migrate(app, db)
manager = Manager(app)
manager.add_command('db', MigrateCommand)
if __name__ == '__main__':
manager.run()
运行入口文件:
flask-pyjwt-auth/run.py
from app import create_app
app = create_app('app.config')
if __name__ == '__main__':
app.run(host=app.config['HOST'],
port=app.config['PORT'],
debug=app.config['DEBUG'])
程序初始化文件:
flask-pyjwt-auth/app/__init__.py
from flask import Flask, request from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() def create_app(config_filename): app = Flask(__name__) app.config.from_object(config_filename) @app.after_request def after_request(response): response.headers.add('Access-Control-Allow-Origin', '*') if request.method == 'OPTIONS': response.headers['Access-Control-Allow-Methods'] = 'DELETE, GET, POST, PUT' headers = request.headers.get('Access-Control-Request-Headers') if headers: response.headers['Access-Control-Allow-Headers'] = headers return response from app.users.model import db db.init_app(app) from app.users.api import init_api init_api(app) return app
上面代码加入了全局HTTP请求头配置,设置允许所有跨域请求。
配置文件:
flask-pyjwt-auth/app/config.py
DB_USER = 'root'
DB_PASSWORD = ''
DB_HOST = 'localhost'
DB_DB = 'flask-pyjwt-auth'
DEBUG = True
PORT = 3333
HOST = "192.168.1.141"
SECRET_KEY = "my blog"
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_DATABASE_URI = 'mysql://' + DB_USER + ':' + DB_PASSWORD + '@' + DB_HOST + '/' + DB_DB
公共文件:
flask-pyjwt-auth/app/common.py
def trueReturn(data, msg):
return {
"status": True,
"data": data,
"msg": msg
}
def falseReturn(data, msg):
return {
"status": False,
"data": data,
"msg": msg
}
模块入口(空)
flask-pyjwt-auth/app/users/__init__.py
#
用户模型:
flask-pyjwt-auth/app/users/model.py
from flask_sqlalchemy import SQLAlchemy from sqlalchemy.exc import SQLAlchemyError from werkzeug.security import generate_password_hash, check_password_hash from app import db class Users(db.Model): id = db.Column(db.Integer, primary_key=True) email = db.Column(db.String(250), unique=True, nullable=False) username = db.Column(db.String(250), unique=True, nullable=False) password = db.Column(db.String(250)) login_time = db.Column(db.Integer) def __init__(self, username, password, email): self.username = username self.password = password self.email = email def __str__(self): return "Users(id='%s')" % self.id def set_password(self, password): return generate_password_hash(password) def check_password(self, hash, password): return check_password_hash(hash, password) def get(self, id): return self.query.filter_by(id=id).first() def add(self, user): db.session.add(user) return session_commit() def update(self): return session_commit() def delete(self, id): self.query.filter_by(id=id).delete() return session_commit() def session_commit(): try: db.session.commit() except SQLAlchemyError as e: db.session.rollback() reason = str(e) return reason
在上面用户模型定义中,定义了set_password和check_password方法,分别用来加密用户注册时填写的密码(将加密后的密码写入数据库)和在用户登录时检查用户密码是否正确。
用户相关接口实现:
flask-pyjwt-auth/app/users/api.py
from flask import jsonify, request from app.users.model import Users from app.auth.auths import Auth from .. import common def init_api(app): @app.route('/register', methods=['POST']) def register(): """ 用户注册 :return: json """ email = request.form.get('email') username = request.form.get('username') password = request.form.get('password') user = Users(email=email, username=username, password=Users.set_password(Users, password)) result = Users.add(Users, user) if user.id: returnUser = { 'id': user.id, 'username': user.username, 'email': user.email, 'login_time': user.login_time } return jsonify(common.trueReturn(returnUser, "用户注册成功")) else: return jsonify(common.falseReturn('', '用户注册失败')) @app.route('/login', methods=['POST']) def login(): """ 用户登录 :return: json """ username = request.form.get('username') password = request.form.get('password') if (not username or not password): return jsonify(common.falseReturn('', '用户名和密码不能为空')) else: return Auth.authenticate(Auth, username, password) @app.route('/user', methods=['GET']) def get(): """ 获取用户信息 :return: json """ result = Auth.identify(Auth, request) if (result['status'] and result['data']): user = Users.get(Users, result['data']) returnUser = { 'id': user.id, 'username': user.username, 'email': user.email, 'login_time': user.login_time } result = common.trueReturn(returnUser, "请求成功") return jsonify(result)
上面用户模块的API实现代码中,先从auth模块中导入Auth类,在用户登录接口中,调用Auth类的authenticate方法来执行用户认证,认证通过则返回token,认证不通过则返回错误信息。在获取用户信息的接口,首先要进行“用户鉴权”,只有拥有权限的用户才有权限拿到用户信息。
模块入口(空)
flask-pyjwt-auth/app/auth/__init__.py
#
授权认证处理:
flask-pyjwt-auth/app/auth/auths.py
import jwt, datetime, time from flask import jsonify from app.users.model import Users from .. import config from .. import common class Auth(): @staticmethod def encode_auth_token(user_id, login_time): """ 生成认证Token :param user_id: int :param login_time: int(timestamp) :return: string """ try: payload = { 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=10), 'iat': datetime.datetime.utcnow(), 'iss': 'ken', 'data': { 'id': user_id, 'login_time': login_time } } return jwt.encode( payload, config.SECRET_KEY, algorithm='HS256' ) except Exception as e: return e @staticmethod def decode_auth_token(auth_token): """ 验证Token :param auth_token: :return: integer|string """ try: # payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'), leeway=datetime.timedelta(seconds=10)) # 取消过期时间验证 payload = jwt.decode(auth_token, config.SECRET_KEY, options={'verify_exp': False}) if ('data' in payload and 'id' in payload['data']): return payload else: raise jwt.InvalidTokenError except jwt.ExpiredSignatureError: return 'Token过期' except jwt.InvalidTokenError: return '无效Token' def authenticate(self, username, password): """ 用户登录,登录成功返回token,写将登录时间写入数据库;登录失败返回失败原因 :param password: :return: json """ userInfo = Users.query.filter_by(username=username).first() if (userInfo is None): return jsonify(common.falseReturn('', '找不到用户')) else: if (Users.check_password(Users, userInfo.password, password)): login_time = int(time.time()) userInfo.login_time = login_time Users.update(Users) token = self.encode_auth_token(userInfo.id, login_time) return jsonify(common.trueReturn(token.decode(), '登录成功')) else: return jsonify(common.falseReturn('', '密码不正确')) def identify(self, request): """ 用户鉴权 :return: list """ auth_header = request.headers.get('Authorization') if (auth_header): auth_tokenArr = auth_header.split(" ") if (not auth_tokenArr or auth_tokenArr[0] != 'JWT' or len(auth_tokenArr) != 2): result = common.falseReturn('', '请传递正确的验证头信息') else: auth_token = auth_tokenArr[1] payload = self.decode_auth_token(auth_token) if not isinstance(payload, str): user = Users.get(Users, payload['data']['id']) if (user is None): result = common.falseReturn('', '找不到该用户信息') else: if (user.login_time == payload['data']['login_time']): result = common.trueReturn(user.id, '请求成功') else: result = common.falseReturn('', 'Token已更改,请重新登录获取') else: result = common.falseReturn('', payload) else: result = common.falseReturn('', '没有提供认证token') return result
认证模块实现token的生成、解析,以及用户的认证和鉴权。
首先要安装PyJWT
Pip install pyjwt
认证模块的实现主要包括下面4个部分(方法):
(1)encode_auth_token方法用来生成认证Token
要生成Token需要用到pyjwt的encode方法,这个方法可以传入三个参数,如示例:
jwt.encode(payload, config.SECRET_KEY, algorithm=’HS256′)
上面代码的jwt.encode方法中传入了三个参数:第一个是payload,这是认证依据的主要信息,第二个是密钥,这里是读取配置文件中的SECRET_KEY配置变量,第三个是生成Token的算法。
这里稍微讲一下payload,这是认证的依据,也是后续解析token后定位用户的依据,需要包含特定用户的特定信息,如本例注册了data声明,data声明中包括了用户ID和用户登录时间两个参数,在“用户鉴权”方法中,解析token完成后要利用这个用户ID来查找并返回用户信息给用户。这里的data声明是我们自己加的,pyjwt内置注册了以下几个声明:
要注意的是”exp”过期时间是按当地时间确定,所以设置时要使用utc时间。
(2)decode_auth_token方法用于Token验证
这里的Token验证主要包括过期时间验证和声明验证。使用pyjwt的decode方法解析Token,得到payload。如:
jwt.decode(auth_token, config.SECRET_KEY, options={‘verify_exp’: False})
上面的options设置不验证过期时间,如果不设置这个选项,token将在原payload中设置的过期时间后过期。
经过上面解析后,得到的payload可以跟原来生成payload进行比较来验证token的有效性。
(3)authenticate方法用于用户登录验证
这个方法进行用户登录验证,如果通过验证,先把登录时间写入用户记录,再调用上面第一个方法生成token,返回给用户(用户登录成功后,据此token来获取用户信息或其他操作)。
(4)identify方法用于用户鉴权
当用户有了token后,用户可以拿token去执行一些需要token才能执行的操作。这个用户鉴权方法就是进一步检查用户的token,如果完全符合条件则返回用户需要的信息或执行用户的操作。
用户鉴权的操作首先判断一个用户是否正确传递token,这里使用header的方式来传递,并要求header传值字段名为“Authorization”,字段值以“JWT”开头,并与token用“ ”(空格)隔开。
用户按正确的方式传递token后,再调用decode_auth_token方法来解析token,如果解析正确,获取解析出来的用户信息(user_id)并到数据库中查找详细信息返回给用户。
PyJWT的使用比较简单,也比较安全,本文基本涵盖了Flask和PyJWT的整合和使用过程,希望对大家有用。本文完。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。