contract_server.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. import datetime
  2. import json
  3. import logging
  4. import os
  5. from typing import Any
  6. from sqlalchemy.ext.asyncio import AsyncSession
  7. from alien_contract.repositories.contract_repo import ContractRepository
  8. from alien_contract.schemas.request.contract import BundleCreateRequest
  9. from alien_contract.infrastructure.esign import main as esign_main
  10. from alien_contract.infrastructure.esign.contract_builder import build_contract_items, ContractBuildError
  11. from alien_contract.infrastructure.esign.main import sign_url, file_download_url
  12. LOG_DIR = os.path.join("common", "logs", "alien_contract")
  13. os.makedirs(LOG_DIR, exist_ok=True)
  14. def _init_logger():
  15. logger = logging.getLogger("alien_contract_service")
  16. if logger.handlers:
  17. return logger
  18. logger.setLevel(logging.INFO)
  19. fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s %(message)s")
  20. info_handler = logging.FileHandler(os.path.join(LOG_DIR, "info.log"), encoding="utf-8")
  21. info_handler.setLevel(logging.INFO)
  22. info_handler.setFormatter(fmt)
  23. error_handler = logging.FileHandler(os.path.join(LOG_DIR, "error.log"), encoding="utf-8")
  24. error_handler.setLevel(logging.ERROR)
  25. error_handler.setFormatter(fmt)
  26. logger.addHandler(info_handler)
  27. logger.addHandler(error_handler)
  28. return logger
  29. logger = _init_logger()
  30. # e签宝 v3 回调 action 说明:
  31. # SIGN_MISSON_COMPLETE 单个签署任务完成(按签署人维度,平台方自动盖章也会触发,signResult=2)
  32. # SIGN_FLOW_COMPLETE / SIGN_FLOW_FINISH 整个签署流程结束
  33. # 注意:绝不能用"单个任务完成"判定整份合同已签署,必须以流程真实状态 signFlowStatus 为准。
  34. SIGN_FLOW_FINISH_ACTIONS = {
  35. "SIGN_FLOW_FINISH",
  36. "SIGN_FLOW_COMPLETE",
  37. }
  38. # e签宝 流程状态 signFlowStatus: 0草稿 1签署中 2完成 3撤销 4终止 5过期 6删除 7拒签
  39. ESIGN_FLOW_STATUS_COMPLETED = 2
  40. # 与数据库 NOW() 时区(东八区)保持一致,避免应用容器在 UTC 时写入早 8 小时的时间
  41. CN_TZ = datetime.timezone(datetime.timedelta(hours=8))
  42. def _ms_to_naive_cn(ms) -> "datetime.datetime | None":
  43. """e签宝时间戳(epoch毫秒)转换为东八区(无时区)datetime"""
  44. if not ms:
  45. return None
  46. try:
  47. return datetime.datetime.fromtimestamp(ms / 1000, tz=CN_TZ).replace(tzinfo=None)
  48. except Exception:
  49. return None
  50. BUNDLE_CONFIGS = {
  51. "STORE_STANDARD": [
  52. ("store_agreement", "店铺入驻协议", 1),
  53. # 暂停签署,仅签主合同;恢复时取消下面两行注释即可
  54. # ("alipay_auth", "支付宝授权函", 0),
  55. # ("wechat_pay_commitment", "微信支付承诺函", 0),
  56. ],
  57. "LAWYER_STANDARD": [
  58. ("lawyer_agreement", "律所入驻协议", 1),
  59. # 暂停签署,仅签主合同;恢复时取消下面两行注释即可
  60. # ("alipay_auth", "支付宝授权函", 0),
  61. # ("wechat_pay_commitment", "微信支付承诺函", 0),
  62. ],
  63. }
  64. DEFAULT_BUNDLE_BY_SUBJECT = {
  65. "store": "STORE_STANDARD",
  66. "lawyer": "LAWYER_STANDARD",
  67. }
  68. def _extract_download_url(download_resp: str) -> tuple[str | None, Any]:
  69. try:
  70. download_json = json.loads(download_resp)
  71. except json.JSONDecodeError as exc:
  72. return None, {"error": f"e签宝返回非 JSON: {exc}", "resp": download_resp}
  73. data = download_json.get("data") if isinstance(download_json, dict) else None
  74. files = data.get("files") if isinstance(data, dict) else None
  75. if isinstance(files, list) and files:
  76. first_file = files[0]
  77. if isinstance(first_file, dict):
  78. download_url = first_file.get("downloadUrl")
  79. if download_url:
  80. return download_url, download_json
  81. return None, download_json
  82. class ContractCenterService:
  83. def __init__(self, db: AsyncSession):
  84. self.repo = ContractRepository(db)
  85. async def create_bundle(self, req: BundleCreateRequest) -> dict:
  86. bundle_type = DEFAULT_BUNDLE_BY_SUBJECT[req.subject_type]
  87. configs = BUNDLE_CONFIGS.get(bundle_type)
  88. if not configs:
  89. return {"success": False, "message": "不支持的合同包类型", "raw": {"bundle_type": bundle_type}}
  90. try:
  91. items = build_contract_items(
  92. configs=configs,
  93. template_name=req.subject_name,
  94. signer_name=req.subject_name,
  95. signer_id_num=req.ord_id,
  96. psn_account=req.contact_phone,
  97. psn_name=req.contact_name,
  98. )
  99. except ContractBuildError as exc:
  100. return {"success": False, "message": exc.message, "raw": exc.raw}
  101. bundle = await self.repo.create_bundle(
  102. {
  103. "subject_type": req.subject_type,
  104. "subject_id": req.subject_id,
  105. "subject_name": req.subject_name,
  106. "business_segment": req.business_segment,
  107. "contact_name": req.contact_name,
  108. "contact_phone": req.contact_phone,
  109. "ord_id": req.ord_id,
  110. "bundle_type": bundle_type,
  111. "status": "未签署",
  112. }
  113. )
  114. documents = await self.repo.create_documents(bundle.id, items)
  115. for document in documents:
  116. try:
  117. sign_resp = sign_url(document.sign_flow_id, req.contact_phone)
  118. sign_json = json.loads(sign_resp)
  119. sign_data = sign_json.get("data") if isinstance(sign_json, dict) else None
  120. result_sign_url = sign_data.get("url") if isinstance(sign_data, dict) else None
  121. except Exception:
  122. await self.repo.rollback()
  123. return {
  124. "success": False,
  125. "message": f"{document.contract_name}创建成功但签署链接获取失败",
  126. "raw": {"contract_type": document.contract_type, "sign_flow_id": document.sign_flow_id},
  127. }
  128. if not result_sign_url:
  129. await self.repo.rollback()
  130. return {
  131. "success": False,
  132. "message": f"{document.contract_name}创建成功但签署链接缺失",
  133. "raw": {"contract_type": document.contract_type, "sign_flow_id": document.sign_flow_id, "resp": sign_json},
  134. }
  135. await self.repo.update_document_urls(document.id, sign_url=result_sign_url)
  136. document.sign_url = result_sign_url
  137. primary_doc = next((doc for doc in documents if doc.is_primary == 1), documents[0])
  138. await self.repo.set_primary_document(bundle.id, primary_doc.id)
  139. await self.repo.commit()
  140. return {
  141. "success": True,
  142. "message": "合同包创建成功",
  143. "bundle_id": bundle.id,
  144. "primary_sign_flow_id": primary_doc.sign_flow_id,
  145. "created_contracts": [
  146. {
  147. "contract_type": d.contract_type,
  148. "contract_name": d.contract_name,
  149. "sign_flow_id": d.sign_flow_id,
  150. "file_id": d.file_id,
  151. "contract_url": d.template_url,
  152. "sign_url": d.sign_url,
  153. }
  154. for d in documents
  155. ],
  156. }
  157. async def list_bundles(self, subject_type: str, subject_id: int, page: int, page_size: int, *, doc_status: int | None = None) -> dict:
  158. bundles, total = await self.repo.list_bundles(subject_type, subject_id, page, page_size)
  159. ids = [b.id for b in bundles]
  160. docs_map = await self.repo.list_documents_by_bundle_ids(ids, doc_status=doc_status)
  161. items = []
  162. for b in bundles:
  163. docs = docs_map.get(b.id, [])
  164. items.append(
  165. {
  166. "id": b.id,
  167. "subject_type": b.subject_type,
  168. "subject_id": b.subject_id,
  169. "subject_name": b.subject_name,
  170. "business_segment": b.business_segment,
  171. "contact_name": b.contact_name,
  172. "contact_phone": b.contact_phone,
  173. "ord_id": b.ord_id,
  174. "bundle_type": b.bundle_type,
  175. "status": b.status,
  176. "primary_document_id": b.primary_document_id,
  177. "documents": [
  178. {
  179. "id": d.id,
  180. "contract_type": d.contract_type,
  181. "contract_name": d.contract_name,
  182. "is_primary": d.is_primary,
  183. "status": d.status,
  184. "sign_flow_id": d.sign_flow_id,
  185. "file_id": d.file_id,
  186. "template_url": d.template_url,
  187. "sign_url": d.sign_url,
  188. "download_url": d.download_url,
  189. "signing_time": d.signing_time,
  190. "effective_time": d.effective_time,
  191. "expiry_time": d.expiry_time,
  192. }
  193. for d in docs
  194. ],
  195. }
  196. )
  197. total_pages = (total + page_size - 1) // page_size if total > 0 else 0
  198. return {"items": items, "total": total, "page": page, "page_size": page_size, "total_pages": total_pages}
  199. async def get_document_detail(self, sign_flow_id: str) -> dict:
  200. document, bundle = await self.repo.get_document_and_bundle(sign_flow_id)
  201. if not document:
  202. return {"success": False, "message": "未找到合同"}
  203. if document.status == 0:
  204. return await self._get_pending_detail(document, bundle)
  205. return await self._get_signed_detail(document, bundle)
  206. async def _get_pending_detail(self, document, bundle):
  207. try:
  208. detail_resp = esign_main.get_contract_detail(document.file_id)
  209. detail_json = json.loads(detail_resp)
  210. data = detail_json.get("data") if isinstance(detail_json, dict) else None
  211. contract_url_val = data.get("fileDownloadUrl") if isinstance(data, dict) else None
  212. if not contract_url_val and isinstance(detail_json, dict):
  213. contract_url_val = detail_json.get("fileDownloadUrl")
  214. except Exception as exc:
  215. return {"success": False, "message": "获取合同链接失败", "raw": str(exc)}
  216. if not contract_url_val:
  217. return {"success": False, "message": "e签宝返回缺少合同链接", "raw": detail_resp}
  218. await self.repo.update_document_urls(document.id, template_url=contract_url_val)
  219. try:
  220. sign_resp = sign_url(document.sign_flow_id, bundle.contact_phone)
  221. sign_json = json.loads(sign_resp)
  222. sign_data = sign_json.get("data") if isinstance(sign_json, dict) else None
  223. result_sign_url = sign_data.get("url") if isinstance(sign_data, dict) else None
  224. except Exception as exc:
  225. return {"success": False, "message": "获取签署链接失败", "raw": str(exc)}
  226. if not result_sign_url:
  227. return {"success": False, "message": "e签宝返回缺少签署链接", "raw": sign_json}
  228. await self.repo.update_document_urls(document.id, sign_url=result_sign_url)
  229. await self.repo.commit()
  230. return {
  231. "status": 0,
  232. "contract_url": contract_url_val,
  233. "sign_url": result_sign_url,
  234. "sign_flow_id": document.sign_flow_id,
  235. }
  236. async def _get_signed_detail(self, document, _bundle):
  237. try:
  238. download_resp = file_download_url(document.sign_flow_id)
  239. except Exception as exc:
  240. cached_url = document.download_url or None
  241. if cached_url:
  242. return {
  243. "status": 1,
  244. "contract_url": cached_url,
  245. "contract_download_url": cached_url,
  246. "sign_flow_id": document.sign_flow_id,
  247. }
  248. return {"success": False, "message": "获取合同下载链接失败", "raw": str(exc)}
  249. contract_download_url, raw = _extract_download_url(download_resp)
  250. if not contract_download_url:
  251. cached_url = document.download_url or None
  252. if cached_url:
  253. return {
  254. "status": 1,
  255. "contract_url": cached_url,
  256. "contract_download_url": cached_url,
  257. "sign_flow_id": document.sign_flow_id,
  258. }
  259. logger.error(
  260. "file_download_url missing downloadUrl sign_flow_id=%s resp=%s",
  261. document.sign_flow_id,
  262. download_resp,
  263. )
  264. return {"success": False, "message": "合同已签署,下载文件生成中,请稍后重试", "raw": raw}
  265. await self.repo.update_document_urls(document.id, template_url=contract_download_url, download_url=contract_download_url)
  266. await self.repo.commit()
  267. return {
  268. "status": 1,
  269. "contract_url": contract_download_url,
  270. "contract_download_url": contract_download_url,
  271. "sign_flow_id": document.sign_flow_id,
  272. }
  273. async def process_esign_callback(self, payload: dict) -> dict:
  274. sign_result = payload.get("signResult")
  275. sign_flow_id = payload.get("signFlowId")
  276. action = payload.get("action")
  277. operator_mobile = (
  278. payload.get("operator", {})
  279. .get("psnAccount", {})
  280. .get("accountMobile")
  281. )
  282. if not sign_flow_id:
  283. logger.info(
  284. "esign_callback_event %s",
  285. json.dumps(
  286. {
  287. "result": "ignored",
  288. "reason": "missing_signFlowId",
  289. "action": action,
  290. "sign_result": sign_result,
  291. "operator_mobile": operator_mobile,
  292. },
  293. ensure_ascii=False,
  294. ),
  295. )
  296. return {"success": True, "code": "200", "msg": "ignored_missing_signFlowId"}
  297. document, bundle = await self.repo.get_document_and_bundle(sign_flow_id)
  298. if not document:
  299. logger.info(
  300. "esign_callback_event %s",
  301. json.dumps(
  302. {
  303. "result": "ignored",
  304. "reason": "unknown_signFlowId",
  305. "sign_flow_id": sign_flow_id,
  306. "action": action,
  307. "sign_result": sign_result,
  308. "operator_mobile": operator_mobile,
  309. },
  310. ensure_ascii=False,
  311. ),
  312. )
  313. return {"success": True, "code": "200", "msg": "ignored_unknown_signFlowId"}
  314. event_type = f"esign_callback:{action or 'UNKNOWN'}"
  315. await self.repo.create_event(bundle.id, document.id, sign_flow_id, event_type[:50], payload)
  316. # 不能凭"单个签署任务完成(SIGN_MISSON_COMPLETE / signResult=2)"判定整份合同已签署,
  317. # 因为平台方自动盖章也会触发该回调。以 e签宝 流程真实状态 signFlowStatus 为准。
  318. flow_status, finish_ms = self._query_flow_status(sign_flow_id)
  319. # 兜底:查询失败时,仅当为"流程结束"类回调且报文明确为已完成(2)才认定完成
  320. if flow_status is None and action in SIGN_FLOW_FINISH_ACTIONS:
  321. payload_status = payload.get("signFlowStatus") or payload.get("flowStatus")
  322. if payload_status in (2, "2"):
  323. flow_status = ESIGN_FLOW_STATUS_COMPLETED
  324. mark_signed = flow_status == ESIGN_FLOW_STATUS_COMPLETED
  325. logger.info(
  326. "esign_callback_event %s",
  327. json.dumps(
  328. {
  329. "result": "mark_signed" if mark_signed else "ignored",
  330. "sign_flow_id": sign_flow_id,
  331. "bundle_id": bundle.id,
  332. "document_id": document.id,
  333. "contract_type": document.contract_type,
  334. "action": action,
  335. "sign_result": sign_result,
  336. "flow_status": flow_status,
  337. "operator_mobile": operator_mobile,
  338. },
  339. ensure_ascii=False,
  340. ),
  341. )
  342. if mark_signed:
  343. ts_ms = finish_ms or payload.get("operateTime") or payload.get("timestamp")
  344. signing_dt = _ms_to_naive_cn(ts_ms)
  345. effective_dt = expiry_dt = None
  346. if signing_dt:
  347. effective_dt = (signing_dt + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
  348. expiry_dt = effective_dt + datetime.timedelta(days=365)
  349. contract_download_url = None
  350. try:
  351. download_resp = file_download_url(sign_flow_id)
  352. contract_download_url, raw_download_resp = _extract_download_url(download_resp)
  353. except Exception as exc:
  354. raw_download_resp = str(exc)
  355. if not contract_download_url:
  356. logger.error(
  357. "file_download_url missing downloadUrl on callback sign_flow_id=%s resp=%s",
  358. sign_flow_id,
  359. raw_download_resp,
  360. )
  361. await self.repo.mark_document_signed(document.id, signing_dt, effective_dt, expiry_dt, contract_download_url)
  362. await self.repo.recalc_bundle_status(bundle.id)
  363. await self.repo.commit()
  364. return {"success": True, "code": "200", "msg": "success"}
  365. await self.repo.commit()
  366. return {"success": True, "code": "200", "msg": f"ignored_flow_status_{flow_status}"}
  367. def _query_flow_status(self, sign_flow_id: str) -> tuple[int | None, int | None]:
  368. """查询 e签宝 流程真实状态,返回 (signFlowStatus, signFlowFinishTime毫秒);失败返回 (None, None)"""
  369. try:
  370. detail_resp = esign_main.query_sign_flow_detail(sign_flow_id)
  371. detail_json = json.loads(detail_resp)
  372. except Exception as exc:
  373. logger.error("query_sign_flow_detail error sign_flow_id=%s err=%s", sign_flow_id, exc)
  374. return None, None
  375. data = detail_json.get("data") if isinstance(detail_json, dict) else None
  376. if not isinstance(data, dict):
  377. logger.error("query_sign_flow_detail no data sign_flow_id=%s resp=%s", sign_flow_id, detail_json)
  378. return None, None
  379. return data.get("signFlowStatus"), data.get("signFlowFinishTime")