contract_repo.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. import json
  2. import logging
  3. from datetime import datetime, timedelta
  4. from sqlalchemy.exc import DBAPIError
  5. from sqlalchemy.ext.asyncio import AsyncSession
  6. from alien_lawyer.db.models.lawyer_contract import LawyerContract
  7. logger = logging.getLogger(__name__)
  8. class LawyerContractRepository:
  9. def __init__(self, db: AsyncSession):
  10. self.db = db
  11. async def _execute_with_retry(self, statement):
  12. try:
  13. return await self.db.execute(statement)
  14. except Exception as exc:
  15. if self._should_retry(exc):
  16. logger.warning("DB connection invalidated, retrying once: %s", exc)
  17. try:
  18. await self.db.rollback()
  19. except Exception:
  20. pass
  21. return await self.db.execute(statement)
  22. raise
  23. @staticmethod
  24. def _should_retry(exc: Exception) -> bool:
  25. if isinstance(exc, DBAPIError) and getattr(exc, "connection_invalidated", False):
  26. return True
  27. txt = str(exc).lower()
  28. closed_keywords = ["closed", "lost connection", "connection was killed", "terminat"]
  29. return any(k in txt for k in closed_keywords)
  30. async def get_by_lawyer_id(self, lawyer_id: int):
  31. result = await self._execute_with_retry(
  32. LawyerContract.__table__.select().where(LawyerContract.lawyer_id == lawyer_id)
  33. )
  34. return [dict(row) for row in result.mappings().all()]
  35. async def get_contract_item_by_sign_flow_id(self, sign_flow_id: str):
  36. result = await self._execute_with_retry(LawyerContract.__table__.select())
  37. rows = result.mappings().all()
  38. for row in rows:
  39. contract_url_raw = row.get("contract_url")
  40. if not contract_url_raw:
  41. continue
  42. try:
  43. items = json.loads(contract_url_raw)
  44. except Exception:
  45. items = None
  46. if not isinstance(items, list):
  47. continue
  48. for item in items:
  49. if item.get("sign_flow_id") == sign_flow_id:
  50. return dict(row), item, items
  51. return None, None, None
  52. async def update_contract_items(self, row_id: int, items: list) -> bool:
  53. if not isinstance(items, list):
  54. return False
  55. await self._execute_with_retry(
  56. LawyerContract.__table__.update()
  57. .where(LawyerContract.id == row_id)
  58. .values(contract_url=json.dumps(items, ensure_ascii=False))
  59. )
  60. await self.db.commit()
  61. return True
  62. async def mark_signed_by_phone(
  63. self,
  64. contact_phone: str,
  65. sign_flow_id: str,
  66. signing_time: datetime | None = None,
  67. contract_download_url: str | None = None,
  68. ):
  69. result = await self._execute_with_retry(
  70. LawyerContract.__table__.select().where(LawyerContract.contact_phone == contact_phone)
  71. )
  72. rows = result.mappings().all()
  73. updated = False
  74. for row in rows:
  75. contract_url_raw = row.get("contract_url")
  76. items = None
  77. if contract_url_raw:
  78. try:
  79. items = json.loads(contract_url_raw)
  80. except Exception:
  81. items = None
  82. changed = False
  83. matched_item = None
  84. if isinstance(items, list):
  85. for item in items:
  86. if item.get("sign_flow_id") == sign_flow_id:
  87. item["status"] = 1
  88. if contract_download_url:
  89. item["contract_download_url"] = contract_download_url
  90. matched_item = item
  91. changed = True
  92. break
  93. if changed and matched_item and matched_item.get("is_master") == 1:
  94. signing_dt = signing_time
  95. effective_dt = expiry_dt = None
  96. if signing_dt:
  97. effective_dt = (signing_dt + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
  98. expiry_dt = effective_dt + timedelta(days=365)
  99. matched_item["signing_time"] = signing_dt.strftime("%Y-%m-%d %H:%M:%S")
  100. matched_item["effective_time"] = effective_dt.strftime("%Y-%m-%d %H:%M:%S")
  101. matched_item["expiry_time"] = expiry_dt.strftime("%Y-%m-%d %H:%M:%S")
  102. await self._execute_with_retry(
  103. LawyerContract.__table__.update()
  104. .where(LawyerContract.id == row["id"])
  105. .values(
  106. signing_status="已签署",
  107. contract_url=json.dumps(items, ensure_ascii=False) if items else contract_url_raw,
  108. signing_time=signing_dt,
  109. effective_time=effective_dt,
  110. expiry_time=expiry_dt,
  111. )
  112. )
  113. updated = True
  114. elif changed:
  115. await self._execute_with_retry(
  116. LawyerContract.__table__.update()
  117. .where(LawyerContract.id == row["id"])
  118. .values(contract_url=json.dumps(items, ensure_ascii=False) if items else contract_url_raw)
  119. )
  120. updated = True
  121. if updated:
  122. await self.db.commit()
  123. return updated
  124. async def update_sign_url(self, contact_phone: str, sign_flow_id: str, sign_url: str):
  125. result = await self._execute_with_retry(
  126. LawyerContract.__table__.select().where(LawyerContract.contact_phone == contact_phone)
  127. )
  128. rows = result.mappings().all()
  129. updated = False
  130. for row in rows:
  131. contract_url_raw = row.get("contract_url")
  132. if not contract_url_raw:
  133. continue
  134. try:
  135. items = json.loads(contract_url_raw)
  136. except Exception:
  137. items = None
  138. if not isinstance(items, list):
  139. continue
  140. changed = False
  141. for item in items:
  142. if item.get("sign_flow_id") == sign_flow_id:
  143. item["sign_url"] = sign_url
  144. changed = True
  145. if changed:
  146. await self._execute_with_retry(
  147. LawyerContract.__table__.update()
  148. .where(LawyerContract.id == row["id"])
  149. .values(contract_url=json.dumps(items, ensure_ascii=False))
  150. )
  151. updated = True
  152. if updated:
  153. await self.db.commit()
  154. return updated
  155. async def append_contract_url(self, templates_data, contract_item: dict):
  156. lawyer_id = getattr(templates_data, "lawyer_id", None)
  157. if lawyer_id is None:
  158. return False
  159. result = await self._execute_with_retry(
  160. LawyerContract.__table__.select().where(LawyerContract.lawyer_id == lawyer_id)
  161. )
  162. rows = result.mappings().all()
  163. updated = False
  164. law_firm_name = getattr(templates_data, "law_firm_name", None)
  165. if rows:
  166. for row in rows:
  167. contract_url_raw = row.get("contract_url")
  168. try:
  169. items = json.loads(contract_url_raw) if contract_url_raw else []
  170. except Exception:
  171. items = []
  172. if not isinstance(items, list):
  173. items = []
  174. items.append(contract_item)
  175. update_values = {"contract_url": json.dumps(items, ensure_ascii=False)}
  176. if law_firm_name:
  177. update_values["law_firm_name"] = law_firm_name
  178. contact_phone = getattr(templates_data, "contact_phone", None)
  179. if contact_phone:
  180. update_values["contact_phone"] = contact_phone
  181. await self._execute_with_retry(
  182. LawyerContract.__table__.update()
  183. .where(LawyerContract.id == row["id"])
  184. .values(**update_values)
  185. )
  186. updated = True
  187. if updated:
  188. await self.db.commit()
  189. return updated
  190. new_record = LawyerContract(
  191. lawyer_id=lawyer_id,
  192. law_firm_name=law_firm_name,
  193. business_segment=getattr(templates_data, "business_segment", None),
  194. contact_name=getattr(templates_data, "contact_name", None),
  195. contact_phone=getattr(templates_data, "contact_phone", None),
  196. contract_url=json.dumps([contract_item], ensure_ascii=False),
  197. ord_id=getattr(templates_data, "ord_id", None),
  198. signing_status="未签署",
  199. )
  200. self.db.add(new_record)
  201. await self.db.commit()
  202. await self.db.refresh(new_record)
  203. return True