custom_cache_evict.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. # -*- coding: utf-8 -*-
  2. """
  3. 自定义缓存清理装饰器,对应 Java 版本的 `@CustomCacheEvict`。
  4. 执行目标函数后,根据前缀/字段路径/参数组合构造通配符 Key,并批量删除 Redis 缓存。
  5. """
  6. from __future__ import annotations
  7. import functools
  8. import inspect
  9. import logging
  10. from typing import Any, Callable, Iterable, Mapping, MutableMapping, Sequence
  11. from werkzeug.local import LocalProxy
  12. from .custom_cacheable import (
  13. ARGS_HASH_PREFIX,
  14. COMMON_SEPARATOR,
  15. _get_value_by_field_path,
  16. _hash_arguments,
  17. _resolve_redis_client,
  18. )
  19. logger = logging.getLogger(__name__)
  20. __all__ = ["custom_cache_evict"]
  21. def custom_cache_evict(
  22. key_prefixes: Sequence[str],
  23. key_fields: Sequence[str] | None = None,
  24. use_query_params_as_key: bool = False,
  25. ) -> Callable:
  26. """
  27. Redis 缓存清理装饰器。
  28. Args:
  29. key_prefixes: 缓存前缀数组,必填,对应待清理的一组 Key。
  30. key_fields: 与前缀一一对应的字段路径,允许缺省;缺省时直接按前缀通配符清理。
  31. use_query_params_as_key: 是否将函数参数序列化为 Key 的一部分(需与存储端保持一致)。
  32. """
  33. if not key_prefixes:
  34. raise ValueError("key_prefixes 不能为空")
  35. def decorator(func: Callable) -> Callable:
  36. signature = inspect.signature(func)
  37. @functools.wraps(func)
  38. def wrapper(*args: Any, **kwargs: Any) -> Any:
  39. result = func(*args, **kwargs)
  40. client = _resolve_redis_client()
  41. if client is None:
  42. return result
  43. params = _bind_arguments(signature, *args, **kwargs)
  44. args_hash = _hash_arguments(params) if use_query_params_as_key else None
  45. for idx, prefix in enumerate(key_prefixes):
  46. if not prefix:
  47. continue
  48. pattern = prefix
  49. field_value = _extract_field_value(params, key_fields, idx)
  50. if field_value not in (None, ""):
  51. pattern = f"{pattern}{COMMON_SEPARATOR}{field_value}"
  52. if use_query_params_as_key and args_hash:
  53. pattern = f"{pattern}{COMMON_SEPARATOR}{ARGS_HASH_PREFIX}:{args_hash}"
  54. pattern = f"{pattern}*"
  55. _delete_keys_by_pattern(client, pattern)
  56. return result
  57. return wrapper
  58. return decorator
  59. def _bind_arguments(signature: inspect.Signature, *args: Any, **kwargs: Any) -> MutableMapping[str, Any]:
  60. """
  61. 对函数参数做一次绑定,得到“参数名 -> 值”的映射,便于后续取字段。
  62. """
  63. bound_args = signature.bind_partial(*args, **kwargs)
  64. bound_args.apply_defaults()
  65. return bound_args.arguments
  66. def _extract_field_value(
  67. params: Mapping[str, Any],
  68. key_fields: Sequence[str] | None,
  69. index: int,
  70. ) -> Any:
  71. """
  72. 根据 key_fields 配置提取对应的嵌套字段值,超过范围时返回 None。
  73. """
  74. if not key_fields or index >= len(key_fields):
  75. return None
  76. field_path = key_fields[index]
  77. if not field_path:
  78. return None
  79. return _get_value_by_field_path(params, field_path)
  80. def _delete_keys_by_pattern(client: LocalProxy, pattern: str) -> None:
  81. """
  82. 使用 scan_iter 增量拉取匹配 Key 并删除,避免阻塞 Redis。
  83. """
  84. try:
  85. pipeline = client.pipeline(transaction=False)
  86. batch: list[str] = []
  87. for key in client.scan_iter(match=pattern, count=200):
  88. batch.append(key)
  89. if len(batch) >= 200:
  90. _execute_delete_batch(pipeline, batch)
  91. batch.clear()
  92. if batch:
  93. _execute_delete_batch(pipeline, batch)
  94. except Exception as exc: # noqa: BLE001
  95. logger.warning("按模式删除缓存失败 %s: %s", pattern, exc)
  96. def _execute_delete_batch(pipeline: Any, batch: Iterable[str]) -> None:
  97. """
  98. 批量删除 Key 并立即执行 pipeline。
  99. """
  100. try:
  101. for key in batch:
  102. pipeline.delete(key)
  103. pipeline.execute()
  104. except Exception as exc: # noqa: BLE001
  105. logger.warning("批量删除缓存失败: %s", exc)