contract_repo.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. import logging
  2. import json
  3. from datetime import datetime, timedelta
  4. from sqlalchemy.ext.asyncio import AsyncSession
  5. from sqlalchemy.exc import DBAPIError
  6. from alien_store.db.models.contract_store import ContractStore
  7. logger = logging.getLogger(__name__)
  8. class ContractRepository:
  9. """合同数据访问层"""
  10. def __init__(self, db: AsyncSession):
  11. self.db = db
  12. async def _execute_with_retry(self, statement):
  13. """
  14. 封装一次重试:如果连接已失效(被 MySQL/网络关掉),回滚并重试一次。
  15. """
  16. try:
  17. return await self.db.execute(statement)
  18. except Exception as exc:
  19. if self._should_retry(exc):
  20. logger.warning("DB connection invalidated, retrying once: %s", exc)
  21. try:
  22. await self.db.rollback()
  23. except Exception:
  24. pass
  25. return await self.db.execute(statement)
  26. raise
  27. @staticmethod
  28. def _should_retry(exc: Exception) -> bool:
  29. # SQLAlchemy 标准:connection_invalidated=True
  30. if isinstance(exc, DBAPIError) and getattr(exc, "connection_invalidated", False):
  31. return True
  32. # 兜底判断常见“连接已关闭”文案(aiomysql/uvloop RuntimeError)
  33. txt = str(exc).lower()
  34. closed_keywords = ["closed", "lost connection", "connection was killed", "terminat"]
  35. return any(k in txt for k in closed_keywords)
  36. async def get_by_store_id(self, store_id: int):
  37. """根据店铺id查询所有合同"""
  38. result = await self._execute_with_retry(
  39. ContractStore.__table__.select().where(ContractStore.store_id == store_id)
  40. )
  41. # 返回列表[dict],避免 Pydantic 序列化 Row 对象出错
  42. return [dict(row) for row in result.mappings().all()]
  43. async def get_all(self):
  44. """查询所有合同"""
  45. result = await self._execute_with_retry(ContractStore.__table__.select())
  46. return [dict(row) for row in result.mappings().all()]
  47. async def get_all_paged(self, page: int, page_size: int = 10):
  48. """分页查询所有合同,返回 (items, total)"""
  49. offset = (page - 1) * page_size
  50. # 查询总数
  51. from sqlalchemy import func, select
  52. count_result = await self._execute_with_retry(
  53. select(func.count()).select_from(ContractStore.__table__)
  54. )
  55. total = count_result.scalar() or 0
  56. # 查询分页数据
  57. result = await self._execute_with_retry(
  58. ContractStore.__table__.select().offset(offset).limit(page_size)
  59. )
  60. items = [dict(row) for row in result.mappings().all()]
  61. return items, total
  62. async def create(self, user_data):
  63. """创建未签署合同模板"""
  64. db_templates = ContractStore(
  65. store_id=user_data.store_id,
  66. store_name=getattr(user_data, "store_name", None),
  67. merchant_name=user_data.merchant_name,
  68. business_segment=user_data.business_segment,
  69. contact_phone=user_data.contact_phone,
  70. contract_url=user_data.contract_url,
  71. seal_url='0.0',
  72. signing_status='未签署'
  73. )
  74. self.db.add(db_templates)
  75. await self.db.commit()
  76. await self.db.refresh(db_templates)
  77. return db_templates
  78. async def mark_signed_by_phone(self, contact_phone: str, sign_flow_id: str, signing_time: datetime | None = None, contract_download_url: str | None = None):
  79. """
  80. 根据手机号 + sign_flow_id 将合同标记为已签署,只更新匹配的合同项
  81. 当 is_master 为 1 时,更新签署状态和时间字段
  82. 同时写入签署/生效/到期时间(签署时间=T,生效=T+1天0点,失效=生效+365天)
  83. 同时更新 contract_download_url 到对应的字典中
  84. """
  85. result = await self._execute_with_retry(
  86. ContractStore.__table__.select().where(ContractStore.contact_phone == contact_phone)
  87. )
  88. rows = result.mappings().all()
  89. updated = False
  90. for row in rows:
  91. contract_url_raw = row.get("contract_url")
  92. items = None
  93. if contract_url_raw:
  94. try:
  95. items = json.loads(contract_url_raw)
  96. except Exception:
  97. items = None
  98. changed = False
  99. matched_item = None
  100. if isinstance(items, list):
  101. for item in items:
  102. if item.get("sign_flow_id") == sign_flow_id:
  103. item["status"] = 1
  104. # 更新 contract_download_url
  105. if contract_download_url:
  106. item["contract_download_url"] = contract_download_url
  107. matched_item = item
  108. changed = True
  109. break
  110. # 只有当 is_master 为 1 时才更新时间字段
  111. if changed and matched_item and matched_item.get("is_master") == 1:
  112. # 时间处理
  113. signing_dt = signing_time
  114. effective_dt = expiry_dt = None
  115. if signing_dt:
  116. # effective_time 是 signing_time 第二天的 0 点
  117. effective_dt = (signing_dt + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
  118. expiry_dt = effective_dt + timedelta(days=365)
  119. # 更新 contract_url 中对应字典的时间字段
  120. matched_item["signing_time"] = signing_dt.strftime("%Y-%m-%d %H:%M:%S") if signing_dt else ""
  121. matched_item["effective_time"] = effective_dt.strftime("%Y-%m-%d %H:%M:%S") if effective_dt else ""
  122. matched_item["expiry_time"] = expiry_dt.strftime("%Y-%m-%d %H:%M:%S") if expiry_dt else ""
  123. await self._execute_with_retry(
  124. ContractStore.__table__.update()
  125. .where(ContractStore.id == row["id"])
  126. .values(
  127. signing_status="已签署",
  128. contract_url=json.dumps(items, ensure_ascii=False) if items else contract_url_raw,
  129. signing_time=signing_dt,
  130. effective_time=effective_dt,
  131. expiry_time=expiry_dt,
  132. )
  133. )
  134. updated = True
  135. elif changed:
  136. # is_master 不为 1 时,只更新 status,不更新时间字段
  137. await self._execute_with_retry(
  138. ContractStore.__table__.update()
  139. .where(ContractStore.id == row["id"])
  140. .values(
  141. contract_url=json.dumps(items, ensure_ascii=False) if items else contract_url_raw,
  142. )
  143. )
  144. updated = True
  145. if updated:
  146. await self.db.commit()
  147. return updated
  148. async def update_sign_url(self, contact_phone: str, sign_flow_id: str, sign_url: str):
  149. """
  150. 根据手机号 + sign_flow_id 更新 contract_url 列表中对应项的 sign_url
  151. """
  152. result = await self._execute_with_retry(
  153. ContractStore.__table__.select().where(ContractStore.contact_phone == contact_phone)
  154. )
  155. rows = result.mappings().all()
  156. updated = False
  157. for row in rows:
  158. contract_url_raw = row.get("contract_url")
  159. if not contract_url_raw:
  160. continue
  161. try:
  162. items = json.loads(contract_url_raw)
  163. except Exception:
  164. items = None
  165. if not isinstance(items, list):
  166. continue
  167. changed = False
  168. for item in items:
  169. if item.get("sign_flow_id") == sign_flow_id:
  170. item["sign_url"] = sign_url
  171. changed = True
  172. if changed:
  173. await self._execute_with_retry(
  174. ContractStore.__table__.update()
  175. .where(ContractStore.id == row["id"])
  176. .values(contract_url=json.dumps(items, ensure_ascii=False))
  177. )
  178. updated = True
  179. if updated:
  180. await self.db.commit()
  181. return updated
  182. async def append_contract_url(self, templates_data, contract_item: dict):
  183. """
  184. 根据手机号,向 contract_url(JSON 列表)追加新的合同信息;
  185. 若手机号不存在,则创建新记录。
  186. """
  187. contact_phone = getattr(templates_data, "contact_phone", None)
  188. result = await self._execute_with_retry(
  189. ContractStore.__table__.select().where(ContractStore.contact_phone == contact_phone)
  190. )
  191. rows = result.mappings().all()
  192. updated = False
  193. store_name = getattr(templates_data, "store_name", None)
  194. if rows:
  195. for row in rows:
  196. contract_url_raw = row.get("contract_url")
  197. try:
  198. items = json.loads(contract_url_raw) if contract_url_raw else []
  199. except Exception:
  200. items = []
  201. if not isinstance(items, list):
  202. items = []
  203. items.append(contract_item)
  204. update_values = {"contract_url": json.dumps(items, ensure_ascii=False)}
  205. if store_name:
  206. update_values["store_name"] = store_name
  207. await self._execute_with_retry(
  208. ContractStore.__table__.update()
  209. .where(ContractStore.id == row["id"])
  210. .values(**update_values)
  211. )
  212. updated = True
  213. if updated:
  214. await self.db.commit()
  215. return updated
  216. # 未找到则创建新记录
  217. new_record = ContractStore(
  218. store_id=getattr(templates_data, "store_id", None),
  219. store_name=store_name,
  220. business_segment=getattr(templates_data, "business_segment", None),
  221. merchant_name=getattr(templates_data, "merchant_name", None),
  222. contact_phone=contact_phone,
  223. contract_url=json.dumps([contract_item], ensure_ascii=False),
  224. seal_url='0.0',
  225. signing_status='未签署'
  226. )
  227. self.db.add(new_record)
  228. await self.db.commit()
  229. await self.db.refresh(new_record)
  230. return True