contract_server.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. import datetime
  2. import os
  3. import logging
  4. import json
  5. import re
  6. import urllib.parse
  7. from typing import Any, Union, Optional
  8. from sqlalchemy.ext.asyncio import AsyncSession
  9. from alien_store.repositories.contract_repo import ContractRepository
  10. from alien_store.schemas.request.contract_store import TemplatesCreate
  11. from common.esigntool import main as esign_main
  12. from common.esigntool.main import fill_in_template, create_by_file, sign_url, file_download_url
  13. # ------------------- 日志配置 -------------------
  14. LOG_DIR = os.path.join("common", "logs", "alien_store")
  15. os.makedirs(LOG_DIR, exist_ok=True)
  16. def _init_logger():
  17. logger = logging.getLogger("alien_store_service")
  18. if logger.handlers:
  19. return logger
  20. logger.setLevel(logging.INFO)
  21. fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s %(message)s")
  22. info_handler = logging.FileHandler(os.path.join(LOG_DIR, "info.log"), encoding="utf-8")
  23. info_handler.setLevel(logging.INFO)
  24. info_handler.setFormatter(fmt)
  25. error_handler = logging.FileHandler(os.path.join(LOG_DIR, "error.log"), encoding="utf-8")
  26. error_handler.setLevel(logging.ERROR)
  27. error_handler.setFormatter(fmt)
  28. logger.addHandler(info_handler)
  29. logger.addHandler(error_handler)
  30. return logger
  31. logger = _init_logger()
  32. class ContractServer:
  33. def __init__(self, db: AsyncSession):
  34. self.db = db
  35. self.esign_repo = ContractRepository(db)
  36. async def create_template(self, template_data: TemplatesCreate):
  37. await self.esign_repo.create(template_data)
  38. return {
  39. "message": "模板创建成功"
  40. }
  41. async def create_esign_templates(self, templates_data: TemplatesCreate) -> dict:
  42. """AI审核完调用 e签宝生成文件"""
  43. logger.info(f"create_esign_templates request: {templates_data}")
  44. # 1. 填充模板
  45. res_text = fill_in_template(templates_data.store_name)
  46. try:
  47. res_data = json.loads(res_text)
  48. except json.JSONDecodeError:
  49. logger.error(f"fill_in_template non-json resp: {res_text}")
  50. return {"success": False, "message": "e签宝返回非 JSON", "raw": res_text}
  51. # 2. 提取文件信息
  52. try:
  53. contract_url = res_data["data"]["fileDownloadUrl"]
  54. file_id = res_data["data"]["fileId"]
  55. m = re.search(r'/([^/]+)\.pdf', contract_url)
  56. if m:
  57. encoded_name = m.group(1)
  58. file_name = urllib.parse.unquote(encoded_name)
  59. else:
  60. file_name = "contract.pdf"
  61. except Exception:
  62. logger.error(f"fill_in_template missing fileDownloadUrl: {res_data}")
  63. return {"success": False, "message": "e签宝返回缺少 fileDownloadUrl", "raw": res_data}
  64. # 3. 创建签署流程
  65. sign_data = create_by_file(
  66. file_id,
  67. file_name,
  68. templates_data.contact_phone,
  69. templates_data.store_name,
  70. templates_data.merchant_name,
  71. templates_data.ord_id
  72. )
  73. try:
  74. sign_json = json.loads(sign_data)
  75. except json.JSONDecodeError:
  76. logger.error(f"create_by_file non-json resp: {sign_data}")
  77. return {"success": False, "message": "e签宝 create_by_file 返回非 JSON", "raw": sign_data}
  78. if not sign_json.get("data"):
  79. logger.error(f"create_by_file failed or missing data: {sign_json}")
  80. return {"success": False, "message": "e签宝创建签署流程失败", "raw": sign_json}
  81. sign_id = sign_json["data"].get("signFlowId")
  82. if not sign_id:
  83. logger.error(f"create_by_file missing signFlowId: {sign_json}")
  84. return {"success": False, "message": "e签宝返回缺少 signFlowId", "raw": sign_json}
  85. # 4. 构建合同记录
  86. result_contract = {
  87. "contract_url": contract_url,
  88. "file_name": file_name,
  89. "file_id": file_id,
  90. "status": 0,
  91. "sign_flow_id": sign_id,
  92. "sign_url": "",
  93. "signing_time": "",
  94. "effective_time": "",
  95. "expiry_time": "",
  96. "contract_download_url": "",
  97. "is_master": 1
  98. }
  99. await self.esign_repo.append_contract_url(templates_data, result_contract)
  100. logger.info(f"create_esign_templates success contact_phone={templates_data.contact_phone}, sign_flow_id={sign_id}")
  101. return {
  102. "success": True,
  103. "message": "合同模板已追加/创建",
  104. "sign_flow_id": sign_id,
  105. "file_id": file_id,
  106. "contract_url": contract_url
  107. }
  108. async def list_contracts(self, store_id: int, status: Optional[int], page: int, page_size: int) -> dict:
  109. """根据 store_id 查询所有合同,支持根据 status 筛选和分页"""
  110. logger.info(
  111. "list_contracts request store_id=%s status=%s page=%s page_size=%s",
  112. store_id, status, page, page_size
  113. )
  114. # 1. 检查 store_info 中的审核状态
  115. reason = await self.esign_repo.check_store_status(store_id)
  116. if reason != "审核通过":
  117. return {"code": 555, "msg": "先进行认证", "reason": reason}
  118. # 2. 获取原始数据
  119. rows = await self.esign_repo.get_by_store_id(store_id)
  120. all_filtered_items = []
  121. # 3. 解析并筛选
  122. for row in rows:
  123. contract_url_raw = row.get("contract_url")
  124. if not contract_url_raw:
  125. continue
  126. try:
  127. items = json.loads(contract_url_raw)
  128. if not isinstance(items, list):
  129. continue
  130. for item in items:
  131. if status is not None and item.get("status") != status:
  132. continue
  133. item_with_info = dict(item)
  134. item_with_info.update({
  135. "id": row.get("id"),
  136. "store_id": row.get("store_id"),
  137. "store_name": row.get("store_name"),
  138. "merchant_name": row.get("merchant_name"),
  139. "contact_phone": row.get("contact_phone")
  140. })
  141. all_filtered_items.append(item_with_info)
  142. except Exception as e:
  143. logger.error(f"Error processing contracts for store_id {store_id}: {e}", exc_info=True)
  144. continue
  145. # 4. 手动分页
  146. total = len(all_filtered_items)
  147. start = (page - 1) * page_size
  148. end = start + page_size
  149. paged_items = all_filtered_items[start:end]
  150. total_pages = (total + page_size - 1) // page_size if total > 0 else 0
  151. return {
  152. "items": paged_items,
  153. "total": total,
  154. "page": page,
  155. "page_size": page_size,
  156. "total_pages": total_pages
  157. }
  158. async def get_contract_detail(self, sign_flow_id: str) -> dict:
  159. """获取合同详情"""
  160. row, item, items = await self.esign_repo.get_contract_item_by_sign_flow_id(sign_flow_id)
  161. if not item:
  162. return {"success": False, "message": "未找到合同"}
  163. status = item.get("status")
  164. if status == 0:
  165. return await self._get_pending_contract_detail(sign_flow_id, row, item, items)
  166. elif status == 1:
  167. return await self._get_signed_contract_detail(sign_flow_id, row, item, items)
  168. return {"success": False, "message": "未知合同状态", "raw": {"status": status}}
  169. async def _get_pending_contract_detail(self, sign_flow_id: str, row, item, items) -> dict:
  170. file_id = item.get("file_id")
  171. if not file_id:
  172. return {"success": False, "message": "缺少 file_id,无法获取合同详情"}
  173. try:
  174. detail_resp = esign_main.get_contract_detail(file_id)
  175. detail_json = json.loads(detail_resp)
  176. data = detail_json.get("data") if isinstance(detail_json, dict) else None
  177. contract_url_val = None
  178. if isinstance(data, dict):
  179. contract_url_val = data.get("fileDownloadUrl")
  180. if not contract_url_val and isinstance(detail_json, dict):
  181. contract_url_val = detail_json.get("fileDownloadUrl")
  182. except Exception as e:
  183. logger.error(f"get_contract_detail failed file_id={file_id}: {e}")
  184. return {"success": False, "message": "获取合同链接失败", "raw": str(e)}
  185. if not contract_url_val:
  186. logger.error(f"get_contract_detail missing contract_url file_id={file_id}: {detail_resp}")
  187. return {"success": False, "message": "e签宝返回缺少合同链接", "raw": detail_resp}
  188. # 更新数据库中的合同链接
  189. if row and isinstance(items, list):
  190. for it in items:
  191. if it.get("sign_flow_id") == sign_flow_id:
  192. it["contract_url"] = contract_url_val
  193. break
  194. await self.esign_repo.update_contract_items(row["id"], items)
  195. # 获取签署链接
  196. contact_phone = item.get("contact_phone") or (row.get("contact_phone") if isinstance(row, dict) else None)
  197. if not contact_phone:
  198. return {"success": False, "message": "缺少 contact_phone,无法获取签署链接"}
  199. try:
  200. sign_resp = sign_url(sign_flow_id, contact_phone)
  201. sign_json = json.loads(sign_resp)
  202. sign_data = sign_json.get("data") if isinstance(sign_json, dict) else None
  203. result_sign_url = sign_data.get("url") if isinstance(sign_data, dict) else None
  204. except Exception as e:
  205. logger.error(f"sign_url failed sign_flow_id={sign_flow_id}, contact_phone={contact_phone}: {e}")
  206. return {"success": False, "message": "获取签署链接失败", "raw": str(e)}
  207. if not result_sign_url:
  208. logger.error(f"sign_url missing url: {sign_json}")
  209. return {"success": False, "message": "e签宝返回缺少签署链接", "raw": sign_json}
  210. await self.esign_repo.update_sign_url(contact_phone, sign_flow_id, result_sign_url)
  211. return {
  212. "status": 0,
  213. "contract_url": contract_url_val,
  214. "sign_url": result_sign_url,
  215. "sign_flow_id": sign_flow_id
  216. }
  217. async def _get_signed_contract_detail(self, sign_flow_id: str, row, item, items) -> dict:
  218. try:
  219. download_resp = file_download_url(sign_flow_id)
  220. download_json = json.loads(download_resp)
  221. contract_download_url = download_json["data"]["files"][0]["downloadUrl"]
  222. except Exception as e:
  223. logger.error(f"file_download_url failed sign_flow_id={sign_flow_id}: {e}")
  224. return {"success": False, "message": "获取合同下载链接失败", "raw": str(e)}
  225. if row and isinstance(items, list):
  226. for it in items:
  227. if it.get("sign_flow_id") == sign_flow_id:
  228. it["contract_download_url"] = contract_download_url
  229. it["contract_url"] = contract_download_url
  230. break
  231. await self.esign_repo.update_contract_items(row["id"], items)
  232. return {
  233. "status": 1,
  234. "contract_url": contract_download_url,
  235. "contract_download_url": contract_download_url,
  236. "sign_flow_id": sign_flow_id
  237. }
  238. async def process_esign_callback(self, payload: dict) -> dict:
  239. """处理 e签宝 回调"""
  240. sign_result = payload.get("signResult")
  241. sign_flow_id = payload.get("signFlowId")
  242. operator = payload.get("operator") or {}
  243. psn_account = operator.get("psnAccount") or {}
  244. contact_phone = psn_account.get("accountMobile")
  245. ts_ms = payload.get("operateTime") or payload.get("timestamp")
  246. signing_dt = None
  247. if ts_ms:
  248. try:
  249. signing_dt = datetime.datetime.fromtimestamp(ts_ms / 1000)
  250. except Exception:
  251. signing_dt = None
  252. if sign_result == 2:
  253. contract_download_url = None
  254. try:
  255. download_resp = file_download_url(sign_flow_id)
  256. download_json = json.loads(download_resp)
  257. contract_download_url = download_json["data"]["files"][0]["downloadUrl"]
  258. except Exception as e:
  259. logger.error(f"file_download_url failed for sign_flow_id={sign_flow_id}: {e}")
  260. updated = await self.esign_repo.mark_signed_by_phone(contact_phone, sign_flow_id, signing_dt, contract_download_url)
  261. logger.info(f"esign_callback success phone={contact_phone}, sign_flow_id={sign_flow_id}, updated={updated}")
  262. return {"success": True, "code": "200", "msg": "success"}
  263. logger.error(f"esign_callback ignored payload: {payload}")
  264. return {"success": False, "message": "未处理: signResult!=2 或手机号/签署流程缺失"}
  265. async def list_by_store(self, store_id: int):
  266. return await self.esign_repo.get_by_store_id(store_id)
  267. async def get_store_reason(self, store_id: int) -> str | None:
  268. return await self.esign_repo.check_store_status(store_id)
  269. async def get_contract_item_by_sign_flow_id(self, sign_flow_id: str):
  270. return await self.esign_repo.get_contract_item_by_sign_flow_id(sign_flow_id)
  271. async def update_contract_items(self, row_id: int, items: list) -> bool:
  272. return await self.esign_repo.update_contract_items(row_id, items)
  273. async def list_all_paged(
  274. self,
  275. page: int,
  276. page_size: int = 10,
  277. store_name: str | None = None,
  278. merchant_name: str | None = None,
  279. signing_status: str | None = None,
  280. business_segment: str | None = None,
  281. store_status: str | None = None,
  282. expiry_start=None,
  283. expiry_end=None,
  284. ):
  285. items, total = await self.esign_repo.get_all_paged(
  286. page,
  287. page_size,
  288. store_name=store_name,
  289. merchant_name=merchant_name,
  290. signing_status=signing_status,
  291. business_segment=business_segment,
  292. store_status=store_status,
  293. expiry_start=expiry_start,
  294. expiry_end=expiry_end,
  295. )
  296. return items, total
  297. async def mark_signed_by_phone(self, contact_phone: str, sign_flow_id: str, signing_time, contract_download_url):
  298. return await self.esign_repo.mark_signed_by_phone(contact_phone, sign_flow_id, signing_time, contract_download_url)
  299. async def update_sign_url(self, contact_phone: str, sign_flow_id: str, sign_url: str):
  300. return await self.esign_repo.update_sign_url(contact_phone, sign_flow_id, sign_url)
  301. async def append_contract_url(self, templates_data, contract_item: dict):
  302. return await self.esign_repo.append_contract_url(templates_data, contract_item)