Parcourir la source

refactor(store): 将router的业务逻辑解耦到contract_server

xiaoqinghui il y a 1 mois
Parent
commit
4c2bb63121
2 fichiers modifiés avec 315 ajouts et 269 suppressions
  1. 22 269
      alien_store/api/router.py
  2. 293 0
      alien_store/services/contract_server.py

+ 22 - 269
alien_store/api/router.py

@@ -1,9 +1,9 @@
 import datetime
-from fastapi import APIRouter, Depends, Query
-from typing import Any,  Union, Optional
-import os
 import logging
+from fastapi import APIRouter, Depends, Query
+from typing import Any, Union, Optional
 from pydantic import ValidationError
+
 from alien_store.api.deps import get_contract_service
 from alien_store.schemas.request.contract_store import TemplatesCreate
 from alien_store.schemas.response.contract_store import (
@@ -15,35 +15,9 @@ from alien_store.schemas.response.contract_store import (
     SuccessResponse
 )
 from alien_store.services.contract_server import ContractServer
-from common.esigntool.main import *
-from common.esigntool import main as esign_main
-import re, urllib.parse
-
-# ------------------- 日志配置 -------------------
-LOG_DIR = os.path.join("common", "logs", "alien_store")
-os.makedirs(LOG_DIR, exist_ok=True)
-
-def _init_logger():
-    logger = logging.getLogger("alien_store")
-    if logger.handlers:
-        return logger
-    logger.setLevel(logging.INFO)
-    fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s %(message)s")
-    info_handler = logging.FileHandler(os.path.join(LOG_DIR, "info.log"), encoding="utf-8")
-    info_handler.setLevel(logging.INFO)
-    info_handler.setFormatter(fmt)
-    error_handler = logging.FileHandler(os.path.join(LOG_DIR, "error.log"), encoding="utf-8")
-    error_handler.setLevel(logging.ERROR)
-    error_handler.setFormatter(fmt)
-    logger.addHandler(info_handler)
-    logger.addHandler(error_handler)
-    # 控制台可选: logger.addHandler(logging.StreamHandler())
-    return logger
-
-logger = _init_logger()
 
 router = APIRouter()
-
+logger = logging.getLogger("alien_store")
 
 def _format_validation_errors(exc: ValidationError) -> list[dict[str, str]]:
     errors = []
@@ -65,12 +39,12 @@ async def index() -> ModuleStatusResponse:
 
 @router.post("/get_esign_templates", response_model=Union[TemplatesCreateResponse, ErrorResponse])
 async def create_esign_templates(
-    templates_data: dict[str, Any],
+    templates_data_raw: dict[str, Any],
     templates_server: ContractServer = Depends(get_contract_service)
 ) -> Union[TemplatesCreateResponse, ErrorResponse]:
     """AI审核完调用 e签宝生成文件"""
     try:
-        templates_data = TemplatesCreate.model_validate(templates_data)
+        templates_data = TemplatesCreate.model_validate(templates_data_raw)
     except ValidationError as e:
         detail = _format_validation_errors(e)
         logger.error("get_esign_templates validation failed: %s", detail)
@@ -80,66 +54,11 @@ async def create_esign_templates(
             raw={"errors": detail},
         )
 
-    logger.info(f"get_esign_templates request: {templates_data}")
-    # res_text = fill_in_template(templates_data.merchant_name)
-    res_text = fill_in_template(templates_data.store_name)
-    try:
-        res_data = json.loads(res_text)
-    except json.JSONDecodeError:
-        logger.error(f"fill_in_template non-json resp: {res_text}")
-        return ErrorResponse(success=False, message="e签宝返回非 JSON", raw=res_text)
-    # 从返回结构提取下载链接,需与实际返回字段匹配
-    try:
-        contract_url = res_data["data"]["fileDownloadUrl"]
-        file_id = res_data["data"]["fileId"]
-        m = re.search(r'/([^/]+)\.pdf', contract_url)
-        if m:
-            encoded_name = m.group(1)
-            file_name = urllib.parse.unquote(encoded_name)
-    except Exception:
-        logger.error(f"fill_in_template missing fileDownloadUrl: {res_data}")
-        return ErrorResponse(success=False, message="e签宝返回缺少 fileDownloadUrl", raw=res_data)
-
-    # sign_data = create_by_file(file_id, file_name, templates_data.contact_phone, templates_data.merchant_name)
-    sign_data = create_by_file(file_id, file_name, templates_data.contact_phone, templates_data.store_name, templates_data.merchant_name, templates_data.ord_id)
-    print(sign_data)
-    try:
-        sign_json = json.loads(sign_data)
-    except json.JSONDecodeError:
-        logger.error(f"create_by_file non-json resp: {sign_data}")
-        return ErrorResponse(success=False, message="e签宝 create_by_file 返回非 JSON", raw=sign_data)
-
-    if not sign_json.get("data"):
-        logger.error(f"create_by_file failed or missing data: {sign_json}")
-        return ErrorResponse(success=False, message="e签宝创建签署流程失败", raw=sign_json)
-
-    sing_id = sign_json["data"].get("signFlowId")
-    if not sing_id:
-        logger.error(f"create_by_file missing signFlowId: {sign_json}")
-        return ErrorResponse(success=False, message="e签宝返回缺少 signFlowId", raw=sign_json)
-
-    result_contract = {
-        "contract_url": contract_url, # 合同模版链接
-        "file_name": file_name, # 签署的合同的文件名称
-        "file_id": file_id, # 生成的文件ID
-        "status": 0, # 签署状态 0 未签署 1 已签署
-        "sign_flow_id": sing_id, # 从
-        "sign_url": "", # e签宝生成的签署页面
-        "signing_time": "", # 签署合同的时间
-        "effective_time": "", # 合同生效的时间
-        "expiry_time": "", # 合同失效的时间
-        "contract_download_url": "", # 合同签署完成后 下载文件的链接
-        "is_master": 1 # 是否是入驻店铺的协议合同 是 1 否 0
-    }
-    updated = await templates_server.append_contract_url(templates_data, result_contract)
-    logger.info(f"get_esign_templates success contact_phone={templates_data.contact_phone}, sign_flow_id={sing_id}")
-    return TemplatesCreateResponse(
-        success=True,
-        message="合同模板已追加/创建",
-        sign_flow_id=sing_id,
-        file_id=file_id,
-        contract_url=contract_url
-    )
+    result = await templates_server.create_esign_templates(templates_data)
+    if not result.get("success"):
+        return ErrorResponse(**result)
+    
+    return TemplatesCreateResponse(**result)
 
 @router.get("/contracts/{store_id}", response_model=Union[dict, Any])
 async def list_contracts(
@@ -150,68 +69,7 @@ async def list_contracts(
     templates_server: ContractServer = Depends(get_contract_service)
 ) -> Any:
     """根据 store_id 查询所有合同,支持根据 status 筛选和分页"""
-    logger.info(
-        "list_contracts request store_id=%s status=%s page=%s page_size=%s",
-        store_id,
-        status,
-        page,
-        page_size,
-    )
-    try:
-        # 1. 检查 store_info 中的审核状态
-        reason = await templates_server.get_store_reason(store_id)
-        if reason != "审核通过":
-            return {"code": 555, "msg": "先进行认证", "reason": reason}
-
-        # 2. 返回合同列表
-        rows = await templates_server.list_by_store(store_id)
-        
-        all_filtered_items = []
-        # 3. 解析并筛选所有符合条件的合同项
-        for row in rows:
-            contract_url_raw = row.get("contract_url")
-            if not contract_url_raw:
-                continue
-            try:
-                items = json.loads(contract_url_raw)
-                if not isinstance(items, list):
-                    continue
-                
-                for item in items:
-                    # 如果传了 status,则进行筛选
-                    if status is not None and item.get("status") != status:
-                        continue
-                    
-                    # 将店铺基础信息混入每个合同项中,方便前端展示
-                    item_with_info = dict(item)
-                    item_with_info["id"] = row.get("id")
-                    item_with_info["store_id"] = row.get("store_id")
-                    item_with_info["store_name"] = row.get("store_name")
-                    item_with_info["merchant_name"] = row.get("merchant_name")
-                    item_with_info["contact_phone"] = row.get("contact_phone")
-                    all_filtered_items.append(item_with_info)
-            except Exception as e:
-                logger.error(f"Error processing contracts for store_id {store_id}: {e}", exc_info=True)
-                continue
-
-        # 4. 手动分页
-        total = len(all_filtered_items)
-        start = (page - 1) * page_size
-        end = start + page_size
-        paged_items = all_filtered_items[start:end]
-        
-        total_pages = (total + page_size - 1) // page_size if total > 0 else 0
-
-        return {
-            "items": paged_items,
-            "total": total,
-            "page": page,
-            "page_size": page_size,
-            "total_pages": total_pages
-        }
-    except Exception as e:
-        logger.error(f"list_contracts failed store_id={store_id}: {e}", exc_info=True)
-        return {"code": 500, "msg": "查询合同失败", "error": str(e)}
+    return await templates_server.list_contracts(store_id, status, page, page_size)
 
 @router.get("/contracts/detail/{sign_flow_id}", response_model=Union[dict, ErrorResponse])
 async def get_contract_detail(
@@ -223,91 +81,10 @@ async def get_contract_detail(
     - status=0: 返回合同PDF链接(contract_url)和签署链接(sign_url)
     - status=1: 拉取最新下载链接并更新数据库,返回 contract_download_url
     """
-    row, item, items = await templates_server.get_contract_item_by_sign_flow_id(sign_flow_id)
-    if not item:
-        return ErrorResponse(success=False, message="未找到合同")
-
-    status = item.get("status")
-    if status == 0:
-        file_id = item.get("file_id")
-        if not file_id:
-            return ErrorResponse(success=False, message="缺少 file_id,无法获取合同详情")
-        try:
-            detail_resp = esign_main.get_contract_detail(file_id)
-            detail_json = json.loads(detail_resp)
-            data = detail_json.get("data") if isinstance(detail_json, dict) else None
-            contract_url = None
-            if isinstance(data, dict):
-                contract_url = data.get("fileDownloadUrl")
-            if not contract_url and isinstance(detail_json, dict):
-                contract_url = detail_json.get("fileDownloadUrl")
-        except Exception as e:
-            logger.error(f"get_contract_detail failed file_id={file_id}: {e}")
-            return ErrorResponse(success=False, message="获取合同链接失败", raw=str(e))
-
-        if not contract_url:
-            logger.error(f"get_contract_detail missing contract_url file_id={file_id}: {detail_resp}")
-            return ErrorResponse(success=False, message="e签宝返回缺少合同链接", raw=detail_resp)
-
-        if row and isinstance(items, list):
-            for it in items:
-                if it.get("sign_flow_id") == sign_flow_id:
-                    it["contract_url"] = contract_url
-                    break
-            await templates_server.update_contract_items(row["id"], items)
-
-        # 融合原 /esign/signurl 逻辑:调用 e签宝 获取签署链接并落库
-        contact_phone = item.get("contact_phone") or (row.get("contact_phone") if isinstance(row, dict) else None)
-        if not contact_phone:
-            return ErrorResponse(success=False, message="缺少 contact_phone,无法获取签署链接")
-        try:
-            sign_resp = sign_url(sign_flow_id, contact_phone)
-            sign_json = json.loads(sign_resp)
-        except json.JSONDecodeError:
-            logger.error(f"sign_url non-json resp: {sign_resp}")
-            return ErrorResponse(success=False, message="e签宝返回非JSON", raw=sign_resp)
-        except Exception as e:
-            logger.error(f"sign_url failed sign_flow_id={sign_flow_id}, contact_phone={contact_phone}: {e}")
-            return ErrorResponse(success=False, message="获取签署链接失败", raw=str(e))
-
-        sign_data = sign_json.get("data") if isinstance(sign_json, dict) else None
-        result_sign_url = sign_data.get("url") if isinstance(sign_data, dict) else None
-        if not result_sign_url:
-            logger.error(f"sign_url missing url: {sign_json}")
-            return ErrorResponse(success=False, message="e签宝返回缺少签署链接", raw=sign_json)
-        await templates_server.update_sign_url(contact_phone, sign_flow_id, result_sign_url)
-
-        return {
-            "status": 0,
-            "contract_url": contract_url,
-            "sign_url": result_sign_url,
-            "sign_flow_id": sign_flow_id
-        }
-
-    if status == 1:
-        try:
-            download_resp = file_download_url(sign_flow_id)
-            download_json = json.loads(download_resp)
-            contract_download_url = download_json["data"]["files"][0]["downloadUrl"]
-        except Exception as e:
-            logger.error(f"file_download_url failed sign_flow_id={sign_flow_id}: {e}")
-            return ErrorResponse(success=False, message="获取合同下载链接失败", raw=str(e))
-        if row and isinstance(items, list):
-            for it in items:
-                if it.get("sign_flow_id") == sign_flow_id:
-                    it["contract_download_url"] = contract_download_url
-                    it["contract_url"] = contract_download_url  # 与 status=0 一致,用 file_download_url 更新最新 contract_url
-                    break
-            await templates_server.update_contract_items(row["id"], items)
-
-        return {
-            "status": 1,
-            "contract_url": contract_download_url,
-            "contract_download_url": contract_download_url,
-            "sign_flow_id": sign_flow_id
-        }
-
-    return ErrorResponse(success=False, message="未知合同状态", raw={"status": status})
+    result = await templates_server.get_contract_detail(sign_flow_id)
+    if not result.get("success", True): # get_contract_detail 返回的成功结果里没有 success 键,只有 status
+        return ErrorResponse(**result)
+    return result
 
 @router.get("/get_all_templates", response_model=PaginatedResponse)
 async def get_all_templates(
@@ -318,8 +95,8 @@ async def get_all_templates(
     signing_status: Optional[str] = Query(None, description="签署状态"),
     business_segment: Optional[str] = Query(None, description="经营板块"),
     store_status: Optional[str] = Query(None, description="店铺状态:正常/禁用"),
-    expiry_start: Optional[datetime] = Query(None, description="到期时间起"),
-    expiry_end: Optional[datetime] = Query(None, description="到期时间止"),
+    expiry_start: Optional[datetime.datetime] = Query(None, description="到期时间起"),
+    expiry_end: Optional[datetime.datetime] = Query(None, description="到期时间止"),
     templates_server: ContractServer = Depends(get_contract_service)
 ) -> PaginatedResponse:
     """分页查询所有合同,支持筛选"""
@@ -354,34 +131,10 @@ async def esign_callback(
     e签宝签署结果回调
     需求:签署完成 -> 更新 signing_status=已签署,contract_url 中 status=1
     """
-    sign_result = payload.get("signResult")
-    operator = payload.get("operator") or {}
-    sign_flow_id = payload.get("signFlowId")
-    psn_account = operator.get("psnAccount") or {}
-    contact_phone = psn_account.get("accountMobile")
-    # 取回调中的毫秒时间戳,优先 operateTime,其次 timestamp
-    ts_ms = payload.get("operateTime") or payload.get("timestamp")
-    signing_dt = None
-    if ts_ms:
-        try:
-            signing_dt = datetime.fromtimestamp(ts_ms / 1000)
-        except Exception:
-            signing_dt = None
-
-    if sign_result == 2:
-        # 获取合同下载链接
-        contract_download_url = None
-        try:
-            download_resp = file_download_url(sign_flow_id)
-            download_json = json.loads(download_resp)
-            contract_download_url = download_json["data"]["files"][0]["downloadUrl"]
-        except Exception as e:
-            logger.error(f"file_download_url failed for sign_flow_id={sign_flow_id}: {e}")
-        updated = await templates_server.mark_signed_by_phone(contact_phone, sign_flow_id, signing_dt, contract_download_url)
-        logger.info(f"esign_callback success phone={contact_phone}, sign_flow_id={sign_flow_id}, updated={updated}")
-        return SuccessResponse(code="200", msg="success")
-    logger.error(f"esign_callback ignored payload: {payload}")
-    return ErrorResponse(success=False, message="未处理: signResult!=2 或手机号/签署流程缺失")
+    result = await templates_server.process_esign_callback(payload)
+    if not result.get("success"):
+        return ErrorResponse(**result)
+    return SuccessResponse(code=result["code"], msg=result["msg"])
 
 
 

+ 293 - 0
alien_store/services/contract_server.py

@@ -1,6 +1,38 @@
+import datetime
+import os
+import logging
+import json
+import re
+import urllib.parse
+from typing import Any, Union, Optional
 from sqlalchemy.ext.asyncio import AsyncSession
 from alien_store.repositories.contract_repo import ContractRepository
 from alien_store.schemas.request.contract_store import TemplatesCreate
+from common.esigntool import main as esign_main
+from common.esigntool.main import fill_in_template, create_by_file, sign_url, file_download_url
+
+
+# ------------------- 日志配置 -------------------
+LOG_DIR = os.path.join("common", "logs", "alien_store")
+os.makedirs(LOG_DIR, exist_ok=True)
+
+def _init_logger():
+    logger = logging.getLogger("alien_store_service")
+    if logger.handlers:
+        return logger
+    logger.setLevel(logging.INFO)
+    fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s %(message)s")
+    info_handler = logging.FileHandler(os.path.join(LOG_DIR, "info.log"), encoding="utf-8")
+    info_handler.setLevel(logging.INFO)
+    info_handler.setFormatter(fmt)
+    error_handler = logging.FileHandler(os.path.join(LOG_DIR, "error.log"), encoding="utf-8")
+    error_handler.setLevel(logging.ERROR)
+    error_handler.setFormatter(fmt)
+    logger.addHandler(info_handler)
+    logger.addHandler(error_handler)
+    return logger
+
+logger = _init_logger()
 
 
 class ContractServer:
@@ -14,6 +46,267 @@ class ContractServer:
             "message": "模板创建成功"
         }
 
+    async def create_esign_templates(self, templates_data: TemplatesCreate) -> dict:
+        """AI审核完调用 e签宝生成文件"""
+        logger.info(f"create_esign_templates request: {templates_data}")
+        
+        # 1. 填充模板
+        res_text = fill_in_template(templates_data.store_name)
+        try:
+            res_data = json.loads(res_text)
+        except json.JSONDecodeError:
+            logger.error(f"fill_in_template non-json resp: {res_text}")
+            return {"success": False, "message": "e签宝返回非 JSON", "raw": res_text}
+
+        # 2. 提取文件信息
+        try:
+            contract_url = res_data["data"]["fileDownloadUrl"]
+            file_id = res_data["data"]["fileId"]
+            m = re.search(r'/([^/]+)\.pdf', contract_url)
+            if m:
+                encoded_name = m.group(1)
+                file_name = urllib.parse.unquote(encoded_name)
+            else:
+                file_name = "contract.pdf"
+        except Exception:
+            logger.error(f"fill_in_template missing fileDownloadUrl: {res_data}")
+            return {"success": False, "message": "e签宝返回缺少 fileDownloadUrl", "raw": res_data}
+
+        # 3. 创建签署流程
+        sign_data = create_by_file(
+            file_id, 
+            file_name, 
+            templates_data.contact_phone, 
+            templates_data.store_name, 
+            templates_data.merchant_name, 
+            templates_data.ord_id
+        )
+        try:
+            sign_json = json.loads(sign_data)
+        except json.JSONDecodeError:
+            logger.error(f"create_by_file non-json resp: {sign_data}")
+            return {"success": False, "message": "e签宝 create_by_file 返回非 JSON", "raw": sign_data}
+
+        if not sign_json.get("data"):
+            logger.error(f"create_by_file failed or missing data: {sign_json}")
+            return {"success": False, "message": "e签宝创建签署流程失败", "raw": sign_json}
+
+        sign_id = sign_json["data"].get("signFlowId")
+        if not sign_id:
+            logger.error(f"create_by_file missing signFlowId: {sign_json}")
+            return {"success": False, "message": "e签宝返回缺少 signFlowId", "raw": sign_json}
+
+        # 4. 构建合同记录
+        result_contract = {
+            "contract_url": contract_url,
+            "file_name": file_name,
+            "file_id": file_id,
+            "status": 0,
+            "sign_flow_id": sign_id,
+            "sign_url": "",
+            "signing_time": "",
+            "effective_time": "",
+            "expiry_time": "",
+            "contract_download_url": "",
+            "is_master": 1
+        }
+        
+        await self.esign_repo.append_contract_url(templates_data, result_contract)
+        logger.info(f"create_esign_templates success contact_phone={templates_data.contact_phone}, sign_flow_id={sign_id}")
+        
+        return {
+            "success": True,
+            "message": "合同模板已追加/创建",
+            "sign_flow_id": sign_id,
+            "file_id": file_id,
+            "contract_url": contract_url
+        }
+
+    async def list_contracts(self, store_id: int, status: Optional[int], page: int, page_size: int) -> dict:
+        """根据 store_id 查询所有合同,支持根据 status 筛选和分页"""
+        logger.info(
+            "list_contracts request store_id=%s status=%s page=%s page_size=%s",
+            store_id, status, page, page_size
+        )
+        
+        # 1. 检查 store_info 中的审核状态
+        reason = await self.esign_repo.check_store_status(store_id)
+        if reason != "审核通过":
+            return {"code": 555, "msg": "先进行认证", "reason": reason}
+
+        # 2. 获取原始数据
+        rows = await self.esign_repo.get_by_store_id(store_id)
+        
+        all_filtered_items = []
+        # 3. 解析并筛选
+        for row in rows:
+            contract_url_raw = row.get("contract_url")
+            if not contract_url_raw:
+                continue
+            try:
+                items = json.loads(contract_url_raw)
+                if not isinstance(items, list):
+                    continue
+                
+                for item in items:
+                    if status is not None and item.get("status") != status:
+                        continue
+                    
+                    item_with_info = dict(item)
+                    item_with_info.update({
+                        "id": row.get("id"),
+                        "store_id": row.get("store_id"),
+                        "store_name": row.get("store_name"),
+                        "merchant_name": row.get("merchant_name"),
+                        "contact_phone": row.get("contact_phone")
+                    })
+                    all_filtered_items.append(item_with_info)
+            except Exception as e:
+                logger.error(f"Error processing contracts for store_id {store_id}: {e}", exc_info=True)
+                continue
+
+        # 4. 手动分页
+        total = len(all_filtered_items)
+        start = (page - 1) * page_size
+        end = start + page_size
+        paged_items = all_filtered_items[start:end]
+        total_pages = (total + page_size - 1) // page_size if total > 0 else 0
+
+        return {
+            "items": paged_items,
+            "total": total,
+            "page": page,
+            "page_size": page_size,
+            "total_pages": total_pages
+        }
+
+    async def get_contract_detail(self, sign_flow_id: str) -> dict:
+        """获取合同详情"""
+        row, item, items = await self.esign_repo.get_contract_item_by_sign_flow_id(sign_flow_id)
+        if not item:
+            return {"success": False, "message": "未找到合同"}
+
+        status = item.get("status")
+        if status == 0:
+            return await self._get_pending_contract_detail(sign_flow_id, row, item, items)
+        elif status == 1:
+            return await self._get_signed_contract_detail(sign_flow_id, row, item, items)
+        
+        return {"success": False, "message": "未知合同状态", "raw": {"status": status}}
+
+    async def _get_pending_contract_detail(self, sign_flow_id: str, row, item, items) -> dict:
+        file_id = item.get("file_id")
+        if not file_id:
+            return {"success": False, "message": "缺少 file_id,无法获取合同详情"}
+        
+        try:
+            detail_resp = esign_main.get_contract_detail(file_id)
+            detail_json = json.loads(detail_resp)
+            data = detail_json.get("data") if isinstance(detail_json, dict) else None
+            contract_url_val = None
+            if isinstance(data, dict):
+                contract_url_val = data.get("fileDownloadUrl")
+            if not contract_url_val and isinstance(detail_json, dict):
+                contract_url_val = detail_json.get("fileDownloadUrl")
+        except Exception as e:
+            logger.error(f"get_contract_detail failed file_id={file_id}: {e}")
+            return {"success": False, "message": "获取合同链接失败", "raw": str(e)}
+
+        if not contract_url_val:
+            logger.error(f"get_contract_detail missing contract_url file_id={file_id}: {detail_resp}")
+            return {"success": False, "message": "e签宝返回缺少合同链接", "raw": detail_resp}
+
+        # 更新数据库中的合同链接
+        if row and isinstance(items, list):
+            for it in items:
+                if it.get("sign_flow_id") == sign_flow_id:
+                    it["contract_url"] = contract_url_val
+                    break
+            await self.esign_repo.update_contract_items(row["id"], items)
+
+        # 获取签署链接
+        contact_phone = item.get("contact_phone") or (row.get("contact_phone") if isinstance(row, dict) else None)
+        if not contact_phone:
+            return {"success": False, "message": "缺少 contact_phone,无法获取签署链接"}
+        
+        try:
+            sign_resp = sign_url(sign_flow_id, contact_phone)
+            sign_json = json.loads(sign_resp)
+            sign_data = sign_json.get("data") if isinstance(sign_json, dict) else None
+            result_sign_url = sign_data.get("url") if isinstance(sign_data, dict) else None
+        except Exception as e:
+            logger.error(f"sign_url failed sign_flow_id={sign_flow_id}, contact_phone={contact_phone}: {e}")
+            return {"success": False, "message": "获取签署链接失败", "raw": str(e)}
+
+        if not result_sign_url:
+            logger.error(f"sign_url missing url: {sign_json}")
+            return {"success": False, "message": "e签宝返回缺少签署链接", "raw": sign_json}
+            
+        await self.esign_repo.update_sign_url(contact_phone, sign_flow_id, result_sign_url)
+
+        return {
+            "status": 0,
+            "contract_url": contract_url_val,
+            "sign_url": result_sign_url,
+            "sign_flow_id": sign_flow_id
+        }
+
+    async def _get_signed_contract_detail(self, sign_flow_id: str, row, item, items) -> dict:
+        try:
+            download_resp = file_download_url(sign_flow_id)
+            download_json = json.loads(download_resp)
+            contract_download_url = download_json["data"]["files"][0]["downloadUrl"]
+        except Exception as e:
+            logger.error(f"file_download_url failed sign_flow_id={sign_flow_id}: {e}")
+            return {"success": False, "message": "获取合同下载链接失败", "raw": str(e)}
+            
+        if row and isinstance(items, list):
+            for it in items:
+                if it.get("sign_flow_id") == sign_flow_id:
+                    it["contract_download_url"] = contract_download_url
+                    it["contract_url"] = contract_download_url
+                    break
+            await self.esign_repo.update_contract_items(row["id"], items)
+
+        return {
+            "status": 1,
+            "contract_url": contract_download_url,
+            "contract_download_url": contract_download_url,
+            "sign_flow_id": sign_flow_id
+        }
+
+    async def process_esign_callback(self, payload: dict) -> dict:
+        """处理 e签宝 回调"""
+        sign_result = payload.get("signResult")
+        sign_flow_id = payload.get("signFlowId")
+        operator = payload.get("operator") or {}
+        psn_account = operator.get("psnAccount") or {}
+        contact_phone = psn_account.get("accountMobile")
+        
+        ts_ms = payload.get("operateTime") or payload.get("timestamp")
+        signing_dt = None
+        if ts_ms:
+            try:
+                signing_dt = datetime.datetime.fromtimestamp(ts_ms / 1000)
+            except Exception:
+                signing_dt = None
+
+        if sign_result == 2:
+            contract_download_url = None
+            try:
+                download_resp = file_download_url(sign_flow_id)
+                download_json = json.loads(download_resp)
+                contract_download_url = download_json["data"]["files"][0]["downloadUrl"]
+            except Exception as e:
+                logger.error(f"file_download_url failed for sign_flow_id={sign_flow_id}: {e}")
+            
+            updated = await self.esign_repo.mark_signed_by_phone(contact_phone, sign_flow_id, signing_dt, contract_download_url)
+            logger.info(f"esign_callback success phone={contact_phone}, sign_flow_id={sign_flow_id}, updated={updated}")
+            return {"success": True, "code": "200", "msg": "success"}
+            
+        logger.error(f"esign_callback ignored payload: {payload}")
+        return {"success": False, "message": "未处理: signResult!=2 或手机号/签署流程缺失"}
+
     async def list_by_store(self, store_id: int):
         return await self.esign_repo.get_by_store_id(store_id)