redis_client.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. """Redis 客户端工厂:根据 settings.REDIS_MODE 自动选择单机或 Sentinel。
  2. - 单机模式:REDIS_URL 非空时启用,直接 redis.from_url
  3. - 哨兵模式:REDIS_SENTINELS 非空时启用,通过 Sentinel 获取主从节点
  4. """
  5. from functools import lru_cache
  6. import redis
  7. from redis import Redis
  8. from redis.asyncio import Redis as AsyncRedis
  9. from redis.asyncio import from_url as async_from_url
  10. from redis.asyncio.sentinel import Sentinel as AsyncSentinel
  11. from redis.sentinel import Sentinel
  12. from alien_gateway.config import settings
  13. def _sentinel_kwargs() -> dict:
  14. return settings.REDIS_SENTINEL_KWARGS
  15. def _redis_connection_kwargs() -> dict:
  16. return {
  17. "password": settings.REDIS_PASSWORD or None,
  18. "db": settings.REDIS_DB,
  19. "socket_timeout": settings.REDIS_SOCKET_TIMEOUT,
  20. "socket_connect_timeout": settings.REDIS_CONNECT_TIMEOUT,
  21. "retry_on_timeout": True,
  22. "decode_responses": True,
  23. }
  24. def _ensure_sentinel_nodes() -> None:
  25. if not settings.REDIS_SENTINEL_NODES:
  26. raise RuntimeError("REDIS_SENTINELS 未配置,无法通过 Sentinel 连接 Redis。")
  27. def _ensure_redis_url() -> None:
  28. if not settings.REDIS_URL:
  29. raise RuntimeError("REDIS_URL 未配置,无法以单机模式连接 Redis。")
  30. # --------------------------------------------------------------- Sentinel ----
  31. @lru_cache(maxsize=1)
  32. def get_sentinel_client() -> Sentinel:
  33. _ensure_sentinel_nodes()
  34. return Sentinel(
  35. settings.REDIS_SENTINEL_NODES,
  36. socket_timeout=settings.REDIS_SOCKET_TIMEOUT,
  37. socket_connect_timeout=settings.REDIS_CONNECT_TIMEOUT,
  38. sentinel_kwargs=_sentinel_kwargs(),
  39. )
  40. @lru_cache(maxsize=1)
  41. def get_async_sentinel_client() -> AsyncSentinel:
  42. _ensure_sentinel_nodes()
  43. return AsyncSentinel(
  44. settings.REDIS_SENTINEL_NODES,
  45. socket_timeout=settings.REDIS_SOCKET_TIMEOUT,
  46. socket_connect_timeout=settings.REDIS_CONNECT_TIMEOUT,
  47. sentinel_kwargs=_sentinel_kwargs(),
  48. )
  49. def _sentinel_master() -> Redis:
  50. return get_sentinel_client().master_for(
  51. service_name=settings.REDIS_MASTER_NAME,
  52. **_redis_connection_kwargs(),
  53. )
  54. def _sentinel_slave() -> Redis:
  55. return get_sentinel_client().slave_for(
  56. service_name=settings.REDIS_MASTER_NAME,
  57. **_redis_connection_kwargs(),
  58. )
  59. def _async_sentinel_master() -> AsyncRedis:
  60. return get_async_sentinel_client().master_for(
  61. service_name=settings.REDIS_MASTER_NAME,
  62. **_redis_connection_kwargs(),
  63. )
  64. def _async_sentinel_slave() -> AsyncRedis:
  65. return get_async_sentinel_client().slave_for(
  66. service_name=settings.REDIS_MASTER_NAME,
  67. **_redis_connection_kwargs(),
  68. )
  69. # ---------------------------------------------------------------- Standalone -
  70. @lru_cache(maxsize=1)
  71. def _standalone_client() -> Redis:
  72. _ensure_redis_url()
  73. return redis.from_url(settings.REDIS_URL, decode_responses=True)
  74. @lru_cache(maxsize=1)
  75. def _async_standalone_client() -> AsyncRedis:
  76. _ensure_redis_url()
  77. return async_from_url(settings.REDIS_URL, decode_responses=True)
  78. # ---------------------------------------------------------------- Public API -
  79. def get_redis_master() -> Redis:
  80. if settings.REDIS_MODE == "sentinel":
  81. return _sentinel_master()
  82. return _standalone_client()
  83. def get_redis_slave() -> Redis:
  84. if settings.REDIS_MODE == "sentinel":
  85. return _sentinel_slave()
  86. # 单机无主从概念,复用同一连接
  87. return _standalone_client()
  88. def get_async_redis_master() -> AsyncRedis:
  89. if settings.REDIS_MODE == "sentinel":
  90. return _async_sentinel_master()
  91. return _async_standalone_client()
  92. def get_async_redis_slave() -> AsyncRedis:
  93. if settings.REDIS_MODE == "sentinel":
  94. return _async_sentinel_slave()
  95. return _async_standalone_client()
  96. def get_redis() -> Redis:
  97. """默认入口:单机直接返回,哨兵返回主节点。"""
  98. return get_redis_master()
  99. def get_async_redis() -> AsyncRedis:
  100. return get_async_redis_master()
  101. def check_redis_connection() -> dict:
  102. """执行 ping 检测连接,并附带模式信息。"""
  103. if settings.REDIS_MODE == "sentinel":
  104. sentinel = get_sentinel_client()
  105. redis_client = get_redis_master()
  106. pong = redis_client.ping()
  107. host, port = sentinel.discover_master(settings.REDIS_MASTER_NAME)
  108. return {
  109. "ok": bool(pong),
  110. "mode": "sentinel",
  111. "master_name": settings.REDIS_MASTER_NAME,
  112. "master_host": host,
  113. "master_port": port,
  114. "db": settings.REDIS_DB,
  115. }
  116. redis_client = get_redis()
  117. pong = redis_client.ping()
  118. return {
  119. "ok": bool(pong),
  120. "mode": "standalone",
  121. "url": settings.REDIS_URL,
  122. }