from celery import Celery from celery.schedules import crontab from alien_gateway.config import settings # 创建 Celery 应用 celery_app = Celery( "alien_cloud", broker=settings.REDIS_SENTINEL_URL, backend=settings.REDIS_SENTINEL_URL, include=["alien_util.tasks.contract_tasks"] ) # Celery 配置 celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="Asia/Shanghai", enable_utc=True, broker_transport_options=settings.REDIS_SENTINEL_TRANSPORT_OPTIONS, result_backend_transport_options=settings.REDIS_SENTINEL_TRANSPORT_OPTIONS, # 定时任务配置 beat_schedule={ "check-contract-expiry": { "task": "alien_util.tasks.contract_tasks.check_contract_expiry", "schedule": crontab(hour=0, minute=1), # 每天凌晨0点1分执行 }, }, )