from sqlalchemy.ext.asyncio import AsyncSession from alien_store.db.models.contract_store import ContractStore import json from datetime import datetime, timedelta class ContractRepository: """合同数据访问层""" def __init__(self, db: AsyncSession): self.db = db async def get_by_store_id(self, store_id: int): """根据店铺id查询所有合同""" result = await self.db.execute( ContractStore.__table__.select().where(ContractStore.store_id == store_id) ) # 返回列表[dict],避免 Pydantic 序列化 Row 对象出错 return [dict(row) for row in result.mappings().all()] async def get_all(self): """查询所有合同""" result = await self.db.execute(ContractStore.__table__.select()) return [dict(row) for row in result.mappings().all()] async def get_all_paged(self, page: int, page_size: int = 10): """分页查询所有合同""" offset = (page - 1) * page_size result = await self.db.execute( ContractStore.__table__.select().offset(offset).limit(page_size) ) return [dict(row) for row in result.mappings().all()] async def create(self, user_data): """创建未签署合同模板""" db_templates = ContractStore( store_id=user_data.store_id, merchant_name=user_data.merchant_name, business_segment=user_data.business_segment, contact_phone=user_data.contact_phone, contract_url=user_data.contract_url, seal_url='0.0', signing_status='未签署' ) self.db.add(db_templates) await self.db.commit() await self.db.refresh(db_templates) return db_templates async def mark_signed_by_phone(self, contact_phone: str, sign_flow_id: str, signing_time: datetime | None = None): """ 根据手机号 + sign_flow_id 将合同标记为已签署,只更新匹配的合同项 同时写入签署/生效/到期时间(签署时间=T,生效=T+1天,失效=生效+365天) """ result = await self.db.execute( ContractStore.__table__.select().where(ContractStore.contact_phone == contact_phone) ) rows = result.mappings().all() updated = False for row in rows: contract_url_raw = row.get("contract_url") items = None if contract_url_raw: try: items = json.loads(contract_url_raw) except Exception: items = None changed = False if isinstance(items, list): for item in items: if item.get("sign_flow_id") == sign_flow_id: item["status"] = 1 changed = True # 时间处理 signing_dt = signing_time effective_dt = expiry_dt = None if signing_dt: effective_dt = signing_dt + timedelta(days=1) expiry_dt = effective_dt + timedelta(days=365) if changed: await self.db.execute( ContractStore.__table__.update() .where(ContractStore.id == row["id"]) .values( signing_status="已签署", contract_url=json.dumps(items, ensure_ascii=False) if items else contract_url_raw, signing_time=signing_dt, effective_time=effective_dt, expiry_time=expiry_dt, ) ) updated = True if updated: await self.db.commit() return updated async def update_sign_url(self, contact_phone: str, sign_flow_id: str, sign_url: str): """ 根据手机号 + sign_flow_id 更新 contract_url 列表中对应项的 sign_url """ result = await self.db.execute( ContractStore.__table__.select().where(ContractStore.contact_phone == contact_phone) ) rows = result.mappings().all() updated = False for row in rows: contract_url_raw = row.get("contract_url") if not contract_url_raw: continue try: items = json.loads(contract_url_raw) except Exception: items = None if not isinstance(items, list): continue changed = False for item in items: if item.get("sign_flow_id") == sign_flow_id: item["sign_url"] = sign_url changed = True if changed: await self.db.execute( ContractStore.__table__.update() .where(ContractStore.id == row["id"]) .values(contract_url=json.dumps(items, ensure_ascii=False)) ) updated = True if updated: await self.db.commit() return updated async def append_contract_url(self, templates_data, contract_item: dict): """ 根据手机号,向 contract_url(JSON 列表)追加新的合同信息; 若手机号不存在,则创建新记录。 """ contact_phone = getattr(templates_data, "contact_phone", None) result = await self.db.execute( ContractStore.__table__.select().where(ContractStore.contact_phone == contact_phone) ) rows = result.mappings().all() updated = False if rows: for row in rows: contract_url_raw = row.get("contract_url") try: items = json.loads(contract_url_raw) if contract_url_raw else [] except Exception: items = [] if not isinstance(items, list): items = [] items.append(contract_item) await self.db.execute( ContractStore.__table__.update() .where(ContractStore.id == row["id"]) .values(contract_url=json.dumps(items, ensure_ascii=False)) ) updated = True if updated: await self.db.commit() return updated # 未找到则创建新记录 new_record = ContractStore( store_id=getattr(templates_data, "store_id", None), business_segment=getattr(templates_data, "business_segment", None), merchant_name=getattr(templates_data, "merchant_name", None), contact_phone=contact_phone, contract_url=json.dumps([contract_item], ensure_ascii=False), seal_url='0.0', signing_status='未签署' ) self.db.add(new_record) await self.db.commit() await self.db.refresh(new_record) return True