sys_role.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. # -*- coding: utf-8 -*-
  2. # @Author : YY
  3. from typing import List, Literal, Optional
  4. from ruoyi_common.constant import UserConstants
  5. from ruoyi_common.domain.entity import SysRole
  6. from ruoyi_common.sqlalchemy.transaction import Transactional
  7. from ruoyi_common.exception import ServiceException
  8. from ruoyi_common.utils import security_util as SecurityUtil
  9. from ruoyi_system.domain.entity import SysRoleDept, SysRoleMenu, SysUserRole
  10. from ruoyi_system.mapper import SysRoleDeptMapper,SysRoleMenuMapper, \
  11. SysUserRoleMapper,SysRoleMapper
  12. from ruoyi_framework.descriptor.datascope import DataScope
  13. from ruoyi_admin.ext import db
  14. class SysRoleService:
  15. @classmethod
  16. @DataScope(dept=True)
  17. def select_role_list(cls, query:SysRole) -> List[SysRole]:
  18. """
  19. 根据条件,查询角色数据
  20. Args:
  21. query (SysRole): 包含查询条件的传输对象
  22. Returns:
  23. List[SysRole]: 角色列表
  24. """
  25. return SysRoleMapper.select_role_list(query)
  26. @classmethod
  27. def select_role_all(cls) -> List[SysRole]:
  28. """
  29. 查询所有角色数据
  30. Args:
  31. query (SysRole): 包含查询条件的传输对象
  32. Returns:
  33. List[SysRole]: 角色列表
  34. """
  35. return SysRoleMapper.select_role_all()
  36. @classmethod
  37. def select_roles_by_user_id(cls, user_id:int) -> List[SysRole]:
  38. """
  39. 根据用户ID查询角色列表
  40. Args:
  41. user_id(int): 用户ID
  42. Returns:
  43. List[SysRole]: 角色列表
  44. """
  45. eos: List[SysRole] = SysRoleMapper.select_role_permission_by_user_id(user_id)
  46. scoped_roles: List[SysRole] = cls.select_role_list(None)
  47. for role in scoped_roles:
  48. if role.role_id in [eo.role_id for eo in eos]:
  49. role.flag = True
  50. return scoped_roles
  51. @classmethod
  52. def select_role_permission_by_user_id(cls, user_id:int) -> List[str]:
  53. """
  54. 根据用户ID,查询角色权限
  55. Args:
  56. user_id(int): 用户ID
  57. Returns:
  58. List[str]: 角色权限列表
  59. """
  60. eos = SysRoleMapper.select_role_permission_by_user_id(user_id)
  61. permset = set()
  62. for eo in eos:
  63. permset = permset | set(eo.role_key.strip().split(","))
  64. return list(permset)
  65. @classmethod
  66. def select_role_list_by_user_id(cls, user_id:int) -> List[SysRole]:
  67. """
  68. 根据用户ID,查询角色选择框列表
  69. Args:
  70. user_id(int): 用户ID
  71. Returns:
  72. List[SysRole]: 角色选择框列表
  73. """
  74. user_eos:List[SysRole] = SysRoleMapper. \
  75. select_role_permission_by_user_id(user_id)
  76. eos:List[SysRole] = cls.select_role_list(SysRole())
  77. for eo in eos:
  78. for user_eo in user_eos:
  79. if eo.role_id == user_eo.role_id:
  80. eo.flag = True
  81. break
  82. return eos
  83. @classmethod
  84. def select_role_by_id(cls, role_id:int) -> Optional[SysRole]:
  85. """
  86. 根据角色ID,查询角色
  87. Args:
  88. role_id(int): 角色ID
  89. Returns:
  90. SysRole: 角色信息
  91. """
  92. return SysRoleMapper.select_role_by_id(role_id)
  93. @classmethod
  94. def check_role_name_unique(cls, role: SysRole) -> Literal['0', '1']:
  95. """
  96. 校验角色名称是否唯一
  97. Args:
  98. role(SysRole): 角色信息
  99. Returns:
  100. Literal['0', '1'] : 唯一性校验结果, 0:唯一, 1:不唯一
  101. """
  102. eo:SysRole = SysRoleMapper.check_role_name_unique(role.role_name)
  103. if eo and eo.role_id != role.role_id:
  104. return UserConstants.NOT_UNIQUE
  105. return UserConstants.UNIQUE
  106. @classmethod
  107. def check_role_key_unique(cls, role: SysRole) -> Literal['0', '1']:
  108. """校验角色权限是否唯一
  109. Args:
  110. role(SysRole): 角色信息
  111. Returns:
  112. Literal['0', '1'] : 唯一性校验结果, 0:唯一, 1:不唯一
  113. """
  114. eo:SysRole = SysRoleMapper.check_role_key_unique(role.role_key)
  115. if eo and eo.role_id == role.role_id:
  116. return UserConstants.NOT_UNIQUE
  117. return UserConstants.UNIQUE
  118. @classmethod
  119. def check_role_allowed(cls, role: SysRole):
  120. """
  121. 校验角色是否允许操作
  122. Args:
  123. role(SysRole): 角色信息
  124. Raises:
  125. ServiceException: 超级管理员角色不允许操作
  126. """
  127. if role and role.role_id and role.is_admin():
  128. raise ServiceException("不允许操作超级管理员角色")
  129. @classmethod
  130. def check_role_data_scope(cls, role_id: int):
  131. """
  132. 校验角色是否有数据权限
  133. Args:
  134. role_id(int): 角色ID
  135. Raises:
  136. ServiceException: 超级管理员角色不允许操作
  137. """
  138. if not SecurityUtil.is_admin(SecurityUtil.get_user_id()):
  139. role = SysRole(role_id=role_id)
  140. roles: List[SysRole] = cls.select_role_list(role)
  141. if not roles:
  142. raise ServiceException("没有权限访问角色数据")
  143. @classmethod
  144. def count_user_role_by_role_id(cls, role_id:int) -> int:
  145. """
  146. 通过角色ID查询角色使用数量
  147. Args:
  148. role_id(int): 角色ID
  149. Returns:
  150. int: 角色使用数量
  151. """
  152. return SysRoleMapper.count_user_role_by_role_id(role_id)
  153. @classmethod
  154. @Transactional(db.session)
  155. def insert_role(cls, role:SysRole) -> int:
  156. """
  157. 新增角色信息
  158. Args:
  159. role(SysRole): 角色信息
  160. Returns:
  161. int: 新增角色ID
  162. """
  163. last_id = SysRoleMapper.insert_role(role)
  164. role.role_id = last_id
  165. return cls.insert_role_menu(role)
  166. @classmethod
  167. @Transactional(db.session)
  168. def update_role(cls, role:SysRole) -> bool:
  169. """
  170. 修改保存角色信息
  171. Args:
  172. role(SysRole): 角色信息
  173. Returns:
  174. bool: 修改结果
  175. """
  176. SysRoleMapper.update_role(role)
  177. SysRoleMenuMapper.delete_role_menu_by_role_id(role.role_id)
  178. return cls.insert_role_menu(role) > 0
  179. @classmethod
  180. @Transactional(db.session)
  181. def update_role_status(cls, role:SysRole) -> bool:
  182. """
  183. 修改角色状态
  184. Args:
  185. role(SysRole): 角色信息
  186. Returns:
  187. bool: 修改结果
  188. """
  189. return SysRoleMapper.update_role(role) > 0
  190. @classmethod
  191. @Transactional(db.session)
  192. def auth_data_scope(cls, role:SysRole):
  193. """
  194. 修改数据权限信息
  195. Args:
  196. role(SysRole): 角色信息
  197. Returns:
  198. bool: 修改结果
  199. """
  200. SysRoleMapper.update_role(role)
  201. SysRoleDeptMapper.delete_role_dept_by_role_id(role.role_id)
  202. return cls.insert_role_dept(role)
  203. @classmethod
  204. @Transactional(db.session)
  205. def insert_role_menu(cls, role: SysRole) -> int:
  206. """
  207. 新增角色菜单信息
  208. Args:
  209. role(SysRole): 角色信息
  210. Returns:
  211. int: 新增角色菜单数量
  212. """
  213. menus = []
  214. for menu_id in role.menu_ids:
  215. menu = SysRoleMenu(role_id=role.role_id, menu_id=menu_id)
  216. menus.append(menu)
  217. if len(menus) > 0:
  218. SysRoleMenuMapper.batch_role_menu(menus)
  219. return len(menus)
  220. @classmethod
  221. @Transactional(db.session)
  222. def insert_role_dept(cls, role: SysRole):
  223. """
  224. 新增角色部门信息
  225. Args:
  226. role(SysRole): 角色信息
  227. Returns:
  228. int: 新增角色部门数量
  229. """
  230. depts = []
  231. for dept_id in role.dept_ids:
  232. dept = SysRoleDept(role_id=role.role_id, dept_id=dept_id)
  233. depts.append(dept)
  234. if len(depts) > 0:
  235. SysRoleDeptMapper.batch_role_dept(depts)
  236. return len(depts)
  237. @classmethod
  238. @Transactional(db.session)
  239. def delete_role_by_id(cls, role_id:int) -> bool:
  240. """
  241. 通过角色ID删除角色
  242. Args:
  243. role_id(int): 角色ID
  244. Returns:
  245. bool: 删除结果
  246. """
  247. SysRoleMenuMapper.delete_role_menu_by_role_id(role_id)
  248. SysRoleDeptMapper.delete_role_dept_by_role_id(role_id)
  249. return SysRoleMapper.delete_role_by_id(role_id) > 0
  250. @classmethod
  251. @Transactional(db.session)
  252. def delete_role_by_ids(cls, role_ids:List[int]) -> bool:
  253. """
  254. 批量删除角色信息
  255. Args:
  256. role_ids(List[int]): 角色ID列表
  257. Returns:
  258. bool: 删除结果
  259. """
  260. for role_id in role_ids:
  261. cls.check_role_allowed(SysRole(role_id=role_id))
  262. # cls.check_role_data_scope(role_id) # todo
  263. role: SysRole = cls.select_role_by_id(role_id)
  264. if cls.count_user_role_by_role_id(role_id) > 0:
  265. raise ServiceException("角色{}已分配用户,不能删除".format(role.role_name))
  266. SysRoleMenuMapper.delete_role_menu(role_ids)
  267. SysRoleDeptMapper.delete_role_dept(role_ids)
  268. return SysRoleMapper.delete_role_by_ids(role_ids) > 0
  269. @classmethod
  270. @Transactional(db.session)
  271. def delete_auth_user(cls, user_role:SysUserRole) -> bool:
  272. """
  273. 取消授权用户角色
  274. Args:
  275. user_role(SysUserRole): 用户角色信息
  276. Returns:
  277. bool: 取消授权结果
  278. """
  279. return SysUserRoleMapper.delete_user_role_info(user_role) > 0
  280. @classmethod
  281. @Transactional(db.session)
  282. def delete_auth_users(cls, role_id, user_ids) -> bool:
  283. """
  284. 批量取消授权用户角色
  285. Args:
  286. role_id(int): 角色ID
  287. user_ids(List[int]): 用户ID列表
  288. Returns:
  289. bool: 取消授权结果
  290. """
  291. return SysUserRoleMapper.delete_user_role_infos(role_id, user_ids) > 0
  292. @classmethod
  293. @Transactional(db.session)
  294. def insert_auth_users(cls, role_id, user_ids) -> bool:
  295. """
  296. 批量选择授权用户角色
  297. Args:
  298. role_id(int): 角色ID
  299. user_ids(List[int]): 用户ID列表
  300. Returns:
  301. bool: 选择授权结果
  302. """
  303. user_roles = [SysUserRole(user_id=user_id, role_id=role_id) \
  304. for user_id in user_ids]
  305. num = SysUserRoleMapper.batch_user_role(user_roles)
  306. return num > 0