当前位置:   article > 正文

【Python】Python使用Pyjwt库生成Token_python jwt token 生成

python jwt token 生成

下载依赖

pip install PyJWT cryptography
  • 1

代码实现

# -*- coding:utf-8 _*_
"""
@Project    : Qingyuing-Server 
@File       : jwtUtil.py
@Author     : 晴天 
@CreateTime : 2023-12-27 10:28 
"""
import os
import jwt
import base64
from uuid import uuid4
from datetime import datetime, timedelta, timezone
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization


def generate_or_load_rsa_keys(private_key_path='private_key.pem',
                              public_key_path='public_key.pem') -> None:
    """
    生成私钥/公钥用于生成jwt
    :param private_key_path: 私钥
    :param public_key_path: 公钥
    """
    # 检查私钥和公钥文件是否已存在
    if not (os.path.exists(private_key_path) and os.path.exists(public_key_path)):
        # 生成新的密钥对
        private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
            backend=default_backend()
        )
        # 获取私钥的PEM格式
        private_pem = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption()
        )
        # 获取公钥的PEM格式
        public_key = private_key.public_key()
        public_pem = public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        # 将私钥和公钥写入文件
        with open(private_key_path, 'wb') as f:
            f.write(private_pem)
        with open(public_key_path, 'wb') as f:
            f.write(public_pem)
    print('私钥公钥已存在跳过生成')


def read_private_key(private_key_path='private_key.pem', public_key_path='public_key.pem'):
    """
    读取现有的私钥和公钥
    :param private_key_path: 私钥
    :param public_key_path: 公钥
    :return: 私钥和公钥
    """
    try:
        with open(private_key_path, 'rb') as f:
            private_pem = f.read()
        with open(public_key_path, 'rb') as f:
            public_pem = f.read()
        return private_pem, public_pem
    except FileNotFoundError:
        print('私钥或公钥读取异常')


def create_tokens(user_id, client_id):
    """
    生成JWT令牌
    :param user_id: 用户ID
    :param client_id: 客户端
    :return: 返回生成的access_token跟refresh_token
    """
    private_key = read_private_key()[0]
    current_time = datetime.now(timezone.utc)  # 使用带时区的UTC时间
    jti_access = base64.urlsafe_b64encode(uuid4().bytes).rstrip(b'=').decode('utf-8')  # 生成access_token的唯一标识符
    jti_refresh = base64.urlsafe_b64encode(uuid4().bytes).rstrip(b'=').decode('utf-8')  # 生成refresh_token的唯一标识符

    # 创建access_token的payload
    access_payload = {
        'exp': current_time + timedelta(hours=24),  # 设置24小时后过期
        'iat': current_time,
        'user_id': user_id,
        'jti': jti_access,
        'client_id': client_id,
        'scope': ['openid']
    }

    # 创建refresh_token的payload
    refresh_payload = {
        'exp': current_time + timedelta(days=7),  # 设置7天后过期
        'iat': current_time,
        'user_id': user_id,
        'jti': jti_refresh,
        'client_id': client_id,
        'scope': ['openid']
    }

    # 使用RS256算法和私钥签署JWT
    access_token = jwt.encode(access_payload, private_key, algorithm='RS256')
    refresh_token = jwt.encode(refresh_payload, private_key, algorithm='RS256')

    return access_token, refresh_token


def parse_jwt(token):
    public_key = read_private_key()[1]
    try:
        # 使用公钥验证JWT
        decoded = jwt.decode(token, public_key, algorithms=['RS256'])
        return decoded  # 令牌有效,返回解码后的payload
    except jwt.ExpiredSignatureError:
        # 令牌过期
        return "Token expired"
    except jwt.InvalidTokenError:
        # 令牌无效
        return "Invalid token"
    except jwt.InvalidSignatureError:
        # 签名无效
        return "Invalid signature"
    except Exception as e:
        # 其他异常
        return str(f'其他异常: {e}')


if __name__ == '__main__':
    accessToken, refreshToken = create_tokens('52111890', 'web_app')
    print(f"AccessToken: {accessToken}")
    print(f"RefreshToken: {refreshToken}")
    decoded_access_token = parse_jwt(accessToken)
    print('解析结果: ', decoded_access_token)
    decoded_refresh_token = parse_jwt(refreshToken)
    print('解析结果: ', decoded_refresh_token)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137

在这里插入图片描述

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号