import datetime from fastapi import APIRouter, Depends, Query from typing import Any, Union, Optional import os import logging from pydantic import ValidationError from alien_store.api.deps import get_contract_service from alien_store.schemas.request.contract_store import TemplatesCreate from alien_store.schemas.response.contract_store import ( ModuleStatusResponse, TemplatesCreateResponse, ErrorResponse, ContractStoreResponse, PaginatedResponse, SuccessResponse ) from alien_store.services.contract_server import ContractServer from common.esigntool.main import * from common.esigntool import main as esign_main import re, urllib.parse # ------------------- 日志配置 ------------------- LOG_DIR = os.path.join("common", "logs", "alien_store") os.makedirs(LOG_DIR, exist_ok=True) def _init_logger(): logger = logging.getLogger("alien_store") if logger.handlers: return logger logger.setLevel(logging.INFO) fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s %(message)s") info_handler = logging.FileHandler(os.path.join(LOG_DIR, "info.log"), encoding="utf-8") info_handler.setLevel(logging.INFO) info_handler.setFormatter(fmt) error_handler = logging.FileHandler(os.path.join(LOG_DIR, "error.log"), encoding="utf-8") error_handler.setLevel(logging.ERROR) error_handler.setFormatter(fmt) logger.addHandler(info_handler) logger.addHandler(error_handler) # 控制台可选: logger.addHandler(logging.StreamHandler()) return logger logger = _init_logger() router = APIRouter() def _format_validation_errors(exc: ValidationError) -> list[dict[str, str]]: errors = [] for err in exc.errors(): loc = err.get("loc", ()) field = ".".join(str(item) for item in loc if item != "body") errors.append( { "field": field or "body", "type": err.get("type", "validation_error"), "message": err.get("msg", "参数校验失败"), } ) return errors @router.get("/", response_model=ModuleStatusResponse) async def index() -> ModuleStatusResponse: return ModuleStatusResponse(module="Contract", status="Ok") @router.post("/get_esign_templates", response_model=Union[TemplatesCreateResponse, ErrorResponse]) async def create_esign_templates( templates_data: dict[str, Any], templates_server: ContractServer = Depends(get_contract_service) ) -> Union[TemplatesCreateResponse, ErrorResponse]: """AI审核完调用 e签宝生成文件""" try: templates_data = TemplatesCreate.model_validate(templates_data) except ValidationError as e: detail = _format_validation_errors(e) logger.error("get_esign_templates validation failed: %s", detail) return ErrorResponse( success=False, message="请求参数校验失败", raw={"errors": detail}, ) logger.info(f"get_esign_templates request: {templates_data}") # res_text = fill_in_template(templates_data.merchant_name) res_text = fill_in_template(templates_data.store_name) try: res_data = json.loads(res_text) except json.JSONDecodeError: logger.error(f"fill_in_template non-json resp: {res_text}") return ErrorResponse(success=False, message="e签宝返回非 JSON", raw=res_text) # 从返回结构提取下载链接,需与实际返回字段匹配 try: contract_url = res_data["data"]["fileDownloadUrl"] file_id = res_data["data"]["fileId"] m = re.search(r'/([^/]+)\.pdf', contract_url) if m: encoded_name = m.group(1) file_name = urllib.parse.unquote(encoded_name) except Exception: logger.error(f"fill_in_template missing fileDownloadUrl: {res_data}") return ErrorResponse(success=False, message="e签宝返回缺少 fileDownloadUrl", raw=res_data) # sign_data = create_by_file(file_id, file_name, templates_data.contact_phone, templates_data.merchant_name) sign_data = create_by_file(file_id, file_name, templates_data.contact_phone, templates_data.store_name, templates_data.merchant_name, templates_data.ord_id) print(sign_data) try: sign_json = json.loads(sign_data) except json.JSONDecodeError: logger.error(f"create_by_file non-json resp: {sign_data}") return ErrorResponse(success=False, message="e签宝 create_by_file 返回非 JSON", raw=sign_data) if not sign_json.get("data"): logger.error(f"create_by_file failed or missing data: {sign_json}") return ErrorResponse(success=False, message="e签宝创建签署流程失败", raw=sign_json) sing_id = sign_json["data"].get("signFlowId") if not sing_id: logger.error(f"create_by_file missing signFlowId: {sign_json}") return ErrorResponse(success=False, message="e签宝返回缺少 signFlowId", raw=sign_json) result_contract = { "contract_url": contract_url, # 合同模版链接 "file_name": file_name, # 签署的合同的文件名称 "file_id": file_id, # 生成的文件ID "status": 0, # 签署状态 0 未签署 1 已签署 "sign_flow_id": sing_id, # 从 "sign_url": "", # e签宝生成的签署页面 "signing_time": "", # 签署合同的时间 "effective_time": "", # 合同生效的时间 "expiry_time": "", # 合同失效的时间 "contract_download_url": "", # 合同签署完成后 下载文件的链接 "is_master": 1 # 是否是入驻店铺的协议合同 是 1 否 0 } updated = await templates_server.append_contract_url(templates_data, result_contract) logger.info(f"get_esign_templates success contact_phone={templates_data.contact_phone}, sign_flow_id={sing_id}") return TemplatesCreateResponse( success=True, message="合同模板已追加/创建", sign_flow_id=sing_id, file_id=file_id, contract_url=contract_url ) @router.get("/contracts/{store_id}", response_model=Union[dict, Any]) async def list_contracts( store_id: int, status: Optional[int] = Query(None, description="筛选合同状态:0 未签署,1 已签署"), page: int = Query(1, ge=1, description="页码,从1开始"), page_size: int = Query(10, ge=1, le=100, description="每页条数,默认10"), templates_server: ContractServer = Depends(get_contract_service) ) -> Any: """根据 store_id 查询所有合同,支持根据 status 筛选和分页""" logger.info( "list_contracts request store_id=%s status=%s page=%s page_size=%s", store_id, status, page, page_size, ) try: # 1. 检查 store_info 中的审核状态 reason = await templates_server.get_store_reason(store_id) if reason != "审核通过": return {"code": 555, "msg": "先进行认证", "reason": reason} # 2. 返回合同列表 rows = await templates_server.list_by_store(store_id) all_filtered_items = [] # 3. 解析并筛选所有符合条件的合同项 for row in rows: contract_url_raw = row.get("contract_url") if not contract_url_raw: continue try: items = json.loads(contract_url_raw) if not isinstance(items, list): continue for item in items: # 如果传了 status,则进行筛选 if status is not None and item.get("status") != status: continue # 将店铺基础信息混入每个合同项中,方便前端展示 item_with_info = dict(item) item_with_info["id"] = row.get("id") item_with_info["store_id"] = row.get("store_id") item_with_info["store_name"] = row.get("store_name") item_with_info["merchant_name"] = row.get("merchant_name") item_with_info["contact_phone"] = row.get("contact_phone") all_filtered_items.append(item_with_info) except Exception as e: logger.error(f"Error processing contracts for store_id {store_id}: {e}", exc_info=True) continue # 4. 手动分页 total = len(all_filtered_items) start = (page - 1) * page_size end = start + page_size paged_items = all_filtered_items[start:end] total_pages = (total + page_size - 1) // page_size if total > 0 else 0 return { "items": paged_items, "total": total, "page": page, "page_size": page_size, "total_pages": total_pages } except Exception as e: logger.error(f"list_contracts failed store_id={store_id}: {e}", exc_info=True) return {"code": 500, "msg": "查询合同失败", "error": str(e)} @router.get("/contracts/detail/{sign_flow_id}", response_model=Union[dict, ErrorResponse]) async def get_contract_detail( sign_flow_id: str, templates_server: ContractServer = Depends(get_contract_service) ) -> Union[dict, ErrorResponse]: """ 根据 sign_flow_id 获取合同详情 - status=0: 返回合同PDF链接(contract_url)和签署链接(sign_url) - status=1: 拉取最新下载链接并更新数据库,返回 contract_download_url """ row, item, items = await templates_server.get_contract_item_by_sign_flow_id(sign_flow_id) if not item: return ErrorResponse(success=False, message="未找到合同") status = item.get("status") if status == 0: file_id = item.get("file_id") if not file_id: return ErrorResponse(success=False, message="缺少 file_id,无法获取合同详情") try: detail_resp = esign_main.get_contract_detail(file_id) detail_json = json.loads(detail_resp) data = detail_json.get("data") if isinstance(detail_json, dict) else None contract_url = None if isinstance(data, dict): contract_url = data.get("fileDownloadUrl") if not contract_url and isinstance(detail_json, dict): contract_url = detail_json.get("fileDownloadUrl") except Exception as e: logger.error(f"get_contract_detail failed file_id={file_id}: {e}") return ErrorResponse(success=False, message="获取合同链接失败", raw=str(e)) if not contract_url: logger.error(f"get_contract_detail missing contract_url file_id={file_id}: {detail_resp}") return ErrorResponse(success=False, message="e签宝返回缺少合同链接", raw=detail_resp) if row and isinstance(items, list): for it in items: if it.get("sign_flow_id") == sign_flow_id: it["contract_url"] = contract_url break await templates_server.update_contract_items(row["id"], items) # 融合原 /esign/signurl 逻辑:调用 e签宝 获取签署链接并落库 contact_phone = item.get("contact_phone") or (row.get("contact_phone") if isinstance(row, dict) else None) if not contact_phone: return ErrorResponse(success=False, message="缺少 contact_phone,无法获取签署链接") try: sign_resp = sign_url(sign_flow_id, contact_phone) sign_json = json.loads(sign_resp) except json.JSONDecodeError: logger.error(f"sign_url non-json resp: {sign_resp}") return ErrorResponse(success=False, message="e签宝返回非JSON", raw=sign_resp) except Exception as e: logger.error(f"sign_url failed sign_flow_id={sign_flow_id}, contact_phone={contact_phone}: {e}") return ErrorResponse(success=False, message="获取签署链接失败", raw=str(e)) sign_data = sign_json.get("data") if isinstance(sign_json, dict) else None result_sign_url = sign_data.get("url") if isinstance(sign_data, dict) else None if not result_sign_url: logger.error(f"sign_url missing url: {sign_json}") return ErrorResponse(success=False, message="e签宝返回缺少签署链接", raw=sign_json) await templates_server.update_sign_url(contact_phone, sign_flow_id, result_sign_url) return { "status": 0, "contract_url": contract_url, "sign_url": result_sign_url, "sign_flow_id": sign_flow_id } if status == 1: try: download_resp = file_download_url(sign_flow_id) download_json = json.loads(download_resp) contract_download_url = download_json["data"]["files"][0]["downloadUrl"] except Exception as e: logger.error(f"file_download_url failed sign_flow_id={sign_flow_id}: {e}") return ErrorResponse(success=False, message="获取合同下载链接失败", raw=str(e)) if row and isinstance(items, list): for it in items: if it.get("sign_flow_id") == sign_flow_id: it["contract_download_url"] = contract_download_url it["contract_url"] = contract_download_url # 与 status=0 一致,用 file_download_url 更新最新 contract_url break await templates_server.update_contract_items(row["id"], items) return { "status": 1, "contract_url": contract_download_url, "contract_download_url": contract_download_url, "sign_flow_id": sign_flow_id } return ErrorResponse(success=False, message="未知合同状态", raw={"status": status}) @router.get("/get_all_templates", response_model=PaginatedResponse) async def get_all_templates( page: int = Query(1, ge=1, description="页码,从1开始"), page_size: int = Query(10, ge=1, le=100, description="每页条数,默认10"), store_name: Optional[str] = Query(None, description="店铺名称(模糊查询)"), merchant_name: Optional[str] = Query(None, description="商家姓名(模糊查询)"), signing_status: Optional[str] = Query(None, description="签署状态"), business_segment: Optional[str] = Query(None, description="经营板块"), store_status: Optional[str] = Query(None, description="店铺状态:正常/禁用"), expiry_start: Optional[datetime] = Query(None, description="到期时间起"), expiry_end: Optional[datetime] = Query(None, description="到期时间止"), templates_server: ContractServer = Depends(get_contract_service) ) -> PaginatedResponse: """分页查询所有合同,支持筛选""" rows, total = await templates_server.list_all_paged( page, page_size, store_name=store_name, merchant_name=merchant_name, signing_status=signing_status, business_segment=business_segment, store_status=store_status, expiry_start=expiry_start, expiry_end=expiry_end, ) total_pages = (total + page_size - 1) // page_size if total > 0 else 0 items = [ContractStoreResponse(**row) for row in rows] return PaginatedResponse( items=items, total=total, page=page, page_size=page_size, total_pages=total_pages ) @router.post("/esign/callback", response_model=Union[SuccessResponse, ErrorResponse]) async def esign_callback( payload: dict, templates_server: ContractServer = Depends(get_contract_service) ) -> Union[SuccessResponse, ErrorResponse]: """ e签宝签署结果回调 需求:签署完成 -> 更新 signing_status=已签署,contract_url 中 status=1 """ sign_result = payload.get("signResult") operator = payload.get("operator") or {} sign_flow_id = payload.get("signFlowId") psn_account = operator.get("psnAccount") or {} contact_phone = psn_account.get("accountMobile") # 取回调中的毫秒时间戳,优先 operateTime,其次 timestamp ts_ms = payload.get("operateTime") or payload.get("timestamp") signing_dt = None if ts_ms: try: signing_dt = datetime.fromtimestamp(ts_ms / 1000) except Exception: signing_dt = None if sign_result == 2: # 获取合同下载链接 contract_download_url = None try: download_resp = file_download_url(sign_flow_id) download_json = json.loads(download_resp) contract_download_url = download_json["data"]["files"][0]["downloadUrl"] except Exception as e: logger.error(f"file_download_url failed for sign_flow_id={sign_flow_id}: {e}") updated = await templates_server.mark_signed_by_phone(contact_phone, sign_flow_id, signing_dt, contract_download_url) logger.info(f"esign_callback success phone={contact_phone}, sign_flow_id={sign_flow_id}, updated={updated}") return SuccessResponse(code="200", msg="success") logger.error(f"esign_callback ignored payload: {payload}") return ErrorResponse(success=False, message="未处理: signResult!=2 或手机号/签署流程缺失") # @router.post("/esign/callback_auth", response_model=SuccessResponse) # async def esign_callback_auth( # payload: dict, # templates_server: ContractServer = Depends(get_contract_service) # ) -> SuccessResponse: # logger.info(f"esign_callback_auth payload: {payload}") # return SuccessResponse(code="200", msg="success")