sys_menu.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. # -*- coding: utf-8 -*-
  2. # @Author : YY
  3. from typing import List, Optional
  4. from flask import g
  5. from sqlalchemy import and_, delete, func, insert, select, update
  6. from ruoyi_common.domain.entity import SysMenu
  7. from ruoyi_common.sqlalchemy.model import ColumnEntityList
  8. from ruoyi_common.sqlalchemy.transaction import Transactional
  9. from ruoyi_system.domain.po import SysMenuPo, SysRoleMenuPo, SysRolePo, SysUserPo, SysUserRolePo
  10. from ruoyi_admin.ext import db
  11. class SysMenuMapper:
  12. """
  13. 菜单数据访问层
  14. """
  15. default_fields = {
  16. 'menu_id','menu_name', 'parent_id', 'order_num', 'path', 'component',
  17. 'query', 'is_frame', 'is_cache', 'menu_type', 'perms', 'visible',
  18. 'status', 'icon', 'create_by', 'create_time', 'update_by',
  19. 'update_time'
  20. }
  21. default_columns = ColumnEntityList(SysMenuPo, default_fields, False)
  22. @classmethod
  23. def select_menu_list(cls, menu: SysMenu) -> List[SysMenu]:
  24. """
  25. 查询系统菜单列表
  26. Args:
  27. menu (SysMenu): 查询条件
  28. Returns:
  29. List[SysMenu]: 菜单列表
  30. """
  31. criterions = []
  32. if menu.menu_name:
  33. criterions.append(SysMenuPo.menu_name.like(f'%{menu.menu_name}%'))
  34. if menu.status:
  35. criterions.append(SysMenuPo.status == menu.status)
  36. if menu.visible:
  37. criterions.append(SysMenuPo.visible == menu.visible)
  38. # columns.append_scalar(func.ifnull(SysMenuPo.perms, '').label('perms'))
  39. stmt = select(*cls.default_columns) \
  40. .where(*criterions) \
  41. .order_by(SysMenuPo.parent_id, SysMenuPo.order_num)
  42. if "criterian_meta" in g and g.criterian_meta.page:
  43. g.criterian_meta.page.stmt = stmt
  44. rows = db.session.execute(stmt).all()
  45. return [cls.default_columns.cast(row, SysMenu) for row in rows]
  46. @classmethod
  47. def select_menu_perms(cls) -> List[str]:
  48. """
  49. 根据用户,查询所有权限
  50. Args:
  51. user_id (int): 用户ID
  52. Returns:
  53. List[str]: 权限列表
  54. """
  55. stmt = select(SysMenuPo.perms).distinct() \
  56. .join(SysRoleMenuPo, SysRoleMenuPo.menu_id == SysMenuPo.menu_id) \
  57. .join(SysUserRolePo, SysUserRolePo.role_id == SysRoleMenuPo.role_id)
  58. return db.session.execute(stmt).scalars().all()
  59. @classmethod
  60. def select_menu_perms_by_role_id(cls, role_id: int) -> List[str]:
  61. """
  62. 根据角色ID,查询权限
  63. Args:
  64. role_id (int): 角色ID
  65. Returns:
  66. List[str]: 权限列表
  67. """
  68. coritations = [SysMenuPo.status == "0"]
  69. coritations.append(SysRoleMenuPo.role_id == role_id)
  70. stmt = select(SysMenuPo.perms).distinct() \
  71. .join(SysRoleMenuPo, SysRoleMenuPo.menu_id == SysMenuPo.menu_id) \
  72. .where(*coritations)
  73. return db.session.execute(stmt).scalars().all()
  74. @classmethod
  75. def select_menu_perms_by_user_id(cls, user_id: int) -> List[str]:
  76. """
  77. 根据用户ID,查询权限
  78. Args:
  79. user_id (int): 用户ID
  80. Returns:
  81. List[str]: 权限列表
  82. """
  83. coritations = [SysMenuPo.status == "0"]
  84. coritations.append(SysRolePo.status == "0")
  85. coritations.append(SysUserRolePo.user_id == user_id)
  86. stmt = select(SysMenuPo.perms).distinct() \
  87. .join(SysRoleMenuPo, SysRoleMenuPo.menu_id == SysMenuPo.menu_id) \
  88. .join(SysUserRolePo, SysUserRolePo.role_id == SysRoleMenuPo.role_id) \
  89. .join(SysRolePo, SysRolePo.role_id == SysUserRolePo.role_id) \
  90. .where(and_(*coritations))
  91. return db.session.execute(stmt).scalars().all()
  92. @classmethod
  93. def select_menu_tree_all(cls) -> List[SysMenu]:
  94. """
  95. 查询所有菜单
  96. Returns:
  97. List[SysMenu]: 菜单列表
  98. """
  99. fields = {
  100. "menu_id", "parent_id", "menu_name", "path", "component",
  101. "query", "visible", "status", "perms", "is_frame",
  102. "is_cache", "menu_type", "icon", "order_num", "create_time"
  103. }
  104. columns = ColumnEntityList(SysMenuPo, fields, False)
  105. coritations = [SysMenuPo.status == "0"]
  106. coritations.append(SysMenuPo.menu_type.in_(['M', 'C']))
  107. stmt = select(*columns).distinct() \
  108. .where(*coritations) \
  109. .order_by(SysMenuPo.parent_id, SysMenuPo.order_num)
  110. rows = db.session.execute(stmt).all()
  111. return [columns.cast(row, SysMenu) for row in rows]
  112. @classmethod
  113. def select_menu_tree_by_user_id(cls, user_id: int) -> List[SysMenu]:
  114. """
  115. 根据用户ID,查询菜单树
  116. Args:
  117. user_id (int): 用户ID
  118. Returns:
  119. List[SysMenuPo]: 菜单列表
  120. """
  121. fields = {
  122. "menu_id", "parent_id", "menu_name", "path", "component",
  123. "query", "visible", "status", "perms", "is_frame",
  124. "is_cache", "menu_type", "icon", "order_num", "create_time"
  125. }
  126. columns = ColumnEntityList(SysMenuPo, fields, False)
  127. coritations = [SysMenuPo.status == "0"]
  128. coritations.append(SysRolePo.status=="0")
  129. coritations.append(SysMenuPo.menu_type.in_(['M', 'C']))
  130. coritations.append(SysUserPo.user_id==user_id)
  131. stmt = select(*columns).distinct() \
  132. .join(SysRoleMenuPo, SysRoleMenuPo.menu_id == SysMenuPo.menu_id) \
  133. .join(SysUserRolePo, SysUserRolePo.role_id == SysRoleMenuPo.role_id) \
  134. .join(SysRolePo, SysRolePo.role_id == SysUserRolePo.role_id) \
  135. .join(SysUserPo, SysUserPo.user_id == SysUserRolePo.user_id) \
  136. .where(*coritations) \
  137. .order_by(SysMenuPo.parent_id, SysMenuPo.order_num)
  138. rows = db.session.execute(stmt).all()
  139. return [columns.cast(row, SysMenu) for row in rows]
  140. @classmethod
  141. def select_menu_list_by_user_id(cls, menu: SysMenu, user_id: int) -> List[SysMenu]:
  142. """
  143. 根据用户ID,查询菜单列表
  144. Args:
  145. menu (SysMenu): 查询条件
  146. user_id (int): 用户ID
  147. Returns:
  148. List[SysMenu]: 菜单列表
  149. """
  150. coritations = [SysMenuPo.user_id == user_id]
  151. if menu.menu_name is not None and menu.menu_name != '':
  152. coritations.append(SysMenuPo.menu_name.like(menu.menu_name))
  153. if menu.status is not None and menu.status != '':
  154. coritations.append(SysMenuPo.status == menu.status)
  155. if menu.visible is not None and menu.visible != '':
  156. coritations.append(SysMenuPo.visible == menu.visible)
  157. stmt = select(*cls.default_columns) \
  158. .join(SysRoleMenuPo, SysRoleMenuPo.menu_id == SysMenuPo.menu_id) \
  159. .join(SysUserRolePo, SysUserRolePo.role_id == SysRoleMenuPo.role_id) \
  160. .join(SysRolePo, SysRolePo.role_id == SysUserRolePo.role_id) \
  161. .where(*coritations) \
  162. .order_by(SysMenuPo.parent_id, SysMenuPo.order_num)
  163. rows = db.session.execute(stmt).all()
  164. return [cls.default_columns.cast(row, SysMenu) for row in rows]
  165. @classmethod
  166. def select_menu_list_by_role_id(cls, role_id: int, menu_check_strictly: bool) -> List[int]:
  167. """
  168. 根据角色ID,查询菜单列表
  169. Args:
  170. role_id (int): 角色ID
  171. menu_check_strictly (bool): 是否严格检查菜单
  172. Returns:
  173. List[int]: 菜单ID列表
  174. """
  175. coritations = [SysRoleMenuPo.role_id == role_id]
  176. if menu_check_strictly:
  177. subquery = select(SysMenuPo.parent_id) \
  178. .join(SysRoleMenuPo, SysMenuPo.menu_id == SysRoleMenuPo.menu_id) \
  179. .filter(SysRoleMenuPo.role_id == role_id) \
  180. .subquery()
  181. coritations.append(SysMenuPo.menu_id.notin_(subquery))
  182. stmt = select(SysMenuPo.menu_id) \
  183. .join(SysRoleMenuPo, SysRoleMenuPo.menu_id == SysMenuPo.menu_id) \
  184. .where(*coritations) \
  185. .order_by(SysMenuPo.parent_id, SysMenuPo.order_num)
  186. return db.session.execute(stmt).scalars().all()
  187. @classmethod
  188. def select_menu_by_id(cls, menu_id: int) -> Optional[SysMenuPo]:
  189. """
  190. 根据ID,查询菜单
  191. Args:
  192. menu_id (int): 菜单ID
  193. Returns:
  194. Optional[SysMenuPo]: 菜单信息
  195. """
  196. stmt = select(*cls.default_columns) \
  197. .where(SysMenuPo.menu_id == menu_id)
  198. row = db.session.execute(stmt).one_or_none()
  199. return cls.default_columns.cast(row, SysMenu) if row else None
  200. @classmethod
  201. def has_child_by_menu_id(cls, menu_id: int) -> int:
  202. """
  203. 是否存在菜单子节点
  204. Args:
  205. menu_id (int): 菜单ID
  206. Returns:
  207. int: 子节点数量
  208. """
  209. stmt = select(func.count(SysMenuPo.menu_id)) \
  210. .where(SysMenuPo.parent_id == menu_id)
  211. return db.session.execute(stmt).scalar() or 0
  212. @classmethod
  213. @Transactional(db.session)
  214. def insert_menu(cls, menu: SysMenu) -> int:
  215. """
  216. 新增菜单信息
  217. Args:
  218. menu (SysMenu): 菜单信息
  219. Returns:
  220. int: 新增记录的ID
  221. """
  222. fields = {
  223. "menu_name", "parent_id", "order_num", "path", "component",
  224. "query", "is_frame", "is_cache", "menu_type", "perms", "visible",
  225. "status", "icon", "create_by", "create_time"
  226. }
  227. stmt = insert(SysMenuPo).values(menu.model_dump(
  228. include=fields,
  229. exclude_none=True,
  230. exclude_unset=True,
  231. ))
  232. out = db.session.execute(stmt).inserted_primary_key
  233. return out[0] if out else 0
  234. @classmethod
  235. @Transactional(db.session)
  236. def update_menu(cls, menu: SysMenu) -> int:
  237. """
  238. 修改菜单信息
  239. Args:
  240. menu (SysMenu): 菜单信息
  241. Returns:
  242. int: 修改记录数
  243. """
  244. fields = {
  245. "menu_name", "parent_id", "order_num", "path", "component",
  246. "query", "is_frame", "is_cache", "menu_type", "perms", "visible",
  247. "status", "icon", "update_by", "update_time"
  248. }
  249. stmt = update(SysMenuPo).values(menu.model_dump(
  250. include=fields,
  251. exclude_none=True,
  252. exclude_unset=True,
  253. )) \
  254. .where(SysMenuPo.menu_id == menu.menu_id)
  255. return db.session.execute(stmt).rowcount
  256. @classmethod
  257. @Transactional(db.session)
  258. def delete_menu_by_id(cls, menu_id: int) -> int:
  259. """
  260. 删除菜单信息
  261. Args:
  262. menu_id (int): 菜单ID
  263. Returns:
  264. int: 删除记录数
  265. """
  266. stmt = delete(SysMenuPo).where(SysMenuPo.menu_id == menu_id)
  267. return db.session.execute(stmt).rowcount
  268. @classmethod
  269. def check_menu_name_unique(cls, menu_name: str, parent_id: int) -> Optional[SysMenu]:
  270. """
  271. 校验菜单名称是否唯一
  272. Args:
  273. menu_name (str): 菜单名称
  274. parent_id (int): 父菜单ID
  275. Returns:
  276. Optional[SysMenu]: 菜单信息
  277. """
  278. stmt = select(*cls.default_columns) \
  279. .where(SysMenuPo.menu_name == menu_name, SysMenuPo.parent_id == parent_id)
  280. row = db.session.execute(stmt).one_or_none()
  281. return cls.default_columns.cast(row, SysMenu) if row else None