reconcile_contract_status.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. """
  2. 合同签署状态对账/回刷脚本(一次性运维工具)
  3. 背景:历史回调逻辑把"平台方自动盖章(SIGN_MISSON_COMPLETE / signResult=2)"误判为整份合同已签署,
  4. 导致部分 contract_document 被错误标记为 status=1,并写入了早 8 小时(UTC)的 signing_time。
  5. 本脚本以 e签宝 流程真实状态 signFlowStatus 为准,回刷 contract_document 与 contract_bundle 状态。
  6. 判定规则(signFlowStatus: 0草稿 1签署中 2完成 3撤销 4终止 5过期 6删除 7拒签):
  7. - == 2 完成:保持/置为已签署(status=1),并用 signFlowFinishTime 重算东八区 signing_time / effective_time / expiry_time,补全 download_url
  8. - != 2 :置为未签署(status=0),清空 signing_time / effective_time / expiry_time
  9. 安全说明:
  10. - 默认 dry-run,只打印将要做的变更,不写库;加 --apply 才真正提交。
  11. - 通过 APP_ENV 选择环境(与服务一致):生产为 APP_ENV=produ。
  12. 用法示例(PowerShell):
  13. $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py # 预览(dry-run)
  14. $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py --apply # 实际回刷
  15. $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py --status 1 # 只检查当前已签署的
  16. $env:APP_ENV="produ"; python scripts/reconcile_contract_status.py --sign-flow-id xxxx # 只处理某一条
  17. """
  18. import argparse
  19. import datetime
  20. import json
  21. import os
  22. import sys
  23. import time
  24. # 确保可以从项目根目录导入业务模块
  25. ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  26. if ROOT_DIR not in sys.path:
  27. sys.path.insert(0, ROOT_DIR)
  28. from sqlalchemy import create_engine, select
  29. from sqlalchemy.orm import Session
  30. from alien_gateway.config import settings
  31. from alien_contract.db.models.bundle import ContractBundle
  32. from alien_contract.db.models.document import ContractDocument
  33. from alien_contract.infrastructure.esign.main import query_sign_flow_detail, file_download_url
  34. from alien_contract.services.contract_server import _ms_to_naive_cn, _extract_download_url
  35. ESIGN_FLOW_STATUS_COMPLETED = 2
  36. FLOW_STATUS_DESC = {
  37. 0: "草稿",
  38. 1: "签署中",
  39. 2: "完成",
  40. 3: "撤销",
  41. 4: "终止",
  42. 5: "过期",
  43. 6: "删除",
  44. 7: "拒签",
  45. }
  46. def _query_flow(sign_flow_id: str):
  47. """返回 (signFlowStatus, signFlowFinishTime毫秒);失败返回 (None, None)"""
  48. try:
  49. resp = query_sign_flow_detail(sign_flow_id)
  50. data = json.loads(resp)
  51. except Exception as exc: # noqa: BLE001
  52. print(f" [WARN] 查询流程失败 sign_flow_id={sign_flow_id} err={exc}")
  53. return None, None
  54. body = data.get("data") if isinstance(data, dict) else None
  55. if not isinstance(body, dict):
  56. print(f" [WARN] 流程查询无 data sign_flow_id={sign_flow_id} resp={data}")
  57. return None, None
  58. return body.get("signFlowStatus"), body.get("signFlowFinishTime")
  59. def _fetch_download_url(sign_flow_id: str):
  60. try:
  61. resp = file_download_url(sign_flow_id)
  62. url, _ = _extract_download_url(resp)
  63. return url
  64. except Exception as exc: # noqa: BLE001
  65. print(f" [WARN] 获取下载链接失败 sign_flow_id={sign_flow_id} err={exc}")
  66. return None
  67. def _derive_times(finish_ms):
  68. signing_dt = _ms_to_naive_cn(finish_ms)
  69. effective_dt = expiry_dt = None
  70. if signing_dt:
  71. effective_dt = (signing_dt + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
  72. expiry_dt = effective_dt + datetime.timedelta(days=365)
  73. return signing_dt, effective_dt, expiry_dt
  74. def _recalc_bundle_status(session: Session, bundle_id: int) -> str:
  75. rows = session.execute(
  76. select(ContractDocument.status).where(
  77. ContractDocument.bundle_id == bundle_id,
  78. ContractDocument.delete_flag == 0,
  79. )
  80. ).all()
  81. statuses = [r[0] for r in rows]
  82. if not statuses:
  83. return "未签署"
  84. if all(s == 1 for s in statuses):
  85. return "已签署"
  86. if any(s == 1 for s in statuses):
  87. return "审核中"
  88. return "未签署"
  89. def main():
  90. parser = argparse.ArgumentParser(description="合同签署状态对账/回刷")
  91. parser.add_argument("--apply", action="store_true", help="实际写库(默认 dry-run 仅预览)")
  92. parser.add_argument("--status", choices=["0", "1", "all"], default="all", help="只检查指定 status 的文档,默认 all")
  93. parser.add_argument("--sign-flow-id", default=None, help="只处理指定 sign_flow_id")
  94. parser.add_argument("--sleep", type=float, default=0.1, help="每条查询之间的间隔秒数,默认0.1")
  95. args = parser.parse_args()
  96. dry_run = not args.apply
  97. print("=" * 80)
  98. print(f"环境 APP_ENV = {settings.APP_ENV}")
  99. print(f"目标数据库 = {settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}")
  100. print(f"模式 = {'DRY-RUN(仅预览,不写库)' if dry_run else '!!! APPLY(实际写库)!!!'}")
  101. print("=" * 80)
  102. engine = create_engine(settings.SQLALCHEMY_DATABASE_URI, pool_pre_ping=True)
  103. conditions = [ContractDocument.delete_flag == 0]
  104. if args.sign_flow_id:
  105. conditions.append(ContractDocument.sign_flow_id == args.sign_flow_id)
  106. elif args.status != "all":
  107. conditions.append(ContractDocument.status == int(args.status))
  108. counters = {"total": 0, "to_signed": 0, "to_unsigned": 0, "time_fixed": 0, "unchanged": 0, "skipped": 0}
  109. affected_bundle_ids: set[int] = set()
  110. with Session(engine) as session:
  111. docs = session.execute(select(ContractDocument).where(*conditions).order_by(ContractDocument.id)).scalars().all()
  112. print(f"待检查文档数:{len(docs)}\n")
  113. for doc in docs:
  114. counters["total"] += 1
  115. flow_status, finish_ms = _query_flow(doc.sign_flow_id)
  116. time.sleep(args.sleep)
  117. status_desc = FLOW_STATUS_DESC.get(flow_status, "未知")
  118. head = (
  119. f"[doc#{doc.id} bundle#{doc.bundle_id}] {doc.contract_name} "
  120. f"sign_flow_id={doc.sign_flow_id} 当前status={doc.status} -> e签宝={flow_status}({status_desc})"
  121. )
  122. if flow_status is None:
  123. counters["skipped"] += 1
  124. print(f"{head} => 跳过(查询失败)")
  125. continue
  126. if flow_status == ESIGN_FLOW_STATUS_COMPLETED:
  127. signing_dt, effective_dt, expiry_dt = _derive_times(finish_ms)
  128. new_download = doc.download_url or ""
  129. if not new_download:
  130. fetched = _fetch_download_url(doc.sign_flow_id)
  131. if fetched:
  132. new_download = fetched
  133. needs_status = doc.status != 1
  134. needs_time = (
  135. doc.signing_time != signing_dt
  136. or doc.effective_time != effective_dt
  137. or doc.expiry_time != expiry_dt
  138. )
  139. needs_download = bool(new_download) and (doc.download_url or "") != new_download
  140. if not (needs_status or needs_time or needs_download):
  141. counters["unchanged"] += 1
  142. print(f"{head} => 无需变更(已正确为已签署)")
  143. continue
  144. if needs_status:
  145. counters["to_signed"] += 1
  146. if needs_time and not needs_status:
  147. counters["time_fixed"] += 1
  148. print(
  149. f"{head} => 置为已签署: status {doc.status}->1, "
  150. f"signing_time {doc.signing_time!r}->{signing_dt!r}, "
  151. f"effective {doc.effective_time!r}->{effective_dt!r}, "
  152. f"expiry {doc.expiry_time!r}->{expiry_dt!r}"
  153. + (", 补全download_url" if needs_download else "")
  154. )
  155. # 改动只作用于当前事务(dry-run 结束会 rollback),便于后续 bundle 状态准确重算
  156. doc.status = 1
  157. doc.signing_time = signing_dt
  158. doc.effective_time = effective_dt
  159. doc.expiry_time = expiry_dt
  160. if needs_download:
  161. doc.download_url = new_download
  162. affected_bundle_ids.add(doc.bundle_id)
  163. else:
  164. if doc.status == 0 and doc.signing_time is None and doc.effective_time is None and doc.expiry_time is None:
  165. counters["unchanged"] += 1
  166. print(f"{head} => 无需变更(已正确为未签署)")
  167. continue
  168. counters["to_unsigned"] += 1
  169. print(
  170. f"{head} => 回退为未签署: status {doc.status}->0, 清空 signing/effective/expiry "
  171. f"(原 signing_time={doc.signing_time!r})"
  172. )
  173. doc.status = 0
  174. doc.signing_time = None
  175. doc.effective_time = None
  176. doc.expiry_time = None
  177. affected_bundle_ids.add(doc.bundle_id)
  178. # 重算受影响合同包整体状态
  179. if affected_bundle_ids:
  180. print("\n--- 合同包状态重算 ---")
  181. for bundle_id in sorted(affected_bundle_ids):
  182. bundle = session.get(ContractBundle, bundle_id)
  183. if not bundle:
  184. continue
  185. new_status = _recalc_bundle_status(session, bundle_id)
  186. if bundle.status != new_status:
  187. print(f"[bundle#{bundle_id}] {bundle.subject_name} status {bundle.status} -> {new_status}")
  188. bundle.status = new_status
  189. else:
  190. print(f"[bundle#{bundle_id}] {bundle.subject_name} status 不变({bundle.status})")
  191. if dry_run:
  192. session.rollback()
  193. else:
  194. session.commit()
  195. print("\n" + "=" * 80)
  196. print("对账汇总:")
  197. print(f" 检查文档总数 : {counters['total']}")
  198. print(f" 置为已签署(含纠正) : {counters['to_signed']}")
  199. print(f" 仅修正签署时间 : {counters['time_fixed']}")
  200. print(f" 回退为未签署 : {counters['to_unsigned']}")
  201. print(f" 无需变更 : {counters['unchanged']}")
  202. print(f" 跳过(查询失败) : {counters['skipped']}")
  203. print(f" 受影响合同包 : {len(affected_bundle_ids)}")
  204. print(f" 模式 : {'DRY-RUN(未写库)' if dry_run else 'APPLY(已提交)'}")
  205. print("=" * 80)
  206. if dry_run:
  207. print("提示:以上为预览。确认无误后加 --apply 实际执行。")
  208. if __name__ == "__main__":
  209. main()