sys_dept.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. # -*- coding: utf-8 -*-
  2. # @Author : YY
  3. from typing import List, Optional
  4. from flask import g
  5. from sqlalchemy import alias, and_, case, func, insert, select, update
  6. from sqlalchemy.orm import aliased
  7. from ruoyi_common.domain.entity import SysDept
  8. from ruoyi_common.sqlalchemy.model import ColumnEntityList
  9. from ruoyi_common.sqlalchemy.transaction import Transactional
  10. from ruoyi_system.domain.po import SysDeptPo, SysRoleDeptPo, SysUserPo
  11. from ruoyi_admin.ext import db
  12. class SysDeptMapper:
  13. """
  14. 部门数据访问层
  15. """
  16. default_fields = {
  17. "dept_id", "parent_id", "ancestors", "dept_name", "order_num",
  18. "leader", "phone", "email", "status", "del_flag", "create_by",
  19. "create_time"
  20. }
  21. default_columns = ColumnEntityList(SysDeptPo, default_fields, False)
  22. @classmethod
  23. def select_dept_list(cls, dept: SysDept) -> List[SysDept]:
  24. """
  25. 根据条件,查询部门列表
  26. Args:
  27. dept (SysDept): 部门实体
  28. Returns:
  29. List[SysDept]: 部门列表
  30. """
  31. criterions = [SysDeptPo.del_flag=="0"]
  32. if dept.dept_id:
  33. criterions.append(SysDeptPo.dept_id==dept.dept_id)
  34. if dept.parent_id:
  35. criterions.append(SysDeptPo.parent_id==dept.parent_id)
  36. if dept.dept_name:
  37. criterions.append(SysDeptPo.dept_name.like(f'%{dept.dept_name}%'))
  38. if dept.status:
  39. criterions.append(SysDeptPo.status==dept.status)
  40. if "criterian_meta" in g and g.criterian_meta.scope:
  41. criterions.append(g.criterian_meta.scope)
  42. stmt = select(*cls.default_columns).where(*criterions)
  43. if "criterian_meta" in g and g.criterian_meta.page:
  44. g.criterian_meta.page.stmt = stmt
  45. rows = db.session.execute(stmt).all()
  46. return [cls.default_columns.cast(row, SysDept) for row in rows]
  47. @classmethod
  48. def select_dept_list_by_role_id(
  49. cls,
  50. role_id: int,
  51. dept_check_strictly: bool
  52. ) -> List[int]:
  53. """
  54. 根据角色ID,查询部门Id列表
  55. Args:
  56. role_id (int): 角色ID
  57. dept_check_strictly (bool): 是否严格检查部门权限
  58. Returns:
  59. List[int]: 部门ID列表
  60. """
  61. criterions = []
  62. criterions.append(SysRoleDeptPo.role_id==role_id)
  63. if dept_check_strictly:
  64. subquery = select(SysDeptPo.parent_id) \
  65. .join(SysRoleDeptPo, and_(
  66. SysDeptPo.dept_id==SysRoleDeptPo.dept_id,
  67. SysRoleDeptPo.role_id==role_id
  68. )) \
  69. .subquery()
  70. criterions.append(SysDeptPo.dept_id.not_in(subquery))
  71. stmt = select(SysDeptPo.dept_id).outerjoin(
  72. SysRoleDeptPo, SysDeptPo.dept_id==SysRoleDeptPo.dept_id
  73. ) \
  74. .where(*criterions) \
  75. .order_by(SysDeptPo.parent_id, SysDeptPo.order_num)
  76. rows = db.session.execute(stmt).scalars()
  77. return rows
  78. @classmethod
  79. def select_dept_by_id(cls, dept_id: int) -> Optional[SysDept]:
  80. """
  81. 根据部门ID查询信息
  82. Args:
  83. dept_id (int): 部门ID
  84. Returns:
  85. Optional[SysDept]: 部门实体
  86. """
  87. fields = {
  88. "dept_id", "parent_id", "ancestors", "dept_name", "order_num", \
  89. "leader", "phone", "email", "status"
  90. }
  91. columns = ColumnEntityList(SysDeptPo, fields, alia_prefix=False)
  92. SysDeptPo_P = aliased(SysDeptPo, name="p")
  93. p_name_scalar = select(SysDeptPo_P.dept_name) \
  94. .where(SysDeptPo_P.dept_id==SysDeptPo.parent_id) \
  95. .scalar_subquery()
  96. columns.append_scalar(p_name_scalar.label("parent_name"))
  97. stmt = select(*columns).where(SysDeptPo.dept_id==dept_id)
  98. row = db.session.execute(stmt).one_or_none()
  99. return columns.cast(row, SysDept) if row else None
  100. @classmethod
  101. def select_children_dept_by_id(cls, dept_id: int) -> List[SysDept]:
  102. """
  103. 根据ID查询所有子部门
  104. Args:
  105. dept_id (int): 部门ID
  106. Returns:
  107. List[SysDept]: 部门列表
  108. """
  109. fields = {
  110. "dept_id", "parent_id", "ancestors", "dept_name", "order_num", \
  111. "leader", "phone", "email", "status", "del_flag", "create_by", \
  112. "create_time"
  113. }
  114. columns = ColumnEntityList(SysDeptPo, fields, alia_prefix=False)
  115. stmt = select(*columns) \
  116. .where(func.find_in_set(dept_id, SysDeptPo.ancestors))
  117. rows = db.session.execute(stmt).all()
  118. return [columns.cast(row, SysDept) for row in rows]
  119. @classmethod
  120. def select_normal_children_dept_by_id(cls, dept_id: int) -> int:
  121. """
  122. 根据ID,查询所有子部门(正常状态)
  123. Args:
  124. dept_id (int): 部门ID
  125. Returns:
  126. int: 部门数量
  127. """
  128. criterions = [SysDeptPo.status==0, SysDeptPo.del_flag=="0"]
  129. criterions.append(func.find_in_set(dept_id, SysDeptPo.ancestors))
  130. stmt = select(func.count()).select_from(SysDeptPo) \
  131. .where(*criterions)
  132. return db.session.execute(stmt).scalar() or 0
  133. @classmethod
  134. def has_child_by_dept_id(cls, dept_id: int) -> int:
  135. """
  136. 是否存在子节点
  137. Args:
  138. dept_id (int): 部门ID
  139. Returns:
  140. int: 数量
  141. """
  142. criterions = [SysDeptPo.parent_id==dept_id, SysDeptPo.del_flag=="0"]
  143. stmt = select(func.count()).select_from(SysDeptPo) \
  144. .where(*criterions).limit(1)
  145. return db.session.execute(stmt).scalar() or 0
  146. @classmethod
  147. def check_dept_exist_user(cls, dept_id: int) -> int:
  148. """
  149. 查询部门是否存在用户
  150. Args:
  151. dept_id (int): 部门ID
  152. Returns:
  153. int: 数量
  154. """
  155. criterions = [SysUserPo.dept_id==dept_id, SysUserPo.del_flag=="0"]
  156. stmt = select(func.count()).select_from(SysUserPo) \
  157. .where(*criterions)
  158. return db.session.execute(stmt).scalar() or 0
  159. @classmethod
  160. def check_dept_name_unique(cls, dept_name: str, parent_id: int) -> Optional[SysDept]:
  161. """
  162. 校验部门名称是否唯一
  163. Args:
  164. dept_name (str): 部门名称
  165. parent_id (int): 父部门ID
  166. Returns:
  167. Optional[SysDept]: 部门实体
  168. """
  169. criterions = [SysDeptPo.parent_id==parent_id]
  170. criterions.append(SysDeptPo.del_flag=="0")
  171. criterions.append(SysDeptPo.dept_name==dept_name)
  172. stmt = select(*cls.default_columns) \
  173. .where(*criterions)
  174. row = db.session.execute(stmt).one_or_none()
  175. return cls.default_columns.cast(row, SysDept) if row else None
  176. @classmethod
  177. @Transactional(db.session)
  178. def insert_dept(cls, dept: SysDept) -> int:
  179. """
  180. 新增部门信息
  181. Args:
  182. dept (SysDept): 部门实体
  183. Returns:
  184. int: 新增记录的ID
  185. """
  186. fields = {
  187. "dept_id", "parent_id", "ancestors", "dept_name", "order_num",
  188. "leader", "phone", "email", "status", "create_by", "create_time"
  189. }
  190. data = dept.model_dump(
  191. include=fields,
  192. exclude_unset=True,
  193. exclude_none=True
  194. )
  195. stmt = insert(SysDeptPo).values(data)
  196. out = db.session.execute(stmt).inserted_primary_key
  197. return out[0] if out else 0
  198. @classmethod
  199. @Transactional(db.session)
  200. def update_dept(cls, dept: SysDept) -> int:
  201. """
  202. 修改部门信息
  203. Args:
  204. dept (SysDept): 部门实体
  205. Returns:
  206. int: 数量
  207. """
  208. fields = {
  209. "parent_id", "ancestors", "dept_name", "order_num", "leader",
  210. "phone", "email", "status", "create_by", "create_time"
  211. }
  212. data = dept.model_dump(
  213. include=fields,
  214. exclude_unset=True,
  215. exclude_none=True
  216. )
  217. stmt = update(SysDeptPo) \
  218. .where(SysDeptPo.dept_id==dept.dept_id) \
  219. .values(data)
  220. return db.session.execute(stmt).rowcount
  221. @classmethod
  222. @Transactional(db.session)
  223. def update_dept_status_normal(cls, dept_ids: List[int]) -> int:
  224. """
  225. 修改所在部门正常状态
  226. Args:
  227. dept_ids (List[int]): 部门ID列表
  228. Returns:
  229. int: 数量
  230. """
  231. stmt = update(SysDeptPo) \
  232. .where(SysDeptPo.dept_id.in_(dept_ids)) \
  233. .values(status="0")
  234. return db.session.execute(stmt).rowcount
  235. @classmethod
  236. @Transactional(db.session)
  237. def update_dept_children(cls, depts: List[SysDept]) -> int:
  238. """
  239. 修改子元素关系
  240. Args:
  241. depts (List[SysDept]): 部门列表
  242. Returns:
  243. int: 数量
  244. """
  245. print("depts: {}".format(depts))
  246. case_expr = case(
  247. *[(SysDeptPo.dept_id==dept.dept_id, dept.ancestors) for dept in depts],
  248. else_=SysDeptPo.ancestors
  249. )
  250. stmt = update(SysDeptPo) \
  251. .where(SysDeptPo.dept_id.in_([dept.dept_id for dept in depts])) \
  252. .values(ancestors=case_expr)
  253. return db.session.execute(stmt).rowcount
  254. @classmethod
  255. @Transactional(db.session)
  256. def delete_dept_by_id(cls, dept_id: int) -> int:
  257. """
  258. 删除部门管理信息
  259. Args:
  260. dept_id (int): 部门ID
  261. Returns:
  262. int: 数量
  263. """
  264. stmt = update(SysDeptPo) \
  265. .where(SysDeptPo.dept_id==dept_id) \
  266. .values(del_flag="2")
  267. return db.session.execute(stmt).rowcount