当前位置:   article > 正文

Python开发篇——如何在Flask下编写JWT登录_python flask jwt

python flask jwt

首先,HTTP 是无状态的协议(对于事务处理没有记忆能力,每次客户端和服务端会话完成时,服务端不会保存任何会话信息)——每个请求都是完全独立的,服务端无法确认当前访问者的身份信息,无法分辨上一次的请求发送者和这一次的发送者是不是同一个人。所以服务器与浏览器为了进行会话跟踪(知道是谁在访问自己),就必须主动的去维护一个状态,这个状态用于告知服务端前后两个请求是否来自同一浏览器。为此,前端开发者便加入了Cookie来实现有状态的HTTP连接。而后实现授权的方式就有cookie、session、token和JWT。

什么是 JWT?

JWT.IO 解释:JSON Web Token (JWT) 是一个开放标准 ( RFC 7519 ),它定义了一种紧凑且自包含的方式,用于在各方之间作为 JSON 对象安全地传输信息。该信息可以被验证和信任,因为它是经过数字签名的。JWT 可以使用秘密(使用HMAC算法)或使用RSA或ECDSA的公钥/私钥对进行签名。

案例

由于网上许多案例都为HS256(对称加密),所以这里我使用RSA256(非对称加密)作为补充。

  1. 首先需要生成私钥和公钥

    • 查阅《Generate OpenSSL RSA Key Pair using genpkey》得到了带密码的pem文件, 但是在使用中会出现TypeError: Password was not given but private key is encrypted的错误。

    • 从《How to generate JWT RS256 key》找到了解决办法

        ssh-keygen -t rsa -b 4096 -m PEM -f jwtRS256.key
        # Don't add passphrase
        openssl rsa -in jwtRS256.key -pubout -outform PEM -out jwtRS256.key.pub
        cat jwtRS256.key
        cat jwtRS256.key.pub
      
      • 1
      • 2
      • 3
      • 4
      • 5
  2. 选择Python的JWT库,我这里选择了两个库

    • PyJWT(需要cryptography库)

        >>> import jwt
        >>> with open('jwtRS256.key', 'rb') as f:
        ...    private_key = f.read()
        ...
        >>> with open('jwtRS256.key.pub', 'rb') as f:
        ...    public_key = f.read()
        ...
        >>> print(encoded)
        eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg
        >>> decoded = jwt.decode(encoded, public_key, algorithms=["RS256"])
        {'some': 'payload'}
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
    • Authlib

        >>> from authlib.jose import jwt
        >>> header = {'alg': 'RS256'}
        >>> payload = {'iss': 'Authlib', 'sub': '123', ...}
        >>> with open('jwtRS256.key', 'rb') as f:
        ...    private_key = f.read()
        ...
        >>> s = jwt.encode(header, payload, private_key)
        >>> with open('jwtRS256.key.pub', 'rb') as f:
        ...    public_key = f.read()
        ...
        >>> claims = jwt.decode(s, public_key)
        >>> print(claims)
        {'iss': 'Authlib', 'sub': '123', ...}
        >>> print(claims.header)
        {'alg': 'RS256', 'typ': 'JWT'}
        >>> claims.validate()
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
  3. 工作原理
    HTTP中JWT工作原理

Using JWT for user authentication in Flask》中的代码参考:

# flask imports
from flask import Flask, request, jsonify, make_response
from flask_sqlalchemy import SQLAlchemy
import uuid # for public id
from  werkzeug.security import generate_password_hash, check_password_hash
# imports for PyJWT authentication
import jwt
from datetime import datetime, timedelta
from functools import wraps

# creates Flask object
app = Flask(__name__)
# configuration
# NEVER HARDCODE YOUR CONFIGURATION IN YOUR CODE
# INSTEAD CREATE A .env FILE AND STORE IN IT
app.config['SECRET_KEY'] = 'your secret key'
# database name
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///Database.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
# creates SQLALCHEMY object
db = SQLAlchemy(app)

# Database ORMs
class User(db.Model):
    id = db.Column(db.Integer, primary_key = True)
    public_id = db.Column(db.String(50), unique = True)
    name = db.Column(db.String(100))
    email = db.Column(db.String(70), unique = True)
    password = db.Column(db.String(80))

