# -*- coding: utf-8 -*-
# @Author : YY
from datetime import datetime
from typing import List, Literal, Optional
from ruoyi_common.constant import UserConstants
from ruoyi_common.exception import ServiceException
from ruoyi_common.sqlalchemy.transaction import Transactional
from ruoyi_common.domain.entity import SysRole, SysUser
from ruoyi_common.utils import security_util
from ruoyi_common.utils.base import LogUtil, StringUtil
from ruoyi_framework.descriptor.datascope import DataScope
from ruoyi_system.domain.entity import SysPost, SysUserPost, SysUserRole
from ruoyi_system.mapper import SysUserMapper
from ruoyi_system.mapper.sys_post import SysPostMapper
from ruoyi_system.mapper.sys_role import SysRoleMapper
from ruoyi_system.mapper.sys_user_post import SysUserPostMapper
from ruoyi_system.mapper.sys_user_role import SysUserRoleMapper
from ruoyi_admin.ext import db
from ruoyi_system.service.sys_config import SysConfigService
class SysUserService:
@classmethod
@DataScope(dept=True, user=True)
def select_user_list(cls, query: SysUser) -> List[SysUser]:
"""
查询用户列表
Args:
query (SysUser): 包含查询条件的传输对象
Returns:
List[SysUser]: 用户列表
"""
return SysUserMapper.select_user_list(query)
@classmethod
@DataScope(dept=True, user=True)
def select_allocated_list(cls, query: SysUser) -> List[SysUser]:
"""
查询已分配用户列表
Args:
query (SysUser): 包含查询条件的传输对象
Returns:
List[SysUser]: 已分配用户列表
"""
return SysUserMapper.select_allocated_list(query)
@classmethod
@DataScope(dept=True, user=True)
def select_unallocated_list(cls, query: SysUser) -> List[SysUser]:
"""
查询未分配用户列表
Args:
query (SysUser): 包含查询条件的传输对象
Returns:
List[SysUser]: 已分配用户列表
"""
return SysUserMapper.select_unallocated_list(query)
@classmethod
def select_user_by_user_name(cls, user_name: str) -> Optional[SysUser]:
"""
根据用户名,查询用户
Args:
user_name (str): 用户名
Returns:
Optional[SysUser]: 用户信息
"""
return SysUserMapper.select_user_by_user_name(user_name)
@classmethod
def select_user_by_id(cls, user_id: int) -> Optional[SysUser]:
"""
根据用户ID,查询用户
Args:
user_id (int): 用户ID
Returns:
Optional[SysUser]: 用户信息
"""
return SysUserMapper.select_user_by_id(user_id)
@classmethod
def select_user_role_group(cls, user_name: str) -> str:
"""
查询用户角色组
Args:
user_name (str): 用户名
Returns:
str: 角色组
"""
eos: List[SysRole] = SysRoleMapper.select_role_list_by_user_name(user_name)
if not eos:
return StringUtil.EMPTY
return ",".join([eo.role_name for eo in eos])
@classmethod
def select_user_post_group(cls, user_name: str) -> str:
"""
查询用户岗位组
Args:
user_name (str): 用户名
Returns:
str: 岗位组
"""
eos: List[SysPost] = SysPostMapper.select_posts_by_user_name(user_name)
if not eos:
return StringUtil.EMPTY
return ",".join([eo.post_name for eo in eos])
@classmethod
def check_user_name_unique(cls, user: SysUser) -> Literal["0", "1"]:
"""
校验用户名是否唯一
Args:
user (SysUser): 用户信息
Returns:
str: 唯一标识符, 0-唯一, 1-不唯一
"""
user_name = -1 if user.user_name is None else user.user_name
num = SysUserMapper.check_user_name_unique(user_name)
if num > 0:
return UserConstants.NOT_UNIQUE
return UserConstants.UNIQUE
@classmethod
def check_phone_unique(cls, user: SysUser) -> Literal["0", "1"]:
"""
校验手机号是否唯一
Args:
user (SysUser): 用户信息
Returns:
str: 唯一标识符, 0-唯一, 1-不唯一
"""
user_id = -1 if user.user_id is None else user.user_id
eo: SysUser = SysUserMapper.check_phone_unique(user.phonenumber)
if eo and eo.user_id != user_id:
return UserConstants.NOT_UNIQUE
return UserConstants.UNIQUE
@classmethod
def check_email_unique(cls, user: SysUser) -> str:
"""
校验邮箱是否唯一
Args:
user (SysUser): 用户信息
Returns:
str: 唯一标识符, 0-唯一, 1-不唯一
"""
user_email = -1 if user.email is None else user.email
eo: SysUser = SysUserMapper.check_email_unique(user.email)
if eo and eo.email != user_email:
return UserConstants.NOT_UNIQUE
return UserConstants.UNIQUE
@classmethod
def check_user_allowed(cls, user: SysUser):
"""
检查用户是否允许操作
Args:
user (SysUser): 用户信息
Raises:
ServiceException: 超级管理员用户不允许操作
"""
if user.is_admin():
raise ServiceException("不允许操作超级管理员用户")
@classmethod
def check_user_data_scope(cls, user_id: Optional[int]):
"""
检查用户数据权限
Args:
user_id (Optional[int]): 用户ID
Raises:
ServiceException: 无权限访问用户数据
"""
if not security_util.login_user_is_admin():
user = SysUser(user_id=user_id) if user_id else SysUser()
users: List[SysUser] = cls.select_user_list(user)
if not users:
raise ServiceException("没有权限访问用户数据")
@classmethod
@Transactional(db.session)
def insert_user(cls, user: SysUser) -> bool:
"""
新增用户
Args:
user (SysUser): 用户信息
Returns:
bool: 操作结果
"""
user.password = cls._build_password(user.password)
if user.create_time is None:
now = datetime.now()
user.create_time = now
user.update_time = now
last_pid = SysUserMapper.insert_user(user)
user.user_id = last_pid
cls.insert_user_post_by_user(user)
cls.insert_user_role_by_user(user)
return last_pid > 0
@classmethod
def register_user(cls, user: SysUser) -> bool:
"""
注册用户
Args:
user (SysUser): 用户信息
Returns:
bool: 操作结果
"""
user.password = cls._build_password(user.password)
if user.create_time is None:
user.create_time = datetime.now()
flag = SysUserMapper.insert_user(user)
return flag > 0
@classmethod
@Transactional(db.session)
def update_user(cls, user: SysUser) -> bool:
"""
更新用户
Args:
user (SysUser): 用户信息
Returns:
bool: 操作结果
"""
user.update_time = datetime.now()
# 删除用户角色关联
SysRoleMapper.delete_user_role_by_user_id(user.user_id)
# 新增用户和角色的关联
cls.insert_user_role_by_user(user)
# 删除用户岗位关联
SysUserPostMapper.delete_user_post_by_user_id(user.user_id)
# 新增用户岗位关联
cls.insert_user_post_by_user(user)
return SysUserMapper.update_user(user)
@classmethod
def update_user_login_info(cls, user: SysUser) -> bool:
"""
更新用户登录信息(登录IP、时间)
Args:
user (SysUser): 用户信息
Returns:
bool: 操作结果
"""
user.update_time = datetime.now()
return SysUserMapper.update_user_login_info(user) > 0
@classmethod
@Transactional(db.session)
def insert_user_auth(cls, user_id: int, role_ids: List[int]):
"""
新增用户角色
Args:
user_id: 用户id
role_ids: 角色id列表
"""
cls.insert_user_role(user_id, role_ids)
@classmethod
@Transactional(db.session)
def delete_users_by_id(cls, id: int) -> bool:
"""
根据用户ID,删除用户
Args:
id (int): 用户ID
Returns:
bool: 操作结果
"""
SysUserRoleMapper.delete_user_role_by_user_id(id)
SysUserPostMapper.delete_user_post_by_user_id(id)
return SysUserMapper.delete_user_by_id(id) > 0
@classmethod
@Transactional(db.session)
def delete_users_by_ids(cls, ids: List[int]) -> bool:
"""
根据用户ID列表,批量删除用户
Args:
ids (List[int]): 用户ID列表
Returns:
bool: 操作结果
"""
if not ids:
return False
SysUserRoleMapper.delete_user_role_by_user_ids(ids)
SysUserPostMapper.delete_user_post(ids)
return SysUserMapper.delete_user_by_ids(ids) > 0
@classmethod
def update_user_status(cls, user: SysUser) -> bool:
"""
更新用户状态
Args:
user (SysUser): 用户信息
Returns:
bool: 操作结果
"""
user.update_time = datetime.now()
return SysUserMapper.update_user(user) > 0
@classmethod
def update_user_profile(cls, user: SysUser) -> bool:
"""
更新用户个人信息
Args:
user (SysUser): 用户信息
Returns:
bool: 操作结果
"""
user.update_time = datetime.now()
return SysUserMapper.update_user(user) > 0
@classmethod
def update_user_avatar(cls, user_name: str, avatar: str) -> bool:
"""
更新用户头像
Args:
user_name (str): 用户名
avatar (str): 头像
Returns:
bool: 操作结果
"""
return SysUserMapper.update_user_avatar(user_name, avatar) > 0
@classmethod
def reset_pwd(cls, user: SysUser) -> bool:
"""
重置用户密码
Args:
user (SysUser): 用户信息
Returns:
bool: 操作结果
"""
user.password = cls._build_password(user.password)
user.update_time = datetime.now()
return SysUserMapper.update_user(user) > 0
@classmethod
def reset_user_pwd(cls, username: str, password: str) -> bool:
"""
重置用户密码
Args:
username (str): 用户名
password (str): 密码
Returns:
bool: 操作结果
"""
return SysUserMapper.reset_user_pwd(username, password) > 0
@classmethod
def insert_user_role_by_user(cls, user: SysUser):
"""
新增用户角色
Args:
user: 用户信息
"""
cls.insert_user_role(user.user_id, user.role_ids)
@classmethod
@Transactional(db.session)
def insert_user_role(cls, user_id: int, role_ids: List[int]):
"""
新增用户角色
Args:
user_id: 用户id
role_ids: 角色id列表
"""
if role_ids:
lists = [
SysUserRole(user_id=user_id, role_id=role_id)
for role_id in role_ids
]
SysUserRoleMapper.batch_user_role(lists)
@classmethod
def insert_user_post_by_user(cls, user: SysUser):
"""
新增用户岗位
Args:
user: 用户信息
"""
cls.insert_user_post(user.user_id, user.post_ids)
@classmethod
@Transactional(db.session)
def insert_user_post(cls, user_id: int, post_ids: List[int]):
"""
新增用户岗位
Args:
user_id: 用户id
post_ids: 岗位id列表
"""
if post_ids:
lists = [
SysUserPost(user_id=user_id, post_id=post_id)
for post_id in post_ids
]
SysUserPostMapper.batch_user_post(lists)
@classmethod
@Transactional(db.session)
def update_user_roles(cls, user_id: int, role_ids: List[int]) -> bool:
"""
更新用户角色
Args:
user_id: 用户id
role_ids: 角色id列表
Returns:
bool: 操作结果
"""
SysUserRoleMapper.delete_user_role_by_user_id(user_id)
cls.insert_user_role(user_id, role_ids)
return True
@classmethod
def import_user(cls, users: List[SysUser], is_update: bool = False) -> str:
"""
导入用户
Args:
users (List[SysUser]): 用户列表
is_update (bool): 是否更新
Returns:
str: 导入消息结果
"""
if not users:
raise ServiceException("导入用户不能为空")
success_count = 0
fail_count = 0
success_msg = ""
fail_msg = ""
default_password = SysConfigService.select_config_by_key("sys.user.initPassword")
for user in users:
try:
dto = SysUserMapper.select_user_by_user_name(user.user_name)
if not dto:
user.password = cls._build_password(default_password)
user.create_by_user(security_util.get_user_id())
cls.insert_user(user)
success_count += 1
success_msg += f"
第{success_count}个账号,导入成功:{user.user_name}"
elif is_update:
user.update_by_user(security_util.get_user_id())
cls.update_user(user)
success_count += 1
success_msg += f"
第{success_count}个账号,更新成功:{user.user_name}"
else:
fail_count += 1
fail_msg += f"
第{fail_count}个账号,已存在:{user.user_name}"
except Exception as e:
fail_count += 1
fail_msg += f"
第{fail_count}个账号,导入失败:{user.user_name},\
原因:{e.__class__.__name__}"
LogUtil.logger.error(f"导入用户失败,原因:{e}")
if fail_count > 0:
if success_msg:
fail_msg = f"导入成功{success_count}个,失败{fail_count}个。{success_msg} \
" + fail_msg
else:
fail_msg = f"导入成功{success_count}个,失败{fail_count}个。{fail_msg}"
raise ServiceException(fail_msg)
else:
success_msg = f"恭喜您,数据已全部导入成功!共 {success_count} 条,数据如下:" \
+ success_msg
return success_msg
@classmethod
def _build_password(cls, password: Optional[str]) -> str:
"""
构建持久化密码,若为空使用初始化密码,若未加密则加密
"""
if not password:
password = SysConfigService.select_config_by_key(
"sys.user.initPassword"
)
if not password:
raise ServiceException("初始化密码未配置")
if cls._is_encrypted(password):
return password
return security_util.encrypt_password(password)
@staticmethod
def _is_encrypted(password: Optional[str]) -> bool:
"""
判断密码是否为bcrypt加密串
"""
return bool(password and password.startswith("$2"))