esign_algorithm.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import base64
  2. import hashlib
  3. import hmac
  4. from functools import wraps
  5. from hashlib import sha256
  6. import time
  7. import logging
  8. import json
  9. from urllib.parse import urlparse, parse_qs, urlencode
  10. def doContentMd5(data):
  11. obj = hashlib.md5()
  12. obj.update(data.encode("utf-8"))
  13. secret = obj.digest()
  14. return base64.b64encode(secret).decode("utf-8")
  15. def appendSignDataString(http_method, content_md5, url, accept="*/*", content_type="application/json; charset=UTF-8", **kwargs):
  16. sign_data_str = "{}\n{}\n{}\n{}\n".format(http_method, accept, content_md5, content_type)
  17. date = kwargs.get("date")
  18. headers = kwargs.get("headers")
  19. if date == "" or date is None:
  20. sign_data_str = "{}\n".format(sign_data_str)
  21. else:
  22. sign_data_str = "{}{}\n".format(sign_data_str, date)
  23. if headers == "" or headers is None:
  24. sign_data_str = "{}{}".format(sign_data_str, url)
  25. else:
  26. sign_data_str = "{}{}\n{}".format(sign_data_str, headers, url)
  27. return sign_data_str
  28. def doSignatureBase64(message, secret):
  29. key = secret.encode("utf-8")
  30. message = message.encode("utf-8")
  31. sign = base64.b64encode(hmac.new(key, message, digestmod=sha256).digest()).decode()
  32. return sign
  33. def getMillisecondStamp():
  34. return str(time.time_ns() // 1_000_000)
  35. def buildHeader(appid, content_md5, req_signature, accept="*/*", content_type="application/json; charset=UTF-8", auth_mode="Signature", **kwargs):
  36. header = {
  37. "X-Tsign-Open-App-Id": appid,
  38. "Content-Type": content_type,
  39. "X-Tsign-Open-Ca-Timestamp": getMillisecondStamp(),
  40. "Accept": accept,
  41. "X-Tsign-Open-Ca-Signature": req_signature,
  42. "Content-MD5": content_md5,
  43. "X-Tsign-Open-Auth-Mode": auth_mode,
  44. "X-Tsign-Dns-App-Id": appid,
  45. }
  46. for key, value in kwargs.items():
  47. header[key] = value
  48. return header
  49. def buildFileUploadHeader(contentType, contentMd5):
  50. return {"Content-Type": contentType, "Content-MD5": contentMd5}
  51. def buildSignJsonHeader(appid, secret, http_method, url, body=None, **kwargs):
  52. content_md5 = ""
  53. if "PUT" == http_method or "POST" == http_method:
  54. body = json.dumps(body, separators=(",", ":"), ensure_ascii=False)
  55. content_md5 = doContentMd5(body)
  56. message = appendSignDataString(http_method, content_md5, apiPathSort(url), **kwargs)
  57. req_signature = doSignatureBase64(message, secret)
  58. header = buildHeader(appid, content_md5, req_signature, **kwargs)
  59. logging.debug("开始运行".center(50, "-"))
  60. logging.debug("计算md5的body:{}\n生成的md5:{}\n拼接的待签字符串:{}\n签名值:{}".format(body, content_md5, message, req_signature))
  61. logging.debug("结束运行".center(50, "-"))
  62. return header
  63. def apiPathSort(api_path, is_query=False, is_url_encode=False):
  64. if is_query:
  65. query = api_path
  66. urls = ""
  67. path = ""
  68. else:
  69. urls = "?"
  70. url = urlparse(api_path)
  71. query = url.query
  72. path = url.path
  73. body = parse_qs(query)
  74. body = {key: body[key][0] for key in sorted(body.keys())}
  75. if is_url_encode:
  76. body = urlencode(body)
  77. urls = path + urls + body
  78. else:
  79. for key in body:
  80. temp_str = key + "=" + body[key]
  81. urls = urls + temp_str + "&"
  82. urls = path + urls
  83. urls = urls[:len(urls) - 1]
  84. return urls
  85. def esign_run_print_outer(func):
  86. @wraps(func)
  87. def esign_run_print(*args, **kwargs):
  88. print("开始运行{}".format(func.__name__).center(50, "-"))
  89. start_time = time.time()
  90. response = func(*args, **kwargs)
  91. end_time = time.time()
  92. print("运行时间:{}".format(end_time - start_time))
  93. print("结束运行{}".format(func.__name__).center(50, "-"))
  94. print("\n\n")
  95. return response
  96. return esign_run_print