赞
踩
利用API可进行以下常见的攻击:
有以下常见的防御方式:
当然,有些还是需要API后端代码来进行防御,例如命令注入、SSRF等。
本文以API身份认证为主要内容,浅谈各种认证的使用场景与优缺点,同时穿插部分攻击与防御。
import uuid import re import traceback from flask import Flask, request, jsonify from mysql import MysqlCli from log import log from setting import * app = Flask(__name__) log.set_file() # 验证username def validate_username(username:str)->bool: if len(username) > 20: return False return True # 验证手机号 def validate_phone_number(phone_number:str)->bool: # 使用正则表达式检查手机号格式 pattern = re.compile(r'^1[3456789]\d{9}$') if re.match(pattern, phone_number): return True else: return False # 管理员注册用户接口 @app.route('/api/v1.0/admin/add_user', methods=['POST']) def add_user(): resp = { "requestid": uuid.uuid4() } if request.is_json: data = request.get_json() username = data.get('username') if username is None: resp['error'] = 'username is required' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp),400 phone_number = data.get('phone_number') if phone_number is None: resp['error'] = 'phone_number is required' log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}") return jsonify(resp),400 if not validate_username(username): resp['error'] = 'username length should not exceed 10' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp),400 try: cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE) cli.insert_one("users", { "username": username, "phone_number": phone_number } ) cli.close() resp['message'] = f'success to add user:{username}!' log.logger.info(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp) except Exception: log.logger.info(f"url:{request.url},params:{username},resp:{resp}") resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}" return jsonify(resp),500 else: resp['error'] = "Invalid JSON format in request" return jsonify(resp),400 # 用户获取信息接口 @app.route('/api/v1.0/get_user_info', methods=['POST']) def get_user_info(): resp = { "requestid": uuid.uuid4() } if request.is_json: data = request.get_json() username = data.get('username') if username is None: resp['error'] = 'username is required' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp),400 if not validate_username(username): resp['error'] = 'username length should not exceed 10' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp),400 try: cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE) sql = f"select * from users where username = '{username}' limit 1" user = cli.select_all(sql) resp['message'] = f'success to get user:{user}.' log.logger.info(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp) except Exception: resp['error'] = f'failed to get user, error:{traceback.format_exc()}' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp),500 else: resp['error'] = "Invalid JSON format in request" return jsonify(resp),400 if __name__ == '__main__': app.run()
可以看到共有两个API,管理员注册用户接口、用户获取信息接口,拥有以下功能或安全措施:
def basic_auth(f): @wraps(f) def decorated_function(*args, **kwargs): resp = { "requestid": uuid.uuid4() } auth_header = request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Basic '): resp['error'] = 'basic auth is required' log.logger.error(f"url:{request.url},resp:{resp}") return jsonify(resp), 400 else: try: encoded_credentials = auth_header.split(' ')[1] decoded_credentials = base64.b64decode(encoded_credentials).decode('utf-8') username, password = decoded_credentials.split(':') if not check_basic_auth(username, password): resp['error'] = 'basic auth failed,check your username or password is right' log.logger.error(f"url:{request.url},resp:{resp}") return jsonify(resp), 401 except Exception: resp['error'] = f'basic auth failed,err: {traceback.format_exc()}' log.logger.error(f"url:{request.url},resp:{resp}") return jsonify(resp), 500 return f(*args, **kwargs) return decorated_function # 管理员注册用户接口 v2.0 增加密码 @app.route('/api/v2.0/admin/basic_auth/add_user', methods=['POST']) @basic_auth def add_user_basic_auth(): resp = { "requestid": uuid.uuid4() } # 请求是json格式 if request.is_json: data = request.get_json() username = data.get('username') # username检查 if username is None: resp['error'] = 'username is required' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp),400 if not validate_username(username): resp['error'] = 'username length should not exceed 10' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp),400 # phone_number检查 phone_number = data.get('phone_number') if phone_number is None: resp['error'] = 'phone_number is required' log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}") return jsonify(resp),400 # 插入数据库 try: cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE) cli.insert_one("users", { "username": username, "phone_number": phone_number } ) pwd = generate_random_password(secrets.choice(range(8, 17))) cli.insert_one("passwords", { "username": username, "password": pwd }) cli.close() resp['message'] = f'success to add user:{username},password {pwd},remember it!' log.logger.info(f"url:{request.url},params:{username},resp:{resp}") # 异常返回 except Exception: log.logger.info(f"url:{request.url},params:{username},resp:{resp}") resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}",500 return jsonify(resp) return jsonify(resp) # 请求不是json格式 else: resp['error'] = "Invalid JSON format in request" return jsonify(resp),400
引入问题:
def api_key_auth(f): @wraps(f) def decorated_function(*args, **kwargs): resp = { "requestid": uuid.uuid4() } # api key检查 if "api_key" not in request.headers: resp['error'] = 'api_key is required in headers' log.logger.error(f"url:{request.url},resp:{resp}") return jsonify(resp), 400 if not check_api_key(request.headers["api_key"]): resp['error'] = f'api_key {request.headers["api_key"]} is invalid' log.logger.error(f"url:{request.url},resp:{resp}") return jsonify(resp), 401 return f(*args, **kwargs) return decorated_function # 管理员注册用户接口 v2.0 增加api token @app.route('/api/v2.0/admin/api_key/add_user', methods=['POST']) @api_key_auth def add_user_api_key(): resp = { "requestid": uuid.uuid4() } # request是json if request.is_json: data = request.get_json() username = data.get('username') if username is None: resp['error'] = 'username is required' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp),400 phone_number = data.get('phone_number') if phone_number is None: resp['error'] = 'phone_number is required' log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}") return jsonify(resp),400 if not validate_username(username): resp['error'] = 'username length should not exceed 10' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp),400 try: cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE) cli.insert_one("users", { "username": username, "phone_number": phone_number } ) key = secrets.token_hex(16) cli.insert_one("keys", { "username": username, "key": key }) cli.close() resp['message'] =f'success to add user:{username},key {key},remember it!' log.logger.info(f"url:{request.url},params:{username},resp:{resp}") except Exception: log.logger.info(f"url:{request.url},params:{username},resp:{resp}") resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}" return jsonify(resp),500 return jsonify(resp) else: resp['error'] = "Invalid JSON format in request" return jsonify(resp),400
博主这里实现时,添加了自定义请求头api_key
引入问题:
使用 Bearer authentication 的优势在于令牌本身可以包含更多的信息、具有较长的有效期,并且不需要在服务器端保存会话状态,这样可以减轻服务器负担并提高安全性。
这里就不实现了,后面通过jwt token,算是实现其中的一种。
涉及的几个常见参数如下:
# 将字符串保存到Redis中,并设置过期时间 def save_nonce_with_expiry(key, value, expiry_seconds): """ :param key: 键 :param value: 值 :param expiry_seconds: 过期时间(秒) """ # 连接Redis数据库 redis_client = redis.StrictRedis(host=REDIS_HOST,password=REDIS_PASSWORD, port=REDIS_PORT, db=REDIS_DB) redis_client.setex(key, expiry_seconds, value) def check_nonce(key): """ 检查字符串是否存在于Redis中 :param key: 键 :return: 布尔值,表示键是否存在 """ # 连接Redis数据库 redis_client = redis.StrictRedis(host=REDIS_HOST,password=REDIS_PASSWORD ,port=REDIS_PORT, db=REDIS_DB) return redis_client.exists(key) # 校验username 返回密码 def check_username(username): try: cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE) sql = f"select * from passwords where username = '{username}' limit 1" user = cli.select_one(sql) return user['password'] if user else '' except Exception: log.logger.error(f"check_basic_auth failed,error:{traceback.format_exc()}") return False # 校验response def check_response(response,realm,username,password,method,uri,req_nonce,nc,cnonce,qop): try: log.logger.info(f"check_response ,response:{response},realm:{realm},username:{username}," f"password:{password},method:{method},uri:{uri},req_nonce:{req_nonce},nc:{nc}," f"cnonce:{cnonce},qop:{qop}") # 校验nonce if not check_nonce(req_nonce): log.logger.error(f"check_signature failed,error:{req_nonce} not exist!") return False ha1=hashlib.md5(f"{username}:{realm}:{password}".encode()).hexdigest() ha2=hashlib.md5(f"{method}:{uri}".encode()).hexdigest() return response == hashlib.md5(f"{ha1}:{req_nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode()).hexdigest() except Exception: log.logger.error(f"check_responce failed,error:{traceback.format_exc()}") return False # 添加用户 用于添加用户时无需额外增加用户关联的密钥等信息时的接口 def add_user(): resp = { "requestid": uuid.uuid4() } log.logger.info(request.headers) # request是json if request.is_json: data = request.get_json() username = data.get('username') if username is None: resp['error'] = 'username is required' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp) phone_number = data.get('phone_number') if phone_number is None: resp['error'] = 'phone_number is required' log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}") return jsonify(resp) if not validate_username(username): resp['error'] = 'username length should not exceed 10' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp) try: cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE) cli.insert_one("users", { "username": username, "phone_number": phone_number } ) cli.close() resp['message'] = f'success to add user:{username}!' log.logger.info(f"url:{request.url},params:{username},resp:{resp}") except Exception: log.logger.info(f"url:{request.url},params:{username},resp:{resp}") resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}" return jsonify(resp) else: resp['error'] = "Invalid JSON format in request" return jsonify(resp) # 管理员注册用户接口 v2.0增加digest算法 @app.route('/api/v2.0/admin/digest_auth/add_user', methods=['POST']) @digest_auth def add_user_digest(): return add_user()
输入账号密码后登录,发起请求
优点:
引入问题:
通常由三个部分组成:header、payload 和 signature。
def jwt_auth(f): @wraps(f) def decorated_function(*args, **kwargs): resp = { "requestid": uuid.uuid4() } auth_header = request.headers.get('Authorization') if not auth_header or not auth_header.startswith("Bearer "): resp["message"] = "no Authorization header or invalid format" return jsonify(resp), 400 try: token = auth_header.split(' ')[1] jwt.decode(token, JWT_SECRET, algorithms=['HS256']) except jwt.ExpiredSignatureError: resp["error"] = "token has expired" log.logger.error(f"url:{request.url},token:{token},resp:{resp}") return jsonify(resp), 401 except jwt.InvalidTokenError: resp["message"] = f"invalid token {token}" log.logger.error(f"url:{request.url},token:{token},resp:{resp}") return jsonify(resp), 401 except Exception as e: resp["message"] = f"interal error {e}" log.logger.error(f"url:{request.url},token:{token},resp:{resp}") return jsonify(resp), 500 return f(*args, **kwargs) return decorated_function # 用户登录获取jwt token接口 @app.route('/api/v2.0/admin/jwt/login', methods=['POST']) def get_jwt_token(): resp = { "requestid": uuid.uuid4() } if request.is_json: data = request.get_json() username = data.get('username') password = data.get('password') if username is None or password is None: resp['error'] = 'username or password is required' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp),400 if not validate_username(username): resp['error'] = 'username length should not exceed 10' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp),400 try: cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE) sql = f"select * from passwords where username = '{username}' limit 1" user = cli.select_one(sql) if not user: resp['error'] = 'username not found' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp), 400 if user['password'] != password: resp['error'] = 'password is not right' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp), 401 payload = { "iss": "lady_killer9", "exp": datetime.now() + timedelta(seconds=5*60), "jti": str(uuid.uuid4()) } resp['message'] = f'success to login :{user},token:{jwt.encode(payload,JWT_SECRET,algorithm="HS256")}' log.logger.info(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp), 200 except Exception: resp['error'] = f'failed to get user, error:{traceback.format_exc()}' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp),500 else: resp['error'] = "Invalid JSON format in request" return jsonify(resp),400 # 管理员注册用户接口 v2.0增加jwt sha256算法 @app.route('/api/v2.0/admin/jwt/add_user', methods=['POST']) @jwt_auth def add_user_jwt(): return add_user() # 用户获取信息接口 v2.0 增加jwt sha256算法 @app.route('/api/v2.0/jwt/get_user_info', methods=['POST']) @jwt_auth def get_user_info_jwt(): return get_user_info()
优点:
引入问题:
和Digest Auth差不多,可以由客户端生成随机数,这样请求一次即可,随机数不可重复。
# 验证摘要 def check_signature(signture:str,username:str,nonce:int,data:dict): try: cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE) sql = f"select * from `secrets` where `username` = '{username}' limit 1" secret = cli.select_one(sql) cli.close() if secret: server_signature = hmac.new(str(secret['secret']).encode('utf-8'), json.dumps(data).encode('utf-8'), hashlib.sha256).hexdigest() log.logger.info(f"secret:{str(secret['secret'])},data:{data},server_signature:{server_signature}") return server_signature == signture log.logger.error(f"check_signature failed,error:{username} {secret}") return False except Exception: log.logger.error(f"check_signature failed,error:{traceback.format_exc()}") return False def hmac_auth(f): @wraps(f) def decorated_function(*args, **kwargs): resp = { "requestid": uuid.uuid4() } # query检查 nonce username nonce = request.args.get('nonce', type=int) username = request.args.get('username') if nonce is None or username is None: resp['error'] = 'username or nonce not found in query string' log.logger.error(f"url:{request.url},resp:{resp}") return jsonify(resp) # 随机数验证 if check_nonce(nonce): log.logger.error(f"check_signature failed,error:{nonce} is in database") resp['error'] = f"check_signature failed,error:{nonce} is in database" log.logger.error(f"url:{request.url},resp:{resp}") return jsonify(resp) save_nonce_with_expiry(nonce,1,MIN*60) # signature检查 if "Signature" not in request.headers: resp['error'] = 'signature is required in headers' log.logger.error(f"url:{request.url},resp:{resp}") return jsonify(resp) # request是json if request.is_json: data = request.get_json() if not check_signature(request.headers['Signature'], username, nonce, data): resp['error'] = f'signature {request.headers["Signature"]} is invalid' log.logger.error(f"url:{request.url},resp:{resp}") return jsonify(resp),401 else: resp['error'] = "Invalid JSON format in request" return jsonify(resp),400 return f(*args, **kwargs) return decorated_function # 管理员注册用户接口 v2.0增加hmac sha256算法 @app.route('/api/v2.0/admin/hmac/add_user', methods=['POST']) @hmac_auth def add_user_hmac(): resp = { "requestid": uuid.uuid4() } # request是json if request.is_json: data = request.get_json() username = data.get('username') if username is None: resp['error'] = 'username is required' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp) phone_number = data.get('phone_number') if phone_number is None: resp['error'] = 'phone_number is required' log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}") return jsonify(resp) if not validate_username(username): resp['error'] = 'username length should not exceed 10' log.logger.error(f"url:{request.url},params:{username},resp:{resp}") return jsonify(resp) try: cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE) cli.insert_one("users", { "username": username, "phone_number": phone_number } ) secret = secrets.token_hex(16) cli.insert_one("secrets", { "username": username, "secret": secret }) cli.close() resp['message'] =f'success to add user:{username},secret {secret},remember it!' log.logger.info(f"url:{request.url},params:{username},resp:{resp}") except Exception: log.logger.info(f"url:{request.url},params:{username},resp:{resp}") resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}" return jsonify(resp) else: resp['error'] = "Invalid JSON format in request" return jsonify(resp) # 用户获取信息接口 v2.0 增加hmac @app.route('/api/v2.0/hmac/get_user_info', methods=['POST']) @hmac_auth def get_user_info_hmac(): return get_user_info()
引入问题:
内容太多,留坑先不看了
比较项 | Basic Auth | API key | Bearer Auth/Token auth | Digest Auth | Hmac Auth | JWT |
---|---|---|---|---|---|---|
身份认证 | √ | √ | √ | √ | √ | √ |
密钥加密 | × | × | × | √ | √ | × |
服务端存储 | √ | √ | √ | √ | √ | ×(token可以不存) |
防重放 | × | × | × | √ | √ | ×(token失效前可重放) |
时效性 | × | × | × | √ | √ | √ |
自定义 | × | × | √ | √ | √ | √ |
通过以上比较,如果设计一个签名具有以下优点会比较好:
请求需要验证身份,就需要有账密,这里就用SecretId、SecretKey,其中SecretKey用于进行签名的计算。
为了防重放,生成一个随机数Nonce,Nonce唯一,这样服务端收到携带该Nonce的请求后,还发送带该Nonce的请求,就拒绝掉。因此Nonce需要服务器保存,同时为了防止篡改,签名时需要。
那么问题来了,Nonce需要服务器保存,不能一直保存吧,随着时间推移,存储成本会越来越高,因此需要时间限制。
请求应该具有时效性,这里使用unix时间戳Timestamp。规定在1分钟内请求有效,这样Nonce保存时间在1分钟即可。
密钥加密选择SHA-256算法,当然算法可以当做参数,由客户端指定,就用Algorithm吧
url类似:xxx?SecretId=xxx&Nonce=xxx&Timestamp=xxx&Algorithm=xxx
添加一个自定义请求头Signature,放上签名,待签名字符串规定格式如下
{Nonce}:{Timestamp}:{Algorithm}:{HTTPMethod}:{base64(HTTPBody)}
当然,还可以添加更多到待签名字符串
客户端
服务端
def verify_signature(signature, nonce,timestamp,algorithm,method,body_base64): format_str = f"{nonce}:{timestamp}:{algorithm}:{method}:{body_base64}" server_signature = hashlib.sha256(format_str.encode()).hexdigest() log.logger.info(format_str) log.logger.info(server_signature) return signature == server_signature def require_signature(f): @wraps(f) def decorated_function(*args, **kwargs): resp = { "requestid": uuid.uuid4() } secret_id = request.args.get('SecretId', type=str) nonce = request.args.get('Nonce', type=str) timestamp = request.args.get('Timestamp', type=int) algorithm = request.args.get('Algorithm', type=str) if secret_id is None: resp['error'] = 'No SecretId in query string' return jsonify(resp),400 if nonce is None: resp['error'] = 'No Nonce in query string' return jsonify(resp),400 if timestamp is None: resp['error'] = 'No Timestamp in query string' return jsonify(resp),400 if algorithm is None: resp['error'] = 'No Algorithm in query string' return jsonify(resp),400 if algorithm not in ["sha256"]: resp['error'] = f'can not support {algorithm}' return jsonify(resp), 400 if (datetime.now() - timedelta(minutes=MIN)).timestamp() > timestamp: resp['error'] = 'Request is send before 5 mins ago, check Timestamp' return jsonify(resp), 400 if ':' in nonce: resp['error'] = 'can not contain : in Nonce, generate a new one' return jsonify(resp), 400 cli = redis.Redis.from_url(REDIS_URL) if cli.exists(nonce): resp['error'] = 'can not request with same Nonce' return jsonify(resp), 400 else: cli.setex(nonce, MIN*60, 1) signature = request.headers.get('Signature') if not signature: resp['error'] = 'no Signature in headers' return jsonify(resp), 400 # 解析 Authorization header,验证签名 body_base64 = bytes.decode(b64encode(json.dumps(request.get_json(),ensure_ascii=False).encode())) if not verify_signature(signature, nonce,timestamp,algorithm,request.method,body_base64): resp['error'] = 'invalid signature' return jsonify(resp), 401 return f(*args, **kwargs) return decorated_function
例如,在v1.0的get_user_info接口,存在将用户输入拼接到sql的漏洞,可以被SQL注入。
抓包如下:
防御方面可以通过预编译等方式来解决
v3.0已解决
例如,在v1.0的get_user_info接口,用户手机号被完整返回,没有打码。
防御上可以通过加*打码或MFA等来解决
v3.0已解决
例如,在v2.0的api key认证接口,任意用户都能查询admin用户的信息,只需要知道username即可
防御方面可以通过添加RBAC等鉴权来解决
例如,v2.0的api key认证接口,设置burpsuite代理,放到重放器Reapter,发送多少次都可以。
v3.0通过自定义请求签名就解决了此类问题。
API 鉴权都有哪些分类,这些重点不要错过
best-practices-for-authentication-and-authorization-for-rest-apis/
pyjwt
https://github.com/ticarpi/jwt_tool
rfc6750-The OAuth 2.0 Authorization Framework: Bearer Token Usage
rfc7616-HTTP Digest Access Authentication
rfc2617-HTTP Authentication: Basic and Digest Access Authentication
rfc7519-JSON web Token (JWT)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。