赞
踩
Cookie和Session
提示:
Cookie和Session是Django程序中用来缓存数据的
状态保持和Cookie、Session的关系
提示:
* 容易出现的误解:状态保持就是Cookie和Session,Cookie和Session就是状态保持。
* 状态保持和Cookie、Session是两种不同的概念,需要区分开,不能混淆。
状态保持:
* 用于记录当前用户的状态,比如,用户登录后记录登录的状态。
Cookie、Session:
* Cookie、Session仅仅是Django提供的缓存机制而已,用于缓存数据的。
* 比如:
* Cookie和Session缓存购物车数据
* Cookie和Session缓存用户登录状态 (状态保持)
状态保持和Cookie、Session的关系
* Cookie和Session仅仅是状态保持的一种实现方式而已
* 而能够实现状态保持的方式不仅仅只有Cookie和Session,比如JWT也可以实现状态保持
1.1 登录接口实现的业务逻辑
在整个登录接口实现中,除了检查用户名和密码以外,我们需要通过某种方式来记住用户身份(状态保持),还有用户认证。
1.2 用户下次访问不用登录
例如,在我们登录boss直聘网站后,我们可以不用再次登录就可以直接使用网站,并且网站上有我们的信息
需求:
HTTP协议本身是无状态的
,即服务器无法判断用户身份。问题:
服务器默认是无法识别当前登录用户是谁的,即无法记住登录状态
。
解决:
Cookie
Key-Value
形式进行信息的存储Cookie
是不能互相访问的
www.baidu.com
域名下面的Cookie
是不能被其他域名访问的set_cookie()
方法来设置Cookie。 # 创建响应对象
response = HttpResponse()
# 使用响应对象设置Cookie
response.set_cookie(key, value, max_age=cookie有效期)
HttpResponse()
:响应多种数据类型JsonResponse()
:响应JSONredirect()
:重定向render()
:渲染并响应HTML模板max_age
设置为None
。max_age
设置为具体的秒数
。class BooksView(View): """测试模板 http://127.0.0.1:8000/books/ """ def get(self, request): # 查询所有图书信息 books = BookInfo.objects.all() # 构造上下文 context = { 'books': books } # 使用上下文渲染'book.html',并返回给客户端 response = render(request, 'books.html', context) # 设置Cookie response.set_cookie('name', 'itcast', max_age=None) # 响应结果,并写Cookie到浏览器 return response
COOKIES
属性来读取本次请求携带的cookie值。class TestCookieView(View):
"""测试Cookie
http://127.0.0.1:8000/cookies/
"""
def get(self, request):
# 读取Cookie
name = request.COOKIES.get('name')
print(name)
return http.HttpResponse('测试Cookie')
需求:
Cookie中存储敏感信息是否安全?
结论:
如果存储前做加密处理,就会是安全的,但是加密后的敏感信息依然会暴露在浏览器中
思考:
有没有一种方式可以将某些敏感数据存储的更加安全些,即加密又不会暴露出去?
解决:
session_key
:一个随机的唯一的不重复的字符串session_data
:用户的唯一标识信息(密文)session_key
'sessionid': 'session_key'
session_key
,再使用它提取session_data
。session_data
来辨认用户状态session_key
需要存储在Cookie中可以通过 HttpRequest() 对象中的session
属性来设置Session。
request.session['key'] = value
class BooksView(View): """测试模板 http://127.0.0.1:8000/books/ """ def get(self, request): # 查询所有图书信息 books = BookInfo.objects.all() # 构造上下文 context = { 'books': books } # 使用上下文渲染'book.html',并返回给客户端 response = render(request, 'books.html', context) # 设置Cookie response.set_cookie('name', 'itcast', max_age=3600) # 设置Session request.session['name'] = 'itcast' # 响应结果,并写Cookie到浏览器 return response
根据键读取值
request.session.get('key', 默认值)
class TestSessionView(View):
"""测试Session
http://127.0.0.1:8000/session/
"""
def get(self, request):
# 读取Session
name = request.session.get('name')
print(name)
return http.HttpResponse('测试Session')
清除所有Session,在存储中删除值部分。
request.session.clear()
清除session数据,在存储中删除session的整条数据。
request.session.flush()
删除session中的指定键及值,在存储中只删除某个键及对应的值。
del request.session['key']
设置session的有效期
request.session.set_expiry(value)
Session数据默认存储的位置是在
settings.py
的DATABASES
配置项指定的SQL数据库中
SESSION_ENGINE = 'django.contrib.sessions.backends.db'
数据库中的表如图所示
表结构如下
由表结构可知,操作Session包括三个数据:键,值,过期时间。
存储在本机内存中,如果丢失则不能找回,比数据库的方式读写更快。
SESSION_ENGINE='django.contrib.sessions.backends.cache'
优先从本机内存中存取,如果没有则从数据库中存取。
SESSION_ENGINE='django.contrib.sessions.backends.cached_db'
在Redis中保存Session,需要引入第三方扩展,我们可以使用django-redis
来解决。
1)安装扩展
pip install django-redis
2)配置
在settings.py文件中做如下设置
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://127.0.0.1:6379/1",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
}
}
}
SESSION_ENGINE = "django.contrib.sessions.backends.cache"
SESSION_CACHE_ALIAS = "default"
官方定义:JSON Web Token (JWT) is a compact URL-safe means of representing claims to be transferred between two parties
JWT是一个有着简单的统一表达形式的字符串:
jwt技术特性
单点登录
客户端在某一台服务器(授权中心)完成登录授权,拿到token之后,可以凭借token(令牌)在任意其他服务器中完成身份验证;
头部用于描述关于该JWT的最基本的信息,例如其类型以及签名所用的算法等。
作用:转化:
header = json.dumps(header)
herder = base64.b64(header.encode()).decode()
print("header:", header)
payload的五个字段都是由JWT的标准所定义的。
1. iss: 该JWT的签发者
2. sub: 该JWT所面向的用户
3. aud: 接收该JWT的一方
4. exp(expires): 什么时候过期,这里是一个Unix时间戳
5. iat(issued at): 在什么时候签发的
作用:存储用户信息
payload = json.dumps(payload)
payload = base64.b64encode.encode().decode()
print("payload:", payload)
作用:校验token真伪
使用哈希算法对信息进行摘要生成签名,目的是保证信息的可靠性;
原理:只要信息没有改动,那么原信息和签名是匹配的
签发
import hmac, hashlib # hmac:Python的一个用于hash(签名,散列)运算的模块 hashlib:专门实现具体的算法 # 1、构建哈希对象 # key: 加密使用的秘钥--生成签名 # msg: header和payload信息 # digestmod: 算法 key = "eerewrwerwrwewrwegegsf" SECRET_KEY = '' message = header + '.' + payload h_obj = hmac.new(key = SECRET_KEY.encode(), msg=message.encode(), djgestmod=hashlib.sha256) # 2、找对象里的方法,生成签名 signature = b_obj.hexdigest() print("signature:", signature) ======================= JWT_TOKEN = header + '.' + signature print("token",JWT_TOKEN) =======================
核心原理:重新对header和payload按照签发时相同的秘钥和算法加密
# 模拟前端传参 token_from_browser = JWT_TOKEN # 有效 token_from_browser = 'fdds' + JWT_TOKEN # 篡改 # 模拟校验原理 # 原理:只要信息没有改动,那么原信息和签名是匹配的 # 只要信息没有被篡改,那么根据相同而秘钥和算法,生成的第三部分签名和原来的签名一定一致 # 1、获取信息(header和payload) old_header = token_from_browser.split('.',[0]) old_payload = token_from_brower.split('.',[1]) old_signature = token_from_brower.split('.'[2]) # 2、把信息按照相同的秘钥和算法,重新生成新的签名 message = old_header + '.' +old_payload new_h_obj = hmac.new(key=SECRET.KEY.encode(), msg=message.encode(), digestmod=hashlib.sha256) new_signature = new_h_obj.hexdigest() print("新的签名:", new_signature) # 3、比对新旧签名是否一致:一致则信息没有被篡改,否则信息就被篡改了 if old_signature = new_signature: print("验证成功") # 提取用户数据 user_json = base64.b64decode(old_payload.encode().decode()) user_dict = json.loads(user_json) print("解析出来的用户身份信息:", user_dict) else: print("验证失败")
$ pip install pyjwt
>>> import jwt
>>> encoded_jwt = jwt.encode({'some': 'payload'}, 'secret', algorithm='HS256')
>>> encoded_jwt
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg'
>>> jwt.decode(encoded_jwt, 'secret', algorithms=['HS256'])
{'some': 'payload'}
import jwt from flask import current_app def generate_jwt(payload, expiry, secret=None): """ 生成jwt :param payload: dict 载荷 :param expiry: datetime 有效期 :param secret: 密钥 :return: jwt """ _payload = {'exp': expiry} _payload.update(payload) if not secret: secret = current_app.config['JWT_SECRET'] token = jwt.encode(_payload, secret, algorithm='HS256') return token.decode() def verify_jwt(token, secret=None): """ 检验jwt :param token: jwt :param secret: 密钥 :return: dict: payload """ if not secret: secret = current_app.config['JWT_SECRET'] try: payload = jwt.decode(token, secret, algorithm=['HS256']) except jwt.PyJWTError: payload = None return payload
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg
# toutiao/resources/user/passport.pty class AuthorizationResource(Resource): """ 认证 """ def _generate_tokens(self, user_id, with_refresh_token=True): """ 生成token 和refresh_token :param user_id: 用户id :return: token, refresh_token """ # 颁发JWT now = datetime.utcnow() expiry = now + timedelta(hours=current_app.config['JWT_EXPIRY_HOURS']) token = generate_jwt({'user_id': user_id, 'refresh': False}, expiry) refresh_token = None if with_refresh_token: refresh_expiry = now + timedelta(days=current_app.config['JWT_REFRESH_DAYS']) refresh_token = generate_jwt({'user_id': user_id, 'refresh': True}, refresh_expiry) return token, refresh_token def post(self): """ 登录创建token """ json_parser = RequestParser() json_parser.add_argument('mobile', type=parser.mobile, required=True, location='json') json_parser.add_argument('code', type=parser.regex(r'^\d{6}$'), required=True, location='json') args = json_parser.parse_args() mobile = args.mobile code = args.code # 从redis中获取验证码 key = 'app:code:{}'.format(mobile) try: real_code = current_app.redis_master.get(key) except ConnectionError as e: current_app.logger.error(e) real_code = current_app.redis_slave.get(key) try: current_app.redis_master.delete(key) except ConnectionError as e: current_app.logger.error(e) if not real_code or real_code.decode() != code: return {'message': 'Invalid code.'}, 400 # 查询或保存用户 user = User.query.filter_by(mobile=mobile).first() if user is None: # 用户不存在,注册用户 user_id = current_app.id_worker.get_id() user = User(id=user_id, mobile=mobile, name=mobile, last_login=datetime.now()) db.session.add(user) profile = UserProfile(id=user.id) db.session.add(profile) db.session.commit() else: if user.status == User.STATUS.DISABLE: return {'message': 'Invalid user.'}, 403 token, refresh_token = self._generate_tokens(user.id) return {'token': token, 'refresh_token': refresh_token}, 201
# common/utils/middlewares.py from flask import request, g from .jwt_util import verify_jwt def jwt_authentication(): """ 根据jwt验证用户身份 """ g.user_id = None g.is_refresh_token = False authorization = request.headers.get('Authorization') if authorization and authorization.startswith('Bearer '): token = authorization.strip()[7:] payload = verify_jwt(token) if payload: g.user_id = payload.get('user_id') g.is_refresh_token = payload.get('refresh')
# common/utils/decorators.py def login_required(func): """ 用户必须登录装饰器 使用方法:放在method_decorators中 """ @wraps(func) def wrapper(*args, **kwargs): if not g.user_id: return {'message': 'User must be authorized.'}, 401 elif g.is_refresh_token: return {'message': 'Do not use refresh token.'}, 403 else: return func(*args, **kwargs) return wrapper
# toutiao/resources/user/passport.py class AuthorizationResource(Resource): """ 认证 """ ... # 补充put方式 更新token接口 def put(self): """ 刷新token """ user_id = g.user_id if user_id and g.is_refresh_token: token, refresh_token = self._generate_tokens(user_id, with_refresh_token=False) return {'token': token}, 201 else: return {'message': 'Wrong refresh token.'}, 403
key = 'user:{}:token'.format(user_id)
pl = redis_client.pipeline()
pl.sadd(key, new_token)
pl.expire(key, token有效期)
pl.execute()
key = 'user:{}:token'.format(user_id)
valid_tokens = redis_client.smembers(key)
# 若上传的token不在白名单列表valid_tokens内,返回403要求重新登录
if valid_tokens and token not in valid_tokens:
return {'message': 'Invalid token'.}, 403
说明:
1. redis记录设置有效期的时长是一个token的有效期,保证旧token过期后,redis的记录也能自动清除,不占用空间。
2. 使用set保存新token的原因是,考虑到用户可能在旧token的有效期内,在其他多个设备进行了登录,需要生成多个新token,这些新token都要保存下来,既保证新token都能正常登录,又能保证旧token被禁用
1. 修改密码接口
class ModifyPasswordResource(Resource): # 修改密码要求用户必须登录 method_decorators = [login_required] def post(self): """修改密码逻辑""" # 1.获取用户id user_id = g.user_id # 2.保存到redis中的key key = 'user:{}:token'.format(user_id) # 3.获取管道对象 pl = current_app.redis_master.pipeline() # 4.先删除已有的白名单 if pl.exists(key): pl.delete(key) # 5.生成新的token值 new_token, refresh_token = self._generate_tokens(user_id) # 6.sadd 往集合中添加成员--添加修改密码后的新的token值 pl.sadd(key, new_token) # 7.设置2小时有效期 pl.expire(key, 7200) # 8.设置修改密码的标志位,同时设置2小时有效期 modify_key = "modify:{}".format(user_id) pl.setex(modify_key, 7200, "modify") # 9.执行管道 pl.execute() # 10.修改密码成功,并返回新的token值 return {"message": "修改密码成功", "valid_token": new_token}
2. 在middleware 的def jwt_authorization():方法中补充验证白名单的逻辑
def jwt_authorization(): """ 每次请求之前进行jwt验证 :return: """ # 请求头Header携带 # {Authorization: "Bearer jwt_token"} # 默认值 g.user_id = None g.is_refresh = False # 1.提取请求头中的token数据 header_token = request.headers.get("Authorization") # 2.截取真正的jwt-token值 if header_token is not None and header_token.startswith("Bearer "): jwt_token = header_token[7:] # 3.jwttoken的校验 --- 如果token没有值 或者过期了 自动会抛出401异常 payload = verify_jwt(token=jwt_token) # 4.获取载荷中的用户信息,使用g对象存储用户信息 if payload is not None: g.user_id = payload.get("user_id") g.is_refresh = payload.get("is_refresh", False) # ========================【白名单验证逻辑】================================ if g.user_id: # 根据key获取是否进行过密码修改的标志位 modify_key = "modify:{}".format(g.user_id) modify = current_app.redis_master.get(modify_key) # 如果是修改过密码,同时又不是刷新token,需要进行白名单认证 if modify == 'modify' and g.is_refresh is False: # 白名单认证 ret = white_list() # 如果认证结果为False 代表token没有在白名单内部 if ret is False: return "token is not in white list", 403
3. 判断用户上传的token是否在白名单列表中
def white_list(): """ 判断用户上传的token是否在白名单列表中 如果在:正常访问 如果不在:拦截不让访问视图函数,需要使用修改密码后生成的最新的token才能访问 :return: BooL值 False代表token没有在白名单内部 """ # 用户id user_id = g.user_id key = 'user:{}:token'.format(user_id) # 获取白名单内部的token # smembers 获取集合中的成员 valid_tokens = current_app.redis_master.smembers(key) # set类型数据 装的bytes类型数据 print(type(valid_tokens)) # 把set、转换成python的列表 valid_tokens = list(valid_tokens) # 提取前端发送的token header_token = request.headers.get("Authorization") token = header_token[7:] # 判断前端发送的token值是否在白名单列表中,如果不在返回403 重新登录 if valid_tokens and token not in valid_tokens: return False else: return True
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。