| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- # -*- coding: utf-8 -*-
- # @Author : YY
- from typing import List, Optional
- from flask import g
- from sqlalchemy import and_, delete, func, insert, select, update
- from ruoyi_common.domain.entity import SysMenu
- from ruoyi_common.sqlalchemy.model import ColumnEntityList
- from ruoyi_common.sqlalchemy.transaction import Transactional
- from ruoyi_system.domain.po import SysMenuPo, SysRoleMenuPo, SysRolePo, SysUserPo, SysUserRolePo
- from ruoyi_admin.ext import db
- class SysMenuMapper:
-
- """
- 菜单数据访问层
- """
-
- default_fields = {
- 'menu_id','menu_name', 'parent_id', 'order_num', 'path', 'component',
- 'query', 'is_frame', 'is_cache', 'menu_type', 'perms', 'visible',
- 'status', 'icon', 'create_by', 'create_time', 'update_by',
- 'update_time'
- }
-
- default_columns = ColumnEntityList(SysMenuPo, default_fields, False)
-
- @classmethod
- def select_menu_list(cls, menu: SysMenu) -> List[SysMenu]:
- """
- 查询系统菜单列表
-
- Args:
- menu (SysMenu): 查询条件
-
- Returns:
- List[SysMenu]: 菜单列表
- """
- criterions = []
- if menu.menu_name:
- criterions.append(SysMenuPo.menu_name.like(f'%{menu.menu_name}%'))
- if menu.status:
- criterions.append(SysMenuPo.status == menu.status)
- if menu.visible:
- criterions.append(SysMenuPo.visible == menu.visible)
-
- # columns.append_scalar(func.ifnull(SysMenuPo.perms, '').label('perms'))
-
- stmt = select(*cls.default_columns) \
- .where(*criterions) \
- .order_by(SysMenuPo.parent_id, SysMenuPo.order_num)
- if "criterian_meta" in g and g.criterian_meta.page:
- g.criterian_meta.page.stmt = stmt
-
- rows = db.session.execute(stmt).all()
- return [cls.default_columns.cast(row, SysMenu) for row in rows]
- @classmethod
- def select_menu_perms(cls) -> List[str]:
- """
- 根据用户,查询所有权限
-
- Args:
- user_id (int): 用户ID
-
- Returns:
- List[str]: 权限列表
- """
- stmt = select(SysMenuPo.perms).distinct() \
- .join(SysRoleMenuPo, SysRoleMenuPo.menu_id == SysMenuPo.menu_id) \
- .join(SysUserRolePo, SysUserRolePo.role_id == SysRoleMenuPo.role_id)
- return db.session.execute(stmt).scalars().all()
- @classmethod
- def select_menu_perms_by_role_id(cls, role_id: int) -> List[str]:
- """
- 根据角色ID,查询权限
-
- Args:
- role_id (int): 角色ID
-
- Returns:
- List[str]: 权限列表
- """
- coritations = [SysMenuPo.status == "0"]
- coritations.append(SysRoleMenuPo.role_id == role_id)
-
- stmt = select(SysMenuPo.perms).distinct() \
- .join(SysRoleMenuPo, SysRoleMenuPo.menu_id == SysMenuPo.menu_id) \
- .where(*coritations)
- return db.session.execute(stmt).scalars().all()
-
- @classmethod
- def select_menu_perms_by_user_id(cls, user_id: int) -> List[str]:
- """
- 根据用户ID,查询权限
-
- Args:
- user_id (int): 用户ID
-
- Returns:
- List[str]: 权限列表
- """
- coritations = [SysMenuPo.status == "0"]
- coritations.append(SysRolePo.status == "0")
- coritations.append(SysUserRolePo.user_id == user_id)
- stmt = select(SysMenuPo.perms).distinct() \
- .join(SysRoleMenuPo, SysRoleMenuPo.menu_id == SysMenuPo.menu_id) \
- .join(SysUserRolePo, SysUserRolePo.role_id == SysRoleMenuPo.role_id) \
- .join(SysRolePo, SysRolePo.role_id == SysUserRolePo.role_id) \
- .where(and_(*coritations))
- return db.session.execute(stmt).scalars().all()
- @classmethod
- def select_menu_tree_all(cls) -> List[SysMenu]:
- """
- 查询所有菜单
-
- Returns:
- List[SysMenu]: 菜单列表
- """
- fields = {
- "menu_id", "parent_id", "menu_name", "path", "component",
- "query", "visible", "status", "perms", "is_frame",
- "is_cache", "menu_type", "icon", "order_num", "create_time"
- }
- columns = ColumnEntityList(SysMenuPo, fields, False)
-
- coritations = [SysMenuPo.status == "0"]
- coritations.append(SysMenuPo.menu_type.in_(['M', 'C']))
-
- stmt = select(*columns).distinct() \
- .where(*coritations) \
- .order_by(SysMenuPo.parent_id, SysMenuPo.order_num)
-
- rows = db.session.execute(stmt).all()
- return [columns.cast(row, SysMenu) for row in rows]
-
- @classmethod
- def select_menu_tree_by_user_id(cls, user_id: int) -> List[SysMenu]:
- """
- 根据用户ID,查询菜单树
-
- Args:
- user_id (int): 用户ID
-
- Returns:
- List[SysMenuPo]: 菜单列表
- """
- fields = {
- "menu_id", "parent_id", "menu_name", "path", "component",
- "query", "visible", "status", "perms", "is_frame",
- "is_cache", "menu_type", "icon", "order_num", "create_time"
- }
- columns = ColumnEntityList(SysMenuPo, fields, False)
-
- coritations = [SysMenuPo.status == "0"]
- coritations.append(SysRolePo.status=="0")
- coritations.append(SysMenuPo.menu_type.in_(['M', 'C']))
- coritations.append(SysUserPo.user_id==user_id)
-
- stmt = select(*columns).distinct() \
- .join(SysRoleMenuPo, SysRoleMenuPo.menu_id == SysMenuPo.menu_id) \
- .join(SysUserRolePo, SysUserRolePo.role_id == SysRoleMenuPo.role_id) \
- .join(SysRolePo, SysRolePo.role_id == SysUserRolePo.role_id) \
- .join(SysUserPo, SysUserPo.user_id == SysUserRolePo.user_id) \
- .where(*coritations) \
- .order_by(SysMenuPo.parent_id, SysMenuPo.order_num)
-
- rows = db.session.execute(stmt).all()
- return [columns.cast(row, SysMenu) for row in rows]
- @classmethod
- def select_menu_list_by_user_id(cls, menu: SysMenu, user_id: int) -> List[SysMenu]:
- """
- 根据用户ID,查询菜单列表
-
- Args:
- menu (SysMenu): 查询条件
- user_id (int): 用户ID
-
- Returns:
- List[SysMenu]: 菜单列表
- """
- coritations = [SysMenuPo.user_id == user_id]
- if menu.menu_name is not None and menu.menu_name != '':
- coritations.append(SysMenuPo.menu_name.like(menu.menu_name))
- if menu.status is not None and menu.status != '':
- coritations.append(SysMenuPo.status == menu.status)
- if menu.visible is not None and menu.visible != '':
- coritations.append(SysMenuPo.visible == menu.visible)
-
- stmt = select(*cls.default_columns) \
- .join(SysRoleMenuPo, SysRoleMenuPo.menu_id == SysMenuPo.menu_id) \
- .join(SysUserRolePo, SysUserRolePo.role_id == SysRoleMenuPo.role_id) \
- .join(SysRolePo, SysRolePo.role_id == SysUserRolePo.role_id) \
- .where(*coritations) \
- .order_by(SysMenuPo.parent_id, SysMenuPo.order_num)
-
- rows = db.session.execute(stmt).all()
- return [cls.default_columns.cast(row, SysMenu) for row in rows]
-
- @classmethod
- def select_menu_list_by_role_id(cls, role_id: int, menu_check_strictly: bool) -> List[int]:
- """
- 根据角色ID,查询菜单列表
-
- Args:
- role_id (int): 角色ID
- menu_check_strictly (bool): 是否严格检查菜单
-
- Returns:
- List[int]: 菜单ID列表
- """
- coritations = [SysRoleMenuPo.role_id == role_id]
- if menu_check_strictly:
- subquery = select(SysMenuPo.parent_id) \
- .join(SysRoleMenuPo, SysMenuPo.menu_id == SysRoleMenuPo.menu_id) \
- .filter(SysRoleMenuPo.role_id == role_id) \
- .subquery()
- coritations.append(SysMenuPo.menu_id.notin_(subquery))
- stmt = select(SysMenuPo.menu_id) \
- .join(SysRoleMenuPo, SysRoleMenuPo.menu_id == SysMenuPo.menu_id) \
- .where(*coritations) \
- .order_by(SysMenuPo.parent_id, SysMenuPo.order_num)
- return db.session.execute(stmt).scalars().all()
- @classmethod
- def select_menu_by_id(cls, menu_id: int) -> Optional[SysMenuPo]:
- """
- 根据ID,查询菜单
-
- Args:
- menu_id (int): 菜单ID
-
- Returns:
- Optional[SysMenuPo]: 菜单信息
- """
- stmt = select(*cls.default_columns) \
- .where(SysMenuPo.menu_id == menu_id)
- row = db.session.execute(stmt).one_or_none()
- return cls.default_columns.cast(row, SysMenu) if row else None
- @classmethod
- def has_child_by_menu_id(cls, menu_id: int) -> int:
- """
- 是否存在菜单子节点
-
- Args:
- menu_id (int): 菜单ID
-
- Returns:
- int: 子节点数量
- """
- stmt = select(func.count(SysMenuPo.menu_id)) \
- .where(SysMenuPo.parent_id == menu_id)
- return db.session.execute(stmt).scalar() or 0
- @classmethod
- @Transactional(db.session)
- def insert_menu(cls, menu: SysMenu) -> int:
- """
- 新增菜单信息
-
- Args:
- menu (SysMenu): 菜单信息
-
- Returns:
- int: 新增记录的ID
- """
- fields = {
- "menu_name", "parent_id", "order_num", "path", "component",
- "query", "is_frame", "is_cache", "menu_type", "perms", "visible",
- "status", "icon", "create_by", "create_time"
- }
- stmt = insert(SysMenuPo).values(menu.model_dump(
- include=fields,
- exclude_none=True,
- exclude_unset=True,
- ))
- out = db.session.execute(stmt).inserted_primary_key
- return out[0] if out else 0
- @classmethod
- @Transactional(db.session)
- def update_menu(cls, menu: SysMenu) -> int:
- """
- 修改菜单信息
-
- Args:
- menu (SysMenu): 菜单信息
-
- Returns:
- int: 修改记录数
- """
- fields = {
- "menu_name", "parent_id", "order_num", "path", "component",
- "query", "is_frame", "is_cache", "menu_type", "perms", "visible",
- "status", "icon", "update_by", "update_time"
- }
- stmt = update(SysMenuPo).values(menu.model_dump(
- include=fields,
- exclude_none=True,
- exclude_unset=True,
- )) \
- .where(SysMenuPo.menu_id == menu.menu_id)
- return db.session.execute(stmt).rowcount
-
- @classmethod
- @Transactional(db.session)
- def delete_menu_by_id(cls, menu_id: int) -> int:
- """
- 删除菜单信息
-
- Args:
- menu_id (int): 菜单ID
-
- Returns:
- int: 删除记录数
- """
- stmt = delete(SysMenuPo).where(SysMenuPo.menu_id == menu_id)
- return db.session.execute(stmt).rowcount
- @classmethod
- def check_menu_name_unique(cls, menu_name: str, parent_id: int) -> Optional[SysMenu]:
- """
- 校验菜单名称是否唯一
-
- Args:
- menu_name (str): 菜单名称
- parent_id (int): 父菜单ID
-
- Returns:
- Optional[SysMenu]: 菜单信息
- """
- stmt = select(*cls.default_columns) \
- .where(SysMenuPo.menu_name == menu_name, SysMenuPo.parent_id == parent_id)
- row = db.session.execute(stmt).one_or_none()
- return cls.default_columns.cast(row, SysMenu) if row else None
|