celery_app.py 812 B

12345678910111213141516171819202122232425262728293031323334
  1. from celery import Celery
  2. from celery.schedules import crontab
  3. from alien_gateway.config import settings
  4. # Redis 配置(可以从环境变量读取,这里使用默认配置)
  5. REDIS_URL = settings.REDIS_URL
  6. # 创建 Celery 应用
  7. celery_app = Celery(
  8. "alien_cloud",
  9. broker=REDIS_URL,
  10. backend=REDIS_URL,
  11. include=["alien_util.tasks.contract_tasks"]
  12. )
  13. # Celery 配置
  14. celery_app.conf.update(
  15. task_serializer="json",
  16. accept_content=["json"],
  17. result_serializer="json",
  18. timezone="Asia/Shanghai",
  19. enable_utc=True,
  20. # 定时任务配置
  21. beat_schedule={
  22. "check-contract-expiry": {
  23. "task": "alien_util.tasks.contract_tasks.check_contract_expiry",
  24. "schedule": crontab(hour=0, minute=1), # 每天凌晨0点1分执行
  25. },
  26. },
  27. )