"""Redis 客户端工厂:根据 settings.REDIS_MODE 自动选择单机或 Sentinel。 - 单机模式:REDIS_URL 非空时启用,直接 redis.from_url - 哨兵模式:REDIS_SENTINELS 非空时启用,通过 Sentinel 获取主从节点 """ from functools import lru_cache import redis from redis import Redis from redis.asyncio import Redis as AsyncRedis from redis.asyncio import from_url as async_from_url from redis.asyncio.sentinel import Sentinel as AsyncSentinel from redis.sentinel import Sentinel from alien_gateway.config import settings def _sentinel_kwargs() -> dict: return settings.REDIS_SENTINEL_KWARGS def _redis_connection_kwargs() -> dict: return { "password": settings.REDIS_PASSWORD or None, "db": settings.REDIS_DB, "socket_timeout": settings.REDIS_SOCKET_TIMEOUT, "socket_connect_timeout": settings.REDIS_CONNECT_TIMEOUT, "retry_on_timeout": True, "decode_responses": True, } def _ensure_sentinel_nodes() -> None: if not settings.REDIS_SENTINEL_NODES: raise RuntimeError("REDIS_SENTINELS 未配置,无法通过 Sentinel 连接 Redis。") def _ensure_redis_url() -> None: if not settings.REDIS_URL: raise RuntimeError("REDIS_URL 未配置,无法以单机模式连接 Redis。") # --------------------------------------------------------------- Sentinel ---- @lru_cache(maxsize=1) def get_sentinel_client() -> Sentinel: _ensure_sentinel_nodes() return Sentinel( settings.REDIS_SENTINEL_NODES, socket_timeout=settings.REDIS_SOCKET_TIMEOUT, socket_connect_timeout=settings.REDIS_CONNECT_TIMEOUT, sentinel_kwargs=_sentinel_kwargs(), ) @lru_cache(maxsize=1) def get_async_sentinel_client() -> AsyncSentinel: _ensure_sentinel_nodes() return AsyncSentinel( settings.REDIS_SENTINEL_NODES, socket_timeout=settings.REDIS_SOCKET_TIMEOUT, socket_connect_timeout=settings.REDIS_CONNECT_TIMEOUT, sentinel_kwargs=_sentinel_kwargs(), ) def _sentinel_master() -> Redis: return get_sentinel_client().master_for( service_name=settings.REDIS_MASTER_NAME, **_redis_connection_kwargs(), ) def _sentinel_slave() -> Redis: return get_sentinel_client().slave_for( service_name=settings.REDIS_MASTER_NAME, **_redis_connection_kwargs(), ) def _async_sentinel_master() -> AsyncRedis: return get_async_sentinel_client().master_for( service_name=settings.REDIS_MASTER_NAME, **_redis_connection_kwargs(), ) def _async_sentinel_slave() -> AsyncRedis: return get_async_sentinel_client().slave_for( service_name=settings.REDIS_MASTER_NAME, **_redis_connection_kwargs(), ) # ---------------------------------------------------------------- Standalone - @lru_cache(maxsize=1) def _standalone_client() -> Redis: _ensure_redis_url() return redis.from_url(settings.REDIS_URL, decode_responses=True) @lru_cache(maxsize=1) def _async_standalone_client() -> AsyncRedis: _ensure_redis_url() return async_from_url(settings.REDIS_URL, decode_responses=True) # ---------------------------------------------------------------- Public API - def get_redis_master() -> Redis: if settings.REDIS_MODE == "sentinel": return _sentinel_master() return _standalone_client() def get_redis_slave() -> Redis: if settings.REDIS_MODE == "sentinel": return _sentinel_slave() # 单机无主从概念,复用同一连接 return _standalone_client() def get_async_redis_master() -> AsyncRedis: if settings.REDIS_MODE == "sentinel": return _async_sentinel_master() return _async_standalone_client() def get_async_redis_slave() -> AsyncRedis: if settings.REDIS_MODE == "sentinel": return _async_sentinel_slave() return _async_standalone_client() def get_redis() -> Redis: """默认入口:单机直接返回,哨兵返回主节点。""" return get_redis_master() def get_async_redis() -> AsyncRedis: return get_async_redis_master() def check_redis_connection() -> dict: """执行 ping 检测连接,并附带模式信息。""" if settings.REDIS_MODE == "sentinel": sentinel = get_sentinel_client() redis_client = get_redis_master() pong = redis_client.ping() host, port = sentinel.discover_master(settings.REDIS_MASTER_NAME) return { "ok": bool(pong), "mode": "sentinel", "master_name": settings.REDIS_MASTER_NAME, "master_host": host, "master_port": port, "db": settings.REDIS_DB, } redis_client = get_redis() pong = redis_client.ping() return { "ok": bool(pong), "mode": "standalone", "url": settings.REDIS_URL, }