| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471 |
- # -*- coding: utf-8 -*-
- # @Author : YY
- from typing import List, Optional
- from flask import g
- from sqlalchemy import and_, or_, func, insert, select, update
- from ruoyi_common.base.model import ExtraModel
- from ruoyi_common.domain.entity import SysDept, SysRole, SysUser
- from ruoyi_common.sqlalchemy.model import ColumnEntityList
- from ruoyi_common.sqlalchemy.transaction import Transactional
- from ruoyi_system.domain.po import SysDeptPo, SysRolePo, SysUserPo, \
- SysUserRolePo
- from ruoyi_admin.ext import db
- from ruoyi_common.utils import security_util as SecurityUtil
- class SysUserMapper:
- """
- 用户数据访问层
- """
- default_fields = {
- "user_id", "dept_id", "user_name", "nick_name", "email", "phonenumber",
- "avatar", "status", "password", "sex", "del_flag", "login_ip",
- "login_date", "create_by", "create_time", "update_by", "update_time",
- "remark"
- }
- default_columns = ColumnEntityList(SysUserPo, default_fields, False)
- @classmethod
- def select_user_list(cls, user: SysUser) -> List[SysUser]:
- """
- 根据条件,查询用户列表
- Args:
- user: 用户传输条件信息
- Returns:
- 用户信息列表
- """
- print(user)
- dept_vo_fields = {"dept_name", "leader"}
- user_columns = ColumnEntityList(SysUserPo, cls.default_fields)
- dept_columns = ColumnEntityList(SysDeptPo, dept_vo_fields)
- criterions = [SysUserPo.del_flag == "0"]
- if user.user_id is not None and user.user_id != 0:
- criterions.append(SysUserPo.user_id == user.user_id)
- if user.user_name is not None and user.user_name != '':
- criterions.append(SysUserPo.user_name.like(f"%{user.user_name}%"))
- if user.status is not None and user.status != 0:
- criterions.append(SysUserPo.status == user.status)
- if user.phonenumber is not None and user.phonenumber != '':
- criterions.append(SysUserPo.phonenumber.like(f"%{user.phonenumber}%"))
- if user.dept_id is not None and user.dept_id != 0:
- subquery = select(SysDeptPo.dept_id).where(or_(
- SysDeptPo.dept_id == user.dept_id,
- func.find_in_set(user.dept_id, SysDeptPo.ancestors)
- )).subquery()
- criterions.append(SysUserPo.dept_id.in_(subquery))
- if g.criterian_meta.extra:
- extra: ExtraModel = g.criterian_meta.extra
- if extra.start_time and extra.end_time:
- criterions.append(SysUserPo.create_time >= extra.start_time)
- criterions.append(SysUserPo.create_time <= extra.end_time)
- # 检查是否需要应用数据范围过滤
- # 只有当用户不是超级管理员时才应用数据范围过滤
- login_user = SecurityUtil.get_login_user()
- if g.criterian_meta.scope and (not login_user or not SecurityUtil.is_user_admin(login_user.user)):
- criterions.append(g.criterian_meta.scope)
- stmt = select(*user_columns, *dept_columns) \
- .join(SysDeptPo, SysUserPo.dept_id == SysDeptPo.dept_id, isouter=True) \
- .where(*criterions)
- print("Generated SQL:")
- print(stmt.compile(db.session.bind, compile_kwargs={"literal_binds": True}))
- if g.criterian_meta.page:
- g.criterian_meta.page.stmt = stmt
- rows = db.session.execute(stmt).all()
- eos = list()
- for row in rows:
- user = user_columns.cast(row, SysUser)
- user.dept = dept_columns.cast(row, SysDept)
- eos.append(user)
- return eos
- @classmethod
- def select_allocated_list(cls, user: SysUser) -> List[SysUser]:
- """
- 根据条件,查询已配用户角色列表
- Args:
- user (SysUser): 用户传输条件信息
- Returns:
- List[SysUser]: 用户信息列表
- """
- fields = {"user_id", "dept_id", "user_name", "nick_name", "email", \
- "phonenumber", "status", "create_time"}
- columns = ColumnEntityList(SysUserPo, fields, alia_prefix=False)
- criterions = [SysUserPo.del_flag == "0"]
- if user.user_name:
- criterions.append(SysUserPo.user_name.like(f"%{user.user_name}%"))
- if user.phonenumber:
- criterions.append(SysUserPo.phonenumber.like(f"%{user.phonenumber}%"))
- # 检查是否需要应用数据范围过滤
- login_user = SecurityUtil.get_login_user()
- if "criterian_meta" in g and g.criterian_meta.scope and (not login_user or not SecurityUtil.is_user_admin(login_user.user)):
- criterions.append(g.criterian_meta.scope)
- stmt = select(*columns).distinct() \
- .join(SysDeptPo, SysUserPo.dept_id == SysDeptPo.dept_id) \
- .join(SysUserRolePo, SysUserPo.user_id == SysUserRolePo.user_id) \
- .join(SysRolePo, SysUserRolePo.role_id == SysRolePo.role_id) \
- .where(*criterions)
- rows = db.session.execute(stmt).all()
- return [columns.cast(row, SysUser) for row in rows]
- @classmethod
- def select_unallocated_list(cls, user: SysUser) -> List[SysUser]:
- """
- 根据条件,查询未分配用户角色列表
- Args:
- user (SysUser): 用户传输条件信息
- Returns:
- List[SysUser]: 用户信息列表
- """
- fields = {
- "user_id", "dept_id", "user_name", "nick_name", "email", \
- "phonenumber", "status", "create_time"
- }
- columns = ColumnEntityList(SysUserPo, fields, False)
- criterions = [SysUserPo.del_flag == "0"]
- subquery = select(SysUserPo.user_id) \
- .join(SysUserRolePo, and_(
- SysUserPo.user_id == SysUserRolePo.user_id,
- SysUserRolePo.role_id == user.role_id
- )) \
- .subquery()
- criterions.append(or_(
- SysRolePo.role_id != user.role_id,
- SysRolePo.role_id.is_(None)
- ))
- criterions.append(SysUserPo.user_id.notin_(subquery))
- if user.user_name:
- criterions.append(SysUserPo.user_name.like(f"%{user.user_name}%"))
- if user.phonenumber:
- criterions.append(SysUserPo.phonenumber.like(f"%{user.phonenumber}%"))
- # 检查是否需要应用数据范围过滤
- login_user = SecurityUtil.get_login_user()
- if "criterian_meta" in g and g.criterian_meta.scope and (not login_user or not SecurityUtil.is_user_admin(login_user.user)):
- criterions.append(g.criterian_meta.scope)
- stmt = select(*columns).distinct() \
- .join(SysDeptPo, SysUserPo.dept_id == SysDeptPo.dept_id) \
- .join(SysUserRolePo, SysUserPo.user_id == SysUserRolePo.user_id) \
- .join(SysRolePo, SysUserRolePo.role_id == SysRolePo.role_id) \
- .where(*criterions)
- rows = db.session.execute(stmt).all()
- return [columns.cast(row, SysUser) for row in rows]
- @classmethod
- def select_user_by_user_name(cls, user_name: str) -> Optional[SysUser]:
- """
- 通过用户名查询用户
- Args:
- user_name (str): 用户名
- Returns:
- Optional[SysUser]: 用户信息
- """
- return cls.select_user_by_unique_map("user_name", user_name)
- @classmethod
- def select_user_by_id(cls, user_id: int) -> Optional[SysUser]:
- """
- 通过用户ID查询用户
- Args:
- user_id (int): 用户ID
- Returns:
- Optional[SysUser]: 用户信息
- """
- return cls.select_user_by_unique_map("user_id", user_id)
- @classmethod
- def select_user_by_unique_map(
- cls,
- key: str,
- value: int | str
- ) -> Optional[SysUser]:
- """
- 通过含有唯一键的条件,查询用户
- Args:
- key (str): 唯一键名
- value (int|str): 唯一键值
- Returns:
- Optional[SysUser]: 用户信息
- """
- dept_vo_fields = {
- "dept_id", "parent_id", "dept_name", "order_num", "leader",
- "status"
- }
- role_vo_fields = {
- "role_id", "role_name", "role_key", "role_sort", "data_scope",
- "status"
- }
- user_columns = ColumnEntityList(SysUserPo, cls.default_fields)
- dept_columns = ColumnEntityList(SysDeptPo, dept_vo_fields)
- role_columns = ColumnEntityList(SysRolePo, role_vo_fields)
- column = getattr(SysUserPo, key)
- stmt = select(*user_columns, *dept_columns, *role_columns).distinct() \
- .join(
- SysDeptPo,
- SysUserPo.dept_id == SysDeptPo.dept_id,
- isouter=True
- ) \
- .join(
- SysUserRolePo,
- SysUserPo.user_id == SysUserRolePo.user_id,
- isouter=True
- ) \
- .join(
- SysRolePo,
- SysUserRolePo.role_id == SysRolePo.role_id,
- isouter=True
- ) \
- .where(column == value)
- rows = db.session.execute(stmt).all()
- eo_tmp = {}
- role_pk_label = role_columns[0].key if role_columns else None
- for row in rows:
- if key in eo_tmp:
- user = eo_tmp[key]
- else:
- user = user_columns.cast(row, SysUser)
- user.dept = dept_columns.cast(row, SysDept)
- eo_tmp[key] = user
- if role_pk_label and getattr(row, role_pk_label) is not None:
- user.roles.append(role_columns.cast(row, SysRole))
- return eo_tmp[key] if rows else None
- @classmethod
- @Transactional(db.session)
- def insert_user(cls, user: SysUser) -> int:
- """
- 新增用户信息
- Args:
- user (SysUser): 用户信息
- Returns:
- int: 新增记录的ID
- """
- fields = {
- "user_id", "dept_id", "user_name", "nick_name", "email", "avatar",
- "phonenumber", "sex", "password", "status", "create_by", "remark",
- "create_time", "update_time"
- }
- data = user.model_dump(
- include=fields,
- exclude_unset=True,
- exclude_none=True
- )
- if user.password is not None:
- data["password"] = user.password
- stmt = insert(SysUserPo).values(data)
- out = db.session.execute(stmt).inserted_primary_key
- return out[0] if out else 0
- @classmethod
- @Transactional(db.session)
- def update_user(cls, user: SysUser) -> int:
- """
- 修改用户信息
- Args:
- user (SysUser): 用户信息
- Returns:
- int: 修改数量
- """
- fields = {
- "dept_id", "user_name", "nick_name", "email", "avatar", "login_ip",
- "phonenumber", "sex", "password", "login_date", "status", "update_by",
- "remark", "update_time"
- }
- data = user.model_dump(
- include=fields,
- exclude_unset=True,
- exclude_none=True
- )
- # 如果密码为空字符串或 None,则不更新密码字段,避免把密码清空
- if not user.password:
- data.pop("password", None)
- else:
- data["password"] = user.password
- stmt = update(SysUserPo) \
- .where(SysUserPo.user_id == user.user_id) \
- .values(data)
- return db.session.execute(stmt).rowcount
- @classmethod
- @Transactional(db.session)
- def update_user_login_info(cls, user: SysUser) -> int:
- """
- 更新用户登录信息(登录IP、时间等)
- Args:
- user (SysUser): 用户信息(需包含 user_id)
- Returns:
- int: 修改数量
- """
- fields = {"login_ip", "login_date", "update_time"}
- data = user.model_dump(
- include=fields,
- exclude_unset=True,
- exclude_none=True
- )
- if not data:
- return 0
- stmt = update(SysUserPo) \
- .where(SysUserPo.user_id == user.user_id) \
- .values(data)
- return db.session.execute(stmt).rowcount
- @classmethod
- @Transactional(db.session)
- def update_user_avatar(cls, user_name: str, avatar: str) -> int:
- """
- 修改用户头像
- Args:
- user_name (str): 用户名
- avatar (str): 头像地址
- Returns:
- int: 修改数量
- """
- stmt = update(SysUserPo) \
- .where(SysUserPo.user_name == user_name) \
- .values(**{'avatar': avatar})
- return db.session.execute(stmt).rowcount
- @classmethod
- @Transactional(db.session)
- def reset_user_pwd(cls, user_name: str, password: str) -> int:
- """
- 重置用户密码
- Args:
- user_name (str): 用户名
- password (str): 密码
- Returns:
- int: 修改数量
- """
- stmt = update(SysUserPo) \
- .where(SysUserPo.user_name == user_name) \
- .values(password=password)
- return db.session.execute(stmt).rowcount
- @classmethod
- @Transactional(db.session)
- def delete_user_by_id(cls, user_id: int) -> int:
- """
- 通过用户ID删除用户
- Args:
- user_id (int): 用户ID
- Returns:
- int: 删除数量
- """
- stmt = update(SysUserPo).where(SysUserPo.user_id == user_id) \
- .values(del_flag="2")
- num = db.session.execute(stmt).rowcount
- return num
- @classmethod
- @Transactional(db.session)
- def delete_user_by_ids(cls, user_ids: List[int]) -> int:
- """
- 批量删除用户信息
- Args:
- user_ids (List[int]): 用户ID列表
- Returns:
- int: 删除数量
- """
- stmt = update(SysUserPo).where(SysUserPo.user_id.in_(user_ids)) \
- .values(del_flag="2")
- num = db.session.execute(stmt).rowcount
- return num
- @classmethod
- def check_user_name_unique(cls, user_name: str) -> int:
- """
- 校验用户名称是否唯一
- Args:
- user_name (str): 用户名称
- Returns:
- int: 0-唯一,大于0-已存在
- """
- stmt = select(func.count()).select_from(SysUserPo) \
- .where(SysUserPo.user_name == user_name)
- return db.session.execute(stmt).scalar() or 0
- @classmethod
- def check_phone_unique(cls, phone_number: str) -> Optional[SysUser]:
- """
- 校验手机号码是否唯一
- Args:
- phone_number (str): 手机号码
- Returns:
- Optional[SysUser]: 用户信息
- """
- fields = {"user_id", "phonenumber"}
- columns = ColumnEntityList(
- SysUserPo, fields, alia_prefix=False
- )
- stmt = select(*columns) \
- .where(SysUserPo.phonenumber == phone_number)
- row = db.session.execute(stmt).one_or_none()
- return columns.cast(row, SysUser) if row else None
- @classmethod
- def check_email_unique(cls, email: str) -> Optional[SysUser]:
- """
- 校验email是否唯一
- Args:
- email (str): email
- Returns:
- Optional[SysUser]: 用户信息
- """
- fields = {"user_id", "email"}
- columns = ColumnEntityList(
- SysUserPo, fields, alia_prefix=False
- )
- stmt = select(*columns).where(SysUserPo.email == email)
- row = db.session.execute(stmt).one_or_none()
- return columns.cast(row, SysUser) if row else None
|