当前位置:   article > 正文

Python sanic 异步web框架--教你从0到1开发后端服务_sanic devops

sanic devops

创建配置文件config.py

  1. # -*- coding:utf-8 -*-
  2. # @Time: 2022/8/11 13:40
  3. # @File: config.py
  4. # @Software: PyCharm
  5. from easydict import EasyDict
  6. C = EasyDict()
  7. cfg = C
  8. # MySQL数据库配置
  9. C.MYSQL = EasyDict()
  10. C.MYSQL.USER = 'root'
  11. C.MYSQL.PASSWORD = 'qwert@2021%'
  12. C.MYSQL.HOSTS = '192.168.2.6'
  13. C.MYSQL.PORT = 3306
  14. C.MYSQL.DB = 'devops'
  15. # redis 库配置信息
  16. C.REDIS = EasyDict()
  17. C.REDIS.HOST = "192.168.2.11"
  18. C.REDIS.PORT = 6379
  19. C.REDIS.PASSWORD = "qwert@2021"
  20. C.REDIS.DB = 10
  21. # 定义加密key
  22. C.KEY = EasyDict()
  23. C.KEY.VALUE = b'DhUB8Cqlh1wLOaBqY7Nbz3fT7QJowmOfZCr5zQvt4Yk='

创建models.py文件

  1. # -*- coding:utf-8 -*-
  2. # @Time: 2022/8/12 13:25
  3. # @File: models.py
  4. # @Software: PyCharm
  5. from sqlalchemy.orm import declarative_base
  6. from sqlalchemy import INTEGER, Integer, Column, String
  7. from config import cfg
  8. from cryptography.fernet import Fernet
  9. # 获取定义的 key
  10. cipher_suite = Fernet(cfg.KEY.VALUE)
  11. Base = declarative_base()
  12. class BaseModel(Base):
  13. """ 必须继承Base """
  14. __abstract__ = True
  15. id = Column(INTEGER(), primary_key=True, autoincrement=True, comment="主键")
  16. class User(BaseModel):
  17. # 数据库中存储的表名
  18. __tablename__ = "dv_user"
  19. username = Column(String(64), comment="用户名")
  20. nickname = Column(String(64), comment="昵称")
  21. # password_hash = Column(EncryptedType(String, cfg.KEY.VALUE), comment="密码")
  22. password_hash = Column(String(255), nullable=True, comment="密码")
  23. login_time = Column(String(128), comment="登陆时间")
  24. status = Column(Integer, comment="用户状态")
  25. email = Column(String(120), comment="邮箱")
  26. def __init__(self, username, nickname, password_hash, login_time, status, email):
  27. self.username = username
  28. self.nickname = nickname
  29. self.password_hash = cipher_suite.encrypt(password_hash.encode())
  30. self.login_time = login_time
  31. self.status = status
  32. self.email = email
  33. def verify_decrypt(self, password):
  34. password_hash = bytes(self.password_hash.encode())
  35. return cipher_suite.decrypt(password_hash).decode() == password
  36. def to_dict(self):
  37. return {
  38. "id": self.id,
  39. "username": self.username,
  40. "nickname": self.nickname,
  41. "login_time": self.login_time,
  42. "email": self.email,
  43. "status": self.status
  44. }

创建token_verify.py

  1. # -*- coding:utf-8 -*-
  2. # @Time: 2022/8/16 10:10
  3. # @File: token_verify.py
  4. # @Software: PyCharm
  5. import jwt
  6. from functools import wraps
  7. from sanic import text
  8. def check_token(request):
  9. '''
  10. :param request:
  11. :return 校验 token 有效性:
  12. '''
  13. try:
  14. token = request.cookies.get('token')
  15. token_info = jwt.decode(token, request.app.config.SECRET, algorithms=["HS256"])
  16. if token_info:
  17. return True
  18. except jwt.exceptions.InvalidTokenError:
  19. return False
  20. def _token_verify(token):
  21. def decorator(f):
  22. @wraps(f)
  23. async def decorated_function(request, *args, **kwargs):
  24. is_authenticated = check_token(request)
  25. if is_authenticated:
  26. response = await f(request, *args, **kwargs)
  27. return response
  28. else:
  29. return text("token 无效", 401)
  30. return decorated_function
  31. return decorator(token)

