contract_tasks.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. """
  2. 合同相关定时任务
  3. """
  4. from datetime import datetime, timedelta
  5. from sqlalchemy import create_engine, select
  6. from sqlalchemy.orm import Session
  7. from alien_gateway.config import settings
  8. from alien_store.db.models.contract_store import ContractStore
  9. from alien_util.celery_app import celery_app
  10. import logging
  11. # 配置日志
  12. logger = logging.getLogger(__name__)
  13. logging.basicConfig(
  14. level=logging.INFO,
  15. format="%(asctime)s [%(levelname)s] %(name)s %(message)s"
  16. )
  17. # 全局数据库引擎(避免重复创建)
  18. _sync_engine = None
  19. def get_sync_db_session():
  20. """
  21. 获取同步数据库会话(用于 Celery 任务)
  22. Celery 任务通常使用同步代码,所以需要同步数据库连接
  23. """
  24. global _sync_engine
  25. if _sync_engine is None:
  26. # 使用同步数据库连接
  27. database_url = settings.SQLALCHEMY_DATABASE_URI
  28. _sync_engine = create_engine(
  29. database_url,
  30. pool_pre_ping=True,
  31. pool_size=5,
  32. max_overflow=10
  33. )
  34. return Session(_sync_engine)
  35. @celery_app.task(name="alien_util.tasks.contract_tasks.check_contract_expiry")
  36. def check_contract_expiry():
  37. """
  38. 检查合同到期时间,如果距离到期不足15天,发送提醒短信
  39. 每天凌晨0点1分执行
  40. """
  41. logger.info("开始执行合同到期检查任务")
  42. try:
  43. # 获取数据库会话
  44. db = get_sync_db_session()
  45. try:
  46. # 计算15天后的日期
  47. check_date = datetime.now() + timedelta(days=15)
  48. # 查询即将到期的合同(expiry_time 在15天内,且不为空)
  49. # 只查询已签署的合同(signing_status = '已签署')
  50. stmt = select(ContractStore).where(
  51. ContractStore.expiry_time.isnot(None),
  52. ContractStore.expiry_time <= check_date,
  53. ContractStore.expiry_time >= datetime.now(),
  54. ContractStore.signing_status == "已签署",
  55. ContractStore.delete_flag == 0
  56. )
  57. result = db.execute(stmt)
  58. contracts = result.scalars().all()
  59. logger.info(f"找到 {len(contracts)} 条即将到期的合同")
  60. # 遍历即将到期的合同,发送提醒短信
  61. for contract in contracts:
  62. try:
  63. # 计算距离到期的天数
  64. days_until_expiry = (contract.expiry_time - datetime.now()).days
  65. logger.info(
  66. f"合同ID: {contract.id}, "
  67. f"商家: {contract.merchant_name}, "
  68. f"联系电话: {contract.contact_phone}, "
  69. f"到期时间: {contract.expiry_time}, "
  70. f"距离到期: {days_until_expiry} 天"
  71. )
  72. # 发送提醒短信(暂时不实现,只记录日志)
  73. send_expiry_reminder_sms(contract.contact_phone, contract.merchant_name, days_until_expiry)
  74. except Exception as e:
  75. logger.error(f"处理合同ID {contract.id} 时出错: {e}", exc_info=True)
  76. logger.info("合同到期检查任务执行完成")
  77. finally:
  78. db.close()
  79. except Exception as e:
  80. logger.error(f"执行合同到期检查任务时出错: {e}", exc_info=True)
  81. raise
  82. def send_expiry_reminder_sms(phone: str, merchant_name: str, days_until_expiry: int):
  83. """
  84. 发送合同到期提醒短信
  85. Args:
  86. phone: 联系电话
  87. merchant_name: 商家名称
  88. days_until_expiry: 距离到期的天数
  89. """
  90. # TODO: 实现短信发送功能
  91. # 这里暂时只记录日志,后续可以接入短信服务商API
  92. message = (
  93. f"【合同到期提醒】尊敬的{merchant_name},您的合同将在{days_until_expiry}天后到期,"
  94. f"请及时续签。如有疑问,请联系客服。"
  95. )
  96. logger.info(f"准备发送短信到 {phone}: {message}")
  97. # 示例:调用短信服务API
  98. # sms_service.send(phone, message)
  99. # 暂时只记录日志
  100. logger.info(f"[模拟] 已发送提醒短信到 {phone}")