contract_server.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. import datetime
  2. import os
  3. import logging
  4. import json
  5. from typing import Any, Union, Optional
  6. from sqlalchemy.ext.asyncio import AsyncSession
  7. from alien_store.repositories.contract_repo import ContractRepository
  8. from alien_store.schemas.request.contract_store import TemplatesCreate
  9. from alien_contract.infrastructure.esign import main as esign_main
  10. from alien_contract.infrastructure.esign.main import sign_url, file_download_url
  11. from alien_contract.infrastructure.esign.contract_builder import build_contract_items, ContractBuildError
  12. # ------------------- 日志配置 -------------------
  13. LOG_DIR = os.path.join("common", "logs", "alien_store")
  14. os.makedirs(LOG_DIR, exist_ok=True)
  15. def _init_logger():
  16. logger = logging.getLogger("alien_store_service")
  17. if logger.handlers:
  18. return logger
  19. logger.setLevel(logging.INFO)
  20. fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s %(message)s")
  21. info_handler = logging.FileHandler(os.path.join(LOG_DIR, "info.log"), encoding="utf-8")
  22. info_handler.setLevel(logging.INFO)
  23. info_handler.setFormatter(fmt)
  24. error_handler = logging.FileHandler(os.path.join(LOG_DIR, "error.log"), encoding="utf-8")
  25. error_handler.setLevel(logging.ERROR)
  26. error_handler.setFormatter(fmt)
  27. logger.addHandler(info_handler)
  28. logger.addHandler(error_handler)
  29. return logger
  30. logger = _init_logger()
  31. CONTRACT_CREATE_CONFIGS = [
  32. ("store_agreement", "店铺入驻协议", 1),
  33. ("alipay_auth", "支付宝授权函", 0),
  34. ("wechat_pay_commitment", "微信支付承诺函", 0),
  35. ]
  36. class ContractServer:
  37. def __init__(self, db: AsyncSession):
  38. self.db = db
  39. self.esign_repo = ContractRepository(db)
  40. async def create_template(self, template_data: TemplatesCreate):
  41. await self.esign_repo.create(template_data)
  42. return {
  43. "message": "模板创建成功"
  44. }
  45. async def create_esign_templates(self, templates_data: TemplatesCreate) -> dict:
  46. """AI审核完调用 e签宝生成文件"""
  47. logger.info(f"create_esign_templates request: {templates_data}")
  48. try:
  49. generated_contracts = build_contract_items(
  50. configs=CONTRACT_CREATE_CONFIGS,
  51. template_name=templates_data.store_name,
  52. signer_name=templates_data.store_name,
  53. signer_id_num=templates_data.ord_id,
  54. psn_account=templates_data.contact_phone,
  55. psn_name=templates_data.merchant_name,
  56. )
  57. except ContractBuildError as exc:
  58. return {"success": False, "message": exc.message, "raw": exc.raw}
  59. for contract_item in generated_contracts:
  60. await self.esign_repo.append_contract_url(templates_data, contract_item)
  61. master_contract = next((item for item in generated_contracts if item.get("is_master") == 1), generated_contracts[0])
  62. logger.info(
  63. "create_esign_templates success contact_phone=%s master_sign_flow_id=%s all_sign_flow_ids=%s",
  64. templates_data.contact_phone,
  65. master_contract.get("sign_flow_id"),
  66. [item.get("sign_flow_id") for item in generated_contracts],
  67. )
  68. return {
  69. "success": True,
  70. "message": "合同模板已追加/创建",
  71. "sign_flow_id": master_contract.get("sign_flow_id"),
  72. "file_id": master_contract.get("file_id"),
  73. "contract_url": master_contract.get("contract_url"),
  74. "created_contracts": [
  75. {
  76. "contract_type": item["contract_type"],
  77. "contract_name": item["contract_name"],
  78. "sign_flow_id": item["sign_flow_id"],
  79. "file_id": item["file_id"],
  80. "contract_url": item["contract_url"],
  81. }
  82. for item in generated_contracts
  83. ],
  84. }
  85. async def list_contracts(self, store_id: int, status: Optional[int], page: int, page_size: int) -> dict:
  86. """根据 store_id 查询所有合同,支持根据 status 筛选和分页"""
  87. logger.info(
  88. "list_contracts request store_id=%s status=%s page=%s page_size=%s",
  89. store_id, status, page, page_size
  90. )
  91. # 1. 检查 store_info 中的审核状态
  92. reason = await self.esign_repo.check_store_status(store_id)
  93. if reason != "审核通过":
  94. return {"code": 555, "msg": "先进行认证", "reason": reason}
  95. # 2. 获取原始数据
  96. rows = await self.esign_repo.get_by_store_id(store_id)
  97. all_filtered_items = []
  98. # 3. 解析并筛选
  99. for row in rows:
  100. contract_url_raw = row.get("contract_url")
  101. if not contract_url_raw:
  102. continue
  103. try:
  104. items = json.loads(contract_url_raw)
  105. if not isinstance(items, list):
  106. continue
  107. for item in items:
  108. if status is not None and item.get("status") != status:
  109. continue
  110. item_with_info = dict(item)
  111. item_with_info.update({
  112. "id": row.get("id"),
  113. "store_id": row.get("store_id"),
  114. "store_name": row.get("store_name"),
  115. "merchant_name": row.get("merchant_name"),
  116. "contact_phone": row.get("contact_phone")
  117. })
  118. all_filtered_items.append(item_with_info)
  119. except Exception as e:
  120. logger.error(f"Error processing contracts for store_id {store_id}: {e}", exc_info=True)
  121. continue
  122. # 4. 手动分页
  123. total = len(all_filtered_items)
  124. start = (page - 1) * page_size
  125. end = start + page_size
  126. paged_items = all_filtered_items[start:end]
  127. total_pages = (total + page_size - 1) // page_size if total > 0 else 0
  128. return {
  129. "items": paged_items,
  130. "total": total,
  131. "page": page,
  132. "page_size": page_size,
  133. "total_pages": total_pages
  134. }
  135. async def get_contract_detail(self, sign_flow_id: str) -> dict:
  136. """获取合同详情"""
  137. row, item, items = await self.esign_repo.get_contract_item_by_sign_flow_id(sign_flow_id)
  138. if not item:
  139. return {"success": False, "message": "未找到合同"}
  140. status = item.get("status")
  141. if status == 0:
  142. return await self._get_pending_contract_detail(sign_flow_id, row, item, items)
  143. elif status == 1:
  144. return await self._get_signed_contract_detail(sign_flow_id, row, item, items)
  145. return {"success": False, "message": "未知合同状态", "raw": {"status": status}}
  146. async def _get_pending_contract_detail(self, sign_flow_id: str, row, item, items) -> dict:
  147. file_id = item.get("file_id")
  148. if not file_id:
  149. return {"success": False, "message": "缺少 file_id,无法获取合同详情"}
  150. try:
  151. detail_resp = esign_main.get_contract_detail(file_id)
  152. detail_json = json.loads(detail_resp)
  153. data = detail_json.get("data") if isinstance(detail_json, dict) else None
  154. contract_url_val = None
  155. if isinstance(data, dict):
  156. contract_url_val = data.get("fileDownloadUrl")
  157. if not contract_url_val and isinstance(detail_json, dict):
  158. contract_url_val = detail_json.get("fileDownloadUrl")
  159. except Exception as e:
  160. logger.error(f"get_contract_detail failed file_id={file_id}: {e}")
  161. return {"success": False, "message": "获取合同链接失败", "raw": str(e)}
  162. if not contract_url_val:
  163. logger.error(f"get_contract_detail missing contract_url file_id={file_id}: {detail_resp}")
  164. return {"success": False, "message": "e签宝返回缺少合同链接", "raw": detail_resp}
  165. # 更新数据库中的合同链接
  166. if row and isinstance(items, list):
  167. for it in items:
  168. if it.get("sign_flow_id") == sign_flow_id:
  169. it["contract_url"] = contract_url_val
  170. break
  171. await self.esign_repo.update_contract_items(row["id"], items)
  172. # 获取签署链接
  173. contact_phone = item.get("contact_phone") or (row.get("contact_phone") if isinstance(row, dict) else None)
  174. if not contact_phone:
  175. return {"success": False, "message": "缺少 contact_phone,无法获取签署链接"}
  176. try:
  177. sign_resp = sign_url(sign_flow_id, contact_phone)
  178. sign_json = json.loads(sign_resp)
  179. sign_data = sign_json.get("data") if isinstance(sign_json, dict) else None
  180. result_sign_url = sign_data.get("url") if isinstance(sign_data, dict) else None
  181. except Exception as e:
  182. logger.error(f"sign_url failed sign_flow_id={sign_flow_id}, contact_phone={contact_phone}: {e}")
  183. return {"success": False, "message": "获取签署链接失败", "raw": str(e)}
  184. if not result_sign_url:
  185. logger.error(f"sign_url missing url: {sign_json}")
  186. return {"success": False, "message": "e签宝返回缺少签署链接", "raw": sign_json}
  187. await self.esign_repo.update_sign_url(contact_phone, sign_flow_id, result_sign_url)
  188. return {
  189. "status": 0,
  190. "contract_url": contract_url_val,
  191. "sign_url": result_sign_url,
  192. "sign_flow_id": sign_flow_id
  193. }
  194. async def _get_signed_contract_detail(self, sign_flow_id: str, row, item, items) -> dict:
  195. try:
  196. download_resp = file_download_url(sign_flow_id)
  197. download_json = json.loads(download_resp)
  198. contract_download_url = download_json["data"]["files"][0]["downloadUrl"]
  199. except Exception as e:
  200. logger.error(f"file_download_url failed sign_flow_id={sign_flow_id}: {e}")
  201. return {"success": False, "message": "获取合同下载链接失败", "raw": str(e)}
  202. if row and isinstance(items, list):
  203. for it in items:
  204. if it.get("sign_flow_id") == sign_flow_id:
  205. it["contract_download_url"] = contract_download_url
  206. it["contract_url"] = contract_download_url
  207. break
  208. await self.esign_repo.update_contract_items(row["id"], items)
  209. return {
  210. "status": 1,
  211. "contract_url": contract_download_url,
  212. "contract_download_url": contract_download_url,
  213. "sign_flow_id": sign_flow_id
  214. }
  215. async def process_esign_callback(self, payload: dict) -> dict:
  216. """处理 e签宝 回调"""
  217. sign_result = payload.get("signResult")
  218. sign_flow_id = payload.get("signFlowId")
  219. operator = payload.get("operator") or {}
  220. psn_account = operator.get("psnAccount") or {}
  221. contact_phone = psn_account.get("accountMobile")
  222. ts_ms = payload.get("operateTime") or payload.get("timestamp")
  223. signing_dt = None
  224. if ts_ms:
  225. try:
  226. signing_dt = datetime.datetime.fromtimestamp(ts_ms / 1000)
  227. except Exception:
  228. signing_dt = None
  229. if sign_result == 2:
  230. contract_download_url = None
  231. try:
  232. download_resp = file_download_url(sign_flow_id)
  233. download_json = json.loads(download_resp)
  234. contract_download_url = download_json["data"]["files"][0]["downloadUrl"]
  235. except Exception as e:
  236. logger.error(f"file_download_url failed for sign_flow_id={sign_flow_id}: {e}")
  237. updated = await self.esign_repo.mark_signed_by_phone(contact_phone, sign_flow_id, signing_dt, contract_download_url)
  238. logger.info(f"esign_callback success phone={contact_phone}, sign_flow_id={sign_flow_id}, updated={updated}")
  239. return {"success": True, "code": "200", "msg": "success"}
  240. logger.error(f"esign_callback ignored payload: {payload}")
  241. return {"success": False, "message": "未处理: signResult!=2 或手机号/签署流程缺失"}
  242. async def list_by_store(self, store_id: int):
  243. return await self.esign_repo.get_by_store_id(store_id)
  244. async def get_store_reason(self, store_id: int) -> str | None:
  245. return await self.esign_repo.check_store_status(store_id)
  246. async def get_contract_item_by_sign_flow_id(self, sign_flow_id: str):
  247. return await self.esign_repo.get_contract_item_by_sign_flow_id(sign_flow_id)
  248. async def update_contract_items(self, row_id: int, items: list) -> bool:
  249. return await self.esign_repo.update_contract_items(row_id, items)
  250. async def list_all_paged(
  251. self,
  252. page: int,
  253. page_size: int = 10,
  254. store_name: str | None = None,
  255. merchant_name: str | None = None,
  256. signing_status: str | None = None,
  257. business_segment: str | None = None,
  258. store_status: str | None = None,
  259. expiry_start=None,
  260. expiry_end=None,
  261. ):
  262. items, total = await self.esign_repo.get_all_paged(
  263. page,
  264. page_size,
  265. store_name=store_name,
  266. merchant_name=merchant_name,
  267. signing_status=signing_status,
  268. business_segment=business_segment,
  269. store_status=store_status,
  270. expiry_start=expiry_start,
  271. expiry_end=expiry_end,
  272. )
  273. return items, total
  274. async def mark_signed_by_phone(self, contact_phone: str, sign_flow_id: str, signing_time, contract_download_url):
  275. return await self.esign_repo.mark_signed_by_phone(contact_phone, sign_flow_id, signing_time, contract_download_url)
  276. async def update_sign_url(self, contact_phone: str, sign_flow_id: str, sign_url: str):
  277. return await self.esign_repo.update_sign_url(contact_phone, sign_flow_id, sign_url)
  278. async def append_contract_url(self, templates_data, contract_item: dict):
  279. return await self.esign_repo.append_contract_url(templates_data, contract_item)