celery_app.py 907 B

123456789101112131415161718192021222324252627282930313233
  1. from celery import Celery
  2. from celery.schedules import crontab
  3. from alien_gateway.config import settings
  4. # 创建 Celery 应用
  5. celery_app = Celery(
  6. "alien_cloud",
  7. broker=settings.REDIS_CELERY_SENTINEL_URL,
  8. backend=settings.REDIS_CELERY_SENTINEL_URL,
  9. include=["alien_util.tasks.contract_tasks"]
  10. )
  11. # Celery 配置
  12. celery_app.conf.update(
  13. task_serializer="json",
  14. accept_content=["json"],
  15. result_serializer="json",
  16. timezone="Asia/Shanghai",
  17. enable_utc=True,
  18. broker_transport_options=settings.REDIS_SENTINEL_TRANSPORT_OPTIONS,
  19. result_backend_transport_options=settings.REDIS_SENTINEL_TRANSPORT_OPTIONS,
  20. # 定时任务配置
  21. beat_schedule={
  22. "check-contract-expiry": {
  23. "task": "alien_util.tasks.contract_tasks.check_contract_expiry",
  24. "schedule": crontab(hour=0, minute=1), # 每天凌晨0点1分执行
  25. },
  26. },
  27. )