| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- from sqlalchemy.ext.asyncio import AsyncSession
- from alien_store.db.models.contract_store import ContractStore
- import json
- from datetime import datetime, timedelta
- class ContractRepository:
- """合同数据访问层"""
- def __init__(self, db: AsyncSession):
- self.db = db
- async def get_by_store_id(self, store_id: int):
- """根据店铺id查询所有合同"""
- result = await self.db.execute(
- ContractStore.__table__.select().where(ContractStore.store_id == store_id)
- )
- # 返回列表[dict],避免 Pydantic 序列化 Row 对象出错
- return [dict(row) for row in result.mappings().all()]
- async def get_all(self):
- """查询所有合同"""
- result = await self.db.execute(ContractStore.__table__.select())
- return [dict(row) for row in result.mappings().all()]
- async def get_all_paged(self, page: int, page_size: int = 10):
- """分页查询所有合同"""
- offset = (page - 1) * page_size
- result = await self.db.execute(
- ContractStore.__table__.select().offset(offset).limit(page_size)
- )
- return [dict(row) for row in result.mappings().all()]
- async def create(self, user_data):
- """创建未签署合同模板"""
- db_templates = ContractStore(
- store_id=user_data.store_id,
- merchant_name=user_data.merchant_name,
- business_segment=user_data.business_segment,
- contact_phone=user_data.contact_phone,
- contract_url=user_data.contract_url,
- seal_url='0.0',
- signing_status='未签署'
- )
- self.db.add(db_templates)
- await self.db.commit()
- await self.db.refresh(db_templates)
- return db_templates
- async def mark_signed_by_phone(self, contact_phone: str, sign_flow_id: str, signing_time: datetime | None = None):
- """
- 根据手机号 + sign_flow_id 将合同标记为已签署,只更新匹配的合同项
- 同时写入签署/生效/到期时间(签署时间=T,生效=T+1天,失效=生效+365天)
- """
- result = await self.db.execute(
- ContractStore.__table__.select().where(ContractStore.contact_phone == contact_phone)
- )
- rows = result.mappings().all()
- updated = False
- for row in rows:
- contract_url_raw = row.get("contract_url")
- items = None
- if contract_url_raw:
- try:
- items = json.loads(contract_url_raw)
- except Exception:
- items = None
- changed = False
- if isinstance(items, list):
- for item in items:
- if item.get("sign_flow_id") == sign_flow_id:
- item["status"] = 1
- changed = True
- # 时间处理
- signing_dt = signing_time
- effective_dt = expiry_dt = None
- if signing_dt:
- effective_dt = signing_dt + timedelta(days=1)
- expiry_dt = effective_dt + timedelta(days=365)
- if changed:
- await self.db.execute(
- ContractStore.__table__.update()
- .where(ContractStore.id == row["id"])
- .values(
- signing_status="已签署",
- contract_url=json.dumps(items, ensure_ascii=False) if items else contract_url_raw,
- signing_time=signing_dt,
- effective_time=effective_dt,
- expiry_time=expiry_dt,
- )
- )
- updated = True
- if updated:
- await self.db.commit()
- return updated
- async def update_sign_url(self, contact_phone: str, sign_flow_id: str, sign_url: str):
- """
- 根据手机号 + sign_flow_id 更新 contract_url 列表中对应项的 sign_url
- """
- result = await self.db.execute(
- ContractStore.__table__.select().where(ContractStore.contact_phone == contact_phone)
- )
- rows = result.mappings().all()
- updated = False
- for row in rows:
- contract_url_raw = row.get("contract_url")
- if not contract_url_raw:
- continue
- try:
- items = json.loads(contract_url_raw)
- except Exception:
- items = None
- if not isinstance(items, list):
- continue
- changed = False
- for item in items:
- if item.get("sign_flow_id") == sign_flow_id:
- item["sign_url"] = sign_url
- changed = True
- if changed:
- await self.db.execute(
- ContractStore.__table__.update()
- .where(ContractStore.id == row["id"])
- .values(contract_url=json.dumps(items, ensure_ascii=False))
- )
- updated = True
- if updated:
- await self.db.commit()
- return updated
- async def append_contract_url(self, templates_data, contract_item: dict):
- """
- 根据手机号,向 contract_url(JSON 列表)追加新的合同信息;
- 若手机号不存在,则创建新记录。
- """
- contact_phone = getattr(templates_data, "contact_phone", None)
- result = await self.db.execute(
- ContractStore.__table__.select().where(ContractStore.contact_phone == contact_phone)
- )
- rows = result.mappings().all()
- updated = False
- if rows:
- for row in rows:
- contract_url_raw = row.get("contract_url")
- try:
- items = json.loads(contract_url_raw) if contract_url_raw else []
- except Exception:
- items = []
- if not isinstance(items, list):
- items = []
- items.append(contract_item)
- await self.db.execute(
- ContractStore.__table__.update()
- .where(ContractStore.id == row["id"])
- .values(contract_url=json.dumps(items, ensure_ascii=False))
- )
- updated = True
- if updated:
- await self.db.commit()
- return updated
- # 未找到则创建新记录
- new_record = ContractStore(
- store_id=getattr(templates_data, "store_id", None),
- business_segment=getattr(templates_data, "business_segment", None),
- merchant_name=getattr(templates_data, "merchant_name", None),
- contact_phone=contact_phone,
- contract_url=json.dumps([contract_item], ensure_ascii=False),
- seal_url='0.0',
- signing_status='未签署'
- )
- self.db.add(new_record)
- await self.db.commit()
- await self.db.refresh(new_record)
- return True
|