from functools import lru_cache from redis import Redis from redis.asyncio import Redis as AsyncRedis from redis.asyncio.sentinel import Sentinel as AsyncSentinel from redis.sentinel import Sentinel from alien_gateway.config import settings def _sentinel_kwargs() -> dict: return settings.REDIS_SENTINEL_KWARGS def _redis_connection_kwargs() -> dict: return { "password": settings.REDIS_PASSWORD or None, "db": settings.REDIS_DB, "socket_timeout": settings.REDIS_SOCKET_TIMEOUT, "socket_connect_timeout": settings.REDIS_CONNECT_TIMEOUT, "retry_on_timeout": True, "decode_responses": True, } def _ensure_sentinel_nodes() -> None: if not settings.REDIS_SENTINEL_NODES: raise RuntimeError("REDIS_SENTINELS 未配置,无法通过 Sentinel 连接 Redis。") @lru_cache(maxsize=1) def get_sentinel_client() -> Sentinel: _ensure_sentinel_nodes() return Sentinel( settings.REDIS_SENTINEL_NODES, socket_timeout=settings.REDIS_SOCKET_TIMEOUT, socket_connect_timeout=settings.REDIS_CONNECT_TIMEOUT, sentinel_kwargs=_sentinel_kwargs(), ) @lru_cache(maxsize=1) def get_async_sentinel_client() -> AsyncSentinel: _ensure_sentinel_nodes() return AsyncSentinel( settings.REDIS_SENTINEL_NODES, socket_timeout=settings.REDIS_SOCKET_TIMEOUT, socket_connect_timeout=settings.REDIS_CONNECT_TIMEOUT, sentinel_kwargs=_sentinel_kwargs(), ) def get_redis_master() -> Redis: return get_sentinel_client().master_for( service_name=settings.REDIS_MASTER_NAME, **_redis_connection_kwargs(), ) def get_redis_slave() -> Redis: return get_sentinel_client().slave_for( service_name=settings.REDIS_MASTER_NAME, **_redis_connection_kwargs(), ) def get_async_redis_master() -> AsyncRedis: return get_async_sentinel_client().master_for( service_name=settings.REDIS_MASTER_NAME, **_redis_connection_kwargs(), ) def get_async_redis_slave() -> AsyncRedis: return get_async_sentinel_client().slave_for( service_name=settings.REDIS_MASTER_NAME, **_redis_connection_kwargs(), ) def get_redis() -> Redis: # 默认返回主节点客户端,统一写入入口 return get_redis_master() def get_async_redis() -> AsyncRedis: # 默认返回主节点客户端,统一写入入口 return get_async_redis_master() def check_redis_connection() -> dict: """ 通过 Sentinel 获取当前主节点并执行 ping,返回连接状态与主节点地址。 """ sentinel = get_sentinel_client() redis_client = get_redis_master() pong = redis_client.ping() host, port = sentinel.discover_master(settings.REDIS_MASTER_NAME) return { "ok": bool(pong), "master_name": settings.REDIS_MASTER_NAME, "master_host": host, "master_port": port, "db": settings.REDIS_DB, }