datascope.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. # -*- coding: utf-8 -*-
  2. # @Author : YY
  3. from enum import Enum
  4. from functools import wraps
  5. from typing import Any, Literal
  6. from flask import g
  7. from pydantic import ConfigDict, Field, validate_call
  8. from pydantic.dataclasses import dataclass
  9. from sqlalchemy.orm.util import AliasedClass
  10. from sqlalchemy.sql.expression import or_
  11. from sqlalchemy import func
  12. from ruoyi_common.base.model import CriterianMeta
  13. from ruoyi_common.descriptor.validator import ValidatorScopeFunction
  14. from ruoyi_common.base.schema_vo import BaseEntity
  15. from ruoyi_common.domain.entity import LoginUser, SysUser
  16. from ruoyi_common.utils import security_util as SecurityUtil
  17. from ruoyi_system.domain.po import SysDeptPo, SysRoleDeptPo, SysUserPo
  18. class DataScopeEnum(Enum):
  19. ALL = "1"
  20. CUSTOM = "2"
  21. DEPT = "3"
  22. DEPT_AND_CHILD = "4"
  23. SELF = "5"
  24. @dataclass
  25. class DataScope:
  26. """
  27. 数据权限范围
  28. """
  29. model_config = ConfigDict(
  30. frozen = True,
  31. extra = "forbid",
  32. strict = True,
  33. populate_by_name = True
  34. )
  35. # DATA_SCOPE: Literal["data_scope"] = Field(init=False, exclude=True, repr=False)
  36. dept : bool = True
  37. user : bool = False
  38. def __call__(self, func) -> Any:
  39. vsfunc = ValidatorScopeFunction(func)
  40. @wraps(func)
  41. def wrapper(*args, **kwargs):
  42. unbound_model = vsfunc.unbound_model
  43. if unbound_model:
  44. key,_ = unbound_model
  45. # bo = kwargs.get(key)
  46. # self.handle_data_scope(bo)
  47. return vsfunc(*args, **kwargs)
  48. return wrapper
  49. def handle_data_scope(self, bo: BaseEntity):
  50. """
  51. 处理数据权限范围
  52. Args:
  53. bo (BaseEntity): 校验对象
  54. """
  55. login_user:LoginUser = SecurityUtil.get_login_user()
  56. if login_user:
  57. current_user:SysUser = login_user.user
  58. if not SecurityUtil.is_admin(login_user.user_id):
  59. self.filter_data_scope(current_user)
  60. def filter_data_scope(self,user: SysUser):
  61. """
  62. 过滤数据权限范围
  63. Args:
  64. bo (BaseEntity): 校验对象
  65. user (SysUser): 当前用户
  66. """
  67. criterian_meta:CriterianMeta = g.criterian_meta
  68. criterions = []
  69. for role in user.roles:
  70. if role.data_scope == DataScopeEnum.ALL.value:
  71. # 全部数据权限
  72. criterions = []
  73. break
  74. elif role.data_scope == DataScopeEnum.CUSTOM.value:
  75. # 自定义数据权限
  76. subquery = SysRoleDeptPo.query(SysRoleDeptPo.dept_id) \
  77. .filter(
  78. SysRoleDeptPo.role_id == role.role_id
  79. ).subquery()
  80. if self.dept is True:
  81. criterion = SysDeptPo.dept_id.in_(subquery)
  82. elif isinstance(self.dept, AliasedClass):
  83. criterion = self.dept.dept_id.in_(subquery)
  84. if criterion:
  85. criterions.append(criterion)
  86. elif role.data_scope == DataScopeEnum.DEPT.value:
  87. # 本部门数据权限
  88. if self.dept is True:
  89. criterion = SysDeptPo.dept_id == user.dept_id
  90. elif isinstance(self.dept, AliasedClass):
  91. criterion = self.dept.dept_id == user.dept_id
  92. if criterion:
  93. criterions.append(criterion)
  94. elif role.data_scope == DataScopeEnum.DEPT_AND_CHILD.value:
  95. # 本部门及子部门数据权限
  96. subquery = SysDeptPo.query(SysDeptPo.dept_id) \
  97. .filter(
  98. or_(
  99. SysDeptPo.dept_id == user.dept_id,
  100. func.find_in_set(user.dept_id, SysDeptPo.ancestors)
  101. )
  102. ).subquery()
  103. if self.dept is True:
  104. criterion = SysDeptPo.dept_id.in_(subquery)
  105. elif isinstance(self.dept, AliasedClass):
  106. criterion = self.dept.dept_id.in_(subquery)
  107. if criterion:
  108. criterions.append(criterion)
  109. elif role.data_scope == DataScopeEnum.SELF.value:
  110. # 仅本人数据权限
  111. if self.user is True:
  112. criterion = SysUserPo.user_id == user.user_id
  113. elif isinstance(self.user, AliasedClass):
  114. criterion = self.user.user_id == user.user_id
  115. else:
  116. criterion = "1=0"
  117. criterions.append(criterion)
  118. else:
  119. print(ValueError("Invalid data_scope value: {}".format(role.data_scope)))
  120. data_scope = or_(*criterions)
  121. criterian_meta.scope = data_scope