| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324 |
- # -*- coding: utf-8 -*-
- # @Author : YY
- from typing import List, Optional
- from flask import g
- from sqlalchemy import alias, and_, case, func, insert, select, update
- from sqlalchemy.orm import aliased
- from ruoyi_common.domain.entity import SysDept
- from ruoyi_common.sqlalchemy.model import ColumnEntityList
- from ruoyi_common.sqlalchemy.transaction import Transactional
- from ruoyi_system.domain.po import SysDeptPo, SysRoleDeptPo, SysUserPo
- from ruoyi_admin.ext import db
- class SysDeptMapper:
-
- """
- 部门数据访问层
- """
-
- default_fields = {
- "dept_id", "parent_id", "ancestors", "dept_name", "order_num",
- "leader", "phone", "email", "status", "del_flag", "create_by",
- "create_time"
- }
-
- default_columns = ColumnEntityList(SysDeptPo, default_fields, False)
-
- @classmethod
- def select_dept_list(cls, dept: SysDept) -> List[SysDept]:
- """
- 根据条件,查询部门列表
-
- Args:
- dept (SysDept): 部门实体
-
- Returns:
- List[SysDept]: 部门列表
- """
- criterions = [SysDeptPo.del_flag=="0"]
- if dept.dept_id:
- criterions.append(SysDeptPo.dept_id==dept.dept_id)
- if dept.parent_id:
- criterions.append(SysDeptPo.parent_id==dept.parent_id)
- if dept.dept_name:
- criterions.append(SysDeptPo.dept_name.like(f'%{dept.dept_name}%'))
- if dept.status:
- criterions.append(SysDeptPo.status==dept.status)
- if "criterian_meta" in g and g.criterian_meta.scope:
- criterions.append(g.criterian_meta.scope)
-
- stmt = select(*cls.default_columns).where(*criterions)
- if "criterian_meta" in g and g.criterian_meta.page:
- g.criterian_meta.page.stmt = stmt
-
- rows = db.session.execute(stmt).all()
- return [cls.default_columns.cast(row, SysDept) for row in rows]
-
- @classmethod
- def select_dept_list_by_role_id(
- cls,
- role_id: int,
- dept_check_strictly: bool
- ) -> List[int]:
- """
- 根据角色ID,查询部门Id列表
-
- Args:
- role_id (int): 角色ID
- dept_check_strictly (bool): 是否严格检查部门权限
-
- Returns:
- List[int]: 部门ID列表
- """
- criterions = []
- criterions.append(SysRoleDeptPo.role_id==role_id)
- if dept_check_strictly:
- subquery = select(SysDeptPo.parent_id) \
- .join(SysRoleDeptPo, and_(
- SysDeptPo.dept_id==SysRoleDeptPo.dept_id,
- SysRoleDeptPo.role_id==role_id
- )) \
- .subquery()
- criterions.append(SysDeptPo.dept_id.not_in(subquery))
-
- stmt = select(SysDeptPo.dept_id).outerjoin(
- SysRoleDeptPo, SysDeptPo.dept_id==SysRoleDeptPo.dept_id
- ) \
- .where(*criterions) \
- .order_by(SysDeptPo.parent_id, SysDeptPo.order_num)
- rows = db.session.execute(stmt).scalars()
- return rows
- @classmethod
- def select_dept_by_id(cls, dept_id: int) -> Optional[SysDept]:
- """
- 根据部门ID查询信息
-
- Args:
- dept_id (int): 部门ID
-
- Returns:
- Optional[SysDept]: 部门实体
- """
- fields = {
- "dept_id", "parent_id", "ancestors", "dept_name", "order_num", \
- "leader", "phone", "email", "status"
- }
- columns = ColumnEntityList(SysDeptPo, fields, alia_prefix=False)
- SysDeptPo_P = aliased(SysDeptPo, name="p")
- p_name_scalar = select(SysDeptPo_P.dept_name) \
- .where(SysDeptPo_P.dept_id==SysDeptPo.parent_id) \
- .scalar_subquery()
- columns.append_scalar(p_name_scalar.label("parent_name"))
-
- stmt = select(*columns).where(SysDeptPo.dept_id==dept_id)
- row = db.session.execute(stmt).one_or_none()
- return columns.cast(row, SysDept) if row else None
- @classmethod
- def select_children_dept_by_id(cls, dept_id: int) -> List[SysDept]:
- """
- 根据ID查询所有子部门
-
- Args:
- dept_id (int): 部门ID
-
- Returns:
- List[SysDept]: 部门列表
- """
- fields = {
- "dept_id", "parent_id", "ancestors", "dept_name", "order_num", \
- "leader", "phone", "email", "status", "del_flag", "create_by", \
- "create_time"
- }
- columns = ColumnEntityList(SysDeptPo, fields, alia_prefix=False)
-
- stmt = select(*columns) \
- .where(func.find_in_set(dept_id, SysDeptPo.ancestors))
- rows = db.session.execute(stmt).all()
-
- return [columns.cast(row, SysDept) for row in rows]
- @classmethod
- def select_normal_children_dept_by_id(cls, dept_id: int) -> int:
- """
- 根据ID,查询所有子部门(正常状态)
-
- Args:
- dept_id (int): 部门ID
-
- Returns:
- int: 部门数量
- """
- criterions = [SysDeptPo.status==0, SysDeptPo.del_flag=="0"]
- criterions.append(func.find_in_set(dept_id, SysDeptPo.ancestors))
-
- stmt = select(func.count()).select_from(SysDeptPo) \
- .where(*criterions)
- return db.session.execute(stmt).scalar() or 0
- @classmethod
- def has_child_by_dept_id(cls, dept_id: int) -> int:
- """
- 是否存在子节点
-
- Args:
- dept_id (int): 部门ID
-
- Returns:
- int: 数量
- """
- criterions = [SysDeptPo.parent_id==dept_id, SysDeptPo.del_flag=="0"]
-
- stmt = select(func.count()).select_from(SysDeptPo) \
- .where(*criterions).limit(1)
- return db.session.execute(stmt).scalar() or 0
- @classmethod
- def check_dept_exist_user(cls, dept_id: int) -> int:
- """
- 查询部门是否存在用户
-
- Args:
- dept_id (int): 部门ID
-
- Returns:
- int: 数量
- """
- criterions = [SysUserPo.dept_id==dept_id, SysUserPo.del_flag=="0"]
-
- stmt = select(func.count()).select_from(SysUserPo) \
- .where(*criterions)
- return db.session.execute(stmt).scalar() or 0
- @classmethod
- def check_dept_name_unique(cls, dept_name: str, parent_id: int) -> Optional[SysDept]:
- """
- 校验部门名称是否唯一
-
- Args:
- dept_name (str): 部门名称
- parent_id (int): 父部门ID
-
- Returns:
- Optional[SysDept]: 部门实体
- """
- criterions = [SysDeptPo.parent_id==parent_id]
- criterions.append(SysDeptPo.del_flag=="0")
- criterions.append(SysDeptPo.dept_name==dept_name)
-
- stmt = select(*cls.default_columns) \
- .where(*criterions)
- row = db.session.execute(stmt).one_or_none()
- return cls.default_columns.cast(row, SysDept) if row else None
- @classmethod
- @Transactional(db.session)
- def insert_dept(cls, dept: SysDept) -> int:
- """
- 新增部门信息
-
- Args:
- dept (SysDept): 部门实体
-
- Returns:
- int: 新增记录的ID
- """
- fields = {
- "dept_id", "parent_id", "ancestors", "dept_name", "order_num",
- "leader", "phone", "email", "status", "create_by", "create_time"
- }
- data = dept.model_dump(
- include=fields,
- exclude_unset=True,
- exclude_none=True
- )
- stmt = insert(SysDeptPo).values(data)
- out = db.session.execute(stmt).inserted_primary_key
- return out[0] if out else 0
- @classmethod
- @Transactional(db.session)
- def update_dept(cls, dept: SysDept) -> int:
- """
- 修改部门信息
- Args:
- dept (SysDept): 部门实体
-
- Returns:
- int: 数量
- """
- fields = {
- "parent_id", "ancestors", "dept_name", "order_num", "leader",
- "phone", "email", "status", "create_by", "create_time"
- }
- data = dept.model_dump(
- include=fields,
- exclude_unset=True,
- exclude_none=True
- )
- stmt = update(SysDeptPo) \
- .where(SysDeptPo.dept_id==dept.dept_id) \
- .values(data)
- return db.session.execute(stmt).rowcount
- @classmethod
- @Transactional(db.session)
- def update_dept_status_normal(cls, dept_ids: List[int]) -> int:
- """
- 修改所在部门正常状态
- Args:
- dept_ids (List[int]): 部门ID列表
-
- Returns:
- int: 数量
- """
- stmt = update(SysDeptPo) \
- .where(SysDeptPo.dept_id.in_(dept_ids)) \
- .values(status="0")
- return db.session.execute(stmt).rowcount
- @classmethod
- @Transactional(db.session)
- def update_dept_children(cls, depts: List[SysDept]) -> int:
- """
- 修改子元素关系
-
- Args:
- depts (List[SysDept]): 部门列表
-
- Returns:
- int: 数量
- """
- print("depts: {}".format(depts))
-
- case_expr = case(
- *[(SysDeptPo.dept_id==dept.dept_id, dept.ancestors) for dept in depts],
- else_=SysDeptPo.ancestors
- )
- stmt = update(SysDeptPo) \
- .where(SysDeptPo.dept_id.in_([dept.dept_id for dept in depts])) \
- .values(ancestors=case_expr)
- return db.session.execute(stmt).rowcount
- @classmethod
- @Transactional(db.session)
- def delete_dept_by_id(cls, dept_id: int) -> int:
- """
- 删除部门管理信息
-
- Args:
- dept_id (int): 部门ID
-
- Returns:
- int: 数量
- """
- stmt = update(SysDeptPo) \
- .where(SysDeptPo.dept_id==dept_id) \
- .values(del_flag="2")
- return db.session.execute(stmt).rowcount
|