""" 合同相关定时任务 """ from datetime import datetime, timedelta from sqlalchemy import create_engine, select from sqlalchemy.orm import Session from alien_gateway.config import settings from alien_store.db.models.contract_store import ContractStore from alien_util.celery_app import celery_app from common.aliyun_sms_server.sms_client import send_sms import logging # 配置日志 logger = logging.getLogger(__name__) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s %(message)s" ) # 全局数据库引擎(避免重复创建) _sync_engine = None def get_sync_db_session(): """ 获取同步数据库会话(用于 Celery 任务) Celery 任务通常使用同步代码,所以需要同步数据库连接 """ global _sync_engine if _sync_engine is None: # 使用同步数据库连接 database_url = settings.SQLALCHEMY_DATABASE_URI _sync_engine = create_engine( database_url, pool_pre_ping=True, pool_size=5, max_overflow=10 ) return Session(_sync_engine) @celery_app.task(name="alien_util.tasks.contract_tasks.check_contract_expiry") def check_contract_expiry(): """ 检查合同到期时间,如果距离到期不足15天,发送提醒短信 每天凌晨0点1分执行 """ logger.info("开始执行合同到期检查任务") try: # 获取数据库会话 db = get_sync_db_session() try: # 计算15天后的日期 check_date = datetime.now() + timedelta(days=15) # 查询即将到期的合同(expiry_time 在15天内,且不为空) # 只查询已签署的合同(signing_status = '已签署') stmt = select(ContractStore).where( ContractStore.expiry_time.isnot(None), ContractStore.expiry_time <= check_date, ContractStore.expiry_time >= datetime.now(), ContractStore.signing_status == "已签署", ContractStore.delete_flag == 0 ) result = db.execute(stmt) contracts = result.scalars().all() logger.info(f"找到 {len(contracts)} 条即将到期的合同") # 遍历即将到期的合同,发送提醒短信 for contract in contracts: try: # 计算距离到期的天数 days_until_expiry = (contract.expiry_time - datetime.now()).days logger.info( f"合同ID: {contract.id}, " f"商家: {contract.merchant_name}, " f"联系电话: {contract.contact_phone}, " f"到期时间: {contract.expiry_time}, " f"距离到期: {days_until_expiry} 天" ) send_expiry_reminder_sms(contract.contact_phone, contract.merchant_name, settings.ALIYUN_SMS_SIGN_NAME_CONTRACT, settings.ALIYUN_SMS_TEMPLATE_CODE_CONTRACT) except Exception as e: logger.error(f"处理合同ID {contract.id} 时出错: {e}", exc_info=True) logger.info("合同到期检查任务执行完成") finally: db.close() except Exception as e: logger.error(f"执行合同到期检查任务时出错: {e}", exc_info=True) raise def send_expiry_reminder_sms(phone: str, merchant_name: str, sign_name: str, template_code: str): """ 发送合同到期提醒短信 Args: phone: 联系电话 merchant_name: 商家名称 days_until_expiry: 距离到期的天数 """ template_param = { "name": merchant_name, } send_sms(phone, template_param, sign_name, template_code)