""" 合同签署状态对账/回刷脚本(一次性运维工具) 背景:历史回调逻辑把"平台方自动盖章(SIGN_MISSON_COMPLETE / signResult=2)"误判为整份合同已签署, 导致部分 contract_document 被错误标记为 status=1,并写入了早 8 小时(UTC)的 signing_time。 本脚本以 e签宝 流程真实状态 signFlowStatus 为准,回刷 contract_document 与 contract_bundle 状态。 判定规则(signFlowStatus: 0草稿 1签署中 2完成 3撤销 4终止 5过期 6删除 7拒签): - == 2 完成:保持/置为已签署(status=1),并用 signFlowFinishTime 重算东八区 signing_time / effective_time / expiry_time,补全 download_url - != 2 :置为未签署(status=0),清空 signing_time / effective_time / expiry_time 安全说明: - 默认 dry-run,只打印将要做的变更,不写库;加 --apply 才真正提交。 - 通过 APP_ENV 选择环境(与服务一致):生产为 APP_ENV=produ。 用法示例(PowerShell): $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py # 预览(dry-run) $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py --apply # 实际回刷 $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py --status 1 # 只检查当前已签署的 $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py --sign-flow-id xxxx # 只处理某一条 """ import argparse import datetime import json import os import sys import time # 确保可以从项目根目录导入业务模块 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) from sqlalchemy import create_engine, select from sqlalchemy.orm import Session from alien_gateway.config import settings from alien_contract.db.models.bundle import ContractBundle from alien_contract.db.models.document import ContractDocument from alien_contract.infrastructure.esign.main import query_sign_flow_detail, file_download_url from alien_contract.services.contract_server import _ms_to_naive_cn, _extract_download_url ESIGN_FLOW_STATUS_COMPLETED = 2 FLOW_STATUS_DESC = { 0: "草稿", 1: "签署中", 2: "完成", 3: "撤销", 4: "终止", 5: "过期", 6: "删除", 7: "拒签", } def _query_flow(sign_flow_id: str): """返回 (signFlowStatus, signFlowFinishTime毫秒);失败返回 (None, None)""" try: resp = query_sign_flow_detail(sign_flow_id) data = json.loads(resp) except Exception as exc: # noqa: BLE001 print(f" [WARN] 查询流程失败 sign_flow_id={sign_flow_id} err={exc}") return None, None body = data.get("data") if isinstance(data, dict) else None if not isinstance(body, dict): print(f" [WARN] 流程查询无 data sign_flow_id={sign_flow_id} resp={data}") return None, None return body.get("signFlowStatus"), body.get("signFlowFinishTime") def _fetch_download_url(sign_flow_id: str): try: resp = file_download_url(sign_flow_id) url, _ = _extract_download_url(resp) return url except Exception as exc: # noqa: BLE001 print(f" [WARN] 获取下载链接失败 sign_flow_id={sign_flow_id} err={exc}") return None def _derive_times(finish_ms): signing_dt = _ms_to_naive_cn(finish_ms) effective_dt = expiry_dt = None if signing_dt: effective_dt = (signing_dt + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) expiry_dt = effective_dt + datetime.timedelta(days=365) return signing_dt, effective_dt, expiry_dt def _recalc_bundle_status(session: Session, bundle_id: int) -> str: rows = session.execute( select(ContractDocument.status).where( ContractDocument.bundle_id == bundle_id, ContractDocument.delete_flag == 0, ) ).all() statuses = [r[0] for r in rows] if not statuses: return "未签署" if all(s == 1 for s in statuses): return "已签署" if any(s == 1 for s in statuses): return "审核中" return "未签署" def main(): parser = argparse.ArgumentParser(description="合同签署状态对账/回刷") parser.add_argument("--apply", action="store_true", help="实际写库(默认 dry-run 仅预览)") parser.add_argument("--status", choices=["0", "1", "all"], default="all", help="只检查指定 status 的文档,默认 all") parser.add_argument("--sign-flow-id", default=None, help="只处理指定 sign_flow_id") parser.add_argument("--sleep", type=float, default=0.1, help="每条查询之间的间隔秒数,默认0.1") args = parser.parse_args() dry_run = not args.apply print("=" * 80) print(f"环境 APP_ENV = {settings.APP_ENV}") print(f"目标数据库 = {settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}") print(f"模式 = {'DRY-RUN(仅预览,不写库)' if dry_run else '!!! APPLY(实际写库)!!!'}") print("=" * 80) engine = create_engine(settings.SQLALCHEMY_DATABASE_URI, pool_pre_ping=True) conditions = [ContractDocument.delete_flag == 0] if args.sign_flow_id: conditions.append(ContractDocument.sign_flow_id == args.sign_flow_id) elif args.status != "all": conditions.append(ContractDocument.status == int(args.status)) counters = {"total": 0, "to_signed": 0, "to_unsigned": 0, "time_fixed": 0, "unchanged": 0, "skipped": 0} affected_bundle_ids: set[int] = set() with Session(engine) as session: docs = session.execute(select(ContractDocument).where(*conditions).order_by(ContractDocument.id)).scalars().all() print(f"待检查文档数:{len(docs)}\n") for doc in docs: counters["total"] += 1 flow_status, finish_ms = _query_flow(doc.sign_flow_id) time.sleep(args.sleep) status_desc = FLOW_STATUS_DESC.get(flow_status, "未知") head = ( f"[doc#{doc.id} bundle#{doc.bundle_id}] {doc.contract_name} " f"sign_flow_id={doc.sign_flow_id} 当前status={doc.status} -> e签宝={flow_status}({status_desc})" ) if flow_status is None: counters["skipped"] += 1 print(f"{head} => 跳过(查询失败)") continue if flow_status == ESIGN_FLOW_STATUS_COMPLETED: signing_dt, effective_dt, expiry_dt = _derive_times(finish_ms) new_download = doc.download_url or "" if not new_download: fetched = _fetch_download_url(doc.sign_flow_id) if fetched: new_download = fetched needs_status = doc.status != 1 needs_time = ( doc.signing_time != signing_dt or doc.effective_time != effective_dt or doc.expiry_time != expiry_dt ) needs_download = bool(new_download) and (doc.download_url or "") != new_download if not (needs_status or needs_time or needs_download): counters["unchanged"] += 1 print(f"{head} => 无需变更(已正确为已签署)") continue if needs_status: counters["to_signed"] += 1 if needs_time and not needs_status: counters["time_fixed"] += 1 print( f"{head} => 置为已签署: status {doc.status}->1, " f"signing_time {doc.signing_time!r}->{signing_dt!r}, " f"effective {doc.effective_time!r}->{effective_dt!r}, " f"expiry {doc.expiry_time!r}->{expiry_dt!r}" + (", 补全download_url" if needs_download else "") ) # 改动只作用于当前事务(dry-run 结束会 rollback),便于后续 bundle 状态准确重算 doc.status = 1 doc.signing_time = signing_dt doc.effective_time = effective_dt doc.expiry_time = expiry_dt if needs_download: doc.download_url = new_download affected_bundle_ids.add(doc.bundle_id) else: if doc.status == 0 and doc.signing_time is None and doc.effective_time is None and doc.expiry_time is None: counters["unchanged"] += 1 print(f"{head} => 无需变更(已正确为未签署)") continue counters["to_unsigned"] += 1 print( f"{head} => 回退为未签署: status {doc.status}->0, 清空 signing/effective/expiry " f"(原 signing_time={doc.signing_time!r})" ) doc.status = 0 doc.signing_time = None doc.effective_time = None doc.expiry_time = None affected_bundle_ids.add(doc.bundle_id) # 重算受影响合同包整体状态 if affected_bundle_ids: print("\n--- 合同包状态重算 ---") for bundle_id in sorted(affected_bundle_ids): bundle = session.get(ContractBundle, bundle_id) if not bundle: continue new_status = _recalc_bundle_status(session, bundle_id) if bundle.status != new_status: print(f"[bundle#{bundle_id}] {bundle.subject_name} status {bundle.status} -> {new_status}") bundle.status = new_status else: print(f"[bundle#{bundle_id}] {bundle.subject_name} status 不变({bundle.status})") if dry_run: session.rollback() else: session.commit() print("\n" + "=" * 80) print("对账汇总:") print(f" 检查文档总数 : {counters['total']}") print(f" 置为已签署(含纠正) : {counters['to_signed']}") print(f" 仅修正签署时间 : {counters['time_fixed']}") print(f" 回退为未签署 : {counters['to_unsigned']}") print(f" 无需变更 : {counters['unchanged']}") print(f" 跳过(查询失败) : {counters['skipped']}") print(f" 受影响合同包 : {len(affected_bundle_ids)}") print(f" 模式 : {'DRY-RUN(未写库)' if dry_run else 'APPLY(已提交)'}") print("=" * 80) if dry_run: print("提示:以上为预览。确认无误后加 --apply 实际执行。") if __name__ == "__main__": main()