esign_algorithm.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. import base64
  2. import hashlib
  3. import hmac
  4. from functools import wraps
  5. from hashlib import sha256
  6. import time
  7. import logging
  8. import json
  9. from urllib.parse import urlparse, parse_qs, urlencode
  10. def doContentMd5(data):
  11. """
  12. 根据字符串计算Content-MD5
  13. :param data:
  14. :return:
  15. """
  16. obj = hashlib.md5() # 实例化md5的时候可以给传个参数,这叫加盐
  17. obj.update(data.encode("utf-8")) # 是再加密的时候传入自己的一块字节,
  18. secret = obj.digest()
  19. return base64.b64encode(secret).decode('utf-8')
  20. def appendSignDataString(http_method, content_md5, url, accept="*/*",
  21. content_type="application/json; charset=UTF-8",
  22. **kwargs):
  23. """
  24. 拼接待签名字符串
  25. :param http_method: 统一用大写的POST GET DELETE PUT
  26. :param content_md5:请求body体计算的md5值
  27. :param url: 接口url地址,不带网关,例:/v1/accounts/createByThirdPartyUserId
  28. :param accept: 不传默认"*/*"
  29. :param content_type: 不传默认application/json; charset=UTF-8
  30. :param kwargs: 可以传headers和date参数
  31. :return: 拼接的待签名字符串
  32. """
  33. # 待签字符串
  34. sign_data_str = "{}\n{}\n{}\n{}\n".format(http_method, accept, content_md5, content_type)
  35. date = kwargs.get('date')
  36. headers = kwargs.get('headers')
  37. # 如果date是空的,直接拼接\n
  38. if date == "" or date is None:
  39. sign_data_str = "{}\n".format(sign_data_str)
  40. else:
  41. sign_data_str = "{}{}\n".format(sign_data_str, date)
  42. # 如果header是空的,直接拼接url
  43. if headers == "" or headers is None:
  44. sign_data_str = "{}{}".format(sign_data_str, url)
  45. else:
  46. sign_data_str = "{}{}\n{}".format(sign_data_str, headers, url)
  47. return sign_data_str
  48. def doSignatureBase64(message, secret):
  49. """
  50. 根据待签字符串计算签名值
  51. :param message: 待签名字符串
  52. :param secret:密钥
  53. :return:
  54. """
  55. key = secret.encode('utf-8') # sha256加密的key
  56. message = message.encode('utf-8') # 待sha256加密的内容
  57. sign = base64.b64encode(hmac.new(key, message, digestmod=sha256).digest()).decode()
  58. return sign
  59. def getMillisecondStamp():
  60. """
  61. 获取当前毫秒时间戳
  62. :return:
  63. """
  64. # 使用 time_ns 避免浮点精度问题
  65. # print(str(time.time_ns() // 1_000_000))
  66. return str(time.time_ns() // 1_000_000)
  67. def buildHeader(appid, content_md5, req_signature, accept="*/*",
  68. content_type="application/json; charset=UTF-8",
  69. auth_mode="Signature", **kwargs):
  70. """
  71. 构造签名鉴权请求头
  72. :param appid:
  73. :param content_md5:
  74. :param req_signature:
  75. :param accept:
  76. :param content_type:
  77. :param auth_mode:
  78. :param kwargs:
  79. :return:
  80. """
  81. header = {"X-Tsign-Open-App-Id": appid,
  82. "Content-Type": content_type,
  83. "X-Tsign-Open-Ca-Timestamp": getMillisecondStamp(),
  84. "Accept": accept,
  85. "X-Tsign-Open-Ca-Signature": req_signature,
  86. "Content-MD5": content_md5,
  87. "X-Tsign-Open-Auth-Mode": auth_mode,
  88. "X-Tsign-Dns-App-Id": appid}
  89. for key, value in kwargs.items():
  90. header[key] = value
  91. return header
  92. def buildFileUploadHeader(contentType, contentMd5):
  93. """
  94. 构建文件流上传请求头
  95. :param contentType:
  96. :param contentMd5:
  97. :return:
  98. """
  99. header = {
  100. 'Content-Type': contentType,
  101. 'Content-MD5': contentMd5
  102. }
  103. return header
  104. def buildSignJsonHeader(appid, secret, http_method, url, body=None,
  105. **kwargs):
  106. """
  107. 签名并构造签名鉴权和json请求体的请求头
  108. :param appid:
  109. :param secret:
  110. :param body: 传入字典格式数据
  111. :param http_method:
  112. :param url:接口url地址,不带网关,例:/v1/accounts/createByThirdPartyUserId
  113. :param kwargs:可以传headers和date参数以及其它自定义请求头参数
  114. :return:
  115. """
  116. content_md5 = ""
  117. # 判断是PUT或者POST请求,不需要计算计算md5,否则md5为空
  118. if "PUT" == http_method or "POST" == http_method:
  119. # 字典转json字符串(去空格,保证与发送一致)
  120. body = json.dumps(body, separators=(",", ":"), ensure_ascii=False)
  121. # 生成md5
  122. content_md5 = doContentMd5(body)
  123. # 拼接待签名字符串
  124. message = appendSignDataString(http_method, content_md5, apiPathSort(url), **kwargs)
  125. # 传入待签名字符串生成签名值
  126. req_signature = doSignatureBase64(message, secret)
  127. # 生成请求头
  128. header = buildHeader(appid, content_md5, req_signature, **kwargs)
  129. logging.debug("开始运行".center(50, "-"))
  130. logging.debug("计算md5的body:{}\n生成的md5:{}\n"
  131. "拼接的待签字符串:{}\n签名值:{}".format(body, content_md5, message, req_signature))
  132. logging.debug("结束运行".center(50, "-"))
  133. return header
  134. def apiPathSort(api_path, is_query=False, is_url_encode=False):
  135. """
  136. 传入url对query做排序返回新的url
  137. :param api_path:
  138. :param is_url_encode:
  139. :param is_query:
  140. :return:
  141. """
  142. # 提取url参数
  143. # 传入的api_path是不是整体都是query参数
  144. if is_query:
  145. query = api_path
  146. urls = ""
  147. path = ""
  148. # 将字符串转换为字典
  149. # 所得的字典的value都是以列表的形式存在,若列表中都只有一个值
  150. else:
  151. urls = "?"
  152. url = urlparse(api_path)
  153. query = url.query
  154. path = url.path
  155. # 将字符串转换为字典
  156. # 所得的字典的value都是以列表的形式存在,若列表中都只有一个值
  157. body = parse_qs(query)
  158. body = {key: body[key][0] for key in sorted(body.keys())}
  159. # 需要做urlencode编码
  160. if is_url_encode:
  161. body = urlencode(body)
  162. urls = path + urls + body
  163. else:
  164. for key in body:
  165. temp_str = key + "=" + body[key]
  166. urls = urls + temp_str + "&"
  167. urls = path + urls
  168. urls = urls[:len(urls) - 1]
  169. return urls
  170. def esign_run_print_outer(func):
  171. """
  172. 装饰器用于打印运行函数的基本信息
  173. :param func:
  174. :return:
  175. """
  176. @wraps(func)
  177. def esign_run_print(*args, **kwargs):
  178. print("开始运行{}".format(func.__name__).center(50, "-"))
  179. start_time = time.time()
  180. response = func(*args, **kwargs)
  181. end_time = time.time()
  182. print("运行时间:{}".format(end_time - start_time))
  183. print("结束运行{}".format(func.__name__).center(50, "-"))
  184. print("\n\n")
  185. return response
  186. return esign_run_print
  187. if __name__ == '__main__':
  188. pass