赞
踩
首先,HTTP 是无状态的协议(对于事务处理没有记忆能力,每次客户端和服务端会话完成时,服务端不会保存任何会话信息)——每个请求都是完全独立的,服务端无法确认当前访问者的身份信息,无法分辨上一次的请求发送者和这一次的发送者是不是同一个人。所以服务器与浏览器为了进行会话跟踪(知道是谁在访问自己),就必须主动的去维护一个状态,这个状态用于告知服务端前后两个请求是否来自同一浏览器。为此,前端开发者便加入了Cookie来实现有状态的HTTP连接。而后实现授权的方式就有cookie、session、token和JWT。
JWT.IO 解释:JSON Web Token (JWT) 是一个开放标准 ( RFC 7519 ),它定义了一种紧凑且自包含的方式,用于在各方之间作为 JSON 对象安全地传输信息。该信息可以被验证和信任,因为它是经过数字签名的。JWT 可以使用秘密(使用HMAC算法)或使用RSA或ECDSA的公钥/私钥对进行签名。
由于网上许多案例都为HS256(对称加密),所以这里我使用RSA256(非对称加密)作为补充。
首先需要生成私钥和公钥
查阅《Generate OpenSSL RSA Key Pair using genpkey》得到了带密码的pem文件, 但是在使用中会出现TypeError: Password was not given but private key is encrypted
的错误。
从《How to generate JWT RS256 key》找到了解决办法
ssh-keygen -t rsa -b 4096 -m PEM -f jwtRS256.key
# Don't add passphrase
openssl rsa -in jwtRS256.key -pubout -outform PEM -out jwtRS256.key.pub
cat jwtRS256.key
cat jwtRS256.key.pub
选择Python的JWT库,我这里选择了两个库
PyJWT(需要cryptography库)
>>> import jwt
>>> with open('jwtRS256.key', 'rb') as f:
... private_key = f.read()
...
>>> with open('jwtRS256.key.pub', 'rb') as f:
... public_key = f.read()
...
>>> print(encoded)
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg
>>> decoded = jwt.decode(encoded, public_key, algorithms=["RS256"])
{'some': 'payload'}
>>> from authlib.jose import jwt >>> header = {'alg': 'RS256'} >>> payload = {'iss': 'Authlib', 'sub': '123', ...} >>> with open('jwtRS256.key', 'rb') as f: ... private_key = f.read() ... >>> s = jwt.encode(header, payload, private_key) >>> with open('jwtRS256.key.pub', 'rb') as f: ... public_key = f.read() ... >>> claims = jwt.decode(s, public_key) >>> print(claims) {'iss': 'Authlib', 'sub': '123', ...} >>> print(claims.header) {'alg': 'RS256', 'typ': 'JWT'} >>> claims.validate()
工作原理
《Using JWT for user authentication in Flask》中的代码参考:
# flask imports from flask import Flask, request, jsonify, make_response from flask_sqlalchemy import SQLAlchemy import uuid # for public id from werkzeug.security import generate_password_hash, check_password_hash # imports for PyJWT authentication import jwt from datetime import datetime, timedelta from functools import wraps # creates Flask object app = Flask(__name__) # configuration # NEVER HARDCODE YOUR CONFIGURATION IN YOUR CODE # INSTEAD CREATE A .env FILE AND STORE IN IT app.config['SECRET_KEY'] = 'your secret key' # database name app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///Database.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True # creates SQLALCHEMY object db = SQLAlchemy(app) # Database ORMs class User(db.Model): id = db.Column(db.Integer, primary_key = True) public_id = db.Column(db.String(50), unique = True) name = db.Column(db.String(100)) email = db.Column(db.String(70), unique = True) password = db.Column(db.String(80)) # decorator for verifying the JWT def token_required(f): @wraps(f) def decorated(*args, **kwargs): token = None # jwt is passed in the request header if 'x-access-token' in request.headers: token = request.headers['x-access-token'] # return 401 if token is not passed if not token: return jsonify({'message' : 'Token is missing !!'}), 401 try: # decoding the payload to fetch the stored details data = jwt.decode(token, app.config['SECRET_KEY']) current_user = User.query\ .filter_by(public_id = data['public_id'])\ .first() except: return jsonify({ 'message' : 'Token is invalid !!' }), 401 # returns the current logged in users contex to the routes return f(current_user, *args, **kwargs) return decorated # User Database Route # this route sends back list of users users @app.route('/user', methods =['GET']) @token_required def get_all_users(current_user): # querying the database # for all the entries in it users = User.query.all() # converting the query objects # to list of jsons output = [] for user in users: # appending the user data json # to the response list output.append({ 'public_id': user.public_id, 'name' : user.name, 'email' : user.email }) return jsonify({'users': output}) # route for loging user in @app.route('/login', methods =['POST']) def login(): # creates dictionary of form data auth = request.form if not auth or not auth.get('email') or not auth.get('password'): # returns 401 if any email or / and password is missing return make_response( 'Could not verify', 401, {'WWW-Authenticate' : 'Basic realm ="Login required !!"'} ) user = User.query\ .filter_by(email = auth.get('email'))\ .first() if not user: # returns 401 if user does not exist return make_response( 'Could not verify', 401, {'WWW-Authenticate' : 'Basic realm ="User does not exist !!"'} ) if check_password_hash(user.password, auth.get('password')): # generates the JWT Token token = jwt.encode({ 'public_id': user.public_id, 'exp' : datetime.utcnow() + timedelta(minutes = 30) }, app.config['SECRET_KEY']) return make_response(jsonify({'token' : token.decode('UTF-8')}), 201) # returns 403 if password is wrong return make_response( 'Could not verify', 403, {'WWW-Authenticate' : 'Basic realm ="Wrong Password !!"'} ) # signup route @app.route('/signup', methods =['POST']) def signup(): # creates a dictionary of the form data data = request.form # gets name, email and password name, email = data.get('name'), data.get('email') password = data.get('password') # checking for existing user user = User.query\ .filter_by(email = email)\ .first() if not user: # database ORM object user = User( public_id = str(uuid.uuid4()), name = name, email = email, password = generate_password_hash(password) ) # insert user db.session.add(user) db.session.commit() return make_response('Successfully registered.', 201) else: # returns 202 if user already exists return make_response('User already exists. Please Log in.', 202) if __name__ == "__main__": # setting debug to True enables hot reload # and also provides a debuger shell # if you hit an error while running the server app.run(debug = True)
'_io.BufferedReader' object is not callable
import functools ... class token_required(object): def __init__(self, func): self.func = func functools.update_wrapper(self, func) def __call__(self, *args, **kwargs): token = None # jwt is passed in the request header if 'x-access-token' in request.headers: token = request.headers['x-access-token'] # return 401 if token is not passed if not token: return jsonify({'message' : 'Token is missing !!'}), 401 try: with open('jwtRS256.key.pub', 'rb') as f: public_key = f.read() # decoding the payload to fetch the stored details data = jwt.decode(token, public_key) current_user = get_user_by_id(data['public_id']) except: return jsonify({ 'message' : 'Token is invalid !!' }), 401 # returns the current logged in users contex to the routes result = self.func(current_user, *args, **kwargs) return result
大部分语言都已经支持了JWT,这里可以从jwt.io
的类库中可以看出。目前JWT主要运用于OAuth1、OAuth2和OpenID等单点登录功能,而且将来会有更多的企业和系统开发需要使用JWT技术。而且我也非常感谢本文中引用的原作者提供了相关的材料,便于我们学习。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。