| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- """
- 合同相关定时任务
- """
- from datetime import datetime, timedelta
- from sqlalchemy import create_engine, select
- from sqlalchemy.orm import Session
- from alien_gateway.config import settings
- from alien_contract.db.models.bundle import ContractBundle
- from alien_contract.db.models.document import ContractDocument
- from alien_util.celery_app import celery_app
- from common.aliyun_sms_server.sms_client import send_sms
- import logging
- # 配置日志
- logger = logging.getLogger(__name__)
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s [%(levelname)s] %(name)s %(message)s"
- )
- # 全局数据库引擎(避免重复创建)
- _sync_engine = None
- def get_sync_db_session():
- """
- 获取同步数据库会话(用于 Celery 任务)
- Celery 任务通常使用同步代码,所以需要同步数据库连接
- """
- global _sync_engine
- if _sync_engine is None:
- # 使用同步数据库连接
- database_url = settings.SQLALCHEMY_DATABASE_URI
- _sync_engine = create_engine(
- database_url,
- pool_pre_ping=True,
- pool_size=5,
- max_overflow=10
- )
- return Session(_sync_engine)
- @celery_app.task(name="alien_util.tasks.contract_tasks.check_contract_expiry")
- def check_contract_expiry(subject_type: str | None = "store"):
- """
- 检查合同到期时间,如果距离到期不足15天,发送提醒短信
- 每天凌晨0点1分执行
- :param subject_type: 签约主体类型过滤,默认 "store"(仅商家);传 None 则不过滤(store+lawyer 全部)
- """
- logger.info("开始执行合同到期检查任务 subject_type=%s", subject_type)
-
- try:
- # 获取数据库会话
- db = get_sync_db_session()
-
- try:
- now = datetime.now()
- # 计算15天后的日期
- check_date = now + timedelta(days=15)
- # 合同中心:仅对"主合同(is_primary=1) 且 已签署(status=1)"发提醒,
- # 避免同一合同包内多份子合同(支付宝授权函/微信承诺函等)重复发送
- conditions = [
- ContractDocument.is_primary == 1,
- ContractDocument.status == 1,
- ContractDocument.expiry_time.isnot(None),
- ContractDocument.expiry_time <= check_date,
- ContractDocument.expiry_time >= now,
- ContractDocument.delete_flag == 0,
- ContractBundle.delete_flag == 0,
- ]
- if subject_type:
- conditions.append(ContractBundle.subject_type == subject_type)
- stmt = (
- select(
- ContractDocument.id,
- ContractDocument.expiry_time,
- ContractBundle.subject_name,
- ContractBundle.contact_phone,
- ContractBundle.subject_type,
- )
- .join(ContractBundle, ContractDocument.bundle_id == ContractBundle.id)
- .where(*conditions)
- )
- contracts = db.execute(stmt).all()
- logger.info(f"找到 {len(contracts)} 条即将到期的合同")
- # 遍历即将到期的合同,发送提醒短信
- for contract in contracts:
- try:
- # 计算距离到期的天数
- days_until_expiry = (contract.expiry_time - now).days
- logger.info(
- f"合同文档ID: {contract.id}, "
- f"主体类型: {contract.subject_type}, "
- f"主体名称: {contract.subject_name}, "
- f"联系电话: {contract.contact_phone}, "
- f"到期时间: {contract.expiry_time}, "
- f"距离到期: {days_until_expiry} 天"
- )
- send_expiry_reminder_sms(
- contract.contact_phone,
- contract.subject_name,
- settings.ALIYUN_SMS_SIGN_NAME_CONTRACT,
- settings.ALIYUN_SMS_TEMPLATE_CODE_CONTRACT,
- )
- except Exception as e:
- logger.error(f"处理合同文档ID {contract.id} 时出错: {e}", exc_info=True)
-
- logger.info("合同到期检查任务执行完成")
-
- finally:
- db.close()
-
- except Exception as e:
- logger.error(f"执行合同到期检查任务时出错: {e}", exc_info=True)
- raise
- def send_expiry_reminder_sms(phone: str, merchant_name: str, sign_name: str, template_code: str):
- """
- 发送合同到期提醒短信
- Args:
- phone: 联系电话
- merchant_name: 商家名称
- days_until_expiry: 距离到期的天数
- """
- template_param = {
- "name": merchant_name,
- }
- send_sms(phone, template_param, sign_name, template_code)
|