import base64 import hashlib import hmac from functools import wraps from hashlib import sha256 import time import logging import json from urllib.parse import urlparse, parse_qs, urlencode def doContentMd5(data): obj = hashlib.md5() obj.update(data.encode("utf-8")) secret = obj.digest() return base64.b64encode(secret).decode("utf-8") def appendSignDataString(http_method, content_md5, url, accept="*/*", content_type="application/json; charset=UTF-8", **kwargs): sign_data_str = "{}\n{}\n{}\n{}\n".format(http_method, accept, content_md5, content_type) date = kwargs.get("date") headers = kwargs.get("headers") if date == "" or date is None: sign_data_str = "{}\n".format(sign_data_str) else: sign_data_str = "{}{}\n".format(sign_data_str, date) if headers == "" or headers is None: sign_data_str = "{}{}".format(sign_data_str, url) else: sign_data_str = "{}{}\n{}".format(sign_data_str, headers, url) return sign_data_str def doSignatureBase64(message, secret): key = secret.encode("utf-8") message = message.encode("utf-8") sign = base64.b64encode(hmac.new(key, message, digestmod=sha256).digest()).decode() return sign def getMillisecondStamp(): return str(time.time_ns() // 1_000_000) def buildHeader(appid, content_md5, req_signature, accept="*/*", content_type="application/json; charset=UTF-8", auth_mode="Signature", **kwargs): header = { "X-Tsign-Open-App-Id": appid, "Content-Type": content_type, "X-Tsign-Open-Ca-Timestamp": getMillisecondStamp(), "Accept": accept, "X-Tsign-Open-Ca-Signature": req_signature, "Content-MD5": content_md5, "X-Tsign-Open-Auth-Mode": auth_mode, "X-Tsign-Dns-App-Id": appid, } for key, value in kwargs.items(): header[key] = value return header def buildFileUploadHeader(contentType, contentMd5): return {"Content-Type": contentType, "Content-MD5": contentMd5} def buildSignJsonHeader(appid, secret, http_method, url, body=None, **kwargs): content_md5 = "" if "PUT" == http_method or "POST" == http_method: body = json.dumps(body, separators=(",", ":"), ensure_ascii=False) content_md5 = doContentMd5(body) message = appendSignDataString(http_method, content_md5, apiPathSort(url), **kwargs) req_signature = doSignatureBase64(message, secret) header = buildHeader(appid, content_md5, req_signature, **kwargs) logging.debug("开始运行".center(50, "-")) logging.debug("计算md5的body:{}\n生成的md5:{}\n拼接的待签字符串:{}\n签名值:{}".format(body, content_md5, message, req_signature)) logging.debug("结束运行".center(50, "-")) return header def apiPathSort(api_path, is_query=False, is_url_encode=False): if is_query: query = api_path urls = "" path = "" else: urls = "?" url = urlparse(api_path) query = url.query path = url.path body = parse_qs(query) body = {key: body[key][0] for key in sorted(body.keys())} if is_url_encode: body = urlencode(body) urls = path + urls + body else: for key in body: temp_str = key + "=" + body[key] urls = urls + temp_str + "&" urls = path + urls urls = urls[:len(urls) - 1] return urls def esign_run_print_outer(func): @wraps(func) def esign_run_print(*args, **kwargs): print("开始运行{}".format(func.__name__).center(50, "-")) start_time = time.time() response = func(*args, **kwargs) end_time = time.time() print("运行时间:{}".format(end_time - start_time)) print("结束运行{}".format(func.__name__).center(50, "-")) print("\n\n") return response return esign_run_print