| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- import base64
- import hashlib
- import hmac
- from functools import wraps
- from hashlib import sha256
- import time
- import logging
- import json
- from urllib.parse import urlparse, parse_qs, urlencode
- def doContentMd5(data):
- """
- 根据字符串计算Content-MD5
- :param data:
- :return:
- """
- obj = hashlib.md5() # 实例化md5的时候可以给传个参数,这叫加盐
- obj.update(data.encode("utf-8")) # 是再加密的时候传入自己的一块字节,
- secret = obj.digest()
- return base64.b64encode(secret).decode('utf-8')
- def appendSignDataString(http_method, content_md5, url, accept="*/*",
- content_type="application/json; charset=UTF-8",
- **kwargs):
- """
- 拼接待签名字符串
- :param http_method: 统一用大写的POST GET DELETE PUT
- :param content_md5:请求body体计算的md5值
- :param url: 接口url地址,不带网关,例:/v1/accounts/createByThirdPartyUserId
- :param accept: 不传默认"*/*"
- :param content_type: 不传默认application/json; charset=UTF-8
- :param kwargs: 可以传headers和date参数
- :return: 拼接的待签名字符串
- """
- # 待签字符串
- sign_data_str = "{}\n{}\n{}\n{}\n".format(http_method, accept, content_md5, content_type)
- date = kwargs.get('date')
- headers = kwargs.get('headers')
- # 如果date是空的,直接拼接\n
- if date == "" or date is None:
- sign_data_str = "{}\n".format(sign_data_str)
- else:
- sign_data_str = "{}{}\n".format(sign_data_str, date)
- # 如果header是空的,直接拼接url
- if headers == "" or headers is None:
- sign_data_str = "{}{}".format(sign_data_str, url)
- else:
- sign_data_str = "{}{}\n{}".format(sign_data_str, headers, url)
- return sign_data_str
- def doSignatureBase64(message, secret):
- """
- 根据待签字符串计算签名值
- :param message: 待签名字符串
- :param secret:密钥
- :return:
- """
- key = secret.encode('utf-8') # sha256加密的key
- message = message.encode('utf-8') # 待sha256加密的内容
- sign = base64.b64encode(hmac.new(key, message, digestmod=sha256).digest()).decode()
- return sign
- def getMillisecondStamp():
- """
- 获取当前毫秒时间戳
- :return:
- """
- # 使用 time_ns 避免浮点精度问题
- # print(str(time.time_ns() // 1_000_000))
- return str(time.time_ns() // 1_000_000)
- def buildHeader(appid, content_md5, req_signature, accept="*/*",
- content_type="application/json; charset=UTF-8",
- auth_mode="Signature", **kwargs):
- """
- 构造签名鉴权请求头
- :param appid:
- :param content_md5:
- :param req_signature:
- :param accept:
- :param content_type:
- :param auth_mode:
- :param kwargs:
- :return:
- """
- header = {"X-Tsign-Open-App-Id": appid,
- "Content-Type": content_type,
- "X-Tsign-Open-Ca-Timestamp": getMillisecondStamp(),
- "Accept": accept,
- "X-Tsign-Open-Ca-Signature": req_signature,
- "Content-MD5": content_md5,
- "X-Tsign-Open-Auth-Mode": auth_mode,
- "X-Tsign-Dns-App-Id": appid}
- for key, value in kwargs.items():
- header[key] = value
- return header
- def buildFileUploadHeader(contentType, contentMd5):
- """
- 构建文件流上传请求头
- :param contentType:
- :param contentMd5:
- :return:
- """
- header = {
- 'Content-Type': contentType,
- 'Content-MD5': contentMd5
- }
- return header
- def buildSignJsonHeader(appid, secret, http_method, url, body=None,
- **kwargs):
- """
- 签名并构造签名鉴权和json请求体的请求头
- :param appid:
- :param secret:
- :param body: 传入字典格式数据
- :param http_method:
- :param url:接口url地址,不带网关,例:/v1/accounts/createByThirdPartyUserId
- :param kwargs:可以传headers和date参数以及其它自定义请求头参数
- :return:
- """
- content_md5 = ""
- # 判断是PUT或者POST请求,不需要计算计算md5,否则md5为空
- if "PUT" == http_method or "POST" == http_method:
- # 字典转json字符串(去空格,保证与发送一致)
- body = json.dumps(body, separators=(",", ":"), ensure_ascii=False)
- # 生成md5
- content_md5 = doContentMd5(body)
- # 拼接待签名字符串
- message = appendSignDataString(http_method, content_md5, apiPathSort(url), **kwargs)
- # 传入待签名字符串生成签名值
- req_signature = doSignatureBase64(message, secret)
- # 生成请求头
- header = buildHeader(appid, content_md5, req_signature, **kwargs)
- logging.debug("开始运行".center(50, "-"))
- logging.debug("计算md5的body:{}\n生成的md5:{}\n"
- "拼接的待签字符串:{}\n签名值:{}".format(body, content_md5, message, req_signature))
- logging.debug("结束运行".center(50, "-"))
- return header
- def apiPathSort(api_path, is_query=False, is_url_encode=False):
- """
- 传入url对query做排序返回新的url
- :param api_path:
- :param is_url_encode:
- :param is_query:
- :return:
- """
- # 提取url参数
- # 传入的api_path是不是整体都是query参数
- if is_query:
- query = api_path
- urls = ""
- path = ""
- # 将字符串转换为字典
- # 所得的字典的value都是以列表的形式存在,若列表中都只有一个值
- else:
- urls = "?"
- url = urlparse(api_path)
- query = url.query
- path = url.path
- # 将字符串转换为字典
- # 所得的字典的value都是以列表的形式存在,若列表中都只有一个值
- body = parse_qs(query)
- body = {key: body[key][0] for key in sorted(body.keys())}
- # 需要做urlencode编码
- if is_url_encode:
- body = urlencode(body)
- urls = path + urls + body
- else:
- for key in body:
- temp_str = key + "=" + body[key]
- urls = urls + temp_str + "&"
- urls = path + urls
- urls = urls[:len(urls) - 1]
- return urls
- def esign_run_print_outer(func):
- """
- 装饰器用于打印运行函数的基本信息
- :param func:
- :return:
- """
- @wraps(func)
- def esign_run_print(*args, **kwargs):
- print("开始运行{}".format(func.__name__).center(50, "-"))
- start_time = time.time()
- response = func(*args, **kwargs)
- end_time = time.time()
- print("运行时间:{}".format(end_time - start_time))
- print("结束运行{}".format(func.__name__).center(50, "-"))
- print("\n\n")
- return response
- return esign_run_print
- if __name__ == '__main__':
- pass
|