# decorator for verifying the JWT
def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = None
        # jwt is passed in the request header
        if 'x-access-token' in request.headers:
            token = request.headers['x-access-token']
        # return 401 if token is not passed
        if not token:
            return jsonify({'message' : 'Token is missing !!'}), 401

        try:
            # decoding the payload to fetch the stored details
            data = jwt.decode(token, app.config['SECRET_KEY'])
            current_user = User.query\
                .filter_by(public_id = data['public_id'])\
                .first()
        except:
            return jsonify({
                'message' : 'Token is invalid !!'
            }), 401
        # returns the current logged in users contex to the routes
        return  f(current_user, *args, **kwargs)

    return decorated

# User Database Route
# this route sends back list of users users
@app.route('/user', methods =['GET'])
@token_required
def get_all_users(current_user):
    # querying the database
    # for all the entries in it
    users = User.query.all()
    # converting the query objects
    # to list of jsons
    output = []
    for user in users:
        # appending the user data json
        # to the response list
        output.append({
            'public_id': user.public_id,
            'name' : user.name,
            'email' : user.email
        })

    return jsonify({'users': output})

# route for loging user in
@app.route('/login', methods =['POST'])
def login():
    # creates dictionary of form data
    auth = request.form

    if not auth or not auth.get('email') or not auth.get('password'):
        # returns 401 if any email or / and password is missing
        return make_response(
            'Could not verify',
            401,
            {'WWW-Authenticate' : 'Basic realm ="Login required !!"'}
        )

    user = User.query\
        .filter_by(email = auth.get('email'))\
        .first()

    if not user:
        # returns 401 if user does not exist
        return make_response(
            'Could not verify',
            401,
            {'WWW-Authenticate' : 'Basic realm ="User does not exist !!"'}
        )

    if check_password_hash(user.password, auth.get('password')):
        # generates the JWT Token
        token = jwt.encode({
            'public_id': user.public_id,
            'exp' : datetime.utcnow() + timedelta(minutes = 30)
        }, app.config['SECRET_KEY'])

        return make_response(jsonify({'token' : token.decode('UTF-8')}), 201)
    # returns 403 if password is wrong
    return make_response(
        'Could not verify',
        403,
        {'WWW-Authenticate' : 'Basic realm ="Wrong Password !!"'}
    )

# signup route
@app.route('/signup', methods =['POST'])
def signup():
    # creates a dictionary of the form data
    data = request.form

    # gets name, email and password
    name, email = data.get('name'), data.get('email')
    password = data.get('password')

    # checking for existing user
    user = User.query\
        .filter_by(email = email)\
        .first()
    if not user:
        # database ORM object
        user = User(
            public_id = str(uuid.uuid4()),
            name = name,
            email = email,
            password = generate_password_hash(password)
        )
        # insert user
        db.session.add(user)
        db.session.commit()

        return make_response('Successfully registered.', 201)
    else:
        # returns 202 if user already exists
        return make_response('User already exists. Please Log in.', 202)

if __name__ == "__main__":
    # setting debug to True enables hot reload
    # and also provides a debuger shell
    # if you hit an error while running the server
    app.run(debug = True)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156

注意:上面的wraps写法存在错误,会产生'_io.BufferedReader' object is not callable

import functools
...
class token_required(object):

    def __init__(self, func):
        self.func = func
        functools.update_wrapper(self, func)

    def __call__(self, *args, **kwargs):
        token = None
        # jwt is passed in the request header
        if 'x-access-token' in request.headers:
            token = request.headers['x-access-token']
        # return 401 if token is not passed
        if not token:
            return jsonify({'message' : 'Token is missing !!'}), 401

        try: 
            with open('jwtRS256.key.pub', 'rb') as f:
                public_key = f.read()
            # decoding the payload to fetch the stored details
            data = jwt.decode(token, public_key)
            current_user = get_user_by_id(data['public_id'])
        except: 
            return jsonify({ 
                'message' : 'Token is invalid !!'
            }), 401
        # returns the current logged in users contex to the routes   
        result = self.func(current_user, *args, **kwargs)
        return result
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

总结

大部分语言都已经支持了JWT,这里可以从jwt.io的类库中可以看出。目前JWT主要运用于OAuth1、OAuth2和OpenID等单点登录功能,而且将来会有更多的企业和系统开发需要使用JWT技术。而且我也非常感谢本文中引用的原作者提供了相关的材料,便于我们学习。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/570692
推荐阅读
相关标签
  

闽ICP备14008679号