| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- from typing import Union
- from fastapi import APIRouter, Depends, Query
- from alien_contract.api.deps import get_contract_service
- from alien_contract.schemas.request.contract import BundleCreateRequest
- from alien_contract.schemas.response.contract import (
- ModuleStatusResponse,
- BundleCreateResponse,
- ErrorResponse,
- PaginatedBundleResponse,
- SuccessResponse,
- )
- from alien_contract.services.contract_server import ContractCenterService
- router = APIRouter()
- @router.get("/", response_model=ModuleStatusResponse)
- async def index() -> ModuleStatusResponse:
- return ModuleStatusResponse(module="ContractCenter", status="Ok")
- @router.post("/bundles", response_model=Union[BundleCreateResponse, ErrorResponse])
- async def create_bundle(
- request: BundleCreateRequest,
- service: ContractCenterService = Depends(get_contract_service),
- ) -> Union[BundleCreateResponse, ErrorResponse]:
- result = await service.create_bundle(request)
- if not result.get("success"):
- return ErrorResponse(**result)
- return BundleCreateResponse(**result)
- @router.get("/bundles", response_model=PaginatedBundleResponse)
- async def list_bundles(
- subject_type: str = Query(...),
- subject_id: int = Query(..., gt=0),
- page: int = Query(1, ge=1),
- page_size: int = Query(10, ge=1, le=100),
- service: ContractCenterService = Depends(get_contract_service),
- ) -> PaginatedBundleResponse:
- result = await service.list_bundles(subject_type, subject_id, page, page_size)
- return PaginatedBundleResponse(**result)
- @router.get("/documents/{sign_flow_id}", response_model=Union[dict, ErrorResponse])
- async def get_document_detail(
- sign_flow_id: str,
- service: ContractCenterService = Depends(get_contract_service),
- ) -> Union[dict, ErrorResponse]:
- result = await service.get_document_detail(sign_flow_id)
- if not result.get("success", True):
- return ErrorResponse(**result)
- return result
- @router.post("/esign/callback", response_model=Union[SuccessResponse, ErrorResponse])
- async def esign_callback(
- payload: dict,
- service: ContractCenterService = Depends(get_contract_service),
- ) -> Union[SuccessResponse, ErrorResponse]:
- result = await service.process_esign_callback(payload)
- if not result.get("success"):
- return ErrorResponse(**result)
- return SuccessResponse(code=result["code"], msg=result["msg"])
|