| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- import base64
- import hashlib
- import hmac
- from functools import wraps
- from hashlib import sha256
- import time
- import logging
- import json
- from urllib.parse import urlparse, parse_qs, urlencode
- def doContentMd5(data):
- obj = hashlib.md5()
- obj.update(data.encode("utf-8"))
- secret = obj.digest()
- return base64.b64encode(secret).decode("utf-8")
- def appendSignDataString(http_method, content_md5, url, accept="*/*", content_type="application/json; charset=UTF-8", **kwargs):
- sign_data_str = "{}\n{}\n{}\n{}\n".format(http_method, accept, content_md5, content_type)
- date = kwargs.get("date")
- headers = kwargs.get("headers")
- if date == "" or date is None:
- sign_data_str = "{}\n".format(sign_data_str)
- else:
- sign_data_str = "{}{}\n".format(sign_data_str, date)
- if headers == "" or headers is None:
- sign_data_str = "{}{}".format(sign_data_str, url)
- else:
- sign_data_str = "{}{}\n{}".format(sign_data_str, headers, url)
- return sign_data_str
- def doSignatureBase64(message, secret):
- key = secret.encode("utf-8")
- message = message.encode("utf-8")
- sign = base64.b64encode(hmac.new(key, message, digestmod=sha256).digest()).decode()
- return sign
- def getMillisecondStamp():
- return str(time.time_ns() // 1_000_000)
- def buildHeader(appid, content_md5, req_signature, accept="*/*", content_type="application/json; charset=UTF-8", auth_mode="Signature", **kwargs):
- header = {
- "X-Tsign-Open-App-Id": appid,
- "Content-Type": content_type,
- "X-Tsign-Open-Ca-Timestamp": getMillisecondStamp(),
- "Accept": accept,
- "X-Tsign-Open-Ca-Signature": req_signature,
- "Content-MD5": content_md5,
- "X-Tsign-Open-Auth-Mode": auth_mode,
- "X-Tsign-Dns-App-Id": appid,
- }
- for key, value in kwargs.items():
- header[key] = value
- return header
- def buildFileUploadHeader(contentType, contentMd5):
- return {"Content-Type": contentType, "Content-MD5": contentMd5}
- def buildSignJsonHeader(appid, secret, http_method, url, body=None, **kwargs):
- content_md5 = ""
- if "PUT" == http_method or "POST" == http_method:
- body = json.dumps(body, separators=(",", ":"), ensure_ascii=False)
- content_md5 = doContentMd5(body)
- message = appendSignDataString(http_method, content_md5, apiPathSort(url), **kwargs)
- req_signature = doSignatureBase64(message, secret)
- header = buildHeader(appid, content_md5, req_signature, **kwargs)
- logging.debug("开始运行".center(50, "-"))
- logging.debug("计算md5的body:{}\n生成的md5:{}\n拼接的待签字符串:{}\n签名值:{}".format(body, content_md5, message, req_signature))
- logging.debug("结束运行".center(50, "-"))
- return header
- def apiPathSort(api_path, is_query=False, is_url_encode=False):
- if is_query:
- query = api_path
- urls = ""
- path = ""
- else:
- urls = "?"
- url = urlparse(api_path)
- query = url.query
- path = url.path
- body = parse_qs(query)
- body = {key: body[key][0] for key in sorted(body.keys())}
- if is_url_encode:
- body = urlencode(body)
- urls = path + urls + body
- else:
- for key in body:
- temp_str = key + "=" + body[key]
- urls = urls + temp_str + "&"
- urls = path + urls
- urls = urls[:len(urls) - 1]
- return urls
- def esign_run_print_outer(func):
- @wraps(func)
- def esign_run_print(*args, **kwargs):
- print("开始运行{}".format(func.__name__).center(50, "-"))
- start_time = time.time()
- response = func(*args, **kwargs)
- end_time = time.time()
- print("运行时间:{}".format(end_time - start_time))
- print("结束运行{}".format(func.__name__).center(50, "-"))
- print("\n\n")
- return response
- return esign_run_print
|