Просмотр исходного кода

fix(contract): 修正 e签宝签署状态误判与时区,统一走合同中心

- 合同中心回调改为查询 e签宝 signFlowStatus 权威判定,避免平台自动盖章(SIGN_MISSON_COMPLETE/signResult=2)被误判为整单已签署
- 签署时间按东八区写入,修复 signing_time 比 created_time 早 8 小时的问题
- main.py 新增 query_sign_flow_detail 流程状态查询接口
- 移除 store/lawyer 旧合同读接口与带同样 bug 的旧回调(store_contract/lawyer_contract 表保留)
- 合同到期提醒任务改读合同中心(contract_document/contract_bundle),新增 subject_type 过滤(默认 store)
- 新增 scripts/reconcile_contract_status.py 存量数据对账回刷脚本(默认 dry-run)

Co-authored-by: Cursor <cursoragent@cursor.com>
天空之城 11 часов назад
Родитель
Сommit
0b4ca8634a

+ 5 - 0
alien_contract/infrastructure/esign/main.py

@@ -199,3 +199,8 @@ def fill_in_template(store_name: str, contract_type: str = "store_agreement"):
 def get_contract_detail(file_id: str):
     detail_url = f"/v3/files/{file_id}"
     return _request("GET", detail_url)
+
+
+def query_sign_flow_detail(sign_flow_id: str):
+    """查询签署流程详情,返回含 signFlowStatus(0草稿1签署中2完成3撤销4终止5过期6删除7拒签)"""
+    return _request("GET", f"/v3/sign-flow/{sign_flow_id}/detail")

+ 49 - 16
alien_contract/services/contract_server.py

@@ -35,16 +35,31 @@ def _init_logger():
 
 logger = _init_logger()
 
+# e签宝 v3 回调 action 说明:
+#   SIGN_MISSON_COMPLETE 单个签署任务完成(按签署人维度,平台方自动盖章也会触发,signResult=2)
+#   SIGN_FLOW_COMPLETE / SIGN_FLOW_FINISH 整个签署流程结束
+# 注意:绝不能用"单个任务完成"判定整份合同已签署,必须以流程真实状态 signFlowStatus 为准。
 SIGN_FLOW_FINISH_ACTIONS = {
     "SIGN_FLOW_FINISH",
     "SIGN_FLOW_COMPLETE",
 }
 
