sys_role.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. # -*- coding: utf-8 -*-
  2. # @Author : YY
  3. from typing import List, Literal, Optional
  4. from ruoyi_common.constant import UserConstants
  5. from ruoyi_common.domain.entity import SysRole
  6. from ruoyi_common.sqlalchemy.transaction import Transactional
  7. from ruoyi_common.exception import ServiceException
  8. from ruoyi_common.utils import security_util as SecurityUtil
  9. from ruoyi_system.domain.entity import SysRoleDept, SysRoleMenu, SysUserRole
  10. from ruoyi_system.mapper import SysRoleDeptMapper,SysRoleMenuMapper, \
  11. SysUserRoleMapper,SysRoleMapper
  12. from ruoyi_framework.descriptor.datascope import DataScope
  13. from ruoyi_admin.ext import db
  14. class SysRoleService:
  15. @classmethod
  16. @DataScope(dept=True)
  17. def select_role_list(cls, query:SysRole) -> List[SysRole]:
  18. """
  19. 根据条件,查询角色数据
  20. Args:
  21. query (SysRole): 包含查询条件的传输对象
  22. Returns:
  23. List[SysRole]: 角色列表
  24. """
  25. return SysRoleMapper.select_role_list(query)
  26. @classmethod
  27. def select_role_all(cls) -> List[SysRole]:
  28. """
  29. 查询所有角色数据
  30. Args:
  31. query (SysRole): 包含查询条件的传输对象
  32. Returns:
  33. List[SysRole]: 角色列表
  34. """
  35. return SysRoleMapper.select_role_all()
  36. @classmethod
  37. def select_roles_by_user_id(cls, user_id:int) -> List[SysRole]:
  38. """
  39. 根据用户ID查询角色列表
  40. Args:
  41. user_id(int): 用户ID
  42. Returns:
  43. List[SysRole]: 角色列表
  44. """
  45. eos: List[SysRole] = SysRoleMapper.select_role_permission_by_user_id(user_id)
  46. scoped_roles: List[SysRole] = cls.select_role_list(None)
  47. for role in scoped_roles:
  48. if role.role_id in [eo.role_id for eo in eos]:
  49. role.flag = True
  50. return scoped_roles
  51. @classmethod
  52. def select_role_permission_by_user_id(cls, user_id:int) -> List[str]:
  53. """
  54. 根据用户ID,查询角色权限
  55. Args:
  56. user_id(int): 用户ID
  57. Returns:
  58. List[str]: 角色权限列表
  59. """
  60. eos = SysRoleMapper.select_role_permission_by_user_id(user_id)
  61. permset = set()
  62. for eo in eos:
  63. permset = permset | set(eo.role_key.strip().split(","))
  64. return list(permset)
  65. @classmethod
  66. def select_role_list_by_user_id(cls, user_id:int) -> List[SysRole]:
  67. """
  68. 根据用户ID,查询角色选择框列表
  69. Args:
  70. user_id(int): 用户ID
  71. Returns:
  72. List[SysRole]: 角色选择框列表
  73. """
  74. user_eos:List[SysRole] = SysRoleMapper. \
  75. select_role_permission_by_user_id(user_id)
  76. eos:List[SysRole] = cls.select_role_list(SysRole())
  77. for eo in eos:
  78. for user_eo in user_eos:
  79. if eo.role_id == user_eo.role_id:
  80. eo.flag = True
  81. break
  82. return eos
  83. @classmethod
  84. def select_role_by_id(cls, role_id:int) -> Optional[SysRole]:
  85. """
  86. 根据角色ID,查询角色
  87. Args:
  88. role_id(int): 角色ID
  89. Returns:
  90. SysRole: 角色信息
  91. """
  92. role = SysRoleMapper.select_role_by_id(role_id)
  93. if role:
  94. role.menu_ids = SysRoleMenuMapper.select_menu_ids_by_role_id(role_id)
  95. role.dept_ids = SysRoleDeptMapper.select_dept_ids_by_role_id(role_id)
  96. return role
  97. @classmethod
  98. def check_role_name_unique(cls, role: SysRole) -> Literal['0', '1']:
  99. """
  100. 校验角色名称是否唯一
  101. Args:
  102. role(SysRole): 角色信息
  103. Returns:
  104. Literal['0', '1'] : 唯一性校验结果, 0:唯一, 1:不唯一
  105. """
  106. eo:SysRole = SysRoleMapper.check_role_name_unique(role.role_name)
  107. if eo and eo.role_id != role.role_id:
  108. return UserConstants.NOT_UNIQUE
  109. return UserConstants.UNIQUE
  110. @classmethod
  111. def check_role_key_unique(cls, role: SysRole) -> Literal['0', '1']:
  112. """校验角色权限是否唯一
  113. Args:
  114. role(SysRole): 角色信息
  115. Returns:
  116. Literal['0', '1'] : 唯一性校验结果, 0:唯一, 1:不唯一
  117. """
  118. eo: SysRole = SysRoleMapper.check_role_key_unique(role.role_key)
  119. if eo and eo.role_id != role.role_id:
  120. return UserConstants.NOT_UNIQUE
  121. return UserConstants.UNIQUE
  122. @classmethod
  123. def check_role_allowed(cls, role: SysRole):
  124. """
  125. 校验角色是否允许操作
  126. Args:
  127. role(SysRole): 角色信息
  128. Raises:
  129. ServiceException: 超级管理员角色不允许操作
  130. """
  131. if role and role.role_id and role.is_admin():
  132. raise ServiceException("不允许操作超级管理员角色")
  133. @classmethod
  134. def check_role_data_scope(cls, role_id: int):
  135. """
  136. 校验角色是否有数据权限
  137. Args:
  138. role_id(int): 角色ID
  139. Raises:
  140. ServiceException: 超级管理员角色不允许操作
  141. """
  142. if not SecurityUtil.is_admin(SecurityUtil.get_user_id()):
  143. role = SysRole(role_id=role_id)
  144. roles: List[SysRole] = cls.select_role_list(role)
  145. if not roles:
  146. raise ServiceException("没有权限访问角色数据")
  147. @classmethod
  148. def count_user_role_by_role_id(cls, role_id:int) -> int:
  149. """
  150. 通过角色ID查询角色使用数量
  151. Args:
  152. role_id(int): 角色ID
  153. Returns:
  154. int: 角色使用数量
  155. """
  156. return SysRoleMapper.count_user_role_by_role_id(role_id)
  157. @classmethod
  158. @Transactional(db.session)
  159. def insert_role(cls, role:SysRole) -> int:
  160. """
  161. 新增角色信息
  162. Args:
  163. role(SysRole): 角色信息
  164. Returns:
  165. int: 新增角色ID
  166. """
  167. last_id = SysRoleMapper.insert_role(role)
  168. role.role_id = last_id
  169. return cls.insert_role_menu(role)
  170. @classmethod
  171. @Transactional(db.session)
  172. def update_role(cls, role:SysRole) -> bool:
  173. """
  174. 修改保存角色信息
  175. Args:
  176. role(SysRole): 角色信息
  177. Returns:
  178. bool: 修改结果
  179. """
  180. SysRoleMapper.update_role(role)
  181. SysRoleMenuMapper.delete_role_menu_by_role_id(role.role_id)
  182. return cls.insert_role_menu(role) > 0
  183. @classmethod
  184. @Transactional(db.session)
  185. def update_role_status(cls, role:SysRole) -> bool:
  186. """
  187. 修改角色状态
  188. Args:
  189. role(SysRole): 角色信息
  190. Returns:
  191. bool: 修改结果
  192. """
  193. return SysRoleMapper.update_role(role) > 0
  194. @classmethod
  195. @Transactional(db.session)
  196. def auth_data_scope(cls, role:SysRole):
  197. """
  198. 修改数据权限信息
  199. Args:
  200. role(SysRole): 角色信息
  201. Returns:
  202. bool: 修改结果
  203. """
  204. SysRoleMapper.update_role(role)
  205. SysRoleDeptMapper.delete_role_dept_by_role_id(role.role_id)
  206. return cls.insert_role_dept(role)
  207. @classmethod
  208. @Transactional(db.session)
  209. def insert_role_menu(cls, role: SysRole) -> int:
  210. """
  211. 新增角色菜单信息
  212. Args:
  213. role(SysRole): 角色信息
  214. Returns:
  215. int: 新增角色菜单数量
  216. """
  217. menus = []
  218. for menu_id in role.menu_ids:
  219. menu = SysRoleMenu(role_id=role.role_id, menu_id=menu_id)
  220. menus.append(menu)
  221. if len(menus) > 0:
  222. SysRoleMenuMapper.batch_role_menu(menus)
  223. return len(menus)
  224. @classmethod
  225. @Transactional(db.session)
  226. def insert_role_dept(cls, role: SysRole):
  227. """
  228. 新增角色部门信息
  229. Args:
  230. role(SysRole): 角色信息
  231. Returns:
  232. int: 新增角色部门数量
  233. """
  234. depts = []
  235. for dept_id in role.dept_ids:
  236. dept = SysRoleDept(role_id=role.role_id, dept_id=dept_id)
  237. depts.append(dept)
  238. if len(depts) > 0:
  239. SysRoleDeptMapper.batch_role_dept(depts)
  240. return len(depts)
  241. @classmethod
  242. @Transactional(db.session)
  243. def delete_role_by_id(cls, role_id:int) -> bool:
  244. """
  245. 通过角色ID删除角色
  246. Args:
  247. role_id(int): 角色ID
  248. Returns:
  249. bool: 删除结果
  250. """
  251. SysRoleMenuMapper.delete_role_menu_by_role_id(role_id)
  252. SysRoleDeptMapper.delete_role_dept_by_role_id(role_id)
  253. return SysRoleMapper.delete_role_by_id(role_id) > 0
  254. @classmethod
  255. @Transactional(db.session)
  256. def delete_role_by_ids(cls, role_ids:List[int]) -> bool:
  257. """
  258. 批量删除角色信息
  259. Args:
  260. role_ids(List[int]): 角色ID列表
  261. Returns:
  262. bool: 删除结果
  263. """
  264. for role_id in role_ids:
  265. cls.check_role_allowed(SysRole(role_id=role_id))
  266. # cls.check_role_data_scope(role_id) # todo
  267. role: SysRole = cls.select_role_by_id(role_id)
  268. if cls.count_user_role_by_role_id(role_id) > 0:
  269. raise ServiceException("角色{}已分配用户,不能删除".format(role.role_name))
  270. SysRoleMenuMapper.delete_role_menu(role_ids)
  271. SysRoleDeptMapper.delete_role_dept(role_ids)
  272. return SysRoleMapper.delete_role_by_ids(role_ids) > 0
  273. @classmethod
  274. @Transactional(db.session)
  275. def delete_auth_user(cls, user_role:SysUserRole) -> bool:
  276. """
  277. 取消授权用户角色
  278. Args:
  279. user_role(SysUserRole): 用户角色信息
  280. Returns:
  281. bool: 取消授权结果
  282. """
  283. return SysUserRoleMapper.delete_user_role_info(user_role) > 0
  284. @classmethod
  285. @Transactional(db.session)
  286. def delete_auth_users(cls, role_id, user_ids) -> bool:
  287. """
  288. 批量取消授权用户角色
  289. Args:
  290. role_id(int): 角色ID
  291. user_ids(List[int]): 用户ID列表
  292. Returns:
  293. bool: 取消授权结果
  294. """
  295. return SysUserRoleMapper.delete_user_role_infos(role_id, user_ids) > 0
  296. @classmethod
  297. @Transactional(db.session)
  298. def insert_auth_users(cls, role_id, user_ids) -> bool:
  299. """
  300. 批量选择授权用户角色
  301. Args:
  302. role_id(int): 角色ID
  303. user_ids(List[int]): 用户ID列表
  304. Returns:
  305. bool: 选择授权结果
  306. """
  307. user_roles = [SysUserRole(user_id=user_id, role_id=role_id) \
  308. for user_id in user_ids]
  309. num = SysUserRoleMapper.batch_user_role(user_roles)
  310. return num > 0