当前位置:   article > 正文

Flask-peewee-pyjwt实现简单的用户接口

flask-peewee





前言

这是我第一次写blog,记录我开始学习写程序的点滴。

老师布置的任务是写一个视频圈的简单接口,由于老师要求前后端分离,我查找了很多大神的博客,最后决定使用flask作为框架, PyJWT 实现基于Json Web Token的用户认证授权,peewee为orm框架。

这篇文章大部分内容均为转载。


提示:以下是本篇文章正文内容,下面案例可供参考




一、需求

1、程序将实现一个用户注册、登录、获取用户信息、用户发送视频保存视频和用户删除视频的功能。

2、用户注册时前端以json格式输入用户名(username)、邮箱(email)和密码(password),用户名和邮箱是唯一的,如果数据库中已有则会注册失败;用户注册成功后返回用户的信息。发送视频时则需要视频相对url,文字(content,可选择不输入),视频类型(type)。

3、用户使用用户名(username)和密码(password)登录,登录成功时返回token,每次登录都会更新一次token。

4、用户要获取用户信息,需要在请求Header中传入验证参数和token,程序会验证这个token的有效性并给出响应。

5、程序构建方面,将用户和认证分列两个模块。

二、轻量级ORM-peewee

我一开始是用SQLAchemy作为这个项目的orm,但老师说不建议使用,让我试试peewee,于是我参考了peewee的官方文档和大神的博客。

一、导入模块

from peewee import *

二、初始化数据库

db=MySQLDatabase("data",user='root',password='123456',host='127.0.0.1',port=3306,charset='utf8mb4')

初始化数据库中包含的几个基本的参数。

三、创建model

当用 peewee 开始一个项目时,通常最好从你的数据模型开始,通过定义一个或多个Model类:

  1. class BaseModel(Model):
  2. class Meta:
  3. database = db
  4. class Videodata(BaseModel):
  5. id = AutoField(primary_key=True) #自增主键
  6. date = DateTimeField()
  7. user = IntegerField()
  8. url = CharField()
  9. content = CharField(null=True)
  10. type_choices = ((1,'1'),(2,'2'))
  11. type = IntegerField(choices=type_choices)
  12. class Meta:
  13. # 表名
  14. table_name = 'Videodata'
  15. class User(BaseModel):
  16. id = AutoField(primary_key=True)
  17. user = IntegerField()
  18. password = CharField()
  19. email = CharField()
  20. class Meta:
  21. # 表名
  22. table_name = 'User'

1.其中第一个BaseModel是将实体与数据库进行绑定。后面的类继承自BaseModel,直接关联db,并且也继承了Model。

2.其中,CharField、DateField、BooleanField等这些类型与数据库中的数据类型一一对应,我们直接使用它就行,至于CharField => varchar(255)这种转换Peewee已经为我们做好了 。

3.以下字段类型对象与之对应的是MySQL中的数据类型
需要对比其他数据库的可以参考官方文档Peewee文档

  1. AutoField:integer
  2. UUIDField:varchar(40)
  3. CharField:varchar
  4. IntegerField:integer
  5. DecimalField:numeric
  6. TextField:text
  7. DateTimeField:datetime
  8. DateField:date
  9. TimeField:time
  10. FixedCharField:char
  11. BigAutoField:bigint
  12. BigIntegerField:bigint
  13. SmallIntegerField:smallint
  14. IdentityField:not supported
  15. FloatField:real
  16. DoubleField:double precision
  17. BlobField:blob
  18. BitField:bigint
  19. BigBitField:blob
  20. BinaryUUIDField:varbinary(16)
  21. TimestampField:integer
  22. IPField:bigint
  23. BooleanField:bool
  24. BareField:not supported
  25. ForeignKeyField:integer
  26. ManyToManyField:无

