| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- # -*- coding: utf-8 -*-
- # @Author : YY
- from enum import Enum
- from functools import wraps
- from typing import Any, Literal
- from flask import g
- from pydantic import ConfigDict, Field, validate_call
- from pydantic.dataclasses import dataclass
- from sqlalchemy.orm.util import AliasedClass
- from sqlalchemy.sql.expression import or_
- from sqlalchemy import func
- from ruoyi_common.base.model import CriterianMeta
- from ruoyi_common.descriptor.validator import ValidatorScopeFunction
- from ruoyi_common.base.schema_vo import BaseEntity
- from ruoyi_common.domain.entity import LoginUser, SysUser
- from ruoyi_common.utils import security_util as SecurityUtil
- from ruoyi_system.domain.po import SysDeptPo, SysRoleDeptPo, SysUserPo
- class DataScopeEnum(Enum):
-
- ALL = "1"
-
- CUSTOM = "2"
-
- DEPT = "3"
-
- DEPT_AND_CHILD = "4"
-
- SELF = "5"
-
-
- @dataclass
- class DataScope:
- """
- 数据权限范围
- """
- model_config = ConfigDict(
- frozen = True,
- extra = "forbid",
- strict = True,
- populate_by_name = True
- )
-
- # DATA_SCOPE: Literal["data_scope"] = Field(init=False, exclude=True, repr=False)
-
- dept : bool = True
- user : bool = False
-
-
- def __call__(self, func) -> Any:
-
- vsfunc = ValidatorScopeFunction(func)
-
- @wraps(func)
- def wrapper(*args, **kwargs):
- unbound_model = vsfunc.unbound_model
- if unbound_model:
- key,_ = unbound_model
- # bo = kwargs.get(key)
- # self.handle_data_scope(bo)
- return vsfunc(*args, **kwargs)
- return wrapper
- def handle_data_scope(self, bo: BaseEntity):
- """
- 处理数据权限范围
-
- Args:
- bo (BaseEntity): 校验对象
- """
- login_user:LoginUser = SecurityUtil.get_login_user()
- if login_user:
- current_user:SysUser = login_user.user
- # 检查用户是否为超级管理员(用户ID为1或者具有admin角色)
- if not SecurityUtil.is_admin(login_user.user_id) and not self.is_user_admin(current_user):
- self.filter_data_scope(current_user)
-
- def is_user_admin(self, user: SysUser) -> bool:
- """
- 判断用户是否为管理员(通过角色判断)
-
- Args:
- user (SysUser): 用户对象
-
- Returns:
- bool: 是否为管理员
- """
- if user.roles:
- for role in user.roles:
- # 如果用户有任何一个角色的role_key为admin,则认为是超级管理员
- if hasattr(role, 'role_key') and role.role_key == 'admin':
- return True
- return False
-
- def filter_data_scope(self,user: SysUser):
- """
- 过滤数据权限范围
-
- Args:
- bo (BaseEntity): 校验对象
- user (SysUser): 当前用户
- """
- criterian_meta:CriterianMeta = g.criterian_meta
-
- criterions = []
- for role in user.roles:
- if role.data_scope == DataScopeEnum.ALL.value:
- # 全部数据权限
- criterions = []
- break
- elif role.data_scope == DataScopeEnum.CUSTOM.value:
- # 自定义数据权限
- subquery = SysRoleDeptPo.query(SysRoleDeptPo.dept_id) \
- .filter(
- SysRoleDeptPo.role_id == role.role_id
- ).subquery()
- if self.dept is True:
- criterion = SysDeptPo.dept_id.in_(subquery)
- elif isinstance(self.dept, AliasedClass):
- criterion = self.dept.dept_id.in_(subquery)
- if criterion:
- criterions.append(criterion)
- elif role.data_scope == DataScopeEnum.DEPT.value:
- # 本部门数据权限
- if self.dept is True:
- criterion = SysDeptPo.dept_id == user.dept_id
- elif isinstance(self.dept, AliasedClass):
- criterion = self.dept.dept_id == user.dept_id
- if criterion:
- criterions.append(criterion)
- elif role.data_scope == DataScopeEnum.DEPT_AND_CHILD.value:
- # 本部门及子部门数据权限
- subquery = SysDeptPo.query(SysDeptPo.dept_id) \
- .filter(
- or_(
- SysDeptPo.dept_id == user.dept_id,
- func.find_in_set(user.dept_id, SysDeptPo.ancestors)
- )
- ).subquery()
- if self.dept is True:
- criterion = SysDeptPo.dept_id.in_(subquery)
- elif isinstance(self.dept, AliasedClass):
- criterion = self.dept.dept_id.in_(subquery)
- if criterion:
- criterions.append(criterion)
- elif role.data_scope == DataScopeEnum.SELF.value:
- # 仅本人数据权限
- if self.user is True:
- criterion = SysUserPo.user_id == user.user_id
- elif isinstance(self.user, AliasedClass):
- criterion = self.user.user_id == user.user_id
- else:
- criterion = "1=0"
- criterions.append(criterion)
- else:
- print(ValueError("Invalid data_scope value: {}".format(role.data_scope)))
- data_scope = or_(*criterions)
- criterian_meta.scope = data_scope
|