| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- # -*- coding: utf-8 -*-
- # @Author : YY
- from typing import List, Literal, Optional
- from ruoyi_common.constant import UserConstants
- from ruoyi_common.domain.entity import SysRole
- from ruoyi_common.sqlalchemy.transaction import Transactional
- from ruoyi_common.exception import ServiceException
- from ruoyi_common.utils import security_util as SecurityUtil
- from ruoyi_system.domain.entity import SysRoleDept, SysRoleMenu, SysUserRole
- from ruoyi_system.mapper import SysRoleDeptMapper,SysRoleMenuMapper, \
- SysUserRoleMapper,SysRoleMapper
- from ruoyi_system.service.sys_dept import SysDeptService
- from ruoyi_framework.descriptor.datascope import DataScope
- from ruoyi_admin.ext import db
- class SysRoleService:
-
- @classmethod
- @DataScope(dept=True)
- def select_role_list(cls, query:SysRole) -> List[SysRole]:
- """
- 根据条件,查询角色数据
- Args:
- query (SysRole): 包含查询条件的传输对象
- Returns:
- List[SysRole]: 角色列表
- """
- return SysRoleMapper.select_role_list(query)
- @classmethod
- def select_role_all(cls) -> List[SysRole]:
- """
- 查询所有角色数据
- Args:
- query (SysRole): 包含查询条件的传输对象
- Returns:
- List[SysRole]: 角色列表
- """
- return SysRoleMapper.select_role_all()
-
- @classmethod
- def select_roles_by_user_id(cls, user_id:int) -> List[SysRole]:
- """
- 根据用户ID查询角色列表
- Args:
- user_id(int): 用户ID
- Returns:
- List[SysRole]: 角色列表
- """
- eos: List[SysRole] = SysRoleMapper.select_role_permission_by_user_id(user_id)
- scoped_roles: List[SysRole] = cls.select_role_list(None)
- for role in scoped_roles:
- if role.role_id in [eo.role_id for eo in eos]:
- role.flag = True
- return scoped_roles
- @classmethod
- def select_role_permission_by_user_id(cls, user_id:int) -> List[str]:
- """
- 根据用户ID,查询角色权限
- Args:
- user_id(int): 用户ID
- Returns:
- List[str]: 角色权限列表
- """
- eos = SysRoleMapper.select_role_permission_by_user_id(user_id)
- permset = set()
- for eo in eos:
- permset = permset | set(eo.role_key.strip().split(","))
- return list(permset)
- @classmethod
- def select_role_list_by_user_id(cls, user_id:int) -> List[SysRole]:
- """
- 根据用户ID,查询角色选择框列表
- Args:
- user_id(int): 用户ID
- Returns:
- List[SysRole]: 角色选择框列表
- """
- user_eos:List[SysRole] = SysRoleMapper. \
- select_role_permission_by_user_id(user_id)
- eos:List[SysRole] = cls.select_role_list(SysRole())
- for eo in eos:
- for user_eo in user_eos:
- if eo.role_id == user_eo.role_id:
- eo.flag = True
- break
- return eos
- @classmethod
- def select_role_by_id(cls, role_id:int) -> Optional[SysRole]:
- """
- 根据角色ID,查询角色
- Args:
- role_id(int): 角色ID
- Returns:
- SysRole: 角色信息
- """
- role = SysRoleMapper.select_role_by_id(role_id)
- if role:
- role.menu_ids = SysRoleMenuMapper.select_menu_ids_by_role_id(role_id)
- role.dept_ids = list(SysDeptService.select_dept_list_by_role_id(role_id)) \
- if role.data_scope == "2" else []
- return role
-
- @classmethod
- def check_role_name_unique(cls, role: SysRole) -> Literal['0', '1']:
- """
- 校验角色名称是否唯一
-
- Args:
- role(SysRole): 角色信息
- Returns:
- Literal['0', '1'] : 唯一性校验结果, 0:唯一, 1:不唯一
- """
- eo:SysRole = SysRoleMapper.check_role_name_unique(role.role_name)
- if eo and eo.role_id != role.role_id:
- return UserConstants.NOT_UNIQUE
- return UserConstants.UNIQUE
- @classmethod
- def check_role_key_unique(cls, role: SysRole) -> Literal['0', '1']:
- """校验角色权限是否唯一
-
- Args:
- role(SysRole): 角色信息
- Returns:
- Literal['0', '1'] : 唯一性校验结果, 0:唯一, 1:不唯一
- """
- eo: SysRole = SysRoleMapper.check_role_key_unique(role.role_key)
- if eo and eo.role_id != role.role_id:
- return UserConstants.NOT_UNIQUE
- return UserConstants.UNIQUE
- @classmethod
- def check_role_allowed(cls, role: SysRole):
- """
- 校验角色是否允许操作
-
- Args:
- role(SysRole): 角色信息
- Raises:
- ServiceException: 超级管理员角色不允许操作
- """
- if role and role.role_id and role.is_admin():
- raise ServiceException("不允许操作超级管理员角色")
-
- @classmethod
- def check_role_data_scope(cls, role_id: int):
- """
- 校验角色是否有数据权限
-
- Args:
- role_id(int): 角色ID
- Raises:
- ServiceException: 超级管理员角色不允许操作
- """
- if not SecurityUtil.is_admin(SecurityUtil.get_user_id()):
- role = SysRole(role_id=role_id)
- roles: List[SysRole] = cls.select_role_list(role)
- if not roles:
- raise ServiceException("没有权限访问角色数据")
- @classmethod
- def count_user_role_by_role_id(cls, role_id:int) -> int:
- """
- 通过角色ID查询角色使用数量
- Args:
- role_id(int): 角色ID
- Returns:
- int: 角色使用数量
- """
- return SysRoleMapper.count_user_role_by_role_id(role_id)
- @classmethod
- @Transactional(db.session)
- def insert_role(cls, role:SysRole) -> int:
- """
- 新增角色信息
-
- Args:
- role(SysRole): 角色信息
- Returns:
- int: 新增角色ID
- """
- last_id = SysRoleMapper.insert_role(role)
- role.role_id = last_id
- return cls.insert_role_menu(role)
- @classmethod
- @Transactional(db.session)
- def update_role(cls, role:SysRole) -> bool:
- """
- 修改保存角色信息
- Args:
- role(SysRole): 角色信息
-
- Returns:
- bool: 修改结果
- """
- SysRoleMapper.update_role(role)
- SysRoleMenuMapper.delete_role_menu_by_role_id(role.role_id)
- return cls.insert_role_menu(role) > 0
- @classmethod
- @Transactional(db.session)
- def update_role_status(cls, role:SysRole) -> bool:
- """
- 修改角色状态
- Args:
- role(SysRole): 角色信息
-
- Returns:
- bool: 修改结果
- """
- return SysRoleMapper.update_role(role) > 0
- @classmethod
- @Transactional(db.session)
- def auth_data_scope(cls, role:SysRole) -> bool:
- """
- 修改数据权限信息
- Args:
- role(SysRole): 角色信息
-
- Returns:
- bool: 修改结果(True 表示成功)
- """
- # 1. 先更新角色本身的数据范围等信息
- SysRoleMapper.update_role(role)
- # 2. 清理原有角色-部门关联
- SysRoleDeptMapper.delete_role_dept_by_role_id(role.role_id)
- # 3. 只有“自定义数据权限”(通常为 '2') 时才维护角色-部门表
- # 其他类型的数据范围(全部、本部门、本部门及以下、仅本人)都不需要 sys_role_dept 记录
- if role.data_scope == "2" and role.dept_ids:
- num = cls.insert_role_dept(role)
- return num > 0
- # 非自定义范围,即使没有部门记录也视为成功
- return True
-
- @classmethod
- @Transactional(db.session)
- def insert_role_menu(cls, role: SysRole) -> int:
- """
- 新增角色菜单信息
- Args:
- role(SysRole): 角色信息
- Returns:
- int: 新增角色菜单数量
- """
- menus = []
- for menu_id in role.menu_ids:
- menu = SysRoleMenu(role_id=role.role_id, menu_id=menu_id)
- menus.append(menu)
- if len(menus) > 0:
- SysRoleMenuMapper.batch_role_menu(menus)
- return len(menus)
-
- @classmethod
- @Transactional(db.session)
- def insert_role_dept(cls, role: SysRole):
- """
- 新增角色部门信息
- Args:
- role(SysRole): 角色信息
- Returns:
- int: 新增角色部门数量
- """
- depts = []
- for dept_id in role.dept_ids:
- dept = SysRoleDept(role_id=role.role_id, dept_id=dept_id)
- depts.append(dept)
- if len(depts) > 0:
- SysRoleDeptMapper.batch_role_dept(depts)
- return len(depts)
-
- @classmethod
- @Transactional(db.session)
- def delete_role_by_id(cls, role_id:int) -> bool:
- """
- 通过角色ID删除角色
- Args:
- role_id(int): 角色ID
- Returns:
- bool: 删除结果
- """
- SysRoleMenuMapper.delete_role_menu_by_role_id(role_id)
- SysRoleDeptMapper.delete_role_dept_by_role_id(role_id)
- return SysRoleMapper.delete_role_by_id(role_id) > 0
- @classmethod
- @Transactional(db.session)
- def delete_role_by_ids(cls, role_ids:List[int]) -> bool:
- """
- 批量删除角色信息
- Args:
- role_ids(List[int]): 角色ID列表
- Returns:
- bool: 删除结果
- """
- for role_id in role_ids:
- cls.check_role_allowed(SysRole(role_id=role_id))
- # cls.check_role_data_scope(role_id) # todo
- role: SysRole = cls.select_role_by_id(role_id)
- if cls.count_user_role_by_role_id(role_id) > 0:
- raise ServiceException("角色{}已分配用户,不能删除".format(role.role_name))
- SysRoleMenuMapper.delete_role_menu(role_ids)
- SysRoleDeptMapper.delete_role_dept(role_ids)
- return SysRoleMapper.delete_role_by_ids(role_ids) > 0
- @classmethod
- @Transactional(db.session)
- def delete_auth_user(cls, user_role:SysUserRole) -> bool:
- """
- 取消授权用户角色
- Args:
- user_role(SysUserRole): 用户角色信息
- Returns:
- bool: 取消授权结果
- """
- return SysUserRoleMapper.delete_user_role_info(user_role) > 0
- @classmethod
- @Transactional(db.session)
- def delete_auth_users(cls, role_id, user_ids) -> bool:
- """
- 批量取消授权用户角色
- Args:
- role_id(int): 角色ID
- user_ids(List[int]): 用户ID列表
- Returns:
- bool: 取消授权结果
- """
- return SysUserRoleMapper.delete_user_role_infos(role_id, user_ids) > 0
- @classmethod
- @Transactional(db.session)
- def insert_auth_users(cls, role_id, user_ids) -> bool:
- """
- 批量选择授权用户角色
- Args:
- role_id(int): 角色ID
- user_ids(List[int]): 用户ID列表
- Returns:
- bool: 选择授权结果
- """
- user_roles = [SysUserRole(user_id=user_id, role_id=role_id) \
- for user_id in user_ids]
- num = SysUserRoleMapper.batch_user_role(user_roles)
- return num > 0
-
|