import base64 import hashlib import hmac from functools import wraps from hashlib import sha256 import time import logging import json from urllib.parse import urlparse, parse_qs, urlencode def doContentMd5(data): """ 根据字符串计算Content-MD5 :param data: :return: """ obj = hashlib.md5() # 实例化md5的时候可以给传个参数,这叫加盐 obj.update(data.encode("utf-8")) # 是再加密的时候传入自己的一块字节, secret = obj.digest() return base64.b64encode(secret).decode('utf-8') def appendSignDataString(http_method, content_md5, url, accept="*/*", content_type="application/json; charset=UTF-8", **kwargs): """ 拼接待签名字符串 :param http_method: 统一用大写的POST GET DELETE PUT :param content_md5:请求body体计算的md5值 :param url: 接口url地址,不带网关,例:/v1/accounts/createByThirdPartyUserId :param accept: 不传默认"*/*" :param content_type: 不传默认application/json; charset=UTF-8 :param kwargs: 可以传headers和date参数 :return: 拼接的待签名字符串 """ # 待签字符串 sign_data_str = "{}\n{}\n{}\n{}\n".format(http_method, accept, content_md5, content_type) date = kwargs.get('date') headers = kwargs.get('headers') # 如果date是空的,直接拼接\n if date == "" or date is None: sign_data_str = "{}\n".format(sign_data_str) else: sign_data_str = "{}{}\n".format(sign_data_str, date) # 如果header是空的,直接拼接url if headers == "" or headers is None: sign_data_str = "{}{}".format(sign_data_str, url) else: sign_data_str = "{}{}\n{}".format(sign_data_str, headers, url) return sign_data_str def doSignatureBase64(message, secret): """ 根据待签字符串计算签名值 :param message: 待签名字符串 :param secret:密钥 :return: """ key = secret.encode('utf-8') # sha256加密的key message = message.encode('utf-8') # 待sha256加密的内容 sign = base64.b64encode(hmac.new(key, message, digestmod=sha256).digest()).decode() return sign def getMillisecondStamp(): """ 获取当前毫秒时间戳 :return: """ # 使用 time_ns 避免浮点精度问题 # print(str(time.time_ns() // 1_000_000)) return str(time.time_ns() // 1_000_000) def buildHeader(appid, content_md5, req_signature, accept="*/*", content_type="application/json; charset=UTF-8", auth_mode="Signature", **kwargs): """ 构造签名鉴权请求头 :param appid: :param content_md5: :param req_signature: :param accept: :param content_type: :param auth_mode: :param kwargs: :return: """ header = {"X-Tsign-Open-App-Id": appid, "Content-Type": content_type, "X-Tsign-Open-Ca-Timestamp": getMillisecondStamp(), "Accept": accept, "X-Tsign-Open-Ca-Signature": req_signature, "Content-MD5": content_md5, "X-Tsign-Open-Auth-Mode": auth_mode, "X-Tsign-Dns-App-Id": appid} for key, value in kwargs.items(): header[key] = value return header def buildFileUploadHeader(contentType, contentMd5): """ 构建文件流上传请求头 :param contentType: :param contentMd5: :return: """ header = { 'Content-Type': contentType, 'Content-MD5': contentMd5 } return header def buildSignJsonHeader(appid, secret, http_method, url, body=None, **kwargs): """ 签名并构造签名鉴权和json请求体的请求头 :param appid: :param secret: :param body: 传入字典格式数据 :param http_method: :param url:接口url地址,不带网关,例:/v1/accounts/createByThirdPartyUserId :param kwargs:可以传headers和date参数以及其它自定义请求头参数 :return: """ content_md5 = "" # 判断是PUT或者POST请求,不需要计算计算md5,否则md5为空 if "PUT" == http_method or "POST" == http_method: # 字典转json字符串(去空格,保证与发送一致) body = json.dumps(body, separators=(",", ":"), ensure_ascii=False) # 生成md5 content_md5 = doContentMd5(body) # 拼接待签名字符串 message = appendSignDataString(http_method, content_md5, apiPathSort(url), **kwargs) # 传入待签名字符串生成签名值 req_signature = doSignatureBase64(message, secret) # 生成请求头 header = buildHeader(appid, content_md5, req_signature, **kwargs) logging.debug("开始运行".center(50, "-")) logging.debug("计算md5的body:{}\n生成的md5:{}\n" "拼接的待签字符串:{}\n签名值:{}".format(body, content_md5, message, req_signature)) logging.debug("结束运行".center(50, "-")) return header def apiPathSort(api_path, is_query=False, is_url_encode=False): """ 传入url对query做排序返回新的url :param api_path: :param is_url_encode: :param is_query: :return: """ # 提取url参数 # 传入的api_path是不是整体都是query参数 if is_query: query = api_path urls = "" path = "" # 将字符串转换为字典 # 所得的字典的value都是以列表的形式存在,若列表中都只有一个值 else: urls = "?" url = urlparse(api_path) query = url.query path = url.path # 将字符串转换为字典 # 所得的字典的value都是以列表的形式存在,若列表中都只有一个值 body = parse_qs(query) body = {key: body[key][0] for key in sorted(body.keys())} # 需要做urlencode编码 if is_url_encode: body = urlencode(body) urls = path + urls + body else: for key in body: temp_str = key + "=" + body[key] urls = urls + temp_str + "&" urls = path + urls urls = urls[:len(urls) - 1] return urls def esign_run_print_outer(func): """ 装饰器用于打印运行函数的基本信息 :param func: :return: """ @wraps(func) def esign_run_print(*args, **kwargs): print("开始运行{}".format(func.__name__).center(50, "-")) start_time = time.time() response = func(*args, **kwargs) end_time = time.time() print("运行时间:{}".format(end_time - start_time)) print("结束运行{}".format(func.__name__).center(50, "-")) print("\n\n") return response return esign_run_print if __name__ == '__main__': pass