contract_tasks.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. """
  2. 合同相关定时任务
  3. """
  4. from datetime import datetime, timedelta
  5. from sqlalchemy import create_engine, select
  6. from sqlalchemy.orm import Session
  7. from alien_gateway.config import settings
  8. from alien_store.db.models.contract_store import ContractStore
  9. from alien_util.celery_app import celery_app
  10. from common.aliyun_sms_server.sms_client import send_sms
  11. import logging
  12. # 配置日志
  13. logger = logging.getLogger(__name__)
  14. logging.basicConfig(
  15. level=logging.INFO,
  16. format="%(asctime)s [%(levelname)s] %(name)s %(message)s"
  17. )
  18. # 全局数据库引擎(避免重复创建)
  19. _sync_engine = None
  20. def get_sync_db_session():
  21. """
  22. 获取同步数据库会话(用于 Celery 任务)
  23. Celery 任务通常使用同步代码,所以需要同步数据库连接
  24. """
  25. global _sync_engine
  26. if _sync_engine is None:
  27. # 使用同步数据库连接
  28. database_url = settings.SQLALCHEMY_DATABASE_URI
  29. _sync_engine = create_engine(
  30. database_url,
  31. pool_pre_ping=True,
  32. pool_size=5,
  33. max_overflow=10
  34. )
  35. return Session(_sync_engine)
  36. @celery_app.task(name="alien_util.tasks.contract_tasks.check_contract_expiry")
  37. def check_contract_expiry():
  38. """
  39. 检查合同到期时间,如果距离到期不足15天,发送提醒短信
  40. 每天凌晨0点1分执行
  41. """
  42. logger.info("开始执行合同到期检查任务")
  43. try:
  44. # 获取数据库会话
  45. db = get_sync_db_session()
  46. try:
  47. # 计算15天后的日期
  48. check_date = datetime.now() + timedelta(days=15)
  49. # 查询即将到期的合同(expiry_time 在15天内,且不为空)
  50. # 只查询已签署的合同(signing_status = '已签署')
  51. stmt = select(ContractStore).where(
  52. ContractStore.expiry_time.isnot(None),
  53. ContractStore.expiry_time <= check_date,
  54. ContractStore.expiry_time >= datetime.now(),
  55. ContractStore.signing_status == "已签署",
  56. ContractStore.delete_flag == 0
  57. )
  58. result = db.execute(stmt)
  59. contracts = result.scalars().all()
  60. logger.info(f"找到 {len(contracts)} 条即将到期的合同")
  61. # 遍历即将到期的合同,发送提醒短信
  62. for contract in contracts:
  63. try:
  64. # 计算距离到期的天数
  65. days_until_expiry = (contract.expiry_time - datetime.now()).days
  66. logger.info(
  67. f"合同ID: {contract.id}, "
  68. f"商家: {contract.merchant_name}, "
  69. f"联系电话: {contract.contact_phone}, "
  70. f"到期时间: {contract.expiry_time}, "
  71. f"距离到期: {days_until_expiry} 天"
  72. )
  73. send_expiry_reminder_sms(contract.contact_phone, contract.merchant_name, settings.ALIYUN_SMS_SIGN_NAME_CONTRACT, settings.ALIYUN_SMS_TEMPLATE_CODE_CONTRACT)
  74. except Exception as e:
  75. logger.error(f"处理合同ID {contract.id} 时出错: {e}", exc_info=True)
  76. logger.info("合同到期检查任务执行完成")
  77. finally:
  78. db.close()
  79. except Exception as e:
  80. logger.error(f"执行合同到期检查任务时出错: {e}", exc_info=True)
  81. raise
  82. def send_expiry_reminder_sms(phone: str, merchant_name: str, sign_name: str, template_code: str):
  83. """
  84. 发送合同到期提醒短信
  85. Args:
  86. phone: 联系电话
  87. merchant_name: 商家名称
  88. days_until_expiry: 距离到期的天数
  89. """
  90. template_param = {
  91. "name": merchant_name,
  92. }
  93. send_sms(phone, template_param, sign_name, template_code)
  94. send_expiry_reminder_sms("17337039317", "", settings.ALIYUN_SMS_SIGN_NAME_CONTRACT, settings.ALIYUN_SMS_TEMPLATE_CODE_CONTRACT)