赞
踩
apps
-user
__init__.py
authen
__init__.py
token.py
ext
__init__.py
util.py
public.py
__init__.py
app.py
依赖包
- alembic==1.8.1
- click==7.1.2
- Flask==1.1.4
- Flask-Migrate==2.7.0
- Flask-Script==2.0.6
- Flask-SQLAlchemy==2.4.1
- greenlet==2.0.1
- importlib-metadata==5.0.0
- importlib-resources==5.10.0
- itsdangerous==1.1.0
- Jinja2==2.11.3
- Mako==1.2.4
- MarkupSafe==2.0.1
- PyMySQL==1.0.2
- SQLAlchemy==1.3.0
- typing_extensions==4.4.0
- Werkzeug==1.0.1
- zipp==3.10.0
- Flask-RESTful==0.3.9
- PyJWT==2.7.0
- websocket-client==1.5.1
- gevent==22.10.1
- gevent-websocket==0.10.1
- Flask-Caching==2.0.2
- redis==4.5.5
authen/token.py
- from flask import request,g
- import jwt
- import time
- from flask import jsonify
- from functools import wraps #类装饰器
- #导入用户表:这个根据用户自己来
- from apps.user.models import UserModel
-
-
- JWT_SECRET_KEY = '$#%^&&*(DFHJKUTYGHF112312' #加密的盐
- ALGORITHM = 'HS256' #加密算法
-
- #生成token
- def jwt_encode_token(user_id:int,time_out=7*24*60*60):
- payload = {
- 'user_id': user_id,
- 'iat': time.time(), #产生token时的时间戳
- 'exp': time.time() + time_out #token过期时间戳
- }
- token = jwt.encode(payload, JWT_SECRET_KEY, algorithm=ALGORITHM)
- return token
-
- #解析token
- def jwt_decode_token(token):
- secret = JWT_SECRET_KEY
- payload = jwt.decode(token,secret,algorithms=ALGORITHM)
- return payload
-
-
- #类装饰器:需要认证
- def login_required(func):
- @wraps(func)
- def wrapper(*args, **kwargs):
- token = request.cookies.get('token', "none")
- if not token:
- token = request.headers.get('token', "none")
- try:
- payload = jwt_decode_token(token)
- except Exception as e:
- return jsonify({'code': 410, 'error': 'token有问题'}),403
- user_id = payload.get('user_id')
- exp = payload.get('exp')
- now = time.time()
- obj = UserModel.query.filter_by(id=user_id).first()
- if not obj:
- return jsonify({'code': 410, 'error': 'token有问题'}),403
- if exp < now:
- return jsonify({'code': 410, 'error': 'token已经过期了'}),403
- g.user = obj
- #认证通过,执行视图函数
- return func(*args,**kwargs)
- return wrapper
-
-
-
- def authen_admin_required(func):
- @wraps(func)
- def wrapper(*args, **kwargs):
- if not hasattr(g,'user'):
- return jsonify({'code':410,'msg':'请先登录'})
- if g.user.role != '管理员':
- return jsonify({'code': 410, 'msg': '没有管理员权限,无法操作'}), 403
- # 认证通过,执行视图函数
- return func(*args, **kwargs)
-
- return wrapper
-
- if __name__ == '__main__':
- token = jwt_encode_token(78)
user/views.py
- from flask.views import MethodView
- from flask import jsonify
- from flask import request,g
- from ext import db
-
-
- #模型类
- from . import models
- #认证相关的
- from authen.token import jwt_encode_token,login_required,admin_required,authen_admin_required
-
-
- class RoleView(MethodView):
- #所有视图函数都需要登录认证
- decorators=[login_required]
- def get(self):
- objs = models.RoleModel.query.all()
- lis = []
- for obj in objs:
- lis.append({'id':obj.id,'name':obj.name})
- return jsonify({'code':200,'data':lis})
- #post请求需要有管理员权限
- @authen_admin_required
- def post(self):
- print(g.user)
- name = request.form.get('name')
- if not name:
- return jsonify({'code':400,'error':'请携带上权限名'})
-
- obj = models.RoleModel.query.filter_by(name=name).first()
- if obj:
- return jsonify({'code':400,'error':f'{name} 权限已经存在了'})
- else:
- obj = models.RoleModel()
- obj.name=name
- db.session.add(obj)
- db.session.commit()
- return jsonify({'code':200,'msg':f'新增角色:{name} 成功'})
认证大致的逻辑:
1、用户登录时,生成token,前端保存token信息
2、前端发起请求时,将token携带在cookies或请求头中
3、后端拿到token,先解析出token的用户信息,将用户信息存到g对象中
4、认证通过后,就可以在请求函数中通过g获取到当前登录的用户。
管理员权限认证的逻辑:
1、依赖于登录认证的流程,
2、在进入视图函数前,先检验是否通过登录认证了
3、登录认证通过了,就可以通过g获取当前登录的用户信息
4、再对登录的用户的信息进行权限的判断
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。