赞
踩
官方文档地址:
Sanic JWT — sanic-jwt 1.5.0 documentation
https://sanic-jwt.readthedocs.io/en/latest/index.html
前后端分离项目,使用JWT进行身份校验。
pip install sanic-jwt
简易使用:
官方给出的代码:
from sanic import Sanic from sanic_jwt import exceptions from sanic_jwt import initialize class User: def __init__(self, id, username, password): self.user_id = id self.username = username self.password = password def __repr__(self): return "User(id='{}')".format(self.user_id) def to_dict(self): return {"user_id": self.user_id, "username": self.username} users = [User(1, "user1", "abcxyz"), User(2, "user2", "abcxyz")] username_table = {u.username: u for u in users} userid_table = {u.user_id: u for u in users} async def authenticate(request, *args, **kwargs): username = request.json.get("username", None) password = request.json.get("password", None) if not username or not password: raise exceptions.AuthenticationFailed("Missing username or password.") #验证账号密码 user = username_table.get(username, None) if user is None: raise exceptions.AuthenticationFailed("User not found.") if password != user.password: raise exceptions.AuthenticationFailed("Password is incorrect.") #返回错误应该只需返回"账号或密码错误" return user #成功返回user对象 app = Sanic() initialize(app, authenticate=authenticate) if __name__ == "__main__": app.run(host="127.0.0.1", port=8888)
启动后会绑定以下URL:
http://127.0.0.1:8888/auth
返回access_token 要保存到本地,下次请求时要在请求头附加 Authorization。
access_token JSON Web Tokens (“JWT”) 介绍:
What is a JWT? — sanic-jwt 1.5.0 documentation
https://sanic-jwt.readthedocs.io/en/latest/pages/whatisjwt.html
同时还会绑定
/auth/verify
用于验证access token是否有效
Configuration — sanic-jwt 1.5.0 documentation
https://sanic-jwt.readthedocs.io/en/latest/pages/configuration.html
Initialize(app, url_prefix='/api/authentication')
/auth将被改为/api/authentication
其他地址的设置:
Initialize(
app,
path_to_authenticate='/my_authenticate',
path_to_retrieve_user='/my_retrieve_user',
path_to_verify='/my_verify',
path_to_refresh='/my_refresh',
)
#!/usr/bin/env python # -*- encoding: utf-8 -*- from sanic import Sanic from sanic_jwt import exceptions from sanic_jwt import initialize class User: def __init__(self, id, username, password): self.user_id = id self.username = username self.password = password def __repr__(self): return "User(id='{}')".format(self.user_id) def to_dict(self): return {"user_id": self.user_id, "username": self.username} @staticmethod def get(user_id): """ 静态方法 通过userid查询 构建User对象 :param user_id: :return: """ print("get",user_id) user = userid_table.get(user_id)#这里在业务上应该是查询数据库 # _user = checkUser(user_id) # user = User(_user.user_id,_user.username,_user.password) #构造自身 return user users = [User(1, "user1", "abcxyz"), User(2, "user2", "abcxyz")] username_table = {u.username: u for u in users} userid_table = {u.user_id: u for u in users} async def authenticate(request, *args, **kwargs): username = request.json.get("username", None) password = request.json.get("password", None) if not username or not password: raise exceptions.AuthenticationFailed("Missing username or password.") user = username_table.get(username, None) if user is None: raise exceptions.AuthenticationFailed("User not found.") if password != user.password: raise exceptions.AuthenticationFailed("Password is incorrect.") print(user.to_dict()) return user async def retrieve_user(request, payload, *args, **kwargs): """ 这个是/auth/me接口的 :param request: :param payload: :param args: :param kwargs: :return: """ if payload: user_id = payload.get('user_id', None) print(payload) #{'user_id': 1, 'exp': 1615122254} user = User.get(user_id=user_id)#查询user并返回 get应该是个静态方法 返回构造好的user return user.to_dict() else: return None app = Sanic(__name__) initialize(app, authenticate=authenticate, retrieve_user=retrieve_user, ) if __name__ == "__main__": app.run(host="127.0.0.1", port=8888)
代码增加User的get方法,函数retrieve_user和initialize增加retrieve_user=retrieve_user
这个借口可以根据access token获取里面存储的user_id,再通过user_id获取User返回。
from sanic_jwt import Responses class MyResponses(Responses): """ 额外的返回 在playload外的 """ @staticmethod def extend_authenticate(request, user=None, access_token=None, refresh_token=None): """ /auth接口 """ print(user) if(type(user)!=User): return {'errno': '2', 'errmsg': '用户或密码错误'} return {"user":user.to_dict()} @staticmethod def extend_retrieve_user(request, user=None, payload=None): """ 这个是/auth/me接口的 在retrieve_user之后执行 """ return {"user":user,"payload":payload} @staticmethod def extend_verify(request, user=None, payload=None): """ /auth/verify """ print(payload) return {"user":user} @staticmethod def extend_refresh(request, user=None, access_token=None, refresh_token=None, purported_token=None, payload=None): return {} initialize(app, authenticate=authenticate, retrieve_user=retrieve_user, responses_class=MyResponses, )
/auth/refresh
刷新access token
POST 返回新的access token
@protected()
只有登录了才会响应
from sanic_jwt.decorators import protected
@app.route("/")
async def open_route(request):
return json({"protected": False})
@app.route("/protected")
@protected()
async def protected_route(request):
return json({"protected": True})
@scoped()
只有在这个用户域才会响应
@app.route("/protected/scoped/1")
@scoped('user')
async def protected_route1(request):
return json({"protected": True, "scoped": True})
可以用多个scope
@scoped([‘user’, ‘admin’])
增加scopes
async def my_scope_extender(user, *args, **kwargs):
return user.scopes
Initialize(app, add_scopes_to_payload=my_scope_extender)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。