这些字段常用的基本参数,下面的都是默认设置,实际中根据需求修改:

  1. null = False 是否允许空字符串。
  2. index = False 是否创建普通索引。
  3. unique = False 是否创建唯一索引。
  4. column_name = None 在数据库中指定列的名称,一般不设置会使用定义model时的变量名作为列名。
  5. default = None 设置默认值。
  6. primary_key = False 是否为主键。
  7. constraints = None 增加约束,如果增加,该值是一个列表。
  8. collation = None 用于排序字段或索引的排序规则。
  9. choices = None 设置可选的枚举选项,和Django的ORM射着一样,值是一个元组,元素也是一个元组,例如((1,'女'),(2,'男'))。
  10. help_text = None 设置备注或注释文本。
  11. verbose_name = None 设置一个可读性良好的名称,例如 name = CharField(verbose_name = '用户名')。
  12. index_type = None 指定索引类型,不常用,一般默认即可。

 4.迁移模型(也叫生成表):

  1. db.connect()
  2. User.create_table()
  3. db.close()

四、基于User类的增删查改

1.增

creat

user = User.create(name='张三',age=26,sex='男') # 创建一个记录,并返回一个user对象

save

  1. user = User(name='张三',age=26,sex='男')
  2. user.save() # 创建一个记录,并返回一个user对象

 insert

  1. user_id = User.insert(name='张三',age=26,sex='男').execute() # 创建一个记录,并返回新纪录的主键
  2. 这里使用了 execute() 方法,因为insert不会马上执行,execute()方法可以马上执行操作。

