| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- from pydantic_settings import BaseSettings, SettingsConfigDict
- from typing import Any, Dict, List
- from dotenv import load_dotenv
- from urllib.parse import quote
- import os
- load_dotenv()
- class Settings(BaseSettings):
- # 基础配置
- PROJECT_NAME: str = "Alien Cloud Python"
- API_V1_STR: str = "/api/v1"
-
- # 鉴权配置 (原 alien-gateway 职责)
- SECRET_KEY: str = os.getenv("SECRET_KEY")
- ALGORITHM: str = os.getenv("ALGORITHM")
- ACCESS_TOKEN_EXPIRE_MINUTES: int = os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES") # 7天
-
- # 数据库配置
- DB_USER: str = os.getenv("DB_USER")
- DB_PASSWORD: str = os.getenv("DB_PASSWORD")
- DB_HOST: str = os.getenv("DB_HOST")
- DB_PORT: int = os.getenv("DB_PORT")
- DB_NAME: str = os.getenv("DB_NAME")
- # Redis Sentinel 高可用配置
- # 例: REDIS_SENTINELS=192.168.2.251:36379,192.168.2.252:36379,192.168.2.253:36379
- REDIS_SENTINELS: str = os.getenv("REDIS_SENTINELS", "")
- REDIS_MASTER_NAME: str = os.getenv("REDIS_MASTER_NAME", "mymaster")
- REDIS_PASSWORD: str = os.getenv("REDIS_PASSWORD", "")
- REDIS_DB: int = int(os.getenv("REDIS_DB", "0"))
- REDIS_SENTINEL_USERNAME: str = os.getenv("REDIS_SENTINEL_USERNAME", "")
- REDIS_SENTINEL_PASSWORD: str = os.getenv("REDIS_SENTINEL_PASSWORD", "")
- REDIS_SOCKET_TIMEOUT: float = float(os.getenv("REDIS_SOCKET_TIMEOUT", "0.5"))
- REDIS_CONNECT_TIMEOUT: float = float(os.getenv("REDIS_CONNECT_TIMEOUT", "1.0"))
- # 下游服务地址
- STORE_BASE_URL: str = os.getenv("STORE_BASE_URL") # alien_store 服务地址
- # 阿里云短信配置
- ALIYUN_SMS_SIGN_NAME_CONTRACT: str = os.getenv("ALIYUN_SMS_SIGN_NAME_CONTRACT")
- ALIYUN_SMS_TEMPLATE_CODE_CONTRACT: str = os.getenv("ALIYUN_SMS_TEMPLATE_CODE_CONTRACT")
- ALIYUN_ACCESS_KEY_ID: str = os.getenv("ALIYUN_ACCESS_KEY_ID")
- ALIYUN_ACCESS_KEY_SECRET: str = os.getenv("ALIYUN_ACCESS_KEY_SECRET")
- @property
- def SQLALCHEMY_DATABASE_URI(self) -> str:
- return f"mysql+pymysql://{self.DB_USER}:{self.DB_PASSWORD}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_NAME}"
- @property
- def REDIS_SENTINEL_NODES(self) -> List[tuple[str, int]]:
- nodes: List[tuple[str, int]] = []
- for item in self.REDIS_SENTINELS.split(","):
- entry = item.strip()
- if not entry:
- continue
- if ":" not in entry:
- raise ValueError(f"Invalid REDIS_SENTINELS entry: {entry}")
- host, port = entry.split(":", 1)
- nodes.append((host.strip(), int(port.strip())))
- return nodes
- @property
- def REDIS_SENTINEL_KWARGS(self) -> Dict[str, Any]:
- kwargs: Dict[str, Any] = {}
- if self.REDIS_SENTINEL_USERNAME:
- kwargs["username"] = self.REDIS_SENTINEL_USERNAME
- if self.REDIS_SENTINEL_PASSWORD:
- kwargs["password"] = self.REDIS_SENTINEL_PASSWORD
- return kwargs
- @property
- def REDIS_SENTINEL_URL(self) -> str:
- # 标准 Sentinel URL:
- # redis+sentinel://[username[:password]@]host1:port1,host2:port2/db?sentinel_master=mymaster
- if self.REDIS_SENTINEL_USERNAME:
- user = quote(self.REDIS_SENTINEL_USERNAME, safe="")
- pwd = quote(self.REDIS_PASSWORD, safe="")
- auth = f"{user}:{pwd}@"
- elif self.REDIS_PASSWORD:
- pwd = quote(self.REDIS_PASSWORD, safe="")
- auth = f":{pwd}@"
- else:
- auth = ""
- hosts = ",".join(f"{host}:{port}" for host, port in self.REDIS_SENTINEL_NODES)
- master = quote(self.REDIS_MASTER_NAME, safe="")
- return f"redis+sentinel://{auth}{hosts}/{self.REDIS_DB}?sentinel_master={master}"
- @property
- def REDIS_CELERY_SENTINEL_URL(self) -> str:
- # Celery/Kombu Sentinel URL 格式:
- # sentinel://:redis_password@sentinel_host:port/db;...
- # URL 中的 auth 密码 = Redis 主从节点密码
- # (Kombu 通过 Sentinel 发现主节点地址后,用此密码向 Redis 主节点认证)
- # Sentinel 自身的密码(如有)通过 broker_transport_options["sentinel_kwargs"]["password"] 传递
- auth = f":{quote(self.REDIS_PASSWORD, safe='')}@" if self.REDIS_PASSWORD else ""
- return ";".join(
- f"sentinel://{auth}{host}:{port}/{self.REDIS_DB}"
- for host, port in self.REDIS_SENTINEL_NODES
- )
- @property
- def REDIS_SENTINEL_TRANSPORT_OPTIONS(self) -> Dict[str, Any]:
- return {
- "master_name": self.REDIS_MASTER_NAME,
- "sentinel_kwargs": self.REDIS_SENTINEL_KWARGS,
- "password": self.REDIS_PASSWORD,
- "db": self.REDIS_DB,
- }
- model_config = SettingsConfigDict(
- case_sensitive=True,
- env_file=".env",
- )
- settings = Settings()
|