contract_server.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. import datetime
  2. import json
  3. import logging
  4. import os
  5. from sqlalchemy.ext.asyncio import AsyncSession
  6. from alien_contract.repositories.contract_repo import ContractRepository
  7. from alien_contract.schemas.request.contract import BundleCreateRequest
  8. from alien_contract.infrastructure.esign import main as esign_main
  9. from alien_contract.infrastructure.esign.contract_builder import build_contract_items, ContractBuildError
  10. from alien_contract.infrastructure.esign.main import sign_url, file_download_url
  11. LOG_DIR = os.path.join("common", "logs", "alien_contract")
  12. os.makedirs(LOG_DIR, exist_ok=True)
  13. def _init_logger():
  14. logger = logging.getLogger("alien_contract_service")
  15. if logger.handlers:
  16. return logger
  17. logger.setLevel(logging.INFO)
  18. fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s %(message)s")
  19. info_handler = logging.FileHandler(os.path.join(LOG_DIR, "info.log"), encoding="utf-8")
  20. info_handler.setLevel(logging.INFO)
  21. info_handler.setFormatter(fmt)
  22. error_handler = logging.FileHandler(os.path.join(LOG_DIR, "error.log"), encoding="utf-8")
  23. error_handler.setLevel(logging.ERROR)
  24. error_handler.setFormatter(fmt)
  25. logger.addHandler(info_handler)
  26. logger.addHandler(error_handler)
  27. return logger
  28. logger = _init_logger()
  29. SIGN_SUCCESS_ACTIONS = {
  30. "OPERATOR_COMPLETE_SIGN",
  31. "SIGN_FLOW_FINISH",
  32. "SIGN_FLOW_COMPLETE",
  33. }
  34. NON_SIGNING_ACTIONS = {
  35. "OPERATOR_READ",
  36. "OPERATOR_VIEW",
  37. }
  38. BUNDLE_CONFIGS = {
  39. "STORE_STANDARD": [
  40. ("store_agreement", "店铺入驻协议", 1),
  41. ("alipay_auth", "支付宝授权函", 0),
  42. ("wechat_pay_commitment", "微信支付承诺函", 0),
  43. ],
  44. "LAWYER_STANDARD": [
  45. ("lawyer_agreement", "律所入驻协议", 1),
  46. ("alipay_auth", "支付宝授权函", 0),
  47. ("wechat_pay_commitment", "微信支付承诺函", 0),
  48. ],
  49. }
  50. DEFAULT_BUNDLE_BY_SUBJECT = {
  51. "store": "STORE_STANDARD",
  52. "lawyer": "LAWYER_STANDARD",
  53. }
  54. class ContractCenterService:
  55. def __init__(self, db: AsyncSession):
  56. self.repo = ContractRepository(db)
  57. async def create_bundle(self, req: BundleCreateRequest) -> dict:
  58. bundle_type = DEFAULT_BUNDLE_BY_SUBJECT[req.subject_type]
  59. configs = BUNDLE_CONFIGS.get(bundle_type)
  60. if not configs:
  61. return {"success": False, "message": "不支持的合同包类型", "raw": {"bundle_type": bundle_type}}
  62. try:
  63. items = build_contract_items(
  64. configs=configs,
  65. template_name=req.subject_name,
  66. signer_name=req.subject_name,
  67. signer_id_num=req.ord_id,
  68. psn_account=req.contact_phone,
  69. psn_name=req.contact_name,
  70. )
  71. except ContractBuildError as exc:
  72. return {"success": False, "message": exc.message, "raw": exc.raw}
  73. bundle = await self.repo.create_bundle(
  74. {
  75. "subject_type": req.subject_type,
  76. "subject_id": req.subject_id,
  77. "subject_name": req.subject_name,
  78. "business_segment": req.business_segment,
  79. "contact_name": req.contact_name,
  80. "contact_phone": req.contact_phone,
  81. "ord_id": req.ord_id,
  82. "bundle_type": bundle_type,
  83. "status": "未签署",
  84. }
  85. )
  86. documents = await self.repo.create_documents(bundle.id, items)
  87. for document in documents:
  88. try:
  89. sign_resp = sign_url(document.sign_flow_id, req.contact_phone)
  90. sign_json = json.loads(sign_resp)
  91. sign_data = sign_json.get("data") if isinstance(sign_json, dict) else None
  92. result_sign_url = sign_data.get("url") if isinstance(sign_data, dict) else None
  93. except Exception:
  94. await self.repo.rollback()
  95. return {
  96. "success": False,
  97. "message": f"{document.contract_name}创建成功但签署链接获取失败",
  98. "raw": {"contract_type": document.contract_type, "sign_flow_id": document.sign_flow_id},
  99. }
  100. if not result_sign_url:
  101. await self.repo.rollback()
  102. return {
  103. "success": False,
  104. "message": f"{document.contract_name}创建成功但签署链接缺失",
  105. "raw": {"contract_type": document.contract_type, "sign_flow_id": document.sign_flow_id, "resp": sign_json},
  106. }
  107. await self.repo.update_document_urls(document.id, sign_url=result_sign_url)
  108. document.sign_url = result_sign_url
  109. primary_doc = next((doc for doc in documents if doc.is_primary == 1), documents[0])
  110. await self.repo.set_primary_document(bundle.id, primary_doc.id)
  111. await self.repo.commit()
  112. return {
  113. "success": True,
  114. "message": "合同包创建成功",
  115. "bundle_id": bundle.id,
  116. "primary_sign_flow_id": primary_doc.sign_flow_id,
  117. "created_contracts": [
  118. {
  119. "contract_type": d.contract_type,
  120. "contract_name": d.contract_name,
  121. "sign_flow_id": d.sign_flow_id,
  122. "file_id": d.file_id,
  123. "contract_url": d.template_url,
  124. "sign_url": d.sign_url,
  125. }
  126. for d in documents
  127. ],
  128. }
  129. async def list_bundles(self, subject_type: str, subject_id: int, page: int, page_size: int) -> dict:
  130. bundles, total = await self.repo.list_bundles(subject_type, subject_id, page, page_size)
  131. ids = [b.id for b in bundles]
  132. docs_map = await self.repo.list_documents_by_bundle_ids(ids)
  133. items = []
  134. for b in bundles:
  135. docs = docs_map.get(b.id, [])
  136. items.append(
  137. {
  138. "id": b.id,
  139. "subject_type": b.subject_type,
  140. "subject_id": b.subject_id,
  141. "subject_name": b.subject_name,
  142. "business_segment": b.business_segment,
  143. "contact_name": b.contact_name,
  144. "contact_phone": b.contact_phone,
  145. "ord_id": b.ord_id,
  146. "bundle_type": b.bundle_type,
  147. "status": b.status,
  148. "primary_document_id": b.primary_document_id,
  149. "documents": [
  150. {
  151. "id": d.id,
  152. "contract_type": d.contract_type,
  153. "contract_name": d.contract_name,
  154. "is_primary": d.is_primary,
  155. "status": d.status,
  156. "sign_flow_id": d.sign_flow_id,
  157. "file_id": d.file_id,
  158. "template_url": d.template_url,
  159. "sign_url": d.sign_url,
  160. "download_url": d.download_url,
  161. "signing_time": d.signing_time,
  162. "effective_time": d.effective_time,
  163. "expiry_time": d.expiry_time,
  164. }
  165. for d in docs
  166. ],
  167. }
  168. )
  169. total_pages = (total + page_size - 1) // page_size if total > 0 else 0
  170. return {"items": items, "total": total, "page": page, "page_size": page_size, "total_pages": total_pages}
  171. async def get_document_detail(self, sign_flow_id: str) -> dict:
  172. document, bundle = await self.repo.get_document_and_bundle(sign_flow_id)
  173. if not document:
  174. return {"success": False, "message": "未找到合同"}
  175. if document.status == 0:
  176. return await self._get_pending_detail(document, bundle)
  177. return await self._get_signed_detail(document, bundle)
  178. async def _get_pending_detail(self, document, bundle):
  179. try:
  180. detail_resp = esign_main.get_contract_detail(document.file_id)
  181. detail_json = json.loads(detail_resp)
  182. data = detail_json.get("data") if isinstance(detail_json, dict) else None
  183. contract_url_val = data.get("fileDownloadUrl") if isinstance(data, dict) else None
  184. if not contract_url_val and isinstance(detail_json, dict):
  185. contract_url_val = detail_json.get("fileDownloadUrl")
  186. except Exception as exc:
  187. return {"success": False, "message": "获取合同链接失败", "raw": str(exc)}
  188. if not contract_url_val:
  189. return {"success": False, "message": "e签宝返回缺少合同链接", "raw": detail_resp}
  190. await self.repo.update_document_urls(document.id, template_url=contract_url_val)
  191. try:
  192. sign_resp = sign_url(document.sign_flow_id, bundle.contact_phone)
  193. sign_json = json.loads(sign_resp)
  194. sign_data = sign_json.get("data") if isinstance(sign_json, dict) else None
  195. result_sign_url = sign_data.get("url") if isinstance(sign_data, dict) else None
  196. except Exception as exc:
  197. return {"success": False, "message": "获取签署链接失败", "raw": str(exc)}
  198. if not result_sign_url:
  199. return {"success": False, "message": "e签宝返回缺少签署链接", "raw": sign_json}
  200. await self.repo.update_document_urls(document.id, sign_url=result_sign_url)
  201. await self.repo.commit()
  202. return {
  203. "status": 0,
  204. "contract_url": contract_url_val,
  205. "sign_url": result_sign_url,
  206. "sign_flow_id": document.sign_flow_id,
  207. }
  208. async def _get_signed_detail(self, document, _bundle):
  209. try:
  210. download_resp = file_download_url(document.sign_flow_id)
  211. download_json = json.loads(download_resp)
  212. contract_download_url = download_json["data"]["files"][0]["downloadUrl"]
  213. except Exception as exc:
  214. return {"success": False, "message": "获取合同下载链接失败", "raw": str(exc)}
  215. await self.repo.update_document_urls(document.id, template_url=contract_download_url, download_url=contract_download_url)
  216. await self.repo.commit()
  217. return {
  218. "status": 1,
  219. "contract_url": contract_download_url,
  220. "contract_download_url": contract_download_url,
  221. "sign_flow_id": document.sign_flow_id,
  222. }
  223. async def process_esign_callback(self, payload: dict) -> dict:
  224. sign_result = payload.get("signResult")
  225. sign_flow_id = payload.get("signFlowId")
  226. action = payload.get("action")
  227. operator_mobile = (
  228. payload.get("operator", {})
  229. .get("psnAccount", {})
  230. .get("accountMobile")
  231. )
  232. if not sign_flow_id:
  233. logger.info(
  234. "esign_callback_event %s",
  235. json.dumps(
  236. {
  237. "result": "ignored",
  238. "reason": "missing_signFlowId",
  239. "action": action,
  240. "sign_result": sign_result,
  241. "operator_mobile": operator_mobile,
  242. },
  243. ensure_ascii=False,
  244. ),
  245. )
  246. return {"success": True, "code": "200", "msg": "ignored_missing_signFlowId"}
  247. document, bundle = await self.repo.get_document_and_bundle(sign_flow_id)
  248. if not document:
  249. logger.info(
  250. "esign_callback_event %s",
  251. json.dumps(
  252. {
  253. "result": "ignored",
  254. "reason": "unknown_signFlowId",
  255. "sign_flow_id": sign_flow_id,
  256. "action": action,
  257. "sign_result": sign_result,
  258. "operator_mobile": operator_mobile,
  259. },
  260. ensure_ascii=False,
  261. ),
  262. )
  263. return {"success": True, "code": "200", "msg": "ignored_unknown_signFlowId"}
  264. event_type = f"esign_callback:{action or 'UNKNOWN'}"
  265. await self.repo.create_event(bundle.id, document.id, sign_flow_id, event_type[:50], payload)
  266. mark_signed = bool(sign_result == 2 or (action in SIGN_SUCCESS_ACTIONS and action not in NON_SIGNING_ACTIONS))
  267. logger.info(
  268. "esign_callback_event %s",
  269. json.dumps(
  270. {
  271. "result": "mark_signed" if mark_signed else "ignored",
  272. "sign_flow_id": sign_flow_id,
  273. "bundle_id": bundle.id,
  274. "document_id": document.id,
  275. "contract_type": document.contract_type,
  276. "action": action,
  277. "sign_result": sign_result,
  278. "operator_mobile": operator_mobile,
  279. },
  280. ensure_ascii=False,
  281. ),
  282. )
  283. if mark_signed:
  284. ts_ms = payload.get("operateTime") or payload.get("timestamp")
  285. signing_dt = None
  286. if ts_ms:
  287. try:
  288. signing_dt = datetime.datetime.fromtimestamp(ts_ms / 1000)
  289. except Exception:
  290. signing_dt = None
  291. effective_dt = expiry_dt = None
  292. if signing_dt:
  293. effective_dt = (signing_dt + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
  294. expiry_dt = effective_dt + datetime.timedelta(days=365)
  295. contract_download_url = None
  296. try:
  297. download_resp = file_download_url(sign_flow_id)
  298. download_json = json.loads(download_resp)
  299. contract_download_url = download_json["data"]["files"][0]["downloadUrl"]
  300. except Exception:
  301. contract_download_url = None
  302. await self.repo.mark_document_signed(document.id, signing_dt, effective_dt, expiry_dt, contract_download_url)
  303. await self.repo.recalc_bundle_status(bundle.id)
  304. await self.repo.commit()
  305. return {"success": True, "code": "200", "msg": "success"}
  306. await self.repo.commit()
  307. if action:
  308. return {"success": True, "code": "200", "msg": f"ignored_action_{action}"}
  309. return {"success": True, "code": "200", "msg": f"ignored_signResult_{sign_result}"}