config.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from pydantic_settings import BaseSettings, SettingsConfigDict
  2. from typing import List
  3. from dotenv import load_dotenv
  4. import os
  5. load_dotenv()
  6. class Settings(BaseSettings):
  7. # 基础配置
  8. PROJECT_NAME: str = "Alien Cloud Python"
  9. API_V1_STR: str = "/api/v1"
  10. # 鉴权配置 (原 alien-gateway 职责)
  11. SECRET_KEY: str = os.getenv("SECRET_KEY")
  12. ALGORITHM: str = os.getenv("ALGORITHM")
  13. ACCESS_TOKEN_EXPIRE_MINUTES: int = os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES") # 7天
  14. # 数据库配置
  15. DB_USER: str = os.getenv("DB_USER")
  16. DB_PASSWORD: str = os.getenv("DB_PASSWORD")
  17. DB_HOST: str = os.getenv("DB_HOST")
  18. DB_PORT: int = os.getenv("DB_PORT")
  19. DB_NAME: str = os.getenv("DB_NAME")
  20. # Redis Sentinel 高可用配置
  21. # 例: REDIS_SENTINELS=192.168.2.251:36379,192.168.2.252:36379,192.168.2.253:36379
  22. REDIS_SENTINELS: str = os.getenv("REDIS_SENTINELS", "")
  23. REDIS_MASTER_NAME: str = os.getenv("REDIS_MASTER_NAME", "uatmaster")
  24. REDIS_PASSWORD: str = os.getenv("REDIS_PASSWORD", "")
  25. REDIS_DB: int = int(os.getenv("REDIS_DB", "0"))
  26. # 下游服务地址
  27. STORE_BASE_URL: str = os.getenv("STORE_BASE_URL") # alien_store 服务地址
  28. # 阿里云短信配置
  29. ALIYUN_SMS_SIGN_NAME_CONTRACT: str = os.getenv("ALIYUN_SMS_SIGN_NAME_CONTRACT")
  30. ALIYUN_SMS_TEMPLATE_CODE_CONTRACT: str = os.getenv("ALIYUN_SMS_TEMPLATE_CODE_CONTRACT")
  31. ALIYUN_ACCESS_KEY_ID: str = os.getenv("ALIYUN_ACCESS_KEY_ID")
  32. ALIYUN_ACCESS_KEY_SECRET: str = os.getenv("ALIYUN_ACCESS_KEY_SECRET")
  33. @property
  34. def SQLALCHEMY_DATABASE_URI(self) -> str:
  35. return f"mysql+pymysql://{self.DB_USER}:{self.DB_PASSWORD}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_NAME}"
  36. @property
  37. def REDIS_SENTINEL_NODES(self) -> List[tuple[str, int]]:
  38. nodes: List[tuple[str, int]] = []
  39. for item in self.REDIS_SENTINELS.split(","):
  40. entry = item.strip()
  41. if not entry:
  42. continue
  43. host, port = entry.split(":")
  44. nodes.append((host.strip(), int(port.strip())))
  45. return nodes
  46. @property
  47. def REDIS_SENTINEL_URL(self) -> str:
  48. # Celery sentinel transport 需要 "sentinel://host:port;sentinel://host:port"
  49. return ";".join(
  50. f"sentinel://{host}:{port}" for host, port in self.REDIS_SENTINEL_NODES
  51. )
  52. model_config = SettingsConfigDict(
  53. case_sensitive=True,
  54. env_file=".env",
  55. )
  56. settings = Settings()