from celery import Celery from celery.schedules import crontab from alien_gateway.config import settings # Redis 配置(可以从环境变量读取,这里使用默认配置) REDIS_URL = settings.REDIS_URL # 创建 Celery 应用 celery_app = Celery( "alien_cloud", broker=REDIS_URL, backend=REDIS_URL, include=["alien_util.tasks.contract_tasks"] ) # Celery 配置 celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="Asia/Shanghai", enable_utc=True, # 定时任务配置 beat_schedule={ "check-contract-expiry": { "task": "alien_util.tasks.contract_tasks.check_contract_expiry", "schedule": crontab(hour=0, minute=1), # 每天凌晨0点1分执行 }, }, )