sys_dept.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. # -*- coding: utf-8 -*-
  2. # @Author : YY
  3. from types import NoneType
  4. from typing import Any, List, Literal
  5. from ruoyi_common.sqlalchemy.transaction import Transactional
  6. from ruoyi_common.utils import security_util as SecurityUtil
  7. from ruoyi_common.constant import UserConstants
  8. from ruoyi_common.domain.entity import SysDept, SysRole, TreeSelect
  9. from ruoyi_common.exception import ServiceException
  10. from ruoyi_system.mapper import SysDeptMapper, SysRoleMapper
  11. from ruoyi_admin.ext import db
  12. class SysDeptService:
  13. @classmethod
  14. def select_dept_list(cls, dept:SysDept) -> List[SysDept]:
  15. """
  16. 查询部门列表
  17. Args:
  18. dept (SysDept): 包含查询条件的传输对象
  19. Returns:
  20. List[SysDept]: 部门列表
  21. """
  22. return SysDeptMapper.select_dept_list(dept or SysDept())
  23. @classmethod
  24. def select_dept_tree_list(cls, dept:SysDept):
  25. """
  26. 查询部门树列表
  27. Args:
  28. dept (SysDept): 包含查询条件的传输对象
  29. Returns:
  30. List[TreeSelect]: 部门树列表
  31. """
  32. eos: List[SysDept] = cls.select_dept_list(dept)
  33. return cls.build_dept_tree_select(eos)
  34. @classmethod
  35. def build_dept_tree(cls, depts:List[SysDept]) -> List[SysDept]:
  36. """
  37. 构建部门树
  38. Args:
  39. depts (List[SysDept]): 部门列表
  40. Returns:
  41. List[SysDept]: 部门树列表
  42. """
  43. return_list = []
  44. temp_list = []
  45. for dept in depts:
  46. temp_list.append(dept.dept_id)
  47. for dept in depts:
  48. if dept.parent_id not in temp_list:
  49. cls.recursion_fn(depts, dept)
  50. return_list.append(dept)
  51. if not return_list:
  52. return_list = depts
  53. return return_list
  54. @classmethod
  55. def build_dept_tree_select(cls, depts:List[SysDept]) -> List[TreeSelect]:
  56. """
  57. 构建部门树选择框
  58. Args:
  59. depts (List[SysDept]): 部门列表
  60. Returns:
  61. List[TreeSelect]: 部门树选择框列表
  62. """
  63. dept_trees = cls.build_dept_tree(depts)
  64. return [TreeSelect.from_dept(dept) for dept in dept_trees]
  65. @classmethod
  66. def select_dept_list_by_role_id(cls, role_id:int) -> List[int]:
  67. """
  68. 根据角色ID,查询部门列表
  69. Args:
  70. role_id (int): 角色ID
  71. Returns:
  72. List[int]: 部门列表
  73. """
  74. role:SysRole = SysRoleMapper.select_role_by_id(role_id)
  75. return SysDeptMapper.select_dept_list_by_role_id(
  76. role_id, role.dept_check_strictly
  77. )
  78. @classmethod
  79. def select_dept_by_id(cls, dept_id:int) -> SysDept|NoneType:
  80. """
  81. 根据部门ID,查询部门信息
  82. Args:
  83. dept_id (int): 部门ID
  84. Returns:
  85. SysDept|NoneType: 部门信息
  86. """
  87. return SysDeptMapper.select_dept_by_id(dept_id)
  88. @classmethod
  89. def select_normal_children_dept_by_id(cls, dept_id:int) -> int:
  90. """
  91. 根据部门ID,查询正常部门的数量
  92. Args:
  93. dept_id (int): 部门ID
  94. Returns:
  95. int: 正常部门的数量
  96. """
  97. return SysDeptMapper.select_normal_children_dept_by_id(dept_id)
  98. @classmethod
  99. def has_child_by_dept_id(cls, dept_id:int) -> int:
  100. """
  101. 根据部门ID,判断是否有子部门
  102. Args:
  103. dept_id (int): 部门ID
  104. Returns:
  105. int: 数量
  106. """
  107. return SysDeptMapper.has_child_by_dept_id(dept_id)
  108. @classmethod
  109. def check_dept_exist_user(cls, dept_id:int) -> int:
  110. """
  111. 判断部门是否存在用户
  112. Args:
  113. dept_id (int): 部门ID
  114. Returns:
  115. int: 数量
  116. """
  117. return SysDeptMapper.check_dept_exist_user(dept_id)
  118. @classmethod
  119. def check_dept_name_unique(cls, dept:SysDept) -> Literal["0","1"]:
  120. """
  121. 判断部门名称是否唯一
  122. Args:
  123. dept (SysDept): 部门信息
  124. Returns:
  125. Literal["0","1"]: 唯一或不唯一, 0: 唯一, 1: 不唯一
  126. """
  127. dept_id = -1 if dept.dept_id is None else dept.dept_id
  128. eo:SysDept = SysDeptMapper.check_dept_name_unique(dept.dept_name, dept.parent_id)
  129. if eo is None:
  130. return UserConstants.UNIQUE
  131. if eo.dept_id != dept_id:
  132. return UserConstants.NOT_UNIQUE
  133. return UserConstants.UNIQUE
  134. @classmethod
  135. def check_dept_data_scope(cls, dept_id:int):
  136. """
  137. 判断部门数据权限
  138. Args:
  139. dept_id (int): 部门ID
  140. Raises:
  141. ServiceException: 无权限访问部门数据
  142. """
  143. if not SecurityUtil.login_user_is_admin():
  144. dept = SysDept(dept_id=dept_id)
  145. depts = cls.select_dept_list(dept)
  146. if not depts:
  147. raise ServiceException("没有权限访问部门数据!")
  148. @classmethod
  149. def insert_dept(cls, dept:SysDept) -> int:
  150. """
  151. 新增部门信息
  152. Args:
  153. dept (SysDept): 部门信息
  154. Returns:
  155. int: 部门ID
  156. """
  157. parent = SysDeptMapper.select_dept_by_id(dept.parent_id)
  158. if not UserConstants.DEPT_NORMAL == parent.status:
  159. raise ServiceException("部门停用,不允许新增")
  160. dept.ancestors = f"{parent.ancestors},{dept.parent_id}"
  161. return SysDeptMapper.insert_dept(dept)
  162. @classmethod
  163. @Transactional(db.session)
  164. def update_dept(cls, dept:SysDept) -> int:
  165. """
  166. 修改部门信息
  167. Args:
  168. dept (SysDept): 部门信息
  169. Returns:
  170. int: 数量
  171. """
  172. parent_dept = SysDeptMapper.select_dept_by_id(dept.parent_id)
  173. old_dept = SysDeptMapper.select_dept_by_id(dept.dept_id)
  174. if parent_dept is not None and old_dept is not None:
  175. new_ancestors = f"{parent_dept.ancestors},{parent_dept.dept_id}"
  176. old_ancestors = old_dept.ancestors
  177. dept.ancestors = new_ancestors
  178. cls.update_dept_children(dept.dept_id, new_ancestors, old_ancestors)
  179. num = SysDeptMapper.update_dept(dept)
  180. if UserConstants.DEPT_NORMAL == dept.status and dept.ancestors and dept.ancestors != "0":
  181. # 如果该部门是启用状态,则启用该部门的所有上级部门
  182. cls.update_parent_dept_status_normal(dept)
  183. return num
  184. @classmethod
  185. def update_parent_dept_status_normal(cls, dept:SysDept):
  186. """
  187. 修改该部门的父级部门
  188. Args:
  189. dept (SysDept): 部门信息
  190. """
  191. dept_ids = dept.ancestors.split(",")
  192. SysDeptMapper.update_dept_status_normal(dept_ids)
  193. @classmethod
  194. def update_dept_children(cls, dept_id, new_ancestors, old_ancestors):
  195. """
  196. 修改子元素关系
  197. Args:
  198. dept_id (int): 部门ID
  199. new_ancestors (str): 新的祖先路径
  200. old_ancestors (str): 旧的祖先路径
  201. """
  202. children:List[SysDept] = SysDeptMapper.select_children_dept_by_id(dept_id)
  203. for child in children:
  204. child.ancestors = (child.ancestors.replace(old_ancestors, new_ancestors, 1))
  205. if children:
  206. SysDeptMapper.update_dept_children(children)
  207. @classmethod
  208. def delete_dept_by_id(cls, dept_id) -> int:
  209. """
  210. 删除部门管理信息
  211. Args:
  212. dept_id (int): 部门ID
  213. Returns:
  214. int: 数量
  215. """
  216. return SysDeptMapper.delete_dept_by_id(dept_id)
  217. @classmethod
  218. def recursion_fn(cls, list:List[SysDept], t:SysDept):
  219. """
  220. 递归列表
  221. Args:
  222. list (List[SysDept]): 部门列表
  223. t (SysDept): 父节点
  224. """
  225. child_list = cls.get_child_list(list, t)
  226. t.children = child_list
  227. for t_child in child_list:
  228. if cls.has_child(list, t_child):
  229. cls.recursion_fn(list, t_child)
  230. @classmethod
  231. def get_child_list(cls, list:List[SysDept], t:SysDept) -> List[Any]:
  232. """
  233. 获取子节点列表
  234. Args:
  235. list (List[SysDept]): 部门列表
  236. t (SysDept): 父节点
  237. Returns:
  238. List[Any]: 子节点列表
  239. """
  240. tlist = []
  241. for n in list:
  242. if n.parent_id is not None and n.parent_id==t.dept_id:
  243. tlist.append(n)
  244. return tlist
  245. @classmethod
  246. def has_child(cls, list:List[SysDept], t:SysDept) -> bool:
  247. """
  248. 判断是否有子节点
  249. Args:
  250. list (List[SysDept]): 部门列表
  251. t (SysDept): 父节点
  252. Returns:
  253. bool: 是否有子节点
  254. """
  255. for n in list:
  256. if n.parent_id is not None and n.parent_id==t.dept_id:
  257. return True
  258. return False