- import hashlib
- import hmac
- import base64
- class s3client:
- def __init__(self, restful_server, access_key, secret_key):
- self.restful_server = restful_server
- self.access_key = access_key
- self.secret_key = secret_key
- # Note1: here http_headers is a directory like: http_headers["header-value"] = [value1, value2]
- # Note2: the canonical_path may need urlencoded(here we don't process it)
- def sign(self, method, canonical_path, parameters, http_headers):
- if self.secret_key == "":
- return # no auth secret; skip signing, e.g. for public read-only buckets
- md5 = ""
- ctype = ""
- xamzdate = False
- date = ""
- xamz_elems = []
- s3ParamsToSign = {"uploadId":True, "partNumber":True}
- for k in http_headers.keys():
- v = http_headers.get(k)
- k = k.lower()
- if k == "content-md5":
- md5 = v[0]
- elif k == "content-type":
- ctype = v[0]
- elif k == "date":
- if xamzdate == False:
- date = v[0]
- else:
- if k.startswith("x-amz-",0,len(k)):
- str = k + ":"
- for i, val in enumerate(v):
- str += val
- if i != len(v) -1:
- str += ","
- xamz_elems.append(str)
- if k == "x-amz-date":
- xamzdate = True
- date = ""
- xamz = ""
- if len(xamz_elems) > 0:
- xamz_elems.sort()
- # print xamz_elems
- for elem in xamz_elems:
- xamz += (elem + "\n")
- # print xamz
- expires = False
- if "Expires" in parameters:
- # Query string request authentication alternative
- expires = True
- date = parameters["Expires"]
- parameters["AWSAccessKeyId"] = self.access_key
- # process the parameters
- signParams = []
- for k, v in parameters.items():
- if k in s3ParamsToSign and s3ParamsToSign[k] == True:
- if v == "":
- signParams.append(k)
- else:
- signParams.append(k + "=" + v)
- #print signParams
- if len(signParams):
- signParams.sort()
- canonical_path = canonical_path + "?"
- for i, v in enumerate(signParams):
- canonical_path += v
- if i != len(signParams) -1:
- canonical_path += "&"
- print(canonical_path)
- payload = method + "\n"
- payload += md5 + "\n"
- payload += ctype + "\n"
- payload += date + "\n"
- payload += xamz
- payload += canonical_path
- print("payload: %s" %payload)
- #byte_sk = bytearray(self.secret_key, "utf-8")
- #hmac_sha = hmac.new(byte_sk, digestmod=hashlib.sha1)
- hmac_sha = hmac.new(self.secret_key.encode("utf8"), digestmod=hashlib.sha1)
- hmac_sha.update(payload.encode("utf8"))
- signature = base64.b64encode(hmac_sha.digest())
- print("signature: %s" % signature)
- if expires:
- parameters["Signature"] = signature
- #xqIFY7lyjhiWs0rJ+gx6/Amjo6c=
- else:
- str = "AWS " + self.access_key + ":" + signature.decode()
- http_headers["Authorization"] = [str] #["AWS " + self.access_key + ":" + signature]
- #AWS B0KTM947XL7ZG0X36RQ2:TjXdqQa8LykH0uq4KmJXXx8ywPg=
- # -*- coding:utf-8 -*-
- import s3_client
- import hashlib
- import datetime
- import base64
- import urllib
- def testSign():
- UPLOAD_SERVER = "web.com:443"
- access_key = "xxxxx"
- secret_key = "xxxx"
- client = s3_client.s3client(UPLOAD_SERVER, access_key, secret_key)
- # 1) method
- method = "GET"
- # 2) canonical_path
- canonical_path = "/bucket/md5/filename"
- # 3) parameters
- parameters = {}
- #uploadId = "2~QymFaxK5DMs8XkzBSoIEhW - 7 - 1 - TBRm"
- #parameters["uploadId"] = "2~QymFaxK5DMs8XkzBSoIEhW-7-1-TBRm"
- #parameters["partNumber"] = "1"
- parameters["AWSAccessKeyId"] = "96X9UA862J8CHV28Y9UU"
- parameters["Expires"] = "1551668789"
- #parameters["Signature"] = "1551668789"
- str = "hello,world"
- # 4) http_headers
- http_headers = {}
- http_headers["Content-Type"] = ["application/octet-stream"]
- #http_headers["Content-Length"] = [len(str)]
- hash_md5 = hashlib.md5(str.encode("utf8"))
- digest = hash_md5.digest()
- #digest = "46ff8464449b5f71b0b6496ceb0b8bf0"
- #md5b64 = base64.standard_b64encode(digest)
- #print(md5b64)
- md5b64 = "NDQ5YjVmNzFiMGI2NDk2Yw=="
- #http_headers["Content-MD5"] = [md5b64]
- GMT_FORMAT = '%a, %d %b %Y %H:%M:%S GMT'
- #http_headers["x-amz-date"] = [datetime.datetime.utcnow().strftime(GMT_FORMAT)]
- #http_headers["x-amz-date"] = ["Mon, 04 Mar 2019 02:36:27 GMT"]
- #http_headers["X-Amz-User-Agent"] = ["aws-sdk-js/2.3.19"]
- #print("x-amz-date: %s" % http_headers["x-amz-date"])
- #http_headers["x-amz-meta-reviewdby"] = ["jjjj@johnsmith.net", "jjjj@johnsmith.net"]
- client.sign(method, canonical_path, parameters, http_headers)
- print("canonical_path: %s" % canonical_path)
- print("parameters: %s" % parameters)
- print("http_headers", http_headers)
- testSign(
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。