insert_many

  1. User.insert_many([
  2. ('张三丰', 26, '男'),
  3. ('王大胖', 12, '男'),
  4. ('王小小', 22, '女')
  5. ],
  6. ['name', 'age', 'sex']
  7. ).execute()
  8. 也可以插入字典列表
  9. User.insert_many([
  10. {'name': 王二小', 'age': 12, 'sex': ''},
  11. {'name': 王小小', 'age': 13, 'sex': '女'},
  12. {'name': 王大胖', 'age': 15, 'sex': ''}
  13. ]).execute()

bulk_create

  1. 和Django类似的批量创建
  2. users = [
  3. {'name': 王二小', 'age': 12, 'sex': ''},
  4. {'name': 王小小', 'age': 13, 'sex': '女'},
  5. {'name': 王大胖', 'age': 15, 'sex': ''}
  6. ]
  7. datas = [User(**item) for item in users]
  8. # 不使用事务
  9. User.bulk_create(datas)
  10. # 使用事务
  11. with db.atomic():
  12. User.bulk_create(datas)

2 查询

get 和 get_or_none

  1. user = User.get(User.id==1) # 当获取的结果不存在时,报 Model.DoesNotExist 异常。如果有多条记录满足条件,返回第一条。
  2. user = User.get_or_none(User.id==1) # 和get的区别是,当没有匹配数据时返回None,不会抛出异常。

get_by_id

user = User.get_by_id(1) # 对于主键查找可以用这个

get_or_create

user, created = User.get_or_create(name='张三', defaults={'age': 26, 'sex': '男'}) # 如果存在就返回user,不存在就创建后再返回user

select

  1. 使用 Model.select() 查询获取多条数据。select 后可以添加 where 条件,如果不加则查询所有数据。
  2. user = User.select() # 查询user内的所有数据,查询所有列
  3. user = User.select(User.id, User.name) # 查询列为id、name,返回所有数据
  4. 带where条件的:
  5. user = User.select().where(User.name == '张三') # 查询所有叫张三的数据
  6. user = User.select().where(User.name == '张三', User.age = 26) # 查询名字叫张三并且年龄为26的数据
  7. user = User.select().where((User.name == '张三') | (User.age = 26)) # 查询名字叫张三或者年龄为26的数据
  8. user = User.select().where(~User.age == 26)# 查询年龄不等于26的数据
  9. user = User.select().where(User.age != 26)# 查询年龄不等于26的数据
  10. user = User.select().where(User.age >= 26)# 查询年龄大等于26的数据
  11. user = User.select().where(User.age in [24,26,27]) # 查询年龄在[24,26,27]内的数据
  12. user = User.select().where(User.name.contains('张三')) # 模糊查询,相当于 LIKE '%张三%'
  13. 排序:
  14. user = User.select().order_by(User.id) # 默认升序 == order_by(User.id.asc())
  15. user = User.select().order_by(-User.id) # 降序排序 == order_by(User.id.desc())
  16. 取出前N个数据:
  17. user = User.select().order_by(-User.id).limit(10) # 取出id从大到小的前10个数据
  18. 计数count
  19. tatol = User.select().count() # 统计所有数据条数
  20. 分页查询:
  21. user = User.select().order_by(User.id).paginate(2, 10) # paginate(page, page_size) 查出第二页,每页10条数据
  22. 分组聚合查询:
  23. query = User.select(User.age, fn.Count(User.id).alias('count')).group_by(User.age)

3修改

save

  1. user = User.get(User.id==1)
  2. user.age = 18
  3. user.save()

update

  1. user = User.update(age=18).where(User.name=='张三').execute() # update用于批量更新,将所有匹配结果进行更新
  2. User.update(age=User.age + 1).execute() # 批量操作,会将所有人的年龄加一后再储存

4删除

delete

  1. User.delete().where(User.id == 1).execute() # 可以应用批量删除
  2. User.delete().where(User.age < 18).execute() # 可以删除所有年龄小于18的数据行

delete_instance

  1. user = User.get(User.id=1) # 针对已经知道明确的对象时,可以应用该删除方法更好些,可以避免误删。
  2. user.delete_instance()
  3. 使用事务:使用事务可以保证原子性等事务的四大特性
  4. with db.atomic():
  5. # 执行相关SQL操作
  6. user = User.get(User.id=1)
  7. user.age = 18
  8. user.save()

这是关于peewee的基本使用,更多详细操作 可以参考官方文档Peewee文档

关于本节peewee的学习主要参考一下两个博客及peewee官网文档

Python 轻量级ORM框架Peewee的使用指南_haeasringnar的博客-CSDN博客

python——peewee简单使用_beitacat的博客-CSDN博客

 

三、pyjwt的使用

一、什么是jwt

Json web token (JWT), 是为了在网络应用环境间传递声明而执行的一种基于JSON的开放标准((RFC 7519).该token被设计为紧凑且安全的,特别适用于分布式站点的单点登录(SSO)场景。JWT的声明一般被用来在身份提供者和服务提供者间传递被认证的用户身份信息,以便于从资源服务器获取资源,也可以增加一些额外的其它业务逻辑所必须的声明信息,该token也可直接被用于认证,也可被加密。

jwt与传统session认证最大不同:

基于token的鉴权机制

基于token的鉴权机制类似于http协议也是无状态的,它不需要在服务端去保留用户的认证信息或者会话信息。这就意味着基于token认证机制的应用不需要去考虑用户在哪一台服务器登录了,这就为应用的扩展提供了便利。

流程上是这样的:

用户使用用户名密码来请求服务器
服务器进行验证用户的信息
服务器通过验证发送给用户一个token
客户端存储token,并在每次请求时附送上这个token值
服务端验证token值,并返回数据
这个token必须要在每次请求时传递给服务端,它应该保存在请求头里, 另外,服务端要支持CORS(跨来源资源共享)策略,一般我们在服务端这么做就可以了Access-Control-Allow-Origin: *。
 

我是这样理解jwt的:jwt是可以不需要前端返回保留的认证,只需要通过解密jwt即可得到用户的身份及数据,后端要做的就是在用户操作时更新用户的token并保存就可以了。

二、jwt结构

JWT是由三段信息构成的,将这三段信息文本用.链接一起就构成了Jwt字符串。

第一部分我们称它为头部(header),第二部分我们称其为载荷(payload, 类似于飞机上承载的物品),第三部分是签证(signature).

jwt的头部承载两部分信息:

  • 声明类型,这里是jwt
  • 声明加密的算法 通常直接使用 HMAC SHA256

playload
载荷就是存放有效信息的地方。这个名字像是特指飞机上承载的货品,这些有效信息包含三个部分:

  • 标准中注册的声明
  • 公共的声明
  • 私有的声明

标准中注册的声明 (建议但不强制使用) :

  • iss: jwt签发者
  • sub: jwt所面向的用户
  • aud: 接收jwt的一方
  • exp: jwt的过期时间,这个过期时间必须要大于签发时间
  • nbf: 定义在什么时间之前,该jwt都是不可用的.
  • iat: jwt的签发时间
  • jti: jwt的唯一身份标识,主要用来作为一次性token,从而回避重放攻击。

公共的声明 :
公共的声明可以添加任何的信息,一般添加用户的相关信息或其他业务需要的必要信息.但不建议添加敏感信息,因为该部分在客户端可解密.

私有的声明 :
私有声明是提供者和消费者所共同定义的声明,一般不建议存放敏感信息,因为base64是对称解密的,意味着该部分信息可以归类为明文信息。
 

signature
jwt的第三部分是一个签证信息,这个签证信息由三部分组成:

  • header (base64后的)
  • payload (base64后的)
  • secret

这个部分需要base64加密后的header和base64加密后的payload使用.连接组成的字符串,然后通过header中声明的加密方式进行加盐secret组合加密,然后就构成了jwt的第三部分。

将这三部分用(都会通过加密生成一段字符串).连接成一个完整的字符串,构成了最终的jwt,

注意:secret是保存在服务器端的,jwt的签发生成也是在服务器端的,secret就是用来进行jwt的签发和jwt的验证,所以,它就是你服务端的私钥,在任何场景都不应该流露出去。一旦客户端得知这个secret, 那就意味着客户端是可以自我签发jwt了。

三、pyjwt的使用

1.安装pyjwt

pip install PyJWT

2.设置密匙secret

secret_key = "自己设置的密匙"

这里设置密匙是为了后面在编写encode(加密)和decode(解密)中的参数,实际开发中应该保存在服务器,不流露出去。

3.编写encode(加密)和decode(解密)函数

  1. def encode_auth_token(user_id, login_time):
  2. """
  3. 生成认证Token
  4. :param user_id: int
  5. :param login_time: datetime
  6. :return: string
  7. """
  8. try:
  9. payload = {
  10. 'iss': 'ken', # 签名
  11. 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, hours=4), # 过期时间
  12. 'iat': datetime.datetime.utcnow(), # 开始时间
  13. 'data': {
  14. 'id': user_id,
  15. 'login_time': login_time
  16. } #自定义部分
  17. }
  18. return jwt.encode(
  19. payload,
  20. config.SECRET_KEY, #前面设置的密匙
  21. algorithm='HS256' #加密方法
  22. ) # 加密生成字符串
  23. except Exception as e:
  24. return e

要生成Token需要用到pyjwt的encode方法,这个方法可以传入三个参数,如示例:

jwt.encode(payload, config.SECRET_KEY, algorithm=’HS256′)

上面代码的jwt.encode方法中传入了三个参数:第一个是payload,这是认证依据的主要信息,第二个是密钥,这里是读取配置文件中的SECRET_KEY配置变量,第三个是生成Token的算法。

这里稍微讲一下payload,这是认证的依据,也是后续解析token后定位用户的依据,需要包含特定用户的特定信息,如本例注册了data声明,data声明中包括了用户ID和用户登录时间两个参数,在“用户鉴权”方法中,解析token完成后要利用这个用户ID来查找并返回用户信息给用户。这里的data声明是我们自己加的,pyjwt内置注册了以下几个声明:

  • “exp”: 过期时间
  • “nbf”: 表示当前时间在nbf里的时间之前,则Token不被接受
  • “iss”: token签发者
  • “aud”: 接收者
  • “iat”: 发行时间

要注意的是”exp”过期时间是按当地时间确定,所以设置时要使用utc时间。

  1. def decode_auth_token(auth_token):
  2. """
  3. 验证Token
  4. :param auth_token:
  5. :return: integer|string
  6. """
  7. try:
  8. # payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'), leeway=datetime.timedelta(seconds=10))
  9. # 取消过期时间验证
  10. payload = jwt.decode(auth_token, config.SECRET_KEY,algorithms='HS256') #options={'verify_exp': False} 加上后不验证token过期时间
  11. if ('data' in payload and 'id' in payload['data']):
  12. return payload
  13. else:
  14. raise jwt.InvalidTokenError
  15. except jwt.ExpiredSignatureError:
  16. return 'Token过期'
  17. except jwt.InvalidTokenError:
  18. return '无效Token'

decode_auth_token方法用于Token验证

这里的Token验证主要包括过期时间验证和声明验证。使用pyjwt的decode方法解析Token,得到payload。如:

jwt.decode(auth_token, config.SECRET_KEY, options={‘verify_exp’: False})

上面的options设置不验证过期时间,如果不设置这个选项,token将在原payload中设置的过期时间后过期。

经过上面解析后,得到的payload可以跟原来生成payload进行比较来验证token的有效性。

以上就是pyjwt的简单使用,更多详情可参考pyjwt的官方文档pyjwt官方文档

本节主要参考了以下博客及pyjwt的官方文档:

jwt的详细说明_勿忘初心-CSDN博客

Flask + PyJWT 实现基于Json Web Token的用户认证授权_极客点儿-CSDN博客

四、我的程序

1.程序目录结构

图中的auth模块是认证模块,主要是pyjwt的认证鉴权处理。users模块定义了用户模型和用户相关接口。 common定义了接口返回格式,config定义了一些参数和密匙。

2.初始化文件

vedio_circle/config.py

  1. DB_USER = 'root'
  2. DB_PASSWORD = '123456'
  3. DB_HOST = 'localhost'
  4. DB_DB = 'flask-pyjwt-auth'
  5. DEBUG = True
  6. PORT = 3306
  7. HOST = "127.0.0.1"
  8. SECRET_KEY = "my blog"

vedio_circle/common.py

  1. def trueReturn(data, msg):
  2. return {
  3. "status": True,
  4. "data": data,
  5. "msg": msg
  6. }
  7. def falseReturn(data, msg):
  8. return {
  9. "status": False,
  10. "data": data,
  11. "msg": msg
  12. }

3.用户文件

vedio_circle/users/model.py

  1. from flask import Flask, request, jsonify
  2. from peewee import *
  3. from werkzeug.security import generate_password_hash, check_password_hash #加密密码及验证
  4. import time
  5. db = MySQLDatabase("data",user='root',password='123456',host='127.0.0.1',port=3306,charset='utf8mb4')
  6. class BaseModel(Model):
  7. class Meta:
  8. database = db
  9. class Videodata(BaseModel):
  10. id = AutoField(primary_key=True)
  11. date = DateTimeField()
  12. username = CharField()
  13. url = CharField()
  14. content = CharField(null=True)
  15. type_choices = ((1,'1'),(2,'2'))
  16. type = IntegerField(choices=type_choices)
  17. class Meta:
  18. # 表名
  19. table_name = 'Videodata'
  20. class Users(BaseModel):
  21. id = AutoField(primary_key=True)
  22. username = CharField()
  23. password = CharField()
  24. email = CharField()
  25. login_time = DateTimeField(null=True)
  26. class Meta:
  27. # 表名
  28. table_name = 'users'
  29. def set_password(password):
  30. return generate_password_hash(password) #密码生成函数
  31. def check_password(hash, password):
  32. return check_password_hash(hash, password) #密码延展函数
  33. #需要一个user对象
  34. def get(user):
  35. try:
  36. connect_db()
  37. result = user
  38. return result
  39. except Exception as e:
  40. db.rollback()
  41. reason = str(e)
  42. return reason
  43. finally:
  44. close_db()
  45. #需要一个user对象
  46. def add(user):
  47. try:
  48. connect_db()
  49. result = user.save()
  50. return result
  51. except Exception as e:
  52. db.rollback()
  53. reason = str(e)
  54. return reason
  55. finally:
  56. close_db()
  57. #需要一个user对象
  58. def update(user):
  59. add(user)
  60. #需要一个user对象
  61. def delete(user):
  62. try:
  63. connect_db()
  64. result = user.delete_instance()
  65. return result
  66. except Exception as e:
  67. db.rollback()
  68. reason = str(e)
  69. return reason
  70. finally:
  71. close_db()
  72. def connect_db():
  73. if db.is_closed():
  74. db.connect()
  75. def close_db():
  76. if not db.is_closed():
  77. db.close()
  78. def commit(work):
  79. try:
  80. connect_db()
  81. work.save()
  82. except Exception as e:
  83. db.rollback()
  84. reason = str(e)
  85. return reason
  86. finally:
  87. close_db()

这里定义了用户和视频的模型,还有增删查改四种基本的方法,而且利用了利用Flask中的werkzeug.security模块加密。

这个模块主要是用到了两个函数:

  • 密码生成函数:generate_password_hash;
  • 密码验证函数:check_password_hash;



密码生成函数主要是对密码进行加密,密码验证函数于验证经过generate_password_hash哈希的密码,若密码匹配,则返回真,否则返回假。

更详细的参数可参考文章利用Flask中的werkzeug.security模块加密

vedio_circle/users/api.py

  1. from app.users.model import *
  2. from app.auth.auths import Auth,authenticate,identify
  3. import time
  4. import common
  5. app = Flask(__name__)
  6. db = MySQLDatabase("data",user='root',password='123456',host='127.0.0.1',port=3306,charset='utf8mb4')
  7. db.connect()
  8. @app.route("/",)
  9. def hallo_vedio_circle():
  10. return 'Hello vediocircle'
  11. @app.route('/register', methods=['POST'])
  12. def register():
  13. """
  14. 用户注册
  15. 用户名 username sashuishui
  16. 密码 password 12123232
  17. 邮箱 123456789@qq.com
  18. :return: json
  19. """
  20. get_data = request.get_json()
  21. email = get_data.get("email")
  22. username = get_data.get("username")
  23. password = get_data.get("password")
  24. user = Users(email=email, username=username, password=set_password(password))
  25. add(user)
  26. if user.id:
  27. returnUser = {
  28. 'id': user.id,
  29. 'username': user.username,
  30. 'email': user.email,
  31. 'login_time': user.login_time
  32. }
  33. return jsonify(common.trueReturn(returnUser, "用户注册成功"))
  34. else:
  35. return jsonify(common.falseReturn('', '用户注册失败'))
  36. @app.route('/login', methods=['POST'])
  37. def login():
  38. """
  39. 用户登录
  40. 用户名 username sashuishui
  41. 密码 password 12123232
  42. :return: json
  43. """
  44. get_user = request.get_json()
  45. username = get_user.get('username')
  46. password = get_user.get('password')
  47. if (not username or not password):
  48. return jsonify(common.falseReturn('', '用户名和密码不能为空'))
  49. else:
  50. return authenticate(username, password)
  51. @app.route('/user', methods=['GET'])
  52. def getuser():
  53. """
  54. 获取用户信息
  55. Authorization : token
  56. :return: json
  57. """
  58. result = identify(request)
  59. if (result['status'] and result['data']):
  60. user = get(Users.get_or_none(Users.id==result['data']))
  61. returnUser = {
  62. 'id': user.id,
  63. 'username': user.username,
  64. 'email': user.email,
  65. 'login_time': user.login_time
  66. }
  67. result = common.trueReturn(returnUser, "请求成功")
  68. return jsonify(result)
  69. #用户发送视频,保存用户视频数据
  70. @app.route("/user/video", methods=["POST"])
  71. def user_data():
  72. """
  73. headers{Authorization:token}
  74. body{
  75. 视频url url video.mp4
  76. 文字 content 嘻嘻
  77. 视频类型 type 1/2
  78. }
  79. :return: json
  80. """
  81. get_data = request.get_json()
  82. result = identify(request)
  83. if (result['status'] and result['data']):
  84. user = get(Users.get_or_none(Users.id == result['data']))
  85. date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 改变时间格式,不然插入数据库会报错,数据库是datetime类型
  86. username = user.username
  87. url = get_data.get("url")
  88. content = get_data.get("content")
  89. type = get_data.get("type")
  90. video = Videodata(date=date,username=username,url=url,content=content,type=type)
  91. add(video)
  92. if video.id:
  93. returnVideo = {
  94. 'id': video.id,
  95. 'date':video.date,
  96. 'username': video.username,
  97. 'url': video.url,
  98. 'content': video.content,
  99. 'type':video.type
  100. }
  101. return jsonify(common.trueReturn(returnVideo, "视频发表成功"))
  102. else:
  103. return jsonify(common.falseReturn('', '视频发表失败'))
  104. else:
  105. return jsonify(common.falseReturn('','用户未认证'))
  106. #用户删除视频
  107. @app.route("/user/deletevideo", methods=["DELETE"])
  108. def user_deletevedio():
  109. """
  110. 返回用户要删除的视频id
  111. id number
  112. :return: json
  113. """
  114. get_data = request.get_json()
  115. result = identify(request)
  116. if (result['status'] and result['data']):
  117. id = get_data.get("id")
  118. video = get(Videodata.get_or_none(Videodata.id == id))
  119. delete(video)
  120. if (Videodata.get_or_none(Videodata.id == id) == None):
  121. return jsonify(common.trueReturn('', "视频删除成功"))
  122. else:
  123. return jsonify(common.falseReturn('', '视频删除失败'))
  124. else:
  125. return jsonify(common.falseReturn('', '用户未认证'))
  126. if __name__ == '__main__':
  127. app.run()

api接口都是一些比较常规的操作,其中用到的用户认证和用户鉴权都在下面的模块auth中,用户的注册是接受前端返回json格式的参数,后端保存到数据库,是不能生成token的。而正确输入账号密码后才会生成token,用户身份认证需要用到getuser函数,前端要将生成的token保存在headers的Authorization中,这样用户身份验证函数getuser验证token成功后会将用户的信息返回在getuser函数的返回值中。下面用户发送视频和删除视频的接口就用到了身份鉴权。

4.认证模型

vedio_circle/auth/auths.py

  1. import jwt, datetime, time
  2. from flask import jsonify,request
  3. from app.users.model import Users,check_password,update
  4. import common
  5. import config
  6. class Auth():
  7. @staticmethod
  8. def encode_auth_token(user_id, login_time):
  9. """
  10. 生成认证Token
  11. :param user_id: int
  12. :param login_time: datetime
  13. :return: string
  14. """
  15. try:
  16. payload = {
  17. 'iss': 'ken', # 签名
  18. 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, hours=4), # 过期时间
  19. 'iat': datetime.datetime.utcnow(), # 开始时间
  20. 'data': {
  21. 'id': user_id,
  22. 'login_time': login_time
  23. }
  24. }
  25. return jwt.encode(
  26. payload,
  27. config.SECRET_KEY,
  28. algorithm='HS256'
  29. ) # 加密生成字符串
  30. except Exception as e:
  31. return e
  32. @staticmethod
  33. def decode_auth_token(auth_token):
  34. """
  35. 验证Token
  36. :param auth_token:
  37. :return: integer|string
  38. """
  39. try:
  40. # payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'), leeway=datetime.timedelta(seconds=10))
  41. # 取消过期时间验证
  42. payload = jwt.decode(auth_token, config.SECRET_KEY,algorithms='HS256') #options={'verify_exp': False} 加上后不验证token过期时间
  43. if ('data' in payload and 'id' in payload['data']):
  44. return payload
  45. else:
  46. raise jwt.InvalidTokenError
  47. except jwt.ExpiredSignatureError:
  48. return 'Token过期'
  49. except jwt.InvalidTokenError:
  50. return '无效Token'
  51. def authenticate(username, password):
  52. """
  53. 用户登录,登录成功返回token,写将登录时间写入数据库;登录失败返回失败原因
  54. :param password:
  55. :return: json
  56. """
  57. userInfo = Users.get_or_none(Users.username==username)
  58. if (userInfo is None):
  59. return jsonify(common.falseReturn('', '找不到用户'))
  60. else:
  61. if (check_password(userInfo.password, password)):
  62. login_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  63. userInfo.login_time = login_time
  64. update(userInfo)
  65. token = Auth.encode_auth_token(userInfo.id, login_time)
  66. return jsonify(common.trueReturn(token,'登录成功'))
  67. else:
  68. return jsonify(common.falseReturn('', '密码不正确'))
  69. def identify(request):
  70. """
  71. 用户鉴权
  72. :return: list
  73. """
  74. try:
  75. auth_header = request.headers.get('Authorization')
  76. if (auth_header):
  77. auth_tokenArr = auth_header.split(' ')
  78. if (not auth_tokenArr or auth_tokenArr[0] != 'JWT' or len(auth_tokenArr) != 2):
  79. result = common.falseReturn('', '请传递正确的验证头信息')
  80. else:
  81. auth_token = auth_tokenArr[1]
  82. payload = Auth.decode_auth_token(auth_token)
  83. if not isinstance(payload, str):
  84. user = Users.get_or_none(Users.id == payload['data']['id'])
  85. if (user is None):
  86. result = common.falseReturn('', '找不到该用户信息')
  87. else:
  88. result = common.trueReturn(user.id, '请求成功')
  89. else:
  90. result = common.falseReturn('', payload)
  91. return result
  92. except jwt.ExpiredSignatureError:
  93. result = common.falseReturn('', 'Token已更改,请重新登录获取')
  94. return result
  95. except jwt.InvalidTokenError:
  96. result = common.falseReturn('', '没有提供认证token')
  97. return result

认证模块的实现主要包括下面4个部分(方法):

(1)encode_auth_token方法用来生成认证Token

要生成Token需要用到pyjwt的encode方法,这个方法可以传入三个参数,如示例:

jwt.encode(payload, config.SECRET_KEY, algorithm=’HS256′)

上面代码的jwt.encode方法中传入了三个参数:第一个是payload,这是认证依据的主要信息,第二个是密钥,这里是读取配置文件中的SECRET_KEY配置变量,第三个是生成Token的算法。

这里稍微讲一下payload,这是认证的依据,也是后续解析token后定位用户的依据,需要包含特定用户的特定信息,如本例注册了data声明,data声明中包括了用户ID和用户登录时间两个参数,在“用户鉴权”方法中,解析token完成后要利用这个用户ID来查找并返回用户信息给用户。这里的data声明是我们自己加的,pyjwt内置注册了以下几个声明:

  • “exp”: 过期时间
  • “nbf”: 表示当前时间在nbf里的时间之前,则Token不被接受
  • “iss”: token签发者
  • “aud”: 接收者
  • “iat”: 发行时间

要注意的是”exp”过期时间是按当地时间确定,所以设置时要使用utc时间。

(2)decode_auth_token方法用于Token验证

这里的Token验证主要包括过期时间验证和声明验证。使用pyjwt的decode方法解析Token,得到payload。如:

jwt.decode(auth_token, config.SECRET_KEY, options={‘verify_exp’: False})

上面的options设置不验证过期时间,如果不设置这个选项,token将在原payload中设置的过期时间后过期。

经过上面解析后,得到的payload可以跟原来生成payload进行比较来验证token的有效性。

(3)authenticate方法用于用户登录验证

这个方法进行用户登录验证,如果通过验证,先把登录时间写入用户记录,再调用上面第一个方法生成token,返回给用户(用户登录成功后,据此token来获取用户信息或其他操作)。

(4)identify方法用于用户鉴权

当用户有了token后,用户可以拿token去执行一些需要token才能执行的操作。这个用户鉴权方法就是进一步检查用户的token,如果完全符合条件则返回用户需要的信息或执行用户的操作。

用户鉴权的操作首先判断一个用户是否正确传递token,这里使用header的方式来传递,并要求header传值字段名为“Authorization”,字段值以“JWT”开头,并与token用“ ”(空格)隔开。

用户按正确的方式传递token后,再调用decode_auth_token方法来解析token,如果解析正确,获取解析出来的用户信息(user_id)并到数据库中查找详细信息返回给用户。

5.程序演示

1.注册

 

 可见数据返回成功,数据库保存成功。

2.登录

 可见登录成功,其中返回的"data"数据里的字符串就是token

 数据库也保存了登录时间,之前设置的token有效期就是4小时。

3用户认证

 按照之前的格式将token以header传值字段名为“Authorization”,字段值以“JWT”开头,并与token用“ ”(空格)隔开。用get方法请求,发现身份认证成功。

我们这里用到json格式的交互,故header中设置content-type选择json格式

关于post请求header中设置content-type与参数格式之间的关系

以上参数参考文章大痴小乙的博客_CSDN博客

4.用户发送视频

 

 如图,在发表视频时,用户的token必须包含在headers中,用作身份认证。

 视频保存成功

5删除视频

 如图,与上一步一样,用户身份认证后可成功删除视频

本节参考文章:

Flask + PyJWT 实现基于Json Web Token的用户认证授权

五、总结
以上就是今天要讲的内容,本文仅仅简单介绍了pyjwt和peewee的使用,而这两个模块提供了大量能使我们快速便捷地处理数据的函数和方法,作为一个小白,我会有很多疏漏和不足的地方,希望各位大神能够提出宝贵的意见或者批评指正。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/172886
推荐阅读
相关标签
  

闽ICP备14008679号