赞
踩
创建配置文件config.py
- # -*- coding:utf-8 -*-
- # @Time: 2022/8/11 13:40
- # @File: config.py
- # @Software: PyCharm
-
- from easydict import EasyDict
- C = EasyDict()
-
- cfg = C
-
- # MySQL数据库配置
- C.MYSQL = EasyDict()
- C.MYSQL.USER = 'root'
- C.MYSQL.PASSWORD = 'qwert@2021%'
- C.MYSQL.HOSTS = '192.168.2.6'
- C.MYSQL.PORT = 3306
- C.MYSQL.DB = 'devops'
-
- # redis 库配置信息
- C.REDIS = EasyDict()
- C.REDIS.HOST = "192.168.2.11"
- C.REDIS.PORT = 6379
- C.REDIS.PASSWORD = "qwert@2021"
- C.REDIS.DB = 10
-
- # 定义加密key
- C.KEY = EasyDict()
- C.KEY.VALUE = b'DhUB8Cqlh1wLOaBqY7Nbz3fT7QJowmOfZCr5zQvt4Yk='
创建models.py文件
- # -*- coding:utf-8 -*-
- # @Time: 2022/8/12 13:25
- # @File: models.py
- # @Software: PyCharm
-
- from sqlalchemy.orm import declarative_base
- from sqlalchemy import INTEGER, Integer, Column, String
-
- from config import cfg
- from cryptography.fernet import Fernet
-
- # 获取定义的 key
- cipher_suite = Fernet(cfg.KEY.VALUE)
- Base = declarative_base()
-
-
- class BaseModel(Base):
- """ 必须继承Base """
- __abstract__ = True
- id = Column(INTEGER(), primary_key=True, autoincrement=True, comment="主键")
-
-
- class User(BaseModel):
- # 数据库中存储的表名
- __tablename__ = "dv_user"
- username = Column(String(64), comment="用户名")
- nickname = Column(String(64), comment="昵称")
- # password_hash = Column(EncryptedType(String, cfg.KEY.VALUE), comment="密码")
- password_hash = Column(String(255), nullable=True, comment="密码")
- login_time = Column(String(128), comment="登陆时间")
- status = Column(Integer, comment="用户状态")
- email = Column(String(120), comment="邮箱")
-
- def __init__(self, username, nickname, password_hash, login_time, status, email):
- self.username = username
- self.nickname = nickname
- self.password_hash = cipher_suite.encrypt(password_hash.encode())
- self.login_time = login_time
- self.status = status
- self.email = email
-
- def verify_decrypt(self, password):
- password_hash = bytes(self.password_hash.encode())
- return cipher_suite.decrypt(password_hash).decode() == password
-
- def to_dict(self):
- return {
- "id": self.id,
- "username": self.username,
- "nickname": self.nickname,
- "login_time": self.login_time,
- "email": self.email,
- "status": self.status
- }
-
创建token_verify.py
- # -*- coding:utf-8 -*-
- # @Time: 2022/8/16 10:10
- # @File: token_verify.py
- # @Software: PyCharm
-
- import jwt
- from functools import wraps
- from sanic import text
-
-
- def check_token(request):
- '''
- :param request:
- :return 校验 token 有效性:
- '''
- try:
- token = request.cookies.get('token')
- token_info = jwt.decode(token, request.app.config.SECRET, algorithms=["HS256"])
- if token_info:
- return True
- except jwt.exceptions.InvalidTokenError:
- return False
-
-
- def _token_verify(token):
- def decorator(f):
- @wraps(f)
- async def decorated_function(request, *args, **kwargs):
- is_authenticated = check_token(request)
- if is_authenticated:
- response = await f(request, *args, **kwargs)
- return response
- else:
- return text("token 无效", 401)
- return decorated_function
- return decorator(token)
创建auth.py
- # -*- coding:utf-8 -*-
- # @Time: 2022/8/11 13:48
- # @File: auth.py
- # @Software: PyCharm
-
- import jwt
- from sanic import Blueprint, response, text
- from sqlalchemy import select
- from models import User
- import token_verify
- import time
-
- # 定义蓝图
- user = Blueprint("user_auth_blueprint", url_prefix="/api/v1/user")
- token_requied = token_verify._token_verify
-
-
- @user.post("/enroll")
- async def user_enroll(request):
- '''
- 注册用户接口
- :param request:
- :return:
- '''
- session = request.ctx.session
- async with session.begin():
- user_json = request.json.get('user_data')
- user_json['login_time'] = time.strftime("%Y-%m-%d %X", time.localtime())
- user = User(**user_json)
- session.add(user)
- return response.json(user.to_dict())
-
-
- @user.post("/login")
- async def user_login(request):
- '''
- 登录接口
- :param request:
- :return:
- '''
- notify = {}
- session = request.ctx.session
- async with session.begin():
- user_data = request.json
- username = user_data.get('userName')
- password = user_data.get('password')
- user_data = select(User).where(User.username == username)
- result = await session.execute(user_data)
- user = result.scalar()
- if user and user.verify_decrypt(password=password) == True:
- user_json = {
- "user_id": user.id,
- "user_name": user.username
- }
- # 生成 token
- token = jwt.encode(user_json, request.app.config.SECRET)
- notify.update({
- "code": 200,
- "userid": user.id,
- "username": user.username,
- "success": "验证成功!",
- "token": "{}".format(token),
- "text": "{}".format(text(token))
- })
- return response.json(notify)
- notify.update({
- "code": 401,
- "error": "验证失败!"
- })
- return response.json(notify)
-
-
- @user.get("/info")
- @token_requied # 验证token的装饰器
- async def user_info(request):
- '''
- 获取用户信息
- :param request:
- :return:
- '''
- notify = {}
- session = request.ctx.session
- async with session.begin():
- user_token = request.args.get('token')
- if not user_token:
- raise("user login token is null.")
- user_info = jwt.decode(user_token, request.app.config.SECRET, algorithms=["HS256"])
- user_data = select(User).where(User.id == user_info.get('user_id'), User.username == user_info.get('user_name'))
- result = await session.execute(user_data)
- user = result.scalar()
- if user:
- notify.update({
- "user_id": user.id,
- "username": user.username,
- })
- return response.json(notify)
-
-
- @user.post("/logout")
- @token_requied # 验证token的装饰器
- async def user_logout(request):
- '''
- 退出登录
- :param request:
- :return:
- '''
- notify = {}
- session = request.ctx.session
- async with session.begin():
- user_token = request.json.get('token')
- if not user_token:
- notify.update({
- "code": 401,
- "error": "user logout token is null.",
- "info": "{}".format(text("token verify fail.", 401))
- })
- return response.json(notify)
- user_info = jwt.decode(user_token, request.app.config.SECRET, algorithms=["HS256"])
- user_data = select(User).where(User.id == user_info.get('user_id'), User.username == user_info.get('user_name'))
- result = await session.execute(user_data)
- user = result.scalar()
- if user:
- notify.update({
- "user_id": user.id,
- "username": user.username,
- "info": "退出登录: {}".format(text('logout'))
- })
- return response.json(notify)
创建api.py文件
- # -*- coding:utf-8 -*-
- # @Time: 2022/8/11 12:54
- # @File: api.py
- # @Software: PyCharm
- # @Author: wjpingok
-
-
- from sanic import Sanic
- from user import auth
- from contextvars import ContextVar
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy.orm import sessionmaker
- from config import cfg
- from sqlalchemy.ext.asyncio import create_async_engine
- from urllib import parse
-
- # 定义app
- app = Sanic("devops_manager")
-
- # token 配置
- app.config.SECRET = "KEEP_IT_SECRET_KEEP_IT_SAFE"
-
- # 创建数据库连接
- PASSWORD = parse.quote_plus(cfg.MYSQL.PASSWORD)
- mysql_connect = 'mysql+aiomysql://{0}:{1}@{2}:{3}/{4}?charset=utf8mb4'.format(
- cfg.MYSQL.USER,
- PASSWORD,
- cfg.MYSQL.HOSTS,
- cfg.MYSQL.PORT,
- cfg.MYSQL.DB
- )
-
- engine = create_async_engine(
- mysql_connect,
- max_overflow=0,
- pool_size=5,
- pool_timeout=10,
- pool_recycle=1,
- echo=True
- )
- _base_model_session_ctx = ContextVar("session")
-
-
- # 定义中间件,请求与回调
- @app.middleware("request")
- async def inject_session(request):
- request.ctx.session = sessionmaker(engine, AsyncSession, expire_on_commit=False)()
- request.ctx.session_ctx_token = _base_model_session_ctx.set(request.ctx.session)
-
-
- @app.middleware("response")
- async def close_session(request, response):
- if hasattr(request.ctx, "session_ctx_token"):
- _base_model_session_ctx.reset(request.ctx.session_ctx_token)
- await request.ctx.session.close()
-
- # 引入蓝图
- app.blueprint(auth.user)
-
-
- if __name__ == '__main__':
- app.run(host="0.0.0.0", port=8021)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。