|
|
@@ -3,8 +3,10 @@
|
|
|
|
|
|
from typing import List, Optional
|
|
|
|
|
|
+from flask import request
|
|
|
+
|
|
|
from flask_login import login_required
|
|
|
-from pydantic import BeforeValidator, Field
|
|
|
+from pydantic import BeforeValidator
|
|
|
from typing_extensions import Annotated
|
|
|
from werkzeug.datastructures import FileStorage
|
|
|
|
|
|
@@ -243,17 +245,78 @@ def system_get_user_authrole(id: int):
|
|
|
|
|
|
|
|
|
@reg.api.route("/system/user/authRole", methods=["PUT"])
|
|
|
-@BodyValidator()
|
|
|
@PreAuthorize(HasPerm("system:user:edit"))
|
|
|
@Log(title="用户管理", business_type=BusinessType.GRANT)
|
|
|
@JsonSerializer()
|
|
|
-def system_update_user_authrole(
|
|
|
- user_id: Annotated[int, Field(gt=0)],
|
|
|
- role_ids: Annotated[List[int], Field(default_factory=List)]
|
|
|
-):
|
|
|
+def system_update_user_authrole():
|
|
|
'''
|
|
|
授权用户角色
|
|
|
'''
|
|
|
+ user_id, role_ids = _extract_auth_role_params()
|
|
|
+ if user_id is None:
|
|
|
+ return AjaxResponse.from_error("userId参数不能为空")
|
|
|
+ if role_ids is None:
|
|
|
+ return AjaxResponse.from_error("roleIds参数格式错误")
|
|
|
SysUserService.check_user_data_scope(user_id)
|
|
|
SysUserService.update_user_roles(user_id, role_ids)
|
|
|
return AjaxResponse.from_success()
|
|
|
+
|
|
|
+
|
|
|
+def _extract_auth_role_params() -> tuple[int | None, List[int] | None]:
|
|
|
+ """
|
|
|
+ 兼容 query/json/form 三种提交方式
|
|
|
+ """
|
|
|
+ payload = request.get_json(silent=True) if request.is_json else None
|
|
|
+ data = payload if isinstance(payload, dict) else {}
|
|
|
+
|
|
|
+ user_id = data.get("user_id") or data.get("userId")
|
|
|
+ role_ids_raw = data.get("role_ids") or data.get("roleIds")
|
|
|
+
|
|
|
+ if user_id is None:
|
|
|
+ user_id = request.args.get("userId") or request.form.get("userId")
|
|
|
+ if role_ids_raw is None:
|
|
|
+ role_ids_list = request.args.getlist("roleIds")
|
|
|
+ if not role_ids_list and request.form:
|
|
|
+ role_ids_list = request.form.getlist("roleIds")
|
|
|
+ if role_ids_list:
|
|
|
+ role_ids_raw = role_ids_list
|
|
|
+ else:
|
|
|
+ role_ids_raw = request.args.get("roleIds") or request.form.get("roleIds")
|
|
|
+
|
|
|
+ try:
|
|
|
+ user_id_int = int(user_id) if user_id is not None else None
|
|
|
+ except (TypeError, ValueError):
|
|
|
+ return None, None
|
|
|
+
|
|
|
+ try:
|
|
|
+ role_ids_list = _normalize_role_ids(role_ids_raw)
|
|
|
+ except ValueError:
|
|
|
+ return user_id_int, None
|
|
|
+
|
|
|
+ return user_id_int, role_ids_list
|
|
|
+
|
|
|
+
|
|
|
+def _normalize_role_ids(raw_value) -> List[int]:
|
|
|
+ """
|
|
|
+ 将不同格式的roleIds转换为整数列表
|
|
|
+ """
|
|
|
+ if raw_value is None:
|
|
|
+ return []
|
|
|
+
|
|
|
+ if isinstance(raw_value, list):
|
|
|
+ normalized: List[int] = []
|
|
|
+ for item in raw_value:
|
|
|
+ normalized.extend(_normalize_role_ids(item))
|
|
|
+ return normalized
|
|
|
+
|
|
|
+ if isinstance(raw_value, (int, float)):
|
|
|
+ return [int(raw_value)]
|
|
|
+
|
|
|
+ if isinstance(raw_value, str):
|
|
|
+ raw_value = raw_value.strip()
|
|
|
+ if not raw_value:
|
|
|
+ return []
|
|
|
+ parts = [part.strip() for part in raw_value.split(",") if part.strip()]
|
|
|
+ return [int(part) for part in parts]
|
|
|
+
|
|
|
+ raise ValueError("unsupported roleIds format")
|