""" 合同相关定时任务 """ from datetime import datetime, timedelta from sqlalchemy import create_engine, select from sqlalchemy.orm import Session from alien_gateway.config import settings from alien_contract.db.models.bundle import ContractBundle from alien_contract.db.models.document import ContractDocument from alien_util.celery_app import celery_app from common.aliyun_sms_server.sms_client import send_sms import logging # 配置日志 logger = logging.getLogger(__name__) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s %(message)s" ) # 全局数据库引擎(避免重复创建) _sync_engine = None def get_sync_db_session(): """ 获取同步数据库会话(用于 Celery 任务) Celery 任务通常使用同步代码,所以需要同步数据库连接 """ global _sync_engine if _sync_engine is None: # 使用同步数据库连接 database_url = settings.SQLALCHEMY_DATABASE_URI _sync_engine = create_engine( database_url, pool_pre_ping=True, pool_size=5, max_overflow=10 ) return Session(_sync_engine) @celery_app.task(name="alien_util.tasks.contract_tasks.check_contract_expiry") def check_contract_expiry(subject_type: str | None = "store"): """ 检查合同到期时间,如果距离到期不足15天,发送提醒短信 每天凌晨0点1分执行 :param subject_type: 签约主体类型过滤,默认 "store"(仅商家);传 None 则不过滤(store+lawyer 全部) """ logger.info("开始执行合同到期检查任务 subject_type=%s", subject_type) try: # 获取数据库会话 db = get_sync_db_session() try: now = datetime.now() # 计算15天后的日期 check_date = now + timedelta(days=15) # 合同中心:仅对"主合同(is_primary=1) 且 已签署(status=1)"发提醒, # 避免同一合同包内多份子合同(支付宝授权函/微信承诺函等)重复发送 conditions = [ ContractDocument.is_primary == 1, ContractDocument.status == 1, ContractDocument.expiry_time.isnot(None), ContractDocument.expiry_time <= check_date, ContractDocument.expiry_time >= now, ContractDocument.delete_flag == 0, ContractBundle.delete_flag == 0, ] if subject_type: conditions.append(ContractBundle.subject_type == subject_type) stmt = ( select( ContractDocument.id, ContractDocument.expiry_time, ContractBundle.subject_name, ContractBundle.contact_phone, ContractBundle.subject_type, ) .join(ContractBundle, ContractDocument.bundle_id == ContractBundle.id) .where(*conditions) ) contracts = db.execute(stmt).all() logger.info(f"找到 {len(contracts)} 条即将到期的合同") # 遍历即将到期的合同,发送提醒短信 for contract in contracts: try: # 计算距离到期的天数 days_until_expiry = (contract.expiry_time - now).days logger.info( f"合同文档ID: {contract.id}, " f"主体类型: {contract.subject_type}, " f"主体名称: {contract.subject_name}, " f"联系电话: {contract.contact_phone}, " f"到期时间: {contract.expiry_time}, " f"距离到期: {days_until_expiry} 天" ) send_expiry_reminder_sms( contract.contact_phone, contract.subject_name, settings.ALIYUN_SMS_SIGN_NAME_CONTRACT, settings.ALIYUN_SMS_TEMPLATE_CODE_CONTRACT, ) except Exception as e: logger.error(f"处理合同文档ID {contract.id} 时出错: {e}", exc_info=True) logger.info("合同到期检查任务执行完成") finally: db.close() except Exception as e: logger.error(f"执行合同到期检查任务时出错: {e}", exc_info=True) raise def send_expiry_reminder_sms(phone: str, merchant_name: str, sign_name: str, template_code: str): """ 发送合同到期提醒短信 Args: phone: 联系电话 merchant_name: 商家名称 days_until_expiry: 距离到期的天数 """ template_param = { "name": merchant_name, } send_sms(phone, template_param, sign_name, template_code)