esign_algorithm.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. import base64
  2. import hashlib
  3. import hmac
  4. from functools import wraps
  5. from hashlib import sha256
  6. import time
  7. from esign_emun import httpMethodEnum
  8. import logging
  9. import json
  10. from urllib.parse import urlparse, parse_qs, urlencode
  11. def doContentMd5(data):
  12. """
  13. 根据字符串计算Content-MD5
  14. :param data:
  15. :return:
  16. """
  17. obj = hashlib.md5() # 实例化md5的时候可以给传个参数,这叫加盐
  18. obj.update(data.encode("utf-8")) # 是再加密的时候传入自己的一块字节,
  19. secret = obj.digest()
  20. return base64.b64encode(secret).decode('utf-8')
  21. def appendSignDataString(http_method, content_md5, url, accept="*/*",
  22. content_type="application/json; charset=UTF-8",
  23. **kwargs):
  24. """
  25. 拼接待签名字符串
  26. :param http_method: 统一用大写的POST GET DELETE PUT
  27. :param content_md5:请求body体计算的md5值
  28. :param url: 接口url地址,不带网关,例:/v1/accounts/createByThirdPartyUserId
  29. :param accept: 不传默认"*/*"
  30. :param content_type: 不传默认application/json; charset=UTF-8
  31. :param kwargs: 可以传headers和date参数
  32. :return: 拼接的待签名字符串
  33. """
  34. # 待签字符串
  35. sign_data_str = "{}\n{}\n{}\n{}\n".format(http_method, accept, content_md5, content_type)
  36. date = kwargs.get('date')
  37. headers = kwargs.get('headers')
  38. # 如果date是空的,直接拼接\n
  39. if date == "" or date is None:
  40. sign_data_str = "{}\n".format(sign_data_str)
  41. else:
  42. sign_data_str = "{}{}\n".format(sign_data_str, date)
  43. # 如果header是空的,直接拼接url
  44. if headers == "" or headers is None:
  45. sign_data_str = "{}{}".format(sign_data_str, url)
  46. else:
  47. sign_data_str = "{}{}\n{}".format(sign_data_str, headers, url)
  48. return sign_data_str
  49. def doSignatureBase64(message, secret):
  50. """
  51. 根据待签字符串计算签名值
  52. :param message: 待签名字符串
  53. :param secret:密钥
  54. :return:
  55. """
  56. key = secret.encode('utf-8') # sha256加密的key
  57. message = message.encode('utf-8') # 待sha256加密的内容
  58. sign = base64.b64encode(hmac.new(key, message, digestmod=sha256).digest()).decode()
  59. return sign
  60. def getMillisecondStamp():
  61. """
  62. 获取当前毫秒时间戳
  63. :return:
  64. """
  65. # 使用 time_ns 避免浮点精度问题
  66. # print(str(time.time_ns() // 1_000_000))
  67. return str(time.time_ns() // 1_000_000)
  68. def buildHeader(appid, content_md5, req_signature, accept="*/*",
  69. content_type="application/json; charset=UTF-8",
  70. auth_mode="Signature", **kwargs):
  71. """
  72. 构造签名鉴权请求头
  73. :param appid:
  74. :param content_md5:
  75. :param req_signature:
  76. :param accept:
  77. :param content_type:
  78. :param auth_mode:
  79. :param kwargs:
  80. :return:
  81. """
  82. header = {"X-Tsign-Open-App-Id": appid,
  83. "Content-Type": content_type,
  84. "X-Tsign-Open-Ca-Timestamp": getMillisecondStamp(),
  85. "Accept": accept,
  86. "X-Tsign-Open-Ca-Signature": req_signature,
  87. "Content-MD5": content_md5,
  88. "X-Tsign-Open-Auth-Mode": auth_mode,
  89. "X-Tsign-Dns-App-Id": appid}
  90. for key, value in kwargs.items():
  91. header[key] = value
  92. return header
  93. def buildFileUploadHeader(contentType, contentMd5):
  94. """
  95. 构建文件流上传请求头
  96. :param contentType:
  97. :param contentMd5:
  98. :return:
  99. """
  100. header = {
  101. 'Content-Type': contentType,
  102. 'Content-MD5': contentMd5
  103. }
  104. return header
  105. def buildSignJsonHeader(appid, secret, http_method, url, body=None,
  106. **kwargs):
  107. """
  108. 签名并构造签名鉴权和json请求体的请求头
  109. :param appid:
  110. :param secret:
  111. :param body: 传入字典格式数据
  112. :param http_method:
  113. :param url:接口url地址,不带网关,例:/v1/accounts/createByThirdPartyUserId
  114. :param kwargs:可以传headers和date参数以及其它自定义请求头参数
  115. :return:
  116. """
  117. content_md5 = ""
  118. # 判断是PUT或者POST请求,不需要计算计算md5,否则md5为空
  119. if httpMethodEnum.PUT == http_method or httpMethodEnum.POST == http_method:
  120. # 字典转json字符串(去空格,保证与发送一致)
  121. body = json.dumps(body, separators=(",", ":"), ensure_ascii=False)
  122. # 生成md5
  123. content_md5 = doContentMd5(body)
  124. # 拼接待签名字符串
  125. message = appendSignDataString(http_method, content_md5, apiPathSort(url), **kwargs)
  126. # 传入待签名字符串生成签名值
  127. req_signature = doSignatureBase64(message, secret)
  128. # 生成请求头
  129. header = buildHeader(appid, content_md5, req_signature, **kwargs)
  130. logging.debug("开始运行".center(50, "-"))
  131. logging.debug("计算md5的body:{}\n生成的md5:{}\n"
  132. "拼接的待签字符串:{}\n签名值:{}".format(body, content_md5, message, req_signature))
  133. logging.debug("结束运行".center(50, "-"))
  134. return header
  135. def apiPathSort(api_path, is_query=False, is_url_encode=False):
  136. """
  137. 传入url对query做排序返回新的url
  138. :param api_path:
  139. :param is_url_encode:
  140. :param is_query:
  141. :return:
  142. """
  143. # 提取url参数
  144. # 传入的api_path是不是整体都是query参数
  145. if is_query:
  146. query = api_path
  147. urls = ""
  148. path = ""
  149. # 将字符串转换为字典
  150. # 所得的字典的value都是以列表的形式存在,若列表中都只有一个值
  151. else:
  152. urls = "?"
  153. url = urlparse(api_path)
  154. query = url.query
  155. path = url.path
  156. # 将字符串转换为字典
  157. # 所得的字典的value都是以列表的形式存在,若列表中都只有一个值
  158. body = parse_qs(query)
  159. body = {key: body[key][0] for key in sorted(body.keys())}
  160. # 需要做urlencode编码
  161. if is_url_encode:
  162. body = urlencode(body)
  163. urls = path + urls + body
  164. else:
  165. for key in body:
  166. temp_str = key + "=" + body[key]
  167. urls = urls + temp_str + "&"
  168. urls = path + urls
  169. urls = urls[:len(urls) - 1]
  170. return urls
  171. def esign_run_print_outer(func):
  172. """
  173. 装饰器用于打印运行函数的基本信息
  174. :param func:
  175. :return:
  176. """
  177. @wraps(func)
  178. def esign_run_print(*args, **kwargs):
  179. print("开始运行{}".format(func.__name__).center(50, "-"))
  180. start_time = time.time()
  181. response = func(*args, **kwargs)
  182. end_time = time.time()
  183. print("运行时间:{}".format(end_time - start_time))
  184. print("结束运行{}".format(func.__name__).center(50, "-"))
  185. print("\n\n")
  186. return response
  187. return esign_run_print
  188. if __name__ == '__main__':
  189. pass