| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- """Redis 客户端工厂:根据 settings.REDIS_MODE 自动选择单机或 Sentinel。
- - 单机模式:REDIS_URL 非空时启用,直接 redis.from_url
- - 哨兵模式:REDIS_SENTINELS 非空时启用,通过 Sentinel 获取主从节点
- """
- from functools import lru_cache
- import redis
- from redis import Redis
- from redis.asyncio import Redis as AsyncRedis
- from redis.asyncio import from_url as async_from_url
- from redis.asyncio.sentinel import Sentinel as AsyncSentinel
- from redis.sentinel import Sentinel
- from alien_gateway.config import settings
- def _sentinel_kwargs() -> dict:
- return settings.REDIS_SENTINEL_KWARGS
- def _redis_connection_kwargs() -> dict:
- return {
- "password": settings.REDIS_PASSWORD or None,
- "db": settings.REDIS_DB,
- "socket_timeout": settings.REDIS_SOCKET_TIMEOUT,
- "socket_connect_timeout": settings.REDIS_CONNECT_TIMEOUT,
- "retry_on_timeout": True,
- "decode_responses": True,
- }
- def _ensure_sentinel_nodes() -> None:
- if not settings.REDIS_SENTINEL_NODES:
- raise RuntimeError("REDIS_SENTINELS 未配置,无法通过 Sentinel 连接 Redis。")
- def _ensure_redis_url() -> None:
- if not settings.REDIS_URL:
- raise RuntimeError("REDIS_URL 未配置,无法以单机模式连接 Redis。")
- # --------------------------------------------------------------- Sentinel ----
- @lru_cache(maxsize=1)
- def get_sentinel_client() -> Sentinel:
- _ensure_sentinel_nodes()
- return Sentinel(
- settings.REDIS_SENTINEL_NODES,
- socket_timeout=settings.REDIS_SOCKET_TIMEOUT,
- socket_connect_timeout=settings.REDIS_CONNECT_TIMEOUT,
- sentinel_kwargs=_sentinel_kwargs(),
- )
- @lru_cache(maxsize=1)
- def get_async_sentinel_client() -> AsyncSentinel:
- _ensure_sentinel_nodes()
- return AsyncSentinel(
- settings.REDIS_SENTINEL_NODES,
- socket_timeout=settings.REDIS_SOCKET_TIMEOUT,
- socket_connect_timeout=settings.REDIS_CONNECT_TIMEOUT,
- sentinel_kwargs=_sentinel_kwargs(),
- )
- def _sentinel_master() -> Redis:
- return get_sentinel_client().master_for(
- service_name=settings.REDIS_MASTER_NAME,
- **_redis_connection_kwargs(),
- )
- def _sentinel_slave() -> Redis:
- return get_sentinel_client().slave_for(
- service_name=settings.REDIS_MASTER_NAME,
- **_redis_connection_kwargs(),
- )
- def _async_sentinel_master() -> AsyncRedis:
- return get_async_sentinel_client().master_for(
- service_name=settings.REDIS_MASTER_NAME,
- **_redis_connection_kwargs(),
- )
- def _async_sentinel_slave() -> AsyncRedis:
- return get_async_sentinel_client().slave_for(
- service_name=settings.REDIS_MASTER_NAME,
- **_redis_connection_kwargs(),
- )
- # ---------------------------------------------------------------- Standalone -
- @lru_cache(maxsize=1)
- def _standalone_client() -> Redis:
- _ensure_redis_url()
- return redis.from_url(settings.REDIS_URL, decode_responses=True)
- @lru_cache(maxsize=1)
- def _async_standalone_client() -> AsyncRedis:
- _ensure_redis_url()
- return async_from_url(settings.REDIS_URL, decode_responses=True)
- # ---------------------------------------------------------------- Public API -
- def get_redis_master() -> Redis:
- if settings.REDIS_MODE == "sentinel":
- return _sentinel_master()
- return _standalone_client()
- def get_redis_slave() -> Redis:
- if settings.REDIS_MODE == "sentinel":
- return _sentinel_slave()
- # 单机无主从概念,复用同一连接
- return _standalone_client()
- def get_async_redis_master() -> AsyncRedis:
- if settings.REDIS_MODE == "sentinel":
- return _async_sentinel_master()
- return _async_standalone_client()
- def get_async_redis_slave() -> AsyncRedis:
- if settings.REDIS_MODE == "sentinel":
- return _async_sentinel_slave()
- return _async_standalone_client()
- def get_redis() -> Redis:
- """默认入口:单机直接返回,哨兵返回主节点。"""
- return get_redis_master()
- def get_async_redis() -> AsyncRedis:
- return get_async_redis_master()
- def check_redis_connection() -> dict:
- """执行 ping 检测连接,并附带模式信息。"""
- if settings.REDIS_MODE == "sentinel":
- sentinel = get_sentinel_client()
- redis_client = get_redis_master()
- pong = redis_client.ping()
- host, port = sentinel.discover_master(settings.REDIS_MASTER_NAME)
- return {
- "ok": bool(pong),
- "mode": "sentinel",
- "master_name": settings.REDIS_MASTER_NAME,
- "master_host": host,
- "master_port": port,
- "db": settings.REDIS_DB,
- }
- redis_client = get_redis()
- pong = redis_client.ping()
- return {
- "ok": bool(pong),
- "mode": "standalone",
- "url": settings.REDIS_URL,
- }
|