| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322 |
- # -*- coding: utf-8 -*-
- # @Author : YY
- from typing import List, Optional
- from flask import request
- from flask_login import login_required
- from pydantic import BeforeValidator
- from typing_extensions import Annotated
- from werkzeug.datastructures import FileStorage
- from ruoyi_common.base.model import AjaxResponse, TableResponse
- from ruoyi_common.base.transformer import ids_to_list
- from ruoyi_common.constant import UserConstants
- from ruoyi_common.descriptor.serializer import BaseSerializer, JsonSerializer
- from ruoyi_common.descriptor.validator import FileDownloadValidator, \
- FileUploadValidator, QueryValidator, BodyValidator, PathValidator
- from ruoyi_common.domain.entity import SysUser, SysRole
- from ruoyi_common.domain.enum import BusinessType
- from ruoyi_common.utils import security_util as SecurityUtil
- from ruoyi_common.utils.base import ExcelUtil
- from ruoyi_framework.descriptor.log import Log
- from ruoyi_framework.descriptor.permission import HasPerm, PreAuthorize
- from ruoyi_system.domain.entity import SysPost
- from ruoyi_system.service import SysUserService
- from ruoyi_system.service.sys_post import SysPostService
- from ruoyi_system.service.sys_role import SysRoleService
- from ... import reg
- @reg.api.route("/system/user/list", methods=["GET"])
- @QueryValidator(is_page=True)
- @PreAuthorize(HasPerm("system:user:list"))
- @JsonSerializer()
- def system_user_list(dto: SysUser):
- '''
- 获取用户列表
- '''
- rows = SysUserService.select_user_list(dto)
- table_response = TableResponse(rows=rows)
- return table_response
- @reg.api.route("/system/user/", methods=["GET"])
- @reg.api.route("/system/user/<int:id>", methods=["GET"])
- @PathValidator()
- @PreAuthorize(HasPerm("system:user:query"))
- @JsonSerializer()
- def system_get_user(id: Optional[int] = None):
- '''
- 获取用户详情
- '''
- SysUserService.check_user_data_scope(id)
- ajax_response = AjaxResponse.from_success()
- roles: List[SysRole] = SysRoleService.select_role_all()
- posts: List[SysPost] = SysPostService.select_post_all()
- if not SecurityUtil.is_admin(id):
- roles = [role for role in roles if not role.is_admin()]
- setattr(ajax_response, "roles", roles)
- setattr(ajax_response, "posts", posts)
- if id:
- user = SysUserService.select_user_by_id(id)
- setattr(ajax_response, "data", user)
- post_ids = SysPostService.select_post_list_by_user_id(id)
- setattr(ajax_response, "postIds", post_ids)
- setattr(ajax_response, "role_ids", user.role_ids)
- return ajax_response
- @reg.api.route("/system/user", methods=["POST"])
- @BodyValidator()
- @PreAuthorize(HasPerm("system:user:add"))
- @Log(title="用户管理", business_type=BusinessType.INSERT)
- @JsonSerializer()
- def system_create_user(dto: SysUser):
- '''
- 新增用户
- '''
- if SysUserService.check_user_name_unique(dto) \
- == UserConstants.NOT_UNIQUE:
- return AjaxResponse.from_error(
- f"新增用户'{dto.user_name}'失败,登录账号已存在"
- )
- elif dto.phonenumber \
- and SysUserService.check_phone_unique(dto) \
- == UserConstants.NOT_UNIQUE:
- return AjaxResponse.from_error(
- f"新增用户'{dto.phonenumber}'失败,手机号码已存在"
- )
- elif dto.email \
- and SysUserService.check_email_unique(dto) \
- == UserConstants.NOT_UNIQUE:
- return AjaxResponse.from_error(
- f"新增用户'{dto.email}'失败,邮箱已存在"
- )
- dto.create_by_user(SecurityUtil.get_username())
- flag = SysUserService.insert_user(dto)
- ajax_response = AjaxResponse.from_success() if flag else AjaxResponse.from_error()
- return ajax_response
- @reg.api.route("/system/user", methods=["PUT"])
- @BodyValidator()
- @PreAuthorize(HasPerm("system:user:edit"))
- @Log(title="用户管理", business_type=BusinessType.UPDATE)
- @JsonSerializer()
- def system_update_user(dto: SysUser):
- '''
- 修改用户
- '''
- SysUserService.check_user_allowed(dto)
- SysUserService.check_user_data_scope(dto.user_id)
- if dto.phonenumber \
- and SysUserService.check_phone_unique(dto) \
- == UserConstants.NOT_UNIQUE:
- return AjaxResponse.from_error(
- f"新增用户'{dto.phonenumber}'失败,手机号码已存在"
- )
- elif dto.email \
- and SysUserService.check_email_unique(dto) \
- == UserConstants.NOT_UNIQUE:
- return AjaxResponse.from_error(
- f"新增用户'{dto.email}'失败,邮箱已存在"
- )
- dto.update_by_user(SecurityUtil.get_username())
- flag = SysUserService.update_user(dto)
- ajax_response = AjaxResponse.from_success() if flag else AjaxResponse.from_error()
- return ajax_response
- @reg.api.route("/system/user/<ids>", methods=["DELETE"])
- @PathValidator()
- @PreAuthorize(HasPerm("system:user:remove"))
- @Log(title="用户管理", business_type=BusinessType.DELETE)
- @JsonSerializer()
- def system_delete_users(
- ids: Annotated[List[int], BeforeValidator(ids_to_list)]
- ):
- '''
- 删除用户
- '''
- if SecurityUtil.get_user_id() in ids:
- return AjaxResponse.from_error("当前用户不能删除")
- flag = SysUserService.delete_users_by_ids(ids)
- ajax_response = AjaxResponse.from_success() if flag > 0 else AjaxResponse.from_error()
- return ajax_response
- @reg.api.route("/system/user/export", methods=["POST"])
- @FileDownloadValidator()
- @PreAuthorize(HasPerm("system:user:export"))
- @Log(title="用户管理", business_type=BusinessType.EXPORT)
- @BaseSerializer()
- def system_user_export(dto: SysUser):
- '''
- 导出用户数据
- '''
- rows = SysUserService.select_user_list(dto)
- excel_util = ExcelUtil(SysUser)
- return excel_util.export_response(rows, "用户数据")
- @reg.api.route("/system/user/importData", methods=["POST"])
- @FileUploadValidator()
- @PreAuthorize(HasPerm("system:user:import"))
- @Log(title="用户管理", business_type=BusinessType.IMPORT)
- @JsonSerializer()
- def system_user_importdata(
- file: List[FileStorage],
- update_support: Annotated[bool, BeforeValidator(lambda x: x != "0")]
- ):
- '''
- 导入用户模板
- '''
- file = file[0]
- excel_util = ExcelUtil(SysUser)
- datas = excel_util.import_file(file, sheetname="用户数据")
- msg = SysUserService.import_user(datas, update_support)
- return AjaxResponse.from_success(msg=msg)
- @reg.api.route("/system/user/importTemplate", methods=["POST"])
- @login_required
- @BaseSerializer()
- def system_user_importtemplate():
- '''
- 导出模板
- '''
- excel_util = ExcelUtil(SysUser)
- return excel_util.import_template_response(sheetname="用户数据")
- @reg.api.route("/system/user/resetPwd", methods=["PUT"])
- @BodyValidator()
- @PreAuthorize(HasPerm("system:user:resetPwd"))
- @Log(title="用户管理", business_type=BusinessType.UPDATE)
- @JsonSerializer()
- def system_update_user_resetpwd(dto: SysUser):
- '''
- 重置密码
- '''
- SysUserService.check_user_allowed(dto)
- SysUserService.check_user_data_scope(dto.user_id)
- dto.password = SecurityUtil.encrypt_password(dto.password)
- dto.update_by_user(SecurityUtil.get_username())
- flag = SysUserService.reset_pwd(dto)
- ajax_response = AjaxResponse.from_success() if flag else AjaxResponse.from_error()
- return ajax_response
- @reg.api.route("/system/user/changeStatus", methods=["PUT"])
- @BodyValidator()
- @PreAuthorize(HasPerm("system:user:edit"))
- @Log(title="用户管理", business_type=BusinessType.UPDATE)
- @JsonSerializer()
- def system_update_user_changestatus(dto: SysUser):
- '''
- 修改用户状态
- '''
- SysUserService.check_user_allowed(dto)
- SysUserService.check_user_data_scope(dto.user_id)
- dto.update_by_user(SecurityUtil.get_username())
- flag = SysUserService.update_user_status(dto)
- ajax_response = AjaxResponse.from_success() if flag else AjaxResponse.from_error()
- return ajax_response
- @reg.api.route("/system/user/authRole/<int:id>", methods=["GET"])
- @PathValidator()
- @PreAuthorize(HasPerm("system:user:query"))
- @JsonSerializer()
- def system_get_user_authrole(id: int):
- '''
- 获取用户授权角色
- '''
- sysuser: SysUser = SysUserService.select_user_by_id(id)
- roles: List[SysRole] = SysRoleService.select_role_list_by_user_id(id)
- if not sysuser.is_admin():
- roles = [role for role in roles if not role.is_admin()]
- ajax_response = AjaxResponse.from_success() if sysuser else AjaxResponse.from_error()
- setattr(ajax_response, "user", sysuser)
- setattr(ajax_response, "roles", roles)
- return ajax_response
- @reg.api.route("/system/user/authRole", methods=["PUT"])
- @PreAuthorize(HasPerm("system:user:edit"))
- @Log(title="用户管理", business_type=BusinessType.GRANT)
- @JsonSerializer()
- def system_update_user_authrole():
- '''
- 授权用户角色
- '''
- user_id, role_ids = _extract_auth_role_params()
- if user_id is None:
- return AjaxResponse.from_error("userId参数不能为空")
- if role_ids is None:
- return AjaxResponse.from_error("roleIds参数格式错误")
- SysUserService.check_user_data_scope(user_id)
- SysUserService.update_user_roles(user_id, role_ids)
- return AjaxResponse.from_success()
- def _extract_auth_role_params() -> tuple[int | None, List[int] | None]:
- """
- 兼容 query/json/form 三种提交方式
- """
- payload = request.get_json(silent=True) if request.is_json else None
- data = payload if isinstance(payload, dict) else {}
- user_id = data.get("user_id") or data.get("userId")
- role_ids_raw = data.get("role_ids") or data.get("roleIds")
- if user_id is None:
- user_id = request.args.get("userId") or request.form.get("userId")
- if role_ids_raw is None:
- role_ids_list = request.args.getlist("roleIds")
- if not role_ids_list and request.form:
- role_ids_list = request.form.getlist("roleIds")
- if role_ids_list:
- role_ids_raw = role_ids_list
- else:
- role_ids_raw = request.args.get("roleIds") or request.form.get("roleIds")
- try:
- user_id_int = int(user_id) if user_id is not None else None
- except (TypeError, ValueError):
- return None, None
- try:
- role_ids_list = _normalize_role_ids(role_ids_raw)
- except ValueError:
- return user_id_int, None
- return user_id_int, role_ids_list
- def _normalize_role_ids(raw_value) -> List[int]:
- """
- 将不同格式的roleIds转换为整数列表
- """
- if raw_value is None:
- return []
- if isinstance(raw_value, list):
- normalized: List[int] = []
- for item in raw_value:
- normalized.extend(_normalize_role_ids(item))
- return normalized
- if isinstance(raw_value, (int, float)):
- return [int(raw_value)]
- if isinstance(raw_value, str):
- raw_value = raw_value.strip()
- if not raw_value:
- return []
- parts = [part.strip() for part in raw_value.split(",") if part.strip()]
- return [int(part) for part in parts]
- raise ValueError("unsupported roleIds format")
|