celery_app.py 1.1 KB

1234567891011121314151617181920212223242526272829303132
  1. from celery import Celery
  2. from celery.schedules import crontab
  3. from alien_gateway.config import settings
  4. # 创建 Celery 应用:broker / backend 由 settings 自动适配单机或 Sentinel
  5. celery_app = Celery(
  6. "alien_cloud",
  7. broker=settings.CELERY_BROKER_URL,
  8. backend=settings.CELERY_BACKEND_URL,
  9. include=["alien_util.tasks.contract_tasks"],
  10. )
  11. _celery_conf = dict(
  12. task_serializer="json",
  13. accept_content=["json"],
  14. result_serializer="json",
  15. timezone="Asia/Shanghai",
  16. enable_utc=True,
  17. beat_schedule={
  18. "check-contract-expiry": {
  19. "task": "alien_util.tasks.contract_tasks.check_contract_expiry",
  20. "schedule": crontab(hour=0, minute=1), # 每天凌晨 0:01 执行
  21. },
  22. },
  23. )
  24. # 哨兵模式下必须传 transport_options 让 kombu 感知 master_name 等参数
  25. if settings.CELERY_TRANSPORT_OPTIONS is not None:
  26. _celery_conf["broker_transport_options"] = settings.CELERY_TRANSPORT_OPTIONS
  27. _celery_conf["result_backend_transport_options"] = settings.CELERY_TRANSPORT_OPTIONS
  28. celery_app.conf.update(**_celery_conf)