import datetime import os import logging import json from typing import Any, Union, Optional from sqlalchemy.ext.asyncio import AsyncSession from alien_store.repositories.contract_repo import ContractRepository from alien_store.schemas.request.contract_store import TemplatesCreate from alien_contract.infrastructure.esign import main as esign_main from alien_contract.infrastructure.esign.main import sign_url, file_download_url from alien_contract.infrastructure.esign.contract_builder import build_contract_items, ContractBuildError # ------------------- 日志配置 ------------------- LOG_DIR = os.path.join("common", "logs", "alien_store") os.makedirs(LOG_DIR, exist_ok=True) def _init_logger(): logger = logging.getLogger("alien_store_service") if logger.handlers: return logger logger.setLevel(logging.INFO) fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s %(message)s") info_handler = logging.FileHandler(os.path.join(LOG_DIR, "info.log"), encoding="utf-8") info_handler.setLevel(logging.INFO) info_handler.setFormatter(fmt) error_handler = logging.FileHandler(os.path.join(LOG_DIR, "error.log"), encoding="utf-8") error_handler.setLevel(logging.ERROR) error_handler.setFormatter(fmt) logger.addHandler(info_handler) logger.addHandler(error_handler) return logger logger = _init_logger() CONTRACT_CREATE_CONFIGS = [ ("store_agreement", "店铺入驻协议", 1), ("alipay_auth", "支付宝授权函", 0), ("wechat_pay_commitment", "微信支付承诺函", 0), ] class ContractServer: def __init__(self, db: AsyncSession): self.db = db self.esign_repo = ContractRepository(db) async def create_template(self, template_data: TemplatesCreate): await self.esign_repo.create(template_data) return { "message": "模板创建成功" } async def create_esign_templates(self, templates_data: TemplatesCreate) -> dict: """AI审核完调用 e签宝生成文件""" logger.info(f"create_esign_templates request: {templates_data}") try: generated_contracts = build_contract_items( configs=CONTRACT_CREATE_CONFIGS, template_name=templates_data.store_name, signer_name=templates_data.store_name, signer_id_num=templates_data.ord_id, psn_account=templates_data.contact_phone, psn_name=templates_data.merchant_name, ) except ContractBuildError as exc: return {"success": False, "message": exc.message, "raw": exc.raw} for contract_item in generated_contracts: await self.esign_repo.append_contract_url(templates_data, contract_item) master_contract = next((item for item in generated_contracts if item.get("is_master") == 1), generated_contracts[0]) logger.info( "create_esign_templates success contact_phone=%s master_sign_flow_id=%s all_sign_flow_ids=%s", templates_data.contact_phone, master_contract.get("sign_flow_id"), [item.get("sign_flow_id") for item in generated_contracts], ) return { "success": True, "message": "合同模板已追加/创建", "sign_flow_id": master_contract.get("sign_flow_id"), "file_id": master_contract.get("file_id"), "contract_url": master_contract.get("contract_url"), "created_contracts": [ { "contract_type": item["contract_type"], "contract_name": item["contract_name"], "sign_flow_id": item["sign_flow_id"], "file_id": item["file_id"], "contract_url": item["contract_url"], } for item in generated_contracts ], } async def list_contracts(self, store_id: int, status: Optional[int], page: int, page_size: int) -> dict: """根据 store_id 查询所有合同,支持根据 status 筛选和分页""" logger.info( "list_contracts request store_id=%s status=%s page=%s page_size=%s", store_id, status, page, page_size ) # 1. 检查 store_info 中的审核状态 reason = await self.esign_repo.check_store_status(store_id) if reason != "审核通过": return {"code": 555, "msg": "先进行认证", "reason": reason} # 2. 获取原始数据 rows = await self.esign_repo.get_by_store_id(store_id) all_filtered_items = [] # 3. 解析并筛选 for row in rows: contract_url_raw = row.get("contract_url") if not contract_url_raw: continue try: items = json.loads(contract_url_raw) if not isinstance(items, list): continue for item in items: if status is not None and item.get("status") != status: continue item_with_info = dict(item) item_with_info.update({ "id": row.get("id"), "store_id": row.get("store_id"), "store_name": row.get("store_name"), "merchant_name": row.get("merchant_name"), "contact_phone": row.get("contact_phone") }) all_filtered_items.append(item_with_info) except Exception as e: logger.error(f"Error processing contracts for store_id {store_id}: {e}", exc_info=True) continue # 4. 手动分页 total = len(all_filtered_items) start = (page - 1) * page_size end = start + page_size paged_items = all_filtered_items[start:end] total_pages = (total + page_size - 1) // page_size if total > 0 else 0 return { "items": paged_items, "total": total, "page": page, "page_size": page_size, "total_pages": total_pages } async def get_contract_detail(self, sign_flow_id: str) -> dict: """获取合同详情""" row, item, items = await self.esign_repo.get_contract_item_by_sign_flow_id(sign_flow_id) if not item: return {"success": False, "message": "未找到合同"} status = item.get("status") if status == 0: return await self._get_pending_contract_detail(sign_flow_id, row, item, items) elif status == 1: return await self._get_signed_contract_detail(sign_flow_id, row, item, items) return {"success": False, "message": "未知合同状态", "raw": {"status": status}} async def _get_pending_contract_detail(self, sign_flow_id: str, row, item, items) -> dict: file_id = item.get("file_id") if not file_id: return {"success": False, "message": "缺少 file_id,无法获取合同详情"} try: detail_resp = esign_main.get_contract_detail(file_id) detail_json = json.loads(detail_resp) data = detail_json.get("data") if isinstance(detail_json, dict) else None contract_url_val = None if isinstance(data, dict): contract_url_val = data.get("fileDownloadUrl") if not contract_url_val and isinstance(detail_json, dict): contract_url_val = detail_json.get("fileDownloadUrl") except Exception as e: logger.error(f"get_contract_detail failed file_id={file_id}: {e}") return {"success": False, "message": "获取合同链接失败", "raw": str(e)} if not contract_url_val: logger.error(f"get_contract_detail missing contract_url file_id={file_id}: {detail_resp}") return {"success": False, "message": "e签宝返回缺少合同链接", "raw": detail_resp} # 更新数据库中的合同链接 if row and isinstance(items, list): for it in items: if it.get("sign_flow_id") == sign_flow_id: it["contract_url"] = contract_url_val break await self.esign_repo.update_contract_items(row["id"], items) # 获取签署链接 contact_phone = item.get("contact_phone") or (row.get("contact_phone") if isinstance(row, dict) else None) if not contact_phone: return {"success": False, "message": "缺少 contact_phone,无法获取签署链接"} try: sign_resp = sign_url(sign_flow_id, contact_phone) sign_json = json.loads(sign_resp) sign_data = sign_json.get("data") if isinstance(sign_json, dict) else None result_sign_url = sign_data.get("url") if isinstance(sign_data, dict) else None except Exception as e: logger.error(f"sign_url failed sign_flow_id={sign_flow_id}, contact_phone={contact_phone}: {e}") return {"success": False, "message": "获取签署链接失败", "raw": str(e)} if not result_sign_url: logger.error(f"sign_url missing url: {sign_json}") return {"success": False, "message": "e签宝返回缺少签署链接", "raw": sign_json} await self.esign_repo.update_sign_url(contact_phone, sign_flow_id, result_sign_url) return { "status": 0, "contract_url": contract_url_val, "sign_url": result_sign_url, "sign_flow_id": sign_flow_id } async def _get_signed_contract_detail(self, sign_flow_id: str, row, item, items) -> dict: try: download_resp = file_download_url(sign_flow_id) download_json = json.loads(download_resp) contract_download_url = download_json["data"]["files"][0]["downloadUrl"] except Exception as e: logger.error(f"file_download_url failed sign_flow_id={sign_flow_id}: {e}") return {"success": False, "message": "获取合同下载链接失败", "raw": str(e)} if row and isinstance(items, list): for it in items: if it.get("sign_flow_id") == sign_flow_id: it["contract_download_url"] = contract_download_url it["contract_url"] = contract_download_url break await self.esign_repo.update_contract_items(row["id"], items) return { "status": 1, "contract_url": contract_download_url, "contract_download_url": contract_download_url, "sign_flow_id": sign_flow_id } async def process_esign_callback(self, payload: dict) -> dict: """处理 e签宝 回调""" sign_result = payload.get("signResult") sign_flow_id = payload.get("signFlowId") operator = payload.get("operator") or {} psn_account = operator.get("psnAccount") or {} contact_phone = psn_account.get("accountMobile") ts_ms = payload.get("operateTime") or payload.get("timestamp") signing_dt = None if ts_ms: try: signing_dt = datetime.datetime.fromtimestamp(ts_ms / 1000) except Exception: signing_dt = None if sign_result == 2: contract_download_url = None try: download_resp = file_download_url(sign_flow_id) download_json = json.loads(download_resp) contract_download_url = download_json["data"]["files"][0]["downloadUrl"] except Exception as e: logger.error(f"file_download_url failed for sign_flow_id={sign_flow_id}: {e}") updated = await self.esign_repo.mark_signed_by_phone(contact_phone, sign_flow_id, signing_dt, contract_download_url) logger.info(f"esign_callback success phone={contact_phone}, sign_flow_id={sign_flow_id}, updated={updated}") return {"success": True, "code": "200", "msg": "success"} logger.error(f"esign_callback ignored payload: {payload}") return {"success": False, "message": "未处理: signResult!=2 或手机号/签署流程缺失"} async def list_by_store(self, store_id: int): return await self.esign_repo.get_by_store_id(store_id) async def get_store_reason(self, store_id: int) -> str | None: return await self.esign_repo.check_store_status(store_id) async def get_contract_item_by_sign_flow_id(self, sign_flow_id: str): return await self.esign_repo.get_contract_item_by_sign_flow_id(sign_flow_id) async def update_contract_items(self, row_id: int, items: list) -> bool: return await self.esign_repo.update_contract_items(row_id, items) async def list_all_paged( self, page: int, page_size: int = 10, store_name: str | None = None, merchant_name: str | None = None, signing_status: str | None = None, business_segment: str | None = None, store_status: str | None = None, expiry_start=None, expiry_end=None, ): items, total = await self.esign_repo.get_all_paged( page, page_size, store_name=store_name, merchant_name=merchant_name, signing_status=signing_status, business_segment=business_segment, store_status=store_status, expiry_start=expiry_start, expiry_end=expiry_end, ) return items, total async def mark_signed_by_phone(self, contact_phone: str, sign_flow_id: str, signing_time, contract_download_url): return await self.esign_repo.mark_signed_by_phone(contact_phone, sign_flow_id, signing_time, contract_download_url) async def update_sign_url(self, contact_phone: str, sign_flow_id: str, sign_url: str): return await self.esign_repo.update_sign_url(contact_phone, sign_flow_id, sign_url) async def append_contract_url(self, templates_data, contract_item: dict): return await self.esign_repo.append_contract_url(templates_data, contract_item)