| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- from functools import lru_cache
- from redis import Redis
- from redis.asyncio import Redis as AsyncRedis
- from redis.asyncio.sentinel import Sentinel as AsyncSentinel
- from redis.sentinel import Sentinel
- from alien_gateway.config import settings
- def _sentinel_kwargs() -> dict:
- return settings.REDIS_SENTINEL_KWARGS
- def _redis_connection_kwargs() -> dict:
- return {
- "password": settings.REDIS_PASSWORD or None,
- "db": settings.REDIS_DB,
- "socket_timeout": settings.REDIS_SOCKET_TIMEOUT,
- "socket_connect_timeout": settings.REDIS_CONNECT_TIMEOUT,
- "retry_on_timeout": True,
- "decode_responses": True,
- }
- def _ensure_sentinel_nodes() -> None:
- if not settings.REDIS_SENTINEL_NODES:
- raise RuntimeError("REDIS_SENTINELS 未配置,无法通过 Sentinel 连接 Redis。")
- @lru_cache(maxsize=1)
- def get_sentinel_client() -> Sentinel:
- _ensure_sentinel_nodes()
- return Sentinel(
- settings.REDIS_SENTINEL_NODES,
- socket_timeout=settings.REDIS_SOCKET_TIMEOUT,
- socket_connect_timeout=settings.REDIS_CONNECT_TIMEOUT,
- sentinel_kwargs=_sentinel_kwargs(),
- )
- @lru_cache(maxsize=1)
- def get_async_sentinel_client() -> AsyncSentinel:
- _ensure_sentinel_nodes()
- return AsyncSentinel(
- settings.REDIS_SENTINEL_NODES,
- socket_timeout=settings.REDIS_SOCKET_TIMEOUT,
- socket_connect_timeout=settings.REDIS_CONNECT_TIMEOUT,
- sentinel_kwargs=_sentinel_kwargs(),
- )
- def get_redis_master() -> Redis:
- return get_sentinel_client().master_for(
- service_name=settings.REDIS_MASTER_NAME,
- **_redis_connection_kwargs(),
- )
- def get_redis_slave() -> Redis:
- return get_sentinel_client().slave_for(
- service_name=settings.REDIS_MASTER_NAME,
- **_redis_connection_kwargs(),
- )
- def get_async_redis_master() -> AsyncRedis:
- return get_async_sentinel_client().master_for(
- service_name=settings.REDIS_MASTER_NAME,
- **_redis_connection_kwargs(),
- )
- def get_async_redis_slave() -> AsyncRedis:
- return get_async_sentinel_client().slave_for(
- service_name=settings.REDIS_MASTER_NAME,
- **_redis_connection_kwargs(),
- )
- def get_redis() -> Redis:
- # 默认返回主节点客户端,统一写入入口
- return get_redis_master()
- def get_async_redis() -> AsyncRedis:
- # 默认返回主节点客户端,统一写入入口
- return get_async_redis_master()
- def check_redis_connection() -> dict:
- """
- 通过 Sentinel 获取当前主节点并执行 ping,返回连接状态与主节点地址。
- """
- sentinel = get_sentinel_client()
- redis_client = get_redis_master()
- pong = redis_client.ping()
- host, port = sentinel.discover_master(settings.REDIS_MASTER_NAME)
- return {
- "ok": bool(pong),
- "master_name": settings.REDIS_MASTER_NAME,
- "master_host": host,
- "master_port": port,
- "db": settings.REDIS_DB,
- }
|