datascope.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. # -*- coding: utf-8 -*-
  2. # @Author : YY
  3. from enum import Enum
  4. from functools import wraps
  5. from typing import Any, Literal
  6. from flask import g
  7. from pydantic import ConfigDict, Field, validate_call
  8. from pydantic.dataclasses import dataclass
  9. from sqlalchemy.orm.util import AliasedClass
  10. from sqlalchemy.sql.expression import or_
  11. from sqlalchemy import func
  12. from ruoyi_common.base.model import CriterianMeta
  13. from ruoyi_common.descriptor.validator import ValidatorScopeFunction
  14. from ruoyi_common.base.schema_vo import BaseEntity
  15. from ruoyi_common.domain.entity import LoginUser, SysUser
  16. from ruoyi_common.utils import security_util as SecurityUtil
  17. from ruoyi_system.domain.po import SysDeptPo, SysRoleDeptPo, SysUserPo
  18. class DataScopeEnum(Enum):
  19. ALL = "1"
  20. CUSTOM = "2"
  21. DEPT = "3"
  22. DEPT_AND_CHILD = "4"
  23. SELF = "5"
  24. @dataclass
  25. class DataScope:
  26. """
  27. 数据权限范围
  28. """
  29. model_config = ConfigDict(
  30. frozen = True,
  31. extra = "forbid",
  32. strict = True,
  33. populate_by_name = True
  34. )
  35. # DATA_SCOPE: Literal["data_scope"] = Field(init=False, exclude=True, repr=False)
  36. dept : bool = True
  37. user : bool = False
  38. def __call__(self, func) -> Any:
  39. vsfunc = ValidatorScopeFunction(func)
  40. @wraps(func)
  41. def wrapper(*args, **kwargs):
  42. unbound_model = vsfunc.unbound_model
  43. if unbound_model:
  44. key,_ = unbound_model
  45. # bo = kwargs.get(key)
  46. # self.handle_data_scope(bo)
  47. return vsfunc(*args, **kwargs)
  48. return wrapper
  49. def handle_data_scope(self, bo: BaseEntity):
  50. """
  51. 处理数据权限范围
  52. Args:
  53. bo (BaseEntity): 校验对象
  54. """
  55. login_user:LoginUser = SecurityUtil.get_login_user()
  56. if login_user:
  57. current_user:SysUser = login_user.user
  58. # 检查用户是否为超级管理员(用户ID为1或者具有admin角色)
  59. if not SecurityUtil.is_admin(login_user.user_id) and not self.is_user_admin(current_user):
  60. self.filter_data_scope(current_user)
  61. def is_user_admin(self, user: SysUser) -> bool:
  62. """
  63. 判断用户是否为管理员(通过角色判断)
  64. Args:
  65. user (SysUser): 用户对象
  66. Returns:
  67. bool: 是否为管理员
  68. """
  69. if user.roles:
  70. for role in user.roles:
  71. # 如果用户有任何一个角色的role_key为admin,则认为是超级管理员
  72. if hasattr(role, 'role_key') and role.role_key == 'admin':
  73. return True
  74. return False
  75. def filter_data_scope(self,user: SysUser):
  76. """
  77. 过滤数据权限范围
  78. Args:
  79. bo (BaseEntity): 校验对象
  80. user (SysUser): 当前用户
  81. """
  82. criterian_meta:CriterianMeta = g.criterian_meta
  83. criterions = []
  84. for role in user.roles:
  85. if role.data_scope == DataScopeEnum.ALL.value:
  86. # 全部数据权限
  87. criterions = []
  88. break
  89. elif role.data_scope == DataScopeEnum.CUSTOM.value:
  90. # 自定义数据权限
  91. subquery = SysRoleDeptPo.query(SysRoleDeptPo.dept_id) \
  92. .filter(
  93. SysRoleDeptPo.role_id == role.role_id
  94. ).subquery()
  95. if self.dept is True:
  96. criterion = SysDeptPo.dept_id.in_(subquery)
  97. elif isinstance(self.dept, AliasedClass):
  98. criterion = self.dept.dept_id.in_(subquery)
  99. if criterion:
  100. criterions.append(criterion)
  101. elif role.data_scope == DataScopeEnum.DEPT.value:
  102. # 本部门数据权限
  103. if self.dept is True:
  104. criterion = SysDeptPo.dept_id == user.dept_id
  105. elif isinstance(self.dept, AliasedClass):
  106. criterion = self.dept.dept_id == user.dept_id
  107. if criterion:
  108. criterions.append(criterion)
  109. elif role.data_scope == DataScopeEnum.DEPT_AND_CHILD.value:
  110. # 本部门及子部门数据权限
  111. subquery = SysDeptPo.query(SysDeptPo.dept_id) \
  112. .filter(
  113. or_(
  114. SysDeptPo.dept_id == user.dept_id,
  115. func.find_in_set(user.dept_id, SysDeptPo.ancestors)
  116. )
  117. ).subquery()
  118. if self.dept is True:
  119. criterion = SysDeptPo.dept_id.in_(subquery)
  120. elif isinstance(self.dept, AliasedClass):
  121. criterion = self.dept.dept_id.in_(subquery)
  122. if criterion:
  123. criterions.append(criterion)
  124. elif role.data_scope == DataScopeEnum.SELF.value:
  125. # 仅本人数据权限
  126. if self.user is True:
  127. criterion = SysUserPo.user_id == user.user_id
  128. elif isinstance(self.user, AliasedClass):
  129. criterion = self.user.user_id == user.user_id
  130. else:
  131. criterion = "1=0"
  132. criterions.append(criterion)
  133. else:
  134. print(ValueError("Invalid data_scope value: {}".format(role.data_scope)))
  135. data_scope = or_(*criterions)
  136. criterian_meta.scope = data_scope