| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- """
- 合同签署状态对账/回刷脚本(一次性运维工具)
- 背景:历史回调逻辑把"平台方自动盖章(SIGN_MISSON_COMPLETE / signResult=2)"误判为整份合同已签署,
- 导致部分 contract_document 被错误标记为 status=1,并写入了早 8 小时(UTC)的 signing_time。
- 本脚本以 e签宝 流程真实状态 signFlowStatus 为准,回刷 contract_document 与 contract_bundle 状态。
- 判定规则(signFlowStatus: 0草稿 1签署中 2完成 3撤销 4终止 5过期 6删除 7拒签):
- - == 2 完成:保持/置为已签署(status=1),并用 signFlowFinishTime 重算东八区 signing_time / effective_time / expiry_time,补全 download_url
- - != 2 :置为未签署(status=0),清空 signing_time / effective_time / expiry_time
- 安全说明:
- - 默认 dry-run,只打印将要做的变更,不写库;加 --apply 才真正提交。
- - 通过 APP_ENV 选择环境(与服务一致):生产为 APP_ENV=produ。
- 用法示例(PowerShell):
- $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py # 预览(dry-run)
- $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py --apply # 实际回刷
- $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py --status 1 # 只检查当前已签署的
- $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py --sign-flow-id xxxx # 只处理某一条
- """
- import argparse
- import datetime
- import json
- import os
- import sys
- import time
- # 确保可以从项目根目录导入业务模块
- ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- if ROOT_DIR not in sys.path:
- sys.path.insert(0, ROOT_DIR)
- from sqlalchemy import create_engine, select
- from sqlalchemy.orm import Session
- from alien_gateway.config import settings
- from alien_contract.db.models.bundle import ContractBundle
- from alien_contract.db.models.document import ContractDocument
- from alien_contract.infrastructure.esign.main import query_sign_flow_detail, file_download_url
- from alien_contract.services.contract_server import _ms_to_naive_cn, _extract_download_url
- ESIGN_FLOW_STATUS_COMPLETED = 2
- FLOW_STATUS_DESC = {
- 0: "草稿",
- 1: "签署中",
- 2: "完成",
- 3: "撤销",
- 4: "终止",
- 5: "过期",
- 6: "删除",
- 7: "拒签",
- }
- def _query_flow(sign_flow_id: str):
- """返回 (signFlowStatus, signFlowFinishTime毫秒);失败返回 (None, None)"""
- try:
- resp = query_sign_flow_detail(sign_flow_id)
- data = json.loads(resp)
- except Exception as exc: # noqa: BLE001
- print(f" [WARN] 查询流程失败 sign_flow_id={sign_flow_id} err={exc}")
- return None, None
- body = data.get("data") if isinstance(data, dict) else None
- if not isinstance(body, dict):
- print(f" [WARN] 流程查询无 data sign_flow_id={sign_flow_id} resp={data}")
- return None, None
- return body.get("signFlowStatus"), body.get("signFlowFinishTime")
- def _fetch_download_url(sign_flow_id: str):
- try:
- resp = file_download_url(sign_flow_id)
- url, _ = _extract_download_url(resp)
- return url
- except Exception as exc: # noqa: BLE001
- print(f" [WARN] 获取下载链接失败 sign_flow_id={sign_flow_id} err={exc}")
- return None
- def _derive_times(finish_ms):
- signing_dt = _ms_to_naive_cn(finish_ms)
- effective_dt = expiry_dt = None
- if signing_dt:
- effective_dt = (signing_dt + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
- expiry_dt = effective_dt + datetime.timedelta(days=365)
- return signing_dt, effective_dt, expiry_dt
- def _recalc_bundle_status(session: Session, bundle_id: int) -> str:
- rows = session.execute(
- select(ContractDocument.status).where(
- ContractDocument.bundle_id == bundle_id,
- ContractDocument.delete_flag == 0,
- )
- ).all()
- statuses = [r[0] for r in rows]
- if not statuses:
- return "未签署"
- if all(s == 1 for s in statuses):
- return "已签署"
- if any(s == 1 for s in statuses):
- return "审核中"
- return "未签署"
- def main():
- parser = argparse.ArgumentParser(description="合同签署状态对账/回刷")
- parser.add_argument("--apply", action="store_true", help="实际写库(默认 dry-run 仅预览)")
- parser.add_argument("--status", choices=["0", "1", "all"], default="all", help="只检查指定 status 的文档,默认 all")
- parser.add_argument("--sign-flow-id", default=None, help="只处理指定 sign_flow_id")
- parser.add_argument("--sleep", type=float, default=0.1, help="每条查询之间的间隔秒数,默认0.1")
- args = parser.parse_args()
- dry_run = not args.apply
- print("=" * 80)
- print(f"环境 APP_ENV = {settings.APP_ENV}")
- print(f"目标数据库 = {settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}")
- print(f"模式 = {'DRY-RUN(仅预览,不写库)' if dry_run else '!!! APPLY(实际写库)!!!'}")
- print("=" * 80)
- engine = create_engine(settings.SQLALCHEMY_DATABASE_URI, pool_pre_ping=True)
- conditions = [ContractDocument.delete_flag == 0]
- if args.sign_flow_id:
- conditions.append(ContractDocument.sign_flow_id == args.sign_flow_id)
- elif args.status != "all":
- conditions.append(ContractDocument.status == int(args.status))
- counters = {"total": 0, "to_signed": 0, "to_unsigned": 0, "time_fixed": 0, "unchanged": 0, "skipped": 0}
- affected_bundle_ids: set[int] = set()
- with Session(engine) as session:
- docs = session.execute(select(ContractDocument).where(*conditions).order_by(ContractDocument.id)).scalars().all()
- print(f"待检查文档数:{len(docs)}\n")
- for doc in docs:
- counters["total"] += 1
- flow_status, finish_ms = _query_flow(doc.sign_flow_id)
- time.sleep(args.sleep)
- status_desc = FLOW_STATUS_DESC.get(flow_status, "未知")
- head = (
- f"[doc#{doc.id} bundle#{doc.bundle_id}] {doc.contract_name} "
- f"sign_flow_id={doc.sign_flow_id} 当前status={doc.status} -> e签宝={flow_status}({status_desc})"
- )
- if flow_status is None:
- counters["skipped"] += 1
- print(f"{head} => 跳过(查询失败)")
- continue
- if flow_status == ESIGN_FLOW_STATUS_COMPLETED:
- signing_dt, effective_dt, expiry_dt = _derive_times(finish_ms)
- new_download = doc.download_url or ""
- if not new_download:
- fetched = _fetch_download_url(doc.sign_flow_id)
- if fetched:
- new_download = fetched
- needs_status = doc.status != 1
- needs_time = (
- doc.signing_time != signing_dt
- or doc.effective_time != effective_dt
- or doc.expiry_time != expiry_dt
- )
- needs_download = bool(new_download) and (doc.download_url or "") != new_download
- if not (needs_status or needs_time or needs_download):
- counters["unchanged"] += 1
- print(f"{head} => 无需变更(已正确为已签署)")
- continue
- if needs_status:
- counters["to_signed"] += 1
- if needs_time and not needs_status:
- counters["time_fixed"] += 1
- print(
- f"{head} => 置为已签署: status {doc.status}->1, "
- f"signing_time {doc.signing_time!r}->{signing_dt!r}, "
- f"effective {doc.effective_time!r}->{effective_dt!r}, "
- f"expiry {doc.expiry_time!r}->{expiry_dt!r}"
- + (", 补全download_url" if needs_download else "")
- )
- # 改动只作用于当前事务(dry-run 结束会 rollback),便于后续 bundle 状态准确重算
- doc.status = 1
- doc.signing_time = signing_dt
- doc.effective_time = effective_dt
- doc.expiry_time = expiry_dt
- if needs_download:
- doc.download_url = new_download
- affected_bundle_ids.add(doc.bundle_id)
- else:
- if doc.status == 0 and doc.signing_time is None and doc.effective_time is None and doc.expiry_time is None:
- counters["unchanged"] += 1
- print(f"{head} => 无需变更(已正确为未签署)")
- continue
- counters["to_unsigned"] += 1
- print(
- f"{head} => 回退为未签署: status {doc.status}->0, 清空 signing/effective/expiry "
- f"(原 signing_time={doc.signing_time!r})"
- )
- doc.status = 0
- doc.signing_time = None
- doc.effective_time = None
- doc.expiry_time = None
- affected_bundle_ids.add(doc.bundle_id)
- # 重算受影响合同包整体状态
- if affected_bundle_ids:
- print("\n--- 合同包状态重算 ---")
- for bundle_id in sorted(affected_bundle_ids):
- bundle = session.get(ContractBundle, bundle_id)
- if not bundle:
- continue
- new_status = _recalc_bundle_status(session, bundle_id)
- if bundle.status != new_status:
- print(f"[bundle#{bundle_id}] {bundle.subject_name} status {bundle.status} -> {new_status}")
- bundle.status = new_status
- else:
- print(f"[bundle#{bundle_id}] {bundle.subject_name} status 不变({bundle.status})")
- if dry_run:
- session.rollback()
- else:
- session.commit()
- print("\n" + "=" * 80)
- print("对账汇总:")
- print(f" 检查文档总数 : {counters['total']}")
- print(f" 置为已签署(含纠正) : {counters['to_signed']}")
- print(f" 仅修正签署时间 : {counters['time_fixed']}")
- print(f" 回退为未签署 : {counters['to_unsigned']}")
- print(f" 无需变更 : {counters['unchanged']}")
- print(f" 跳过(查询失败) : {counters['skipped']}")
- print(f" 受影响合同包 : {len(affected_bundle_ids)}")
- print(f" 模式 : {'DRY-RUN(未写库)' if dry_run else 'APPLY(已提交)'}")
- print("=" * 80)
- if dry_run:
- print("提示:以上为预览。确认无误后加 --apply 实际执行。")
- if __name__ == "__main__":
- main()
|