""" 合同相关定时任务 """ from datetime import datetime, timedelta from sqlalchemy import create_engine, select from sqlalchemy.orm import Session from alien_gateway.config import settings from alien_store.db.models.contract_store import ContractStore from alien_util.celery_app import celery_app import logging # 配置日志 logger = logging.getLogger(__name__) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s %(message)s" ) # 全局数据库引擎(避免重复创建) _sync_engine = None def get_sync_db_session(): """ 获取同步数据库会话(用于 Celery 任务) Celery 任务通常使用同步代码,所以需要同步数据库连接 """ global _sync_engine if _sync_engine is None: # 使用同步数据库连接 database_url = settings.SQLALCHEMY_DATABASE_URI _sync_engine = create_engine( database_url, pool_pre_ping=True, pool_size=5, max_overflow=10 ) return Session(_sync_engine) @celery_app.task(name="alien_util.tasks.contract_tasks.check_contract_expiry") def check_contract_expiry(): """ 检查合同到期时间,如果距离到期不足15天,发送提醒短信 每天凌晨0点1分执行 """ logger.info("开始执行合同到期检查任务") try: # 获取数据库会话 db = get_sync_db_session() try: # 计算15天后的日期 check_date = datetime.now() + timedelta(days=15) # 查询即将到期的合同(expiry_time 在15天内,且不为空) # 只查询已签署的合同(signing_status = '已签署') stmt = select(ContractStore).where( ContractStore.expiry_time.isnot(None), ContractStore.expiry_time <= check_date, ContractStore.expiry_time >= datetime.now(), ContractStore.signing_status == "已签署", ContractStore.delete_flag == 0 ) result = db.execute(stmt) contracts = result.scalars().all() logger.info(f"找到 {len(contracts)} 条即将到期的合同") # 遍历即将到期的合同,发送提醒短信 for contract in contracts: try: # 计算距离到期的天数 days_until_expiry = (contract.expiry_time - datetime.now()).days logger.info( f"合同ID: {contract.id}, " f"商家: {contract.merchant_name}, " f"联系电话: {contract.contact_phone}, " f"到期时间: {contract.expiry_time}, " f"距离到期: {days_until_expiry} 天" ) # 发送提醒短信(暂时不实现,只记录日志) send_expiry_reminder_sms(contract.contact_phone, contract.merchant_name, days_until_expiry) except Exception as e: logger.error(f"处理合同ID {contract.id} 时出错: {e}", exc_info=True) logger.info("合同到期检查任务执行完成") finally: db.close() except Exception as e: logger.error(f"执行合同到期检查任务时出错: {e}", exc_info=True) raise def send_expiry_reminder_sms(phone: str, merchant_name: str, days_until_expiry: int): """ 发送合同到期提醒短信 Args: phone: 联系电话 merchant_name: 商家名称 days_until_expiry: 距离到期的天数 """ # TODO: 实现短信发送功能 # 这里暂时只记录日志,后续可以接入短信服务商API message = ( f"【合同到期提醒】尊敬的{merchant_name},您的合同将在{days_until_expiry}天后到期," f"请及时续签。如有疑问,请联系客服。" ) logger.info(f"准备发送短信到 {phone}: {message}") # 示例:调用短信服务API # sms_service.send(phone, message) # 暂时只记录日志 logger.info(f"[模拟] 已发送提醒短信到 {phone}")