-NON_FINAL_SIGN_ACTIONS = {
-    "OPERATOR_COMPLETE_SIGN",
-    "OPERATOR_READ",
-    "OPERATOR_VIEW",
-}
+# e签宝 流程状态 signFlowStatus: 0草稿 1签署中 2完成 3撤销 4终止 5过期 6删除 7拒签
+ESIGN_FLOW_STATUS_COMPLETED = 2
+
+# 与数据库 NOW() 时区(东八区)保持一致,避免应用容器在 UTC 时写入早 8 小时的时间
+CN_TZ = datetime.timezone(datetime.timedelta(hours=8))
+
+
+def _ms_to_naive_cn(ms) -> "datetime.datetime | None":
+    """e签宝时间戳(epoch毫秒)转换为东八区(无时区)datetime"""
+    if not ms:
+        return None
+    try:
+        return datetime.datetime.fromtimestamp(ms / 1000, tz=CN_TZ).replace(tzinfo=None)
+    except Exception:
+        return None
+
 
 BUNDLE_CONFIGS = {
     "STORE_STANDARD": [
@@ -328,7 +343,17 @@ class ContractCenterService:
         event_type = f"esign_callback:{action or 'UNKNOWN'}"
         await self.repo.create_event(bundle.id, document.id, sign_flow_id, event_type[:50], payload)
 
-        mark_signed = bool(action in SIGN_FLOW_FINISH_ACTIONS or (sign_result == 2 and action not in NON_FINAL_SIGN_ACTIONS))
+        # 不能凭"单个签署任务完成(SIGN_MISSON_COMPLETE / signResult=2)"判定整份合同已签署,
+        # 因为平台方自动盖章也会触发该回调。以 e签宝 流程真实状态 signFlowStatus 为准。
+        flow_status, finish_ms = self._query_flow_status(sign_flow_id)
+
+        # 兜底:查询失败时,仅当为"流程结束"类回调且报文明确为已完成(2)才认定完成
+        if flow_status is None and action in SIGN_FLOW_FINISH_ACTIONS:
+            payload_status = payload.get("signFlowStatus") or payload.get("flowStatus")
+            if payload_status in (2, "2"):
+                flow_status = ESIGN_FLOW_STATUS_COMPLETED
+
+        mark_signed = flow_status == ESIGN_FLOW_STATUS_COMPLETED
 
         logger.info(
             "esign_callback_event %s",
@@ -341,6 +366,7 @@ class ContractCenterService:
                     "contract_type": document.contract_type,
                     "action": action,
                     "sign_result": sign_result,
+                    "flow_status": flow_status,
                     "operator_mobile": operator_mobile,
                 },
                 ensure_ascii=False,
@@ -348,13 +374,8 @@ class ContractCenterService:
         )
 
         if mark_signed:
-            ts_ms = payload.get("operateTime") or payload.get("timestamp")
-            signing_dt = None
-            if ts_ms:
-                try:
-                    signing_dt = datetime.datetime.fromtimestamp(ts_ms / 1000)
-                except Exception:
-                    signing_dt = None
+            ts_ms = finish_ms or payload.get("operateTime") or payload.get("timestamp")
+            signing_dt = _ms_to_naive_cn(ts_ms)
             effective_dt = expiry_dt = None
             if signing_dt:
                 effective_dt = (signing_dt + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
@@ -377,6 +398,18 @@ class ContractCenterService:
             return {"success": True, "code": "200", "msg": "success"}
 
         await self.repo.commit()
-        if action:
-            return {"success": True, "code": "200", "msg": f"ignored_action_{action}"}
-        return {"success": True, "code": "200", "msg": f"ignored_signResult_{sign_result}"}
+        return {"success": True, "code": "200", "msg": f"ignored_flow_status_{flow_status}"}
+
+    def _query_flow_status(self, sign_flow_id: str) -> tuple[int | None, int | None]:
+        """查询 e签宝 流程真实状态,返回 (signFlowStatus, signFlowFinishTime毫秒);失败返回 (None, None)"""
+        try:
+            detail_resp = esign_main.query_sign_flow_detail(sign_flow_id)
+            detail_json = json.loads(detail_resp)
+        except Exception as exc:
+            logger.error("query_sign_flow_detail error sign_flow_id=%s err=%s", sign_flow_id, exc)
+            return None, None
+        data = detail_json.get("data") if isinstance(detail_json, dict) else None
+        if not isinstance(data, dict):
+            logger.error("query_sign_flow_detail no data sign_flow_id=%s resp=%s", sign_flow_id, detail_json)
+            return None, None
+        return data.get("signFlowStatus"), data.get("signFlowFinishTime")

+ 0 - 5
alien_lawyer/api/deps.py

@@ -3,11 +3,6 @@ from sqlalchemy.ext.asyncio import AsyncSession
 
 from alien_database.session import get_db
 from alien_contract.services.contract_server import ContractCenterService
-from alien_lawyer.services.contract_server import LawyerContractServer
-
-
-def get_contract_service(db: AsyncSession = Depends(get_db)) -> LawyerContractServer:
-    return LawyerContractServer(db)
 
 
 def get_contract_center_service(db: AsyncSession = Depends(get_db)) -> ContractCenterService:

+ 4 - 40
alien_lawyer/api/router.py

@@ -1,9 +1,9 @@
-from typing import Any, Union, Optional
+from typing import Any, Union
 
-from fastapi import APIRouter, Depends, Query
+from fastapi import APIRouter, Depends
 from pydantic import ValidationError
 
-from alien_lawyer.api.deps import get_contract_service, get_contract_center_service
+from alien_lawyer.api.deps import get_contract_center_service
 from alien_contract.schemas.request.contract import BundleCreateRequest
 from alien_contract.services.contract_server import ContractCenterService
 from alien_lawyer.schemas.request.contract_lawyer import LawyerTemplatesCreate
@@ -11,10 +11,7 @@ from alien_lawyer.schemas.response.contract_lawyer import (
     ModuleStatusResponse,
     TemplatesCreateResponse,
     ErrorResponse,
-    PaginatedResponse,
-    SuccessResponse,
 )
-from alien_lawyer.services.contract_server import LawyerContractServer
 
 router = APIRouter()
 
@@ -44,6 +41,7 @@ async def create_esign_templates(
     templates_data_raw: dict[str, Any],
     templates_server: ContractCenterService = Depends(get_contract_center_service),
 ) -> Union[TemplatesCreateResponse, ErrorResponse]:
+    """AI审核完调用 e签宝生成文件(统一走合同中心)"""
     try:
         templates_data = LawyerTemplatesCreate.model_validate(templates_data_raw)
     except ValidationError as exc:
@@ -68,37 +66,3 @@ async def create_esign_templates(
         contract_url=result.get("created_contracts", [{}])[0].get("contract_url") if result.get("created_contracts") else None,
         created_contracts=result.get("created_contracts"),
     )
-
-
-@router.get("/contracts/{lawyer_id}", response_model=PaginatedResponse)
-async def list_contracts(
-    lawyer_id: int,
-    status: Optional[int] = Query(None, description="筛选合同状态:0 未签署,1 已签署"),
-    page: int = Query(1, ge=1, description="页码,从1开始"),
-    page_size: int = Query(10, ge=1, le=100, description="每页条数,默认10"),
-    templates_server: LawyerContractServer = Depends(get_contract_service),
-) -> PaginatedResponse:
-    result = await templates_server.list_contracts(lawyer_id, status, page, page_size)
-    return PaginatedResponse(**result)
-
-
-@router.get("/contracts/detail/{sign_flow_id}", response_model=Union[dict, ErrorResponse])
-async def get_contract_detail(
-    sign_flow_id: str,
-    templates_server: LawyerContractServer = Depends(get_contract_service),
-) -> Union[dict, ErrorResponse]:
-    result = await templates_server.get_contract_detail(sign_flow_id)
-    if not result.get("success", True):
-        return ErrorResponse(**result)
-    return result
-
-
-@router.post("/esign/callback", response_model=Union[SuccessResponse, ErrorResponse])
-async def esign_callback(
-    payload: dict,
-    templates_server: LawyerContractServer = Depends(get_contract_service),
-) -> Union[SuccessResponse, ErrorResponse]:
-    result = await templates_server.process_esign_callback(payload)
-    if not result.get("success"):
-        return ErrorResponse(**result)
-    return SuccessResponse(code=result["code"], msg=result["msg"])

+ 0 - 220
alien_lawyer/repositories/contract_repo.py

@@ -1,220 +0,0 @@
-import json
-import logging
-from datetime import datetime, timedelta
-
-from sqlalchemy.exc import DBAPIError
-from sqlalchemy.ext.asyncio import AsyncSession
-
-from alien_lawyer.db.models.lawyer_contract import LawyerContract
-
-logger = logging.getLogger(__name__)
-
-
-class LawyerContractRepository:
-    def __init__(self, db: AsyncSession):
-        self.db = db
-
-    async def _execute_with_retry(self, statement):
-        try:
-            return await self.db.execute(statement)
-        except Exception as exc:
-            if self._should_retry(exc):
-                logger.warning("DB connection invalidated, retrying once: %s", exc)
-                try:
-                    await self.db.rollback()
-                except Exception:
-                    pass
-                return await self.db.execute(statement)
-            raise
-
-    @staticmethod
-    def _should_retry(exc: Exception) -> bool:
-        if isinstance(exc, DBAPIError) and getattr(exc, "connection_invalidated", False):
-            return True
-        txt = str(exc).lower()
-        closed_keywords = ["closed", "lost connection", "connection was killed", "terminat"]
-        return any(k in txt for k in closed_keywords)
-
-    async def get_by_lawyer_id(self, lawyer_id: int):
-        result = await self._execute_with_retry(
-            LawyerContract.__table__.select().where(LawyerContract.lawyer_id == lawyer_id)
-        )
-        return [dict(row) for row in result.mappings().all()]
-
-    async def get_contract_item_by_sign_flow_id(self, sign_flow_id: str):
-        result = await self._execute_with_retry(LawyerContract.__table__.select())
-        rows = result.mappings().all()
-        for row in rows:
-            contract_url_raw = row.get("contract_url")
-            if not contract_url_raw:
-                continue
-            try:
-                items = json.loads(contract_url_raw)
-            except Exception:
-                items = None
-            if not isinstance(items, list):
-                continue
-            for item in items:
-                if item.get("sign_flow_id") == sign_flow_id:
-                    return dict(row), item, items
-        return None, None, None
-
-    async def update_contract_items(self, row_id: int, items: list) -> bool:
-        if not isinstance(items, list):
-            return False
-        await self._execute_with_retry(
-            LawyerContract.__table__.update()
-            .where(LawyerContract.id == row_id)
-            .values(contract_url=json.dumps(items, ensure_ascii=False))
-        )
-        await self.db.commit()
-        return True
-
-    async def mark_signed_by_phone(
-        self,
-        contact_phone: str,
-        sign_flow_id: str,
-        signing_time: datetime | None = None,
-        contract_download_url: str | None = None,
-    ):
-        result = await self._execute_with_retry(
-            LawyerContract.__table__.select().where(LawyerContract.contact_phone == contact_phone)
-        )
-        rows = result.mappings().all()
-        updated = False
-        for row in rows:
-            contract_url_raw = row.get("contract_url")
-            items = None
-            if contract_url_raw:
-                try:
-                    items = json.loads(contract_url_raw)
-                except Exception:
-                    items = None
-            changed = False
-            matched_item = None
-            if isinstance(items, list):
-                for item in items:
-                    if item.get("sign_flow_id") == sign_flow_id:
-                        item["status"] = 1
-                        if contract_download_url:
-                            item["contract_download_url"] = contract_download_url
-                        matched_item = item
-                        changed = True
-                        break
-
-            if changed and matched_item and matched_item.get("is_master") == 1:
-                signing_dt = signing_time
-                effective_dt = expiry_dt = None
-                if signing_dt:
-                    effective_dt = (signing_dt + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
-                    expiry_dt = effective_dt + timedelta(days=365)
-                    matched_item["signing_time"] = signing_dt.strftime("%Y-%m-%d %H:%M:%S")
-                    matched_item["effective_time"] = effective_dt.strftime("%Y-%m-%d %H:%M:%S")
-                    matched_item["expiry_time"] = expiry_dt.strftime("%Y-%m-%d %H:%M:%S")
-
-                await self._execute_with_retry(
-                    LawyerContract.__table__.update()
-                    .where(LawyerContract.id == row["id"])
-                    .values(
-                        signing_status="已签署",
-                        contract_url=json.dumps(items, ensure_ascii=False) if items else contract_url_raw,
-                        signing_time=signing_dt,
-                        effective_time=effective_dt,
-                        expiry_time=expiry_dt,
-                    )
-                )
-                updated = True
-            elif changed:
-                await self._execute_with_retry(
-                    LawyerContract.__table__.update()
-                    .where(LawyerContract.id == row["id"])
-                    .values(contract_url=json.dumps(items, ensure_ascii=False) if items else contract_url_raw)
-                )
-                updated = True
-        if updated:
-            await self.db.commit()
-        return updated
-
-    async def update_sign_url(self, contact_phone: str, sign_flow_id: str, sign_url: str):
-        result = await self._execute_with_retry(
-            LawyerContract.__table__.select().where(LawyerContract.contact_phone == contact_phone)
-        )
-        rows = result.mappings().all()
-        updated = False
-        for row in rows:
-            contract_url_raw = row.get("contract_url")
-            if not contract_url_raw:
-                continue
-            try:
-                items = json.loads(contract_url_raw)
-            except Exception:
-                items = None
-            if not isinstance(items, list):
-                continue
-            changed = False
-            for item in items:
-                if item.get("sign_flow_id") == sign_flow_id:
-                    item["sign_url"] = sign_url
-                    changed = True
-            if changed:
-                await self._execute_with_retry(
-                    LawyerContract.__table__.update()
-                    .where(LawyerContract.id == row["id"])
-                    .values(contract_url=json.dumps(items, ensure_ascii=False))
-                )
-                updated = True
-        if updated:
-            await self.db.commit()
-        return updated
-
-    async def append_contract_url(self, templates_data, contract_item: dict):
-        lawyer_id = getattr(templates_data, "lawyer_id", None)
-        if lawyer_id is None:
-            return False
-
-        result = await self._execute_with_retry(
-            LawyerContract.__table__.select().where(LawyerContract.lawyer_id == lawyer_id)
-        )
-        rows = result.mappings().all()
-        updated = False
-        law_firm_name = getattr(templates_data, "law_firm_name", None)
-        if rows:
-            for row in rows:
-                contract_url_raw = row.get("contract_url")
-                try:
-                    items = json.loads(contract_url_raw) if contract_url_raw else []
-                except Exception:
-                    items = []
-                if not isinstance(items, list):
-                    items = []
-                items.append(contract_item)
-                update_values = {"contract_url": json.dumps(items, ensure_ascii=False)}
-                if law_firm_name:
-                    update_values["law_firm_name"] = law_firm_name
-                contact_phone = getattr(templates_data, "contact_phone", None)
-                if contact_phone:
-                    update_values["contact_phone"] = contact_phone
-                await self._execute_with_retry(
-                    LawyerContract.__table__.update()
-                    .where(LawyerContract.id == row["id"])
-                    .values(**update_values)
-                )
-                updated = True
-            if updated:
-                await self.db.commit()
-            return updated
-
-        new_record = LawyerContract(
-            lawyer_id=lawyer_id,
-            law_firm_name=law_firm_name,
-            business_segment=getattr(templates_data, "business_segment", None),
-            contact_name=getattr(templates_data, "contact_name", None),
-            contact_phone=getattr(templates_data, "contact_phone", None),
-            contract_url=json.dumps([contract_item], ensure_ascii=False),
-            ord_id=getattr(templates_data, "ord_id", None),
-            signing_status="未签署",
-        )
-        self.db.add(new_record)
-        await self.db.commit()
-        await self.db.refresh(new_record)
-        return True

+ 0 - 229
alien_lawyer/services/contract_server.py

@@ -1,229 +0,0 @@
-import datetime
-import json
-import logging
-import os
-from typing import Optional, Any
-
-from sqlalchemy.ext.asyncio import AsyncSession
-
-from alien_lawyer.repositories.contract_repo import LawyerContractRepository
-from alien_lawyer.schemas.request.contract_lawyer import LawyerTemplatesCreate
-from alien_contract.infrastructure.esign import main as esign_main
-from alien_contract.infrastructure.esign.contract_builder import build_contract_items, ContractBuildError
-from alien_contract.infrastructure.esign.main import sign_url, file_download_url
-
-LOG_DIR = os.path.join("common", "logs", "alien_lawyer")
-os.makedirs(LOG_DIR, exist_ok=True)
-
-
-def _init_logger():
-    logger = logging.getLogger("alien_lawyer_service")
-    if logger.handlers:
-        return logger
-    logger.setLevel(logging.INFO)
-    fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s %(message)s")
-    info_handler = logging.FileHandler(os.path.join(LOG_DIR, "info.log"), encoding="utf-8")
-    info_handler.setLevel(logging.INFO)
-    info_handler.setFormatter(fmt)
-    error_handler = logging.FileHandler(os.path.join(LOG_DIR, "error.log"), encoding="utf-8")
-    error_handler.setLevel(logging.ERROR)
-    error_handler.setFormatter(fmt)
-    logger.addHandler(info_handler)
-    logger.addHandler(error_handler)
-    return logger
-
-
-logger = _init_logger()
-
-LAWYER_CONTRACT_CREATE_CONFIGS = [
-    ("lawyer_agreement", "律所入驻协议", 1),
-    ("alipay_auth", "支付宝授权函", 0),
-    ("wechat_pay_commitment", "微信支付承诺函", 0),
-]
-
-
-class LawyerContractServer:
-    def __init__(self, db: AsyncSession):
-        self.db = db
-        self.repo = LawyerContractRepository(db)
-
-    async def create_esign_templates(self, templates_data: LawyerTemplatesCreate) -> dict:
-        logger.info("create lawyer esign templates request: %s", templates_data)
-        try:
-            generated_contracts = build_contract_items(
-                configs=LAWYER_CONTRACT_CREATE_CONFIGS,
-                template_name=templates_data.law_firm_name,
-                signer_name=templates_data.law_firm_name,
-                signer_id_num=templates_data.ord_id,
-                psn_account=templates_data.contact_phone,
-                psn_name=templates_data.contact_name,
-            )
-        except ContractBuildError as exc:
-            return {"success": False, "message": exc.message, "raw": exc.raw}
-
-        for contract_item in generated_contracts:
-            await self.repo.append_contract_url(templates_data, contract_item)
-
-        master_contract = next((item for item in generated_contracts if item.get("is_master") == 1), generated_contracts[0])
-        return {
-            "success": True,
-            "message": "律所合同模板已追加/创建",
-            "sign_flow_id": master_contract.get("sign_flow_id"),
-            "file_id": master_contract.get("file_id"),
-            "contract_url": master_contract.get("contract_url"),
-            "created_contracts": [
-                {
-                    "contract_type": item["contract_type"],
-                    "contract_name": item["contract_name"],
-                    "sign_flow_id": item["sign_flow_id"],
-                    "file_id": item["file_id"],
-                    "contract_url": item["contract_url"],
-                }
-                for item in generated_contracts
-            ],
-        }
-
-    async def list_contracts(self, lawyer_id: int, status: Optional[int], page: int, page_size: int) -> dict:
-        rows = await self.repo.get_by_lawyer_id(lawyer_id)
-        all_filtered_items: list[dict[str, Any]] = []
-        for row in rows:
-            contract_url_raw = row.get("contract_url")
-            if not contract_url_raw:
-                continue
-            try:
-                items = json.loads(contract_url_raw)
-                if not isinstance(items, list):
-                    continue
-                for item in items:
-                    if status is not None and item.get("status") != status:
-                        continue
-                    item_with_info = dict(item)
-                    item_with_info.update(
-                        {
-                            "id": row.get("id"),
-                            "lawyer_id": row.get("lawyer_id"),
-                            "law_firm_name": row.get("law_firm_name"),
-                            "contact_name": row.get("contact_name"),
-                            "contact_phone": row.get("contact_phone"),
-                        }
-                    )
-                    all_filtered_items.append(item_with_info)
-            except Exception:
-                continue
-
-        total = len(all_filtered_items)
-        start = (page - 1) * page_size
-        end = start + page_size
-        paged_items = all_filtered_items[start:end]
-        total_pages = (total + page_size - 1) // page_size if total > 0 else 0
-        return {
-            "items": paged_items,
-            "total": total,
-            "page": page,
-            "page_size": page_size,
-            "total_pages": total_pages,
-        }
-
-    async def get_contract_detail(self, sign_flow_id: str) -> dict:
-        row, item, items = await self.repo.get_contract_item_by_sign_flow_id(sign_flow_id)
-        if not item:
-            return {"success": False, "message": "未找到合同"}
-        status = item.get("status")
-        if status == 0:
-            return await self._get_pending_contract_detail(sign_flow_id, row, item, items)
-        if status == 1:
-            return await self._get_signed_contract_detail(sign_flow_id, row, item, items)
-        return {"success": False, "message": "未知合同状态", "raw": {"status": status}}
-
-    async def _get_pending_contract_detail(self, sign_flow_id: str, row, item, items) -> dict:
-        file_id = item.get("file_id")
-        if not file_id:
-            return {"success": False, "message": "缺少 file_id,无法获取合同详情"}
-        try:
-            detail_resp = esign_main.get_contract_detail(file_id)
-            detail_json = json.loads(detail_resp)
-            data = detail_json.get("data") if isinstance(detail_json, dict) else None
-            contract_url_val = None
-            if isinstance(data, dict):
-                contract_url_val = data.get("fileDownloadUrl")
-            if not contract_url_val and isinstance(detail_json, dict):
-                contract_url_val = detail_json.get("fileDownloadUrl")
-        except Exception as exc:
-            return {"success": False, "message": "获取合同链接失败", "raw": str(exc)}
-
-        if row and isinstance(items, list):
-            for it in items:
-                if it.get("sign_flow_id") == sign_flow_id:
-                    it["contract_url"] = contract_url_val
-                    break
-            await self.repo.update_contract_items(row["id"], items)
-
-        contact_phone = item.get("contact_phone") or (row.get("contact_phone") if isinstance(row, dict) else None)
-        if not contact_phone:
-            return {"success": False, "message": "缺少 contact_phone,无法获取签署链接"}
-        try:
-            sign_resp = sign_url(sign_flow_id, contact_phone)
-            sign_json = json.loads(sign_resp)
-            sign_data = sign_json.get("data") if isinstance(sign_json, dict) else None
-            result_sign_url = sign_data.get("url") if isinstance(sign_data, dict) else None
-        except Exception as exc:
-            return {"success": False, "message": "获取签署链接失败", "raw": str(exc)}
-
-        if not result_sign_url:
-            return {"success": False, "message": "e签宝返回缺少签署链接", "raw": sign_json}
-        await self.repo.update_sign_url(contact_phone, sign_flow_id, result_sign_url)
-        return {
-            "status": 0,
-            "contract_url": contract_url_val,
-            "sign_url": result_sign_url,
-            "sign_flow_id": sign_flow_id,
-        }
-
-    async def _get_signed_contract_detail(self, sign_flow_id: str, row, item, items) -> dict:
-        try:
-            download_resp = file_download_url(sign_flow_id)
-            download_json = json.loads(download_resp)
-            contract_download_url = download_json["data"]["files"][0]["downloadUrl"]
-        except Exception as exc:
-            return {"success": False, "message": "获取合同下载链接失败", "raw": str(exc)}
-
-        if row and isinstance(items, list):
-            for it in items:
-                if it.get("sign_flow_id") == sign_flow_id:
-                    it["contract_download_url"] = contract_download_url
-                    it["contract_url"] = contract_download_url
-                    break
-            await self.repo.update_contract_items(row["id"], items)
-
-        return {
-            "status": 1,
-            "contract_url": contract_download_url,
-            "contract_download_url": contract_download_url,
-            "sign_flow_id": sign_flow_id,
-        }
-
-    async def process_esign_callback(self, payload: dict) -> dict:
-        sign_result = payload.get("signResult")
-        sign_flow_id = payload.get("signFlowId")
-        operator = payload.get("operator") or {}
-        psn_account = operator.get("psnAccount") or {}
-        contact_phone = psn_account.get("accountMobile")
-        ts_ms = payload.get("operateTime") or payload.get("timestamp")
-        signing_dt = None
-        if ts_ms:
-            try:
-                signing_dt = datetime.datetime.fromtimestamp(ts_ms / 1000)
-            except Exception:
-                signing_dt = None
-
-        if sign_result == 2:
-            contract_download_url = None
-            try:
-                download_resp = file_download_url(sign_flow_id)
-                download_json = json.loads(download_resp)
-                contract_download_url = download_json["data"]["files"][0]["downloadUrl"]
-            except Exception:
-                contract_download_url = None
-            await self.repo.mark_signed_by_phone(contact_phone, sign_flow_id, signing_dt, contract_download_url)
-            return {"success": True, "code": "200", "msg": "success"}
-        return {"success": False, "message": "未处理: signResult!=2 或手机号/签署流程缺失"}

+ 0 - 5
alien_store/api/deps.py

@@ -1,13 +1,8 @@
 from fastapi import Depends
 from sqlalchemy.ext.asyncio import AsyncSession
 from alien_database.session import get_db
-from alien_store.services.contract_server import ContractServer
 from alien_contract.services.contract_server import ContractCenterService
 
 
-def get_contract_service(db: AsyncSession = Depends(get_db)) -> ContractServer:
-    return ContractServer(db)
-
-
 def get_contract_center_service(db: AsyncSession = Depends(get_db)) -> ContractCenterService:
     return ContractCenterService(db)

+ 7 - 97
alien_store/api/router.py

@@ -1,10 +1,9 @@
-import datetime
 import logging
-from fastapi import APIRouter, Depends, Query
-from typing import Any, Union, Optional
+from fastapi import APIRouter, Depends
+from typing import Any, Union
 from pydantic import ValidationError
 
-from alien_store.api.deps import get_contract_service, get_contract_center_service
+from alien_store.api.deps import get_contract_center_service
 from alien_contract.schemas.request.contract import BundleCreateRequest
 from alien_contract.services.contract_server import ContractCenterService
 from alien_store.schemas.request.contract_store import TemplatesCreate
@@ -12,15 +11,12 @@ from alien_store.schemas.response.contract_store import (
     ModuleStatusResponse,
     TemplatesCreateResponse,
     ErrorResponse,
-    ContractStoreResponse,
-    PaginatedResponse,
-    SuccessResponse
 )
-from alien_store.services.contract_server import ContractServer
 
 router = APIRouter()
 logger = logging.getLogger("alien_store")
 
+
 def _format_validation_errors(exc: ValidationError) -> list[dict[str, str]]:
     errors = []
     for err in exc.errors():
@@ -35,16 +31,18 @@ def _format_validation_errors(exc: ValidationError) -> list[dict[str, str]]:
         )
     return errors
 
+
 @router.get("/", response_model=ModuleStatusResponse)
 async def index() -> ModuleStatusResponse:
     return ModuleStatusResponse(module="Contract", status="Ok")
 
+
 @router.post("/get_esign_templates", response_model=Union[TemplatesCreateResponse, ErrorResponse])
 async def create_esign_templates(
     templates_data_raw: dict[str, Any],
     templates_server: ContractCenterService = Depends(get_contract_center_service)
 ) -> Union[TemplatesCreateResponse, ErrorResponse]:
-    """AI审核完调用 e签宝生成文件"""
+    """AI审核完调用 e签宝生成文件(统一走合同中心)"""
     try:
         templates_data = TemplatesCreate.model_validate(templates_data_raw)
     except ValidationError as e:
@@ -76,91 +74,3 @@ async def create_esign_templates(
         contract_url=result.get("created_contracts", [{}])[0].get("contract_url") if result.get("created_contracts") else None,
         created_contracts=result.get("created_contracts"),
     )
-
-@router.get("/contracts/{store_id}", response_model=Union[dict, Any])
-async def list_contracts(
-    store_id: int, 
-    status: Optional[int] = Query(None, description="筛选合同状态:0 未签署,1 已签署"),
-    page: int = Query(1, ge=1, description="页码,从1开始"),
-    page_size: int = Query(10, ge=1, le=100, description="每页条数,默认10"),
-    templates_server: ContractServer = Depends(get_contract_service)
-) -> Any:
-    """根据 store_id 查询所有合同,支持根据 status 筛选和分页"""
-    return await templates_server.list_contracts(store_id, status, page, page_size)
-
-@router.get("/contracts/detail/{sign_flow_id}", response_model=Union[dict, ErrorResponse])
-async def get_contract_detail(
-    sign_flow_id: str,
-    templates_server: ContractServer = Depends(get_contract_service)
-) -> Union[dict, ErrorResponse]:
-    """
-    根据 sign_flow_id 获取合同详情
-    - status=0: 返回合同PDF链接(contract_url)和签署链接(sign_url)
-    - status=1: 拉取最新下载链接并更新数据库,返回 contract_download_url
-    """
-    result = await templates_server.get_contract_detail(sign_flow_id)
-    if not result.get("success", True): # get_contract_detail 返回的成功结果里没有 success 键,只有 status
-        return ErrorResponse(**result)
-    return result
-
-@router.get("/get_all_templates", response_model=PaginatedResponse)
-async def get_all_templates(
-    page: int = Query(1, ge=1, description="页码,从1开始"),
-    page_size: int = Query(10, ge=1, le=100, description="每页条数,默认10"),
-    store_name: Optional[str] = Query(None, description="店铺名称(模糊查询)"),
-    merchant_name: Optional[str] = Query(None, description="商家姓名(模糊查询)"),
-    signing_status: Optional[str] = Query(None, description="签署状态"),
-    business_segment: Optional[str] = Query(None, description="经营板块"),
-    store_status: Optional[str] = Query(None, description="店铺状态:正常/禁用"),
-    expiry_start: Optional[datetime.datetime] = Query(None, description="到期时间起"),
-    expiry_end: Optional[datetime.datetime] = Query(None, description="到期时间止"),
-    templates_server: ContractServer = Depends(get_contract_service)
-) -> PaginatedResponse:
-    """分页查询所有合同,支持筛选"""
-    rows, total = await templates_server.list_all_paged(
-        page,
-        page_size,
-        store_name=store_name,
-        merchant_name=merchant_name,
-        signing_status=signing_status,
-        business_segment=business_segment,
-        store_status=store_status,
-        expiry_start=expiry_start,
-        expiry_end=expiry_end,
-    )
-    total_pages = (total + page_size - 1) // page_size if total > 0 else 0
-    
-    items = [ContractStoreResponse(**row) for row in rows]
-    return PaginatedResponse(
-        items=items,
-        total=total,
-        page=page,
-        page_size=page_size,
-        total_pages=total_pages
-    )
-
-@router.post("/esign/callback", response_model=Union[SuccessResponse, ErrorResponse])
-async def esign_callback(
-    payload: dict, 
-    templates_server: ContractServer = Depends(get_contract_service)
-) -> Union[SuccessResponse, ErrorResponse]:
-    """
-    e签宝签署结果回调
-    需求:签署完成 -> 更新 signing_status=已签署,contract_url 中 status=1
-    """
-    result = await templates_server.process_esign_callback(payload)
-    if not result.get("success"):
-        return ErrorResponse(**result)
-    return SuccessResponse(code=result["code"], msg=result["msg"])
-
-
-
-# @router.post("/esign/callback_auth", response_model=SuccessResponse)
-# async def esign_callback_auth(
-#     payload: dict,
-#     templates_server: ContractServer = Depends(get_contract_service)
-# ) -> SuccessResponse:
-#     logger.info(f"esign_callback_auth payload: {payload}")
-#     return SuccessResponse(code="200", msg="success")
-
-

+ 1 - 0
alien_store/db/models/__init__.py

@@ -0,0 +1 @@
+from .contract_store import ContractStore

+ 0 - 342
alien_store/repositories/contract_repo.py

@@ -1,342 +0,0 @@
-import logging
-import json
-from datetime import datetime, timedelta
-from sqlalchemy import func, select, text, or_
-from sqlalchemy.ext.asyncio import AsyncSession
-from sqlalchemy.exc import DBAPIError
-from alien_store.db.models.contract_store import ContractStore
-
-logger = logging.getLogger(__name__)
-
-
-class ContractRepository:
-    """合同数据访问层"""
-
-    def __init__(self, db: AsyncSession):
-        self.db = db
-
-    async def _execute_with_retry(self, statement):
-        """
-        封装一次重试:如果连接已失效(被 MySQL/网络关掉),回滚并重试一次。
-        """
-        try:
-            return await self.db.execute(statement)
-        except Exception as exc:
-            if self._should_retry(exc):
-                logger.warning("DB connection invalidated, retrying once: %s", exc)
-                try:
-                    await self.db.rollback()
-                except Exception:
-                    pass
-                return await self.db.execute(statement)
-            raise
-
-    @staticmethod
-    def _should_retry(exc: Exception) -> bool:
-        # SQLAlchemy 标准:connection_invalidated=True
-        if isinstance(exc, DBAPIError) and getattr(exc, "connection_invalidated", False):
-            return True
-        # 兜底判断常见“连接已关闭”文案(aiomysql/uvloop RuntimeError)
-        txt = str(exc).lower()
-        closed_keywords = ["closed", "lost connection", "connection was killed", "terminat"]
-        return any(k in txt for k in closed_keywords)
-
-    async def get_by_store_id(self, store_id: int):
-        """根据店铺id查询所有合同"""
-        result = await self._execute_with_retry(
-            ContractStore.__table__.select().where(ContractStore.store_id == store_id)
-        )
-        # 返回列表[dict],避免 Pydantic 序列化 Row 对象出错
-        return [dict(row) for row in result.mappings().all()]
-
-    async def check_store_status(self, store_id: int) -> str | None:
-        """
-        检查 store_info 表中对应 store_id 的 reason 字段
-        """
-        query = text("SELECT reason FROM store_info WHERE id = :store_id")
-        result = await self._execute_with_retry(query.bindparams(store_id=store_id))
-        row = result.fetchone()
-        return row[0] if row else None
-
-    async def get_contract_item_by_sign_flow_id(self, sign_flow_id: str):
-        """
-        根据 sign_flow_id 查找合同项,返回 (row, item, items)
-        """
-        result = await self._execute_with_retry(ContractStore.__table__.select())
-        rows = result.mappings().all()
-        for row in rows:
-            contract_url_raw = row.get("contract_url")
-            if not contract_url_raw:
-                continue
-            try:
-                items = json.loads(contract_url_raw)
-            except Exception:
-                items = None
-            if not isinstance(items, list):
-                continue
-            for item in items:
-                if item.get("sign_flow_id") == sign_flow_id:
-                    return dict(row), item, items
-        return None, None, None
-
-    async def update_contract_items(self, row_id: int, items: list) -> bool:
-        """
-        更新指定记录的 contract_url 列表
-        """
-        if not isinstance(items, list):
-            return False
-        await self._execute_with_retry(
-            ContractStore.__table__.update()
-            .where(ContractStore.id == row_id)
-            .values(contract_url=json.dumps(items, ensure_ascii=False))
-        )
-        await self.db.commit()
-        return True
-
-    async def get_all(self):
-        """查询所有合同"""
-        result = await self._execute_with_retry(ContractStore.__table__.select())
-        return [dict(row) for row in result.mappings().all()]
-
-    async def get_all_paged(
-        self,
-        page: int,
-        page_size: int = 10,
-        store_name: str | None = None,
-        merchant_name: str | None = None,
-        signing_status: str | None = None,
-        business_segment: str | None = None,
-        store_status: str | None = None,
-        expiry_start: datetime | None = None,
-        expiry_end: datetime | None = None,
-    ):
-        """分页查询所有合同,支持筛选,返回 (items, total)"""
-        offset = (page - 1) * page_size
-        table = ContractStore.__table__
-        conditions = []
-
-        if store_name:
-            conditions.append(table.c.store_name.like(f"%{store_name}%"))
-        if merchant_name:
-            conditions.append(table.c.merchant_name.like(f"%{merchant_name}%"))
-        if signing_status:
-            conditions.append(table.c.signing_status == signing_status)
-        if business_segment:
-            conditions.append(table.c.business_segment == business_segment)
-
-        if store_status:
-            if store_status == "正常":
-                conditions.append(table.c.signing_status == "已签署")
-            elif store_status == "禁用":
-                conditions.append(
-                    or_(table.c.signing_status != "已签署", table.c.signing_status.is_(None))
-                )
-
-        if expiry_start:
-            conditions.append(table.c.expiry_time >= expiry_start)
-        if expiry_end:
-            conditions.append(table.c.expiry_time <= expiry_end)
-
-        # 查询总数
-        count_stmt = select(func.count()).select_from(table)
-        if conditions:
-            count_stmt = count_stmt.where(*conditions)
-        count_result = await self._execute_with_retry(count_stmt)
-        total = count_result.scalar() or 0
-
-        # 查询分页数据
-        data_stmt = table.select()
-        if conditions:
-            data_stmt = data_stmt.where(*conditions)
-        data_stmt = data_stmt.offset(offset).limit(page_size)
-        result = await self._execute_with_retry(data_stmt)
-        items = [dict(row) for row in result.mappings().all()]
-        return items, total
-
-    async def create(self, user_data):
-        """创建未签署合同模板"""
-        db_templates = ContractStore(
-            store_id=user_data.store_id,
-            store_name=getattr(user_data, "store_name", None),
-            merchant_name=user_data.merchant_name,
-            business_segment=user_data.business_segment,
-            contact_phone=user_data.contact_phone,
-            contract_url=user_data.contract_url,
-            ord_id=user_data.ord_id,
-            signing_status='未签署'
-        )
-        self.db.add(db_templates)
-        await self.db.commit()
-        await self.db.refresh(db_templates)
-        return db_templates
-
-    async def mark_signed_by_phone(self, contact_phone: str, sign_flow_id: str, signing_time: datetime | None = None, contract_download_url: str | None = None):
-        """
-        根据手机号 + sign_flow_id 将合同标记为已签署,只更新匹配的合同项
-        当 is_master 为 1 时,更新签署状态和时间字段
-        同时写入签署/生效/到期时间(签署时间=T,生效=T+1天0点,失效=生效+365天)
-        同时更新 contract_download_url 到对应的字典中
-        """
-        result = await self._execute_with_retry(
-            ContractStore.__table__.select().where(ContractStore.contact_phone == contact_phone)
-        )
-        rows = result.mappings().all()
-        updated = False
-        for row in rows:
-            contract_url_raw = row.get("contract_url")
-            items = None
-            if contract_url_raw:
-                try:
-                    items = json.loads(contract_url_raw)
-                except Exception:
-                    items = None
-            changed = False
-            matched_item = None
-            if isinstance(items, list):
-                for item in items:
-                    if item.get("sign_flow_id") == sign_flow_id:
-                        item["status"] = 1
-                        # 更新 contract_download_url
-                        if contract_download_url:
-                            item["contract_download_url"] = contract_download_url
-                        matched_item = item
-                        changed = True
-                        break
-            
-            # 只有当 is_master 为 1 时才更新时间字段
-            if changed and matched_item and matched_item.get("is_master") == 1:
-                # 时间处理
-                signing_dt = signing_time
-                effective_dt = expiry_dt = None
-                if signing_dt:
-                    # effective_time 是 signing_time 第二天的 0 点
-                    effective_dt = (signing_dt + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
-                    expiry_dt = effective_dt + timedelta(days=365)
-                    
-                    # 更新 contract_url 中对应字典的时间字段
-                    matched_item["signing_time"] = signing_dt.strftime("%Y-%m-%d %H:%M:%S") if signing_dt else ""
-                    matched_item["effective_time"] = effective_dt.strftime("%Y-%m-%d %H:%M:%S") if effective_dt else ""
-                    matched_item["expiry_time"] = expiry_dt.strftime("%Y-%m-%d %H:%M:%S") if expiry_dt else ""
-                
-                await self._execute_with_retry(
-                    ContractStore.__table__.update()
-                    .where(ContractStore.id == row["id"])
-                    .values(
-                        signing_status="已签署",
-                        contract_url=json.dumps(items, ensure_ascii=False) if items else contract_url_raw,
-                        signing_time=signing_dt,
-                        effective_time=effective_dt,
-                        expiry_time=expiry_dt,
-                    )
-                )
-                updated = True
-            elif changed:
-                # is_master 不为 1 时,只更新 status,不更新时间字段
-                await self._execute_with_retry(
-                    ContractStore.__table__.update()
-                    .where(ContractStore.id == row["id"])
-                    .values(
-                        contract_url=json.dumps(items, ensure_ascii=False) if items else contract_url_raw,
-                    )
-                )
-                updated = True
-        if updated:
-            await self.db.commit()
-        return updated
-
-    async def update_sign_url(self, contact_phone: str, sign_flow_id: str, sign_url: str):
-        """
-        根据手机号 + sign_flow_id 更新 contract_url 列表中对应项的 sign_url
-        """
-        result = await self._execute_with_retry(
-            ContractStore.__table__.select().where(ContractStore.contact_phone == contact_phone)
-        )
-        rows = result.mappings().all()
-        updated = False
-        for row in rows:
-            contract_url_raw = row.get("contract_url")
-            if not contract_url_raw:
-                continue
-            try:
-                items = json.loads(contract_url_raw)
-            except Exception:
-                items = None
-            if not isinstance(items, list):
-                continue
-            changed = False
-            for item in items:
-                if item.get("sign_flow_id") == sign_flow_id:
-                    item["sign_url"] = sign_url
-                    changed = True
-            if changed:
-                await self._execute_with_retry(
-                    ContractStore.__table__.update()
-                    .where(ContractStore.id == row["id"])
-                    .values(contract_url=json.dumps(items, ensure_ascii=False))
-                )
-                updated = True
-        if updated:
-            await self.db.commit()
-        return updated
-
-    async def append_contract_url(self, templates_data, contract_item: dict):
-        """
-        根据 store_id,向 contract_url(JSON 列表)追加新的合同信息;
-        若 store_id 不存在,则创建新记录。
-        """
-        store_id = getattr(templates_data, "store_id", None)
-        if store_id is None:
-            logger.error("append_contract_url missing store_id")
-            return False
-
-        result = await self._execute_with_retry(
-            ContractStore.__table__.select().where(ContractStore.store_id == store_id)
-        )
-        rows = result.mappings().all()
-        updated = False
-        store_name = getattr(templates_data, "store_name", None)
-        if rows:
-            for row in rows:
-                contract_url_raw = row.get("contract_url")
-                try:
-                    items = json.loads(contract_url_raw) if contract_url_raw else []
-                except Exception:
-                    items = []
-                if not isinstance(items, list):
-                    items = []
-                items.append(contract_item)
-                update_values = {"contract_url": json.dumps(items, ensure_ascii=False)}
-                if store_name:
-                    update_values["store_name"] = store_name
-                contact_phone = getattr(templates_data, "contact_phone", None)
-                if contact_phone:
-                    update_values["contact_phone"] = contact_phone
-                await self._execute_with_retry(
-                    ContractStore.__table__.update()
-                    .where(ContractStore.id == row["id"])
-                    .values(**update_values)
-                )
-                updated = True
-            if updated:
-                await self.db.commit()
-            return updated
-        # 未找到则创建新记录
-        new_record = ContractStore(
-            store_id=store_id,
-            store_name=store_name,
-            business_segment=getattr(templates_data, "business_segment", None),
-            merchant_name=getattr(templates_data, "merchant_name", None),
-            contact_phone=getattr(templates_data, "contact_phone", None),
-            contract_url=json.dumps([contract_item], ensure_ascii=False),
-            ord_id=getattr(templates_data, "ord_id", None),
-            signing_status='未签署'
-        )
-        self.db.add(new_record)
-        await self.db.commit()
-        await self.db.refresh(new_record)
-        return True
-
-
-
-
-

+ 0 - 327
alien_store/services/contract_server.py

@@ -1,327 +0,0 @@
-import datetime
-import os
-import logging
-import json
-from typing import Any, Union, Optional
-from sqlalchemy.ext.asyncio import AsyncSession
-from alien_store.repositories.contract_repo import ContractRepository
-from alien_store.schemas.request.contract_store import TemplatesCreate
-from alien_contract.infrastructure.esign import main as esign_main
-from alien_contract.infrastructure.esign.main import sign_url, file_download_url
-from alien_contract.infrastructure.esign.contract_builder import build_contract_items, ContractBuildError
-
-
-# ------------------- 日志配置 -------------------
-LOG_DIR = os.path.join("common", "logs", "alien_store")
-os.makedirs(LOG_DIR, exist_ok=True)
-
-def _init_logger():
-    logger = logging.getLogger("alien_store_service")
-    if logger.handlers:
-        return logger
-    logger.setLevel(logging.INFO)
-    fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s %(message)s")
-    info_handler = logging.FileHandler(os.path.join(LOG_DIR, "info.log"), encoding="utf-8")
-    info_handler.setLevel(logging.INFO)
-    info_handler.setFormatter(fmt)
-    error_handler = logging.FileHandler(os.path.join(LOG_DIR, "error.log"), encoding="utf-8")
-    error_handler.setLevel(logging.ERROR)
-    error_handler.setFormatter(fmt)
-    logger.addHandler(info_handler)
-    logger.addHandler(error_handler)
-    return logger
-
-logger = _init_logger()
-
-CONTRACT_CREATE_CONFIGS = [
-    ("store_agreement", "店铺入驻协议", 1),
-    ("alipay_auth", "支付宝授权函", 0),
-    ("wechat_pay_commitment", "微信支付承诺函", 0),
-]
-
-
-class ContractServer:
-    def __init__(self, db: AsyncSession):
-        self.db = db
-        self.esign_repo = ContractRepository(db)
-
-    async def create_template(self, template_data: TemplatesCreate):
-        await self.esign_repo.create(template_data)
-        return {
-            "message": "模板创建成功"
-        }
-
-    async def create_esign_templates(self, templates_data: TemplatesCreate) -> dict:
-        """AI审核完调用 e签宝生成文件"""
-        logger.info(f"create_esign_templates request: {templates_data}")
-        try:
-            generated_contracts = build_contract_items(
-                configs=CONTRACT_CREATE_CONFIGS,
-                template_name=templates_data.store_name,
-                signer_name=templates_data.store_name,
-                signer_id_num=templates_data.ord_id,
-                psn_account=templates_data.contact_phone,
-                psn_name=templates_data.merchant_name,
-            )
-        except ContractBuildError as exc:
-            return {"success": False, "message": exc.message, "raw": exc.raw}
-
-        for contract_item in generated_contracts:
-            await self.esign_repo.append_contract_url(templates_data, contract_item)
-
-        master_contract = next((item for item in generated_contracts if item.get("is_master") == 1), generated_contracts[0])
-        logger.info(
-            "create_esign_templates success contact_phone=%s master_sign_flow_id=%s all_sign_flow_ids=%s",
-            templates_data.contact_phone,
-            master_contract.get("sign_flow_id"),
-            [item.get("sign_flow_id") for item in generated_contracts],
-        )
-
-        return {
-            "success": True,
-            "message": "合同模板已追加/创建",
-            "sign_flow_id": master_contract.get("sign_flow_id"),
-            "file_id": master_contract.get("file_id"),
-            "contract_url": master_contract.get("contract_url"),
-            "created_contracts": [
-                {
-                    "contract_type": item["contract_type"],
-                    "contract_name": item["contract_name"],
-                    "sign_flow_id": item["sign_flow_id"],
-                    "file_id": item["file_id"],
-                    "contract_url": item["contract_url"],
-                }
-                for item in generated_contracts
-            ],
-        }
-
-    async def list_contracts(self, store_id: int, status: Optional[int], page: int, page_size: int) -> dict:
-        """根据 store_id 查询所有合同,支持根据 status 筛选和分页"""
-        logger.info(
-            "list_contracts request store_id=%s status=%s page=%s page_size=%s",
-            store_id, status, page, page_size
-        )
-        
-        # 1. 检查 store_info 中的审核状态
-        reason = await self.esign_repo.check_store_status(store_id)
-        if reason != "审核通过":
-            return {"code": 555, "msg": "先进行认证", "reason": reason}
-
-        # 2. 获取原始数据
-        rows = await self.esign_repo.get_by_store_id(store_id)
-        
-        all_filtered_items = []
-        # 3. 解析并筛选
-        for row in rows:
-            contract_url_raw = row.get("contract_url")
-            if not contract_url_raw:
-                continue
-            try:
-                items = json.loads(contract_url_raw)
-                if not isinstance(items, list):
-                    continue
-                
-                for item in items:
-                    if status is not None and item.get("status") != status:
-                        continue
-                    
-                    item_with_info = dict(item)
-                    item_with_info.update({
-                        "id": row.get("id"),
-                        "store_id": row.get("store_id"),
-                        "store_name": row.get("store_name"),
-                        "merchant_name": row.get("merchant_name"),
-                        "contact_phone": row.get("contact_phone")
-                    })
-                    all_filtered_items.append(item_with_info)
-            except Exception as e:
-                logger.error(f"Error processing contracts for store_id {store_id}: {e}", exc_info=True)
-                continue
-
-        # 4. 手动分页
-        total = len(all_filtered_items)
-        start = (page - 1) * page_size
-        end = start + page_size
-        paged_items = all_filtered_items[start:end]
-        total_pages = (total + page_size - 1) // page_size if total > 0 else 0
-
-        return {
-            "items": paged_items,
-            "total": total,
-            "page": page,
-            "page_size": page_size,
-            "total_pages": total_pages
-        }
-
-    async def get_contract_detail(self, sign_flow_id: str) -> dict:
-        """获取合同详情"""
-        row, item, items = await self.esign_repo.get_contract_item_by_sign_flow_id(sign_flow_id)
-        if not item:
-            return {"success": False, "message": "未找到合同"}
-
-        status = item.get("status")
-        if status == 0:
-            return await self._get_pending_contract_detail(sign_flow_id, row, item, items)
-        elif status == 1:
-            return await self._get_signed_contract_detail(sign_flow_id, row, item, items)
-        
-        return {"success": False, "message": "未知合同状态", "raw": {"status": status}}
-
-    async def _get_pending_contract_detail(self, sign_flow_id: str, row, item, items) -> dict:
-        file_id = item.get("file_id")
-        if not file_id:
-            return {"success": False, "message": "缺少 file_id,无法获取合同详情"}
-        
-        try:
-            detail_resp = esign_main.get_contract_detail(file_id)
-            detail_json = json.loads(detail_resp)
-            data = detail_json.get("data") if isinstance(detail_json, dict) else None
-            contract_url_val = None
-            if isinstance(data, dict):
-                contract_url_val = data.get("fileDownloadUrl")
-            if not contract_url_val and isinstance(detail_json, dict):
-                contract_url_val = detail_json.get("fileDownloadUrl")
-        except Exception as e:
-            logger.error(f"get_contract_detail failed file_id={file_id}: {e}")
-            return {"success": False, "message": "获取合同链接失败", "raw": str(e)}
-
-        if not contract_url_val:
-            logger.error(f"get_contract_detail missing contract_url file_id={file_id}: {detail_resp}")
-            return {"success": False, "message": "e签宝返回缺少合同链接", "raw": detail_resp}
-
-        # 更新数据库中的合同链接
-        if row and isinstance(items, list):
-            for it in items:
-                if it.get("sign_flow_id") == sign_flow_id:
-                    it["contract_url"] = contract_url_val
-                    break
-            await self.esign_repo.update_contract_items(row["id"], items)
-
-        # 获取签署链接
-        contact_phone = item.get("contact_phone") or (row.get("contact_phone") if isinstance(row, dict) else None)
-        if not contact_phone:
-            return {"success": False, "message": "缺少 contact_phone,无法获取签署链接"}
-        
-        try:
-            sign_resp = sign_url(sign_flow_id, contact_phone)
-            sign_json = json.loads(sign_resp)
-            sign_data = sign_json.get("data") if isinstance(sign_json, dict) else None
-            result_sign_url = sign_data.get("url") if isinstance(sign_data, dict) else None
-        except Exception as e:
-            logger.error(f"sign_url failed sign_flow_id={sign_flow_id}, contact_phone={contact_phone}: {e}")
-            return {"success": False, "message": "获取签署链接失败", "raw": str(e)}
-
-        if not result_sign_url:
-            logger.error(f"sign_url missing url: {sign_json}")
-            return {"success": False, "message": "e签宝返回缺少签署链接", "raw": sign_json}
-            
-        await self.esign_repo.update_sign_url(contact_phone, sign_flow_id, result_sign_url)
-
-        return {
-            "status": 0,
-            "contract_url": contract_url_val,
-            "sign_url": result_sign_url,
-            "sign_flow_id": sign_flow_id
-        }
-
-    async def _get_signed_contract_detail(self, sign_flow_id: str, row, item, items) -> dict:
-        try:
-            download_resp = file_download_url(sign_flow_id)
-            download_json = json.loads(download_resp)
-            contract_download_url = download_json["data"]["files"][0]["downloadUrl"]
-        except Exception as e:
-            logger.error(f"file_download_url failed sign_flow_id={sign_flow_id}: {e}")
-            return {"success": False, "message": "获取合同下载链接失败", "raw": str(e)}
-            
-        if row and isinstance(items, list):
-            for it in items:
-                if it.get("sign_flow_id") == sign_flow_id:
-                    it["contract_download_url"] = contract_download_url
-                    it["contract_url"] = contract_download_url
-                    break
-            await self.esign_repo.update_contract_items(row["id"], items)
-
-        return {
-            "status": 1,
-            "contract_url": contract_download_url,
-            "contract_download_url": contract_download_url,
-            "sign_flow_id": sign_flow_id
-        }
-
-    async def process_esign_callback(self, payload: dict) -> dict:
-        """处理 e签宝 回调"""
-        sign_result = payload.get("signResult")
-        sign_flow_id = payload.get("signFlowId")
-        operator = payload.get("operator") or {}
-        psn_account = operator.get("psnAccount") or {}
-        contact_phone = psn_account.get("accountMobile")
-        
-        ts_ms = payload.get("operateTime") or payload.get("timestamp")
-        signing_dt = None
-        if ts_ms:
-            try:
-                signing_dt = datetime.datetime.fromtimestamp(ts_ms / 1000)
-            except Exception:
-                signing_dt = None
-
-        if sign_result == 2:
-            contract_download_url = None
-            try:
-                download_resp = file_download_url(sign_flow_id)
-                download_json = json.loads(download_resp)
-                contract_download_url = download_json["data"]["files"][0]["downloadUrl"]
-            except Exception as e:
-                logger.error(f"file_download_url failed for sign_flow_id={sign_flow_id}: {e}")
-            
-            updated = await self.esign_repo.mark_signed_by_phone(contact_phone, sign_flow_id, signing_dt, contract_download_url)
-            logger.info(f"esign_callback success phone={contact_phone}, sign_flow_id={sign_flow_id}, updated={updated}")
-            return {"success": True, "code": "200", "msg": "success"}
-            
-        logger.error(f"esign_callback ignored payload: {payload}")
-        return {"success": False, "message": "未处理: signResult!=2 或手机号/签署流程缺失"}
-
-    async def list_by_store(self, store_id: int):
-        return await self.esign_repo.get_by_store_id(store_id)
-
-    async def get_store_reason(self, store_id: int) -> str | None:
-        return await self.esign_repo.check_store_status(store_id)
-
-    async def get_contract_item_by_sign_flow_id(self, sign_flow_id: str):
-        return await self.esign_repo.get_contract_item_by_sign_flow_id(sign_flow_id)
-
-    async def update_contract_items(self, row_id: int, items: list) -> bool:
-        return await self.esign_repo.update_contract_items(row_id, items)
-
-    async def list_all_paged(
-        self,
-        page: int,
-        page_size: int = 10,
-        store_name: str | None = None,
-        merchant_name: str | None = None,
-        signing_status: str | None = None,
-        business_segment: str | None = None,
-        store_status: str | None = None,
-        expiry_start=None,
-        expiry_end=None,
-    ):
-        items, total = await self.esign_repo.get_all_paged(
-            page,
-            page_size,
-            store_name=store_name,
-            merchant_name=merchant_name,
-            signing_status=signing_status,
-            business_segment=business_segment,
-            store_status=store_status,
-            expiry_start=expiry_start,
-            expiry_end=expiry_end,
-        )
-        return items, total
-
-    async def mark_signed_by_phone(self, contact_phone: str, sign_flow_id: str, signing_time, contract_download_url):
-        return await self.esign_repo.mark_signed_by_phone(contact_phone, sign_flow_id, signing_time, contract_download_url)
-
-    async def update_sign_url(self, contact_phone: str, sign_flow_id: str, sign_url: str):
-        return await self.esign_repo.update_sign_url(contact_phone, sign_flow_id, sign_url)
-
-    async def append_contract_url(self, templates_data, contract_item: dict):
-        return await self.esign_repo.append_contract_url(templates_data, contract_item)

+ 49 - 25
alien_util/tasks/contract_tasks.py

@@ -5,7 +5,8 @@ from datetime import datetime, timedelta
 from sqlalchemy import create_engine, select
 from sqlalchemy.orm import Session
 from alien_gateway.config import settings
-from alien_store.db.models.contract_store import ContractStore
+from alien_contract.db.models.bundle import ContractBundle
+from alien_contract.db.models.document import ContractDocument
 from alien_util.celery_app import celery_app
 from common.aliyun_sms_server.sms_client import send_sms
 import logging
@@ -40,54 +41,77 @@ def get_sync_db_session():
 
 
 @celery_app.task(name="alien_util.tasks.contract_tasks.check_contract_expiry")
-def check_contract_expiry():
+def check_contract_expiry(subject_type: str | None = "store"):
     """
     检查合同到期时间,如果距离到期不足15天,发送提醒短信
     每天凌晨0点1分执行
+
+    :param subject_type: 签约主体类型过滤,默认 "store"(仅商家);传 None 则不过滤(store+lawyer 全部)
     """
-    logger.info("开始执行合同到期检查任务")
+    logger.info("开始执行合同到期检查任务 subject_type=%s", subject_type)
     
     try:
         # 获取数据库会话
         db = get_sync_db_session()
         
         try:
+            now = datetime.now()
             # 计算15天后的日期
-            check_date = datetime.now() + timedelta(days=15)
-            
-            # 查询即将到期的合同(expiry_time 在15天内,且不为空)
-            # 只查询已签署的合同(signing_status = '已签署')
-            stmt = select(ContractStore).where(
-                ContractStore.expiry_time.isnot(None),
-                ContractStore.expiry_time <= check_date,
-                ContractStore.expiry_time >= datetime.now(),
-                ContractStore.signing_status == "已签署",
-                ContractStore.delete_flag == 0
+            check_date = now + timedelta(days=15)
+
+            # 合同中心:仅对"主合同(is_primary=1) 且 已签署(status=1)"发提醒,
+            # 避免同一合同包内多份子合同(支付宝授权函/微信承诺函等)重复发送
+            conditions = [
+                ContractDocument.is_primary == 1,
+                ContractDocument.status == 1,
+                ContractDocument.expiry_time.isnot(None),
+                ContractDocument.expiry_time <= check_date,
+                ContractDocument.expiry_time >= now,
+                ContractDocument.delete_flag == 0,
+                ContractBundle.delete_flag == 0,
+            ]
+            if subject_type:
+                conditions.append(ContractBundle.subject_type == subject_type)
+
+            stmt = (
+                select(
+                    ContractDocument.id,
+                    ContractDocument.expiry_time,
+                    ContractBundle.subject_name,
+                    ContractBundle.contact_phone,
+                    ContractBundle.subject_type,
+                )
+                .join(ContractBundle, ContractDocument.bundle_id == ContractBundle.id)
+                .where(*conditions)
             )
-            
-            result = db.execute(stmt)
-            contracts = result.scalars().all()
-            
+
+            contracts = db.execute(stmt).all()
+
             logger.info(f"找到 {len(contracts)} 条即将到期的合同")
-            
+
             # 遍历即将到期的合同,发送提醒短信
             for contract in contracts:
                 try:
                     # 计算距离到期的天数
-                    days_until_expiry = (contract.expiry_time - datetime.now()).days
-                    
+                    days_until_expiry = (contract.expiry_time - now).days
+
                     logger.info(
-                        f"合同ID: {contract.id}, "
-                        f"商家: {contract.merchant_name}, "
+                        f"合同文档ID: {contract.id}, "
+                        f"主体类型: {contract.subject_type}, "
+                        f"主体名称: {contract.subject_name}, "
                         f"联系电话: {contract.contact_phone}, "
                         f"到期时间: {contract.expiry_time}, "
                         f"距离到期: {days_until_expiry} 天"
                     )
 
-                    send_expiry_reminder_sms(contract.contact_phone, contract.merchant_name, settings.ALIYUN_SMS_SIGN_NAME_CONTRACT, settings.ALIYUN_SMS_TEMPLATE_CODE_CONTRACT)
-                    
+                    send_expiry_reminder_sms(
+                        contract.contact_phone,
+                        contract.subject_name,
+                        settings.ALIYUN_SMS_SIGN_NAME_CONTRACT,
+                        settings.ALIYUN_SMS_TEMPLATE_CODE_CONTRACT,
+                    )
                 except Exception as e:
-                    logger.error(f"处理合同ID {contract.id} 时出错: {e}", exc_info=True)
+                    logger.error(f"处理合同文档ID {contract.id} 时出错: {e}", exc_info=True)
             
             logger.info("合同到期检查任务执行完成")
             

+ 245 - 0
scripts/reconcile_contract_status.py

@@ -0,0 +1,245 @@
+"""
+合同签署状态对账/回刷脚本(一次性运维工具)
+
+背景:历史回调逻辑把"平台方自动盖章(SIGN_MISSON_COMPLETE / signResult=2)"误判为整份合同已签署,
+导致部分 contract_document 被错误标记为 status=1,并写入了早 8 小时(UTC)的 signing_time。
+本脚本以 e签宝 流程真实状态 signFlowStatus 为准,回刷 contract_document 与 contract_bundle 状态。
+
+判定规则(signFlowStatus: 0草稿 1签署中 2完成 3撤销 4终止 5过期 6删除 7拒签):
+  - == 2 完成:保持/置为已签署(status=1),并用 signFlowFinishTime 重算东八区 signing_time / effective_time / expiry_time,补全 download_url
+  - != 2     :置为未签署(status=0),清空 signing_time / effective_time / expiry_time
+
+安全说明:
+  - 默认 dry-run,只打印将要做的变更,不写库;加 --apply 才真正提交。
+  - 通过 APP_ENV 选择环境(与服务一致):生产为 APP_ENV=produ。
+
+用法示例(PowerShell):
+  $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py                 # 预览(dry-run)
+  $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py --apply         # 实际回刷
+  $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py --status 1      # 只检查当前已签署的
+  $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py --sign-flow-id xxxx  # 只处理某一条
+"""
+import argparse
+import datetime
+import json
+import os
+import sys
+import time
+
+# 确保可以从项目根目录导入业务模块
+ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+if ROOT_DIR not in sys.path:
+    sys.path.insert(0, ROOT_DIR)
+
+from sqlalchemy import create_engine, select
+from sqlalchemy.orm import Session
+
+from alien_gateway.config import settings
+from alien_contract.db.models.bundle import ContractBundle
+from alien_contract.db.models.document import ContractDocument
+from alien_contract.infrastructure.esign.main import query_sign_flow_detail, file_download_url
+from alien_contract.services.contract_server import _ms_to_naive_cn, _extract_download_url
+
+ESIGN_FLOW_STATUS_COMPLETED = 2
+FLOW_STATUS_DESC = {
+    0: "草稿",
+    1: "签署中",
+    2: "完成",
+    3: "撤销",
+    4: "终止",
+    5: "过期",
+    6: "删除",
+    7: "拒签",
+}
+
+
+def _query_flow(sign_flow_id: str):
+    """返回 (signFlowStatus, signFlowFinishTime毫秒);失败返回 (None, None)"""
+    try:
+        resp = query_sign_flow_detail(sign_flow_id)
+        data = json.loads(resp)
+    except Exception as exc:  # noqa: BLE001
+        print(f"    [WARN] 查询流程失败 sign_flow_id={sign_flow_id} err={exc}")
+        return None, None
+    body = data.get("data") if isinstance(data, dict) else None
+    if not isinstance(body, dict):
+        print(f"    [WARN] 流程查询无 data sign_flow_id={sign_flow_id} resp={data}")
+        return None, None
+    return body.get("signFlowStatus"), body.get("signFlowFinishTime")
+
+
+def _fetch_download_url(sign_flow_id: str):
+    try:
+        resp = file_download_url(sign_flow_id)
+        url, _ = _extract_download_url(resp)
+        return url
+    except Exception as exc:  # noqa: BLE001
+        print(f"    [WARN] 获取下载链接失败 sign_flow_id={sign_flow_id} err={exc}")
+        return None
+
+
+def _derive_times(finish_ms):
+    signing_dt = _ms_to_naive_cn(finish_ms)
+    effective_dt = expiry_dt = None
+    if signing_dt:
+        effective_dt = (signing_dt + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
+        expiry_dt = effective_dt + datetime.timedelta(days=365)
+    return signing_dt, effective_dt, expiry_dt
+
+
+def _recalc_bundle_status(session: Session, bundle_id: int) -> str:
+    rows = session.execute(
+        select(ContractDocument.status).where(
+            ContractDocument.bundle_id == bundle_id,
+            ContractDocument.delete_flag == 0,
+        )
+    ).all()
+    statuses = [r[0] for r in rows]
+    if not statuses:
+        return "未签署"
+    if all(s == 1 for s in statuses):
+        return "已签署"
+    if any(s == 1 for s in statuses):
+        return "审核中"
+    return "未签署"
+
+
+def main():
+    parser = argparse.ArgumentParser(description="合同签署状态对账/回刷")
+    parser.add_argument("--apply", action="store_true", help="实际写库(默认 dry-run 仅预览)")
+    parser.add_argument("--status", choices=["0", "1", "all"], default="all", help="只检查指定 status 的文档,默认 all")
+    parser.add_argument("--sign-flow-id", default=None, help="只处理指定 sign_flow_id")
+    parser.add_argument("--sleep", type=float, default=0.1, help="每条查询之间的间隔秒数,默认0.1")
+    args = parser.parse_args()
+
+    dry_run = not args.apply
+
+    print("=" * 80)
+    print(f"环境 APP_ENV = {settings.APP_ENV}")
+    print(f"目标数据库   = {settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}")
+    print(f"模式         = {'DRY-RUN(仅预览,不写库)' if dry_run else '!!! APPLY(实际写库)!!!'}")
+    print("=" * 80)
+
+    engine = create_engine(settings.SQLALCHEMY_DATABASE_URI, pool_pre_ping=True)
+
+    conditions = [ContractDocument.delete_flag == 0]
+    if args.sign_flow_id:
+        conditions.append(ContractDocument.sign_flow_id == args.sign_flow_id)
+    elif args.status != "all":
+        conditions.append(ContractDocument.status == int(args.status))
+
+    counters = {"total": 0, "to_signed": 0, "to_unsigned": 0, "time_fixed": 0, "unchanged": 0, "skipped": 0}
+    affected_bundle_ids: set[int] = set()
+
+    with Session(engine) as session:
+        docs = session.execute(select(ContractDocument).where(*conditions).order_by(ContractDocument.id)).scalars().all()
+        print(f"待检查文档数:{len(docs)}\n")
+
+        for doc in docs:
+            counters["total"] += 1
+            flow_status, finish_ms = _query_flow(doc.sign_flow_id)
+            time.sleep(args.sleep)
+            status_desc = FLOW_STATUS_DESC.get(flow_status, "未知")
+            head = (
+                f"[doc#{doc.id} bundle#{doc.bundle_id}] {doc.contract_name} "
+                f"sign_flow_id={doc.sign_flow_id} 当前status={doc.status} -> e签宝={flow_status}({status_desc})"
+            )
+
+            if flow_status is None:
+                counters["skipped"] += 1
+                print(f"{head}  => 跳过(查询失败)")
+                continue
+
+            if flow_status == ESIGN_FLOW_STATUS_COMPLETED:
+                signing_dt, effective_dt, expiry_dt = _derive_times(finish_ms)
+                new_download = doc.download_url or ""
+                if not new_download:
+                    fetched = _fetch_download_url(doc.sign_flow_id)
+                    if fetched:
+                        new_download = fetched
+
+                needs_status = doc.status != 1
+                needs_time = (
+                    doc.signing_time != signing_dt
+                    or doc.effective_time != effective_dt
+                    or doc.expiry_time != expiry_dt
+                )
+                needs_download = bool(new_download) and (doc.download_url or "") != new_download
+
+                if not (needs_status or needs_time or needs_download):
+                    counters["unchanged"] += 1
+                    print(f"{head}  => 无需变更(已正确为已签署)")
+                    continue
+
+                if needs_status:
+                    counters["to_signed"] += 1
+                if needs_time and not needs_status:
+                    counters["time_fixed"] += 1
+
+                print(
+                    f"{head}  => 置为已签署: status {doc.status}->1, "
+                    f"signing_time {doc.signing_time!r}->{signing_dt!r}, "
+                    f"effective {doc.effective_time!r}->{effective_dt!r}, "
+                    f"expiry {doc.expiry_time!r}->{expiry_dt!r}"
+                    + (", 补全download_url" if needs_download else "")
+                )
+                # 改动只作用于当前事务(dry-run 结束会 rollback),便于后续 bundle 状态准确重算
+                doc.status = 1
+                doc.signing_time = signing_dt
+                doc.effective_time = effective_dt
+                doc.expiry_time = expiry_dt
+                if needs_download:
+                    doc.download_url = new_download
+                affected_bundle_ids.add(doc.bundle_id)
+            else:
+                if doc.status == 0 and doc.signing_time is None and doc.effective_time is None and doc.expiry_time is None:
+                    counters["unchanged"] += 1
+                    print(f"{head}  => 无需变更(已正确为未签署)")
+                    continue
+                counters["to_unsigned"] += 1
+                print(
+                    f"{head}  => 回退为未签署: status {doc.status}->0, 清空 signing/effective/expiry "
+                    f"(原 signing_time={doc.signing_time!r})"
+                )
+                doc.status = 0
+                doc.signing_time = None
+                doc.effective_time = None
+                doc.expiry_time = None
+                affected_bundle_ids.add(doc.bundle_id)
+
+        # 重算受影响合同包整体状态
+        if affected_bundle_ids:
+            print("\n--- 合同包状态重算 ---")
+        for bundle_id in sorted(affected_bundle_ids):
+            bundle = session.get(ContractBundle, bundle_id)
+            if not bundle:
+                continue
+            new_status = _recalc_bundle_status(session, bundle_id)
+            if bundle.status != new_status:
+                print(f"[bundle#{bundle_id}] {bundle.subject_name} status {bundle.status} -> {new_status}")
+                bundle.status = new_status
+            else:
+                print(f"[bundle#{bundle_id}] {bundle.subject_name} status 不变({bundle.status})")
+
+        if dry_run:
+            session.rollback()
+        else:
+            session.commit()
+
+    print("\n" + "=" * 80)
+    print("对账汇总:")
+    print(f"  检查文档总数        : {counters['total']}")
+    print(f"  置为已签署(含纠正)  : {counters['to_signed']}")
+    print(f"  仅修正签署时间      : {counters['time_fixed']}")
+    print(f"  回退为未签署        : {counters['to_unsigned']}")
+    print(f"  无需变更            : {counters['unchanged']}")
+    print(f"  跳过(查询失败)      : {counters['skipped']}")
+    print(f"  受影响合同包        : {len(affected_bundle_ids)}")
+    print(f"  模式                : {'DRY-RUN(未写库)' if dry_run else 'APPLY(已提交)'}")
+    print("=" * 80)
+    if dry_run:
+        print("提示:以上为预览。确认无误后加 --apply 实际执行。")
+
+
+if __name__ == "__main__":
+    main()