创建auth.py

  1. # -*- coding:utf-8 -*-
  2. # @Time: 2022/8/11 13:48
  3. # @File: auth.py
  4. # @Software: PyCharm
  5. import jwt
  6. from sanic import Blueprint, response, text
  7. from sqlalchemy import select
  8. from models import User
  9. import token_verify
  10. import time
  11. # 定义蓝图
  12. user = Blueprint("user_auth_blueprint", url_prefix="/api/v1/user")
  13. token_requied = token_verify._token_verify
  14. @user.post("/enroll")
  15. async def user_enroll(request):
  16. '''
  17. 注册用户接口
  18. :param request:
  19. :return:
  20. '''
  21. session = request.ctx.session
  22. async with session.begin():
  23. user_json = request.json.get('user_data')
  24. user_json['login_time'] = time.strftime("%Y-%m-%d %X", time.localtime())
  25. user = User(**user_json)
  26. session.add(user)
  27. return response.json(user.to_dict())
  28. @user.post("/login")
  29. async def user_login(request):
  30. '''
  31. 登录接口
  32. :param request:
  33. :return:
  34. '''
  35. notify = {}
  36. session = request.ctx.session
  37. async with session.begin():
  38. user_data = request.json
  39. username = user_data.get('userName')
  40. password = user_data.get('password')
  41. user_data = select(User).where(User.username == username)
  42. result = await session.execute(user_data)
  43. user = result.scalar()
  44. if user and user.verify_decrypt(password=password) == True:
  45. user_json = {
  46. "user_id": user.id,
  47. "user_name": user.username
  48. }
  49. # 生成 token
  50. token = jwt.encode(user_json, request.app.config.SECRET)
  51. notify.update({
  52. "code": 200,
  53. "userid": user.id,
  54. "username": user.username,
  55. "success": "验证成功!",
  56. "token": "{}".format(token),
  57. "text": "{}".format(text(token))
  58. })
  59. return response.json(notify)
  60. notify.update({
  61. "code": 401,
  62. "error": "验证失败!"
  63. })
  64. return response.json(notify)
  65. @user.get("/info")
  66. @token_requied # 验证token的装饰器
  67. async def user_info(request):
  68. '''
  69. 获取用户信息
  70. :param request:
  71. :return:
  72. '''
  73. notify = {}
  74. session = request.ctx.session
  75. async with session.begin():
  76. user_token = request.args.get('token')
  77. if not user_token:
  78. raise("user login token is null.")
  79. user_info = jwt.decode(user_token, request.app.config.SECRET, algorithms=["HS256"])
  80. user_data = select(User).where(User.id == user_info.get('user_id'), User.username == user_info.get('user_name'))
  81. result = await session.execute(user_data)
  82. user = result.scalar()
  83. if user:
  84. notify.update({
  85. "user_id": user.id,
  86. "username": user.username,
  87. })
  88. return response.json(notify)
  89. @user.post("/logout")
  90. @token_requied # 验证token的装饰器
  91. async def user_logout(request):
  92. '''
  93. 退出登录
  94. :param request:
  95. :return:
  96. '''
  97. notify = {}
  98. session = request.ctx.session
  99. async with session.begin():
  100. user_token = request.json.get('token')
  101. if not user_token:
  102. notify.update({
  103. "code": 401,
  104. "error": "user logout token is null.",
  105. "info": "{}".format(text("token verify fail.", 401))
  106. })
  107. return response.json(notify)
  108. user_info = jwt.decode(user_token, request.app.config.SECRET, algorithms=["HS256"])
  109. user_data = select(User).where(User.id == user_info.get('user_id'), User.username == user_info.get('user_name'))
  110. result = await session.execute(user_data)
  111. user = result.scalar()
  112. if user:
  113. notify.update({
  114. "user_id": user.id,
  115. "username": user.username,
  116. "info": "退出登录: {}".format(text('logout'))
  117. })
  118. return response.json(notify)

创建api.py文件

  1. # -*- coding:utf-8 -*-
  2. # @Time: 2022/8/11 12:54
  3. # @File: api.py
  4. # @Software: PyCharm
  5. # @Author: wjpingok
  6. from sanic import Sanic
  7. from user import auth
  8. from contextvars import ContextVar
  9. from sqlalchemy.ext.asyncio import AsyncSession
  10. from sqlalchemy.orm import sessionmaker
  11. from config import cfg
  12. from sqlalchemy.ext.asyncio import create_async_engine
  13. from urllib import parse
  14. # 定义app
  15. app = Sanic("devops_manager")
  16. # token 配置
  17. app.config.SECRET = "KEEP_IT_SECRET_KEEP_IT_SAFE"
  18. # 创建数据库连接
  19. PASSWORD = parse.quote_plus(cfg.MYSQL.PASSWORD)
  20. mysql_connect = 'mysql+aiomysql://{0}:{1}@{2}:{3}/{4}?charset=utf8mb4'.format(
  21. cfg.MYSQL.USER,
  22. PASSWORD,
  23. cfg.MYSQL.HOSTS,
  24. cfg.MYSQL.PORT,
  25. cfg.MYSQL.DB
  26. )
  27. engine = create_async_engine(
  28. mysql_connect,
  29. max_overflow=0,
  30. pool_size=5,
  31. pool_timeout=10,
  32. pool_recycle=1,
  33. echo=True
  34. )
  35. _base_model_session_ctx = ContextVar("session")
  36. # 定义中间件,请求与回调
  37. @app.middleware("request")
  38. async def inject_session(request):
  39. request.ctx.session = sessionmaker(engine, AsyncSession, expire_on_commit=False)()
  40. request.ctx.session_ctx_token = _base_model_session_ctx.set(request.ctx.session)
  41. @app.middleware("response")
  42. async def close_session(request, response):
  43. if hasattr(request.ctx, "session_ctx_token"):
  44. _base_model_session_ctx.reset(request.ctx.session_ctx_token)
  45. await request.ctx.session.close()
  46. # 引入蓝图
  47. app.blueprint(auth.user)
  48. if __name__ == '__main__':
  49. app.run(host="0.0.0.0", port=8021)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/136886
推荐阅读
相关标签
  

闽ICP备14008679号