redis_client.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. from functools import lru_cache
  2. from redis import Redis
  3. from redis.asyncio import Redis as AsyncRedis
  4. from redis.asyncio.sentinel import Sentinel as AsyncSentinel
  5. from redis.sentinel import Sentinel
  6. from alien_gateway.config import settings
  7. def _sentinel_kwargs() -> dict:
  8. return settings.REDIS_SENTINEL_KWARGS
  9. def _redis_connection_kwargs() -> dict:
  10. return {
  11. "password": settings.REDIS_PASSWORD or None,
  12. "db": settings.REDIS_DB,
  13. "socket_timeout": settings.REDIS_SOCKET_TIMEOUT,
  14. "socket_connect_timeout": settings.REDIS_CONNECT_TIMEOUT,
  15. "retry_on_timeout": True,
  16. "decode_responses": True,
  17. }
  18. def _ensure_sentinel_nodes() -> None:
  19. if not settings.REDIS_SENTINEL_NODES:
  20. raise RuntimeError("REDIS_SENTINELS 未配置,无法通过 Sentinel 连接 Redis。")
  21. @lru_cache(maxsize=1)
  22. def get_sentinel_client() -> Sentinel:
  23. _ensure_sentinel_nodes()
  24. return Sentinel(
  25. settings.REDIS_SENTINEL_NODES,
  26. socket_timeout=settings.REDIS_SOCKET_TIMEOUT,
  27. socket_connect_timeout=settings.REDIS_CONNECT_TIMEOUT,
  28. sentinel_kwargs=_sentinel_kwargs(),
  29. )
  30. @lru_cache(maxsize=1)
  31. def get_async_sentinel_client() -> AsyncSentinel:
  32. _ensure_sentinel_nodes()
  33. return AsyncSentinel(
  34. settings.REDIS_SENTINEL_NODES,
  35. socket_timeout=settings.REDIS_SOCKET_TIMEOUT,
  36. socket_connect_timeout=settings.REDIS_CONNECT_TIMEOUT,
  37. sentinel_kwargs=_sentinel_kwargs(),
  38. )
  39. def get_redis_master() -> Redis:
  40. return get_sentinel_client().master_for(
  41. service_name=settings.REDIS_MASTER_NAME,
  42. **_redis_connection_kwargs(),
  43. )
  44. def get_redis_slave() -> Redis:
  45. return get_sentinel_client().slave_for(
  46. service_name=settings.REDIS_MASTER_NAME,
  47. **_redis_connection_kwargs(),
  48. )
  49. def get_async_redis_master() -> AsyncRedis:
  50. return get_async_sentinel_client().master_for(
  51. service_name=settings.REDIS_MASTER_NAME,
  52. **_redis_connection_kwargs(),
  53. )
  54. def get_async_redis_slave() -> AsyncRedis:
  55. return get_async_sentinel_client().slave_for(
  56. service_name=settings.REDIS_MASTER_NAME,
  57. **_redis_connection_kwargs(),
  58. )
  59. def get_redis() -> Redis:
  60. # 默认返回主节点客户端,统一写入入口
  61. return get_redis_master()
  62. def get_async_redis() -> AsyncRedis:
  63. # 默认返回主节点客户端,统一写入入口
  64. return get_async_redis_master()
  65. def check_redis_connection() -> dict:
  66. """
  67. 通过 Sentinel 获取当前主节点并执行 ping,返回连接状态与主节点地址。
  68. """
  69. sentinel = get_sentinel_client()
  70. redis_client = get_redis_master()
  71. pong = redis_client.ping()
  72. host, port = sentinel.discover_master(settings.REDIS_MASTER_NAME)
  73. return {
  74. "ok": bool(pong),
  75. "master_name": settings.REDIS_MASTER_NAME,
  76. "master_host": host,
  77. "master_port": port,
  78. "db": settings.REDIS_DB,
  79. }