|
|
@@ -1,9 +1,9 @@
|
|
|
|
|
|
from abc import ABC, abstractmethod
|
|
|
from dataclasses import dataclass
|
|
|
-from typing import ClassVar, Dict
|
|
|
+from typing import ClassVar, Dict, Iterable, Set, Type
|
|
|
from flask import g, request
|
|
|
-from pydantic import BaseModel
|
|
|
+from pydantic import AliasChoices, AliasPath, BaseModel
|
|
|
from werkzeug.datastructures import ImmutableMultiDict
|
|
|
from werkzeug.exceptions import BadRequest,UnsupportedMediaType
|
|
|
|
|
|
@@ -89,20 +89,65 @@ class QueryReqParser(BaseReqParser):
|
|
|
page = PageModel.model_validate(data,context=self.context)
|
|
|
if page.model_fields_set:
|
|
|
self.criterian_meta.page = page
|
|
|
+ self._remove_model_aliases(data, PageModel)
|
|
|
if self.context.is_sort:
|
|
|
sort = OrderModel.model_validate(data,context=self.context)
|
|
|
if sort.model_fields_set:
|
|
|
self.criterian_meta.sort = sort
|
|
|
+ self._remove_model_aliases(data, OrderModel)
|
|
|
if self.extra_model:
|
|
|
extra = self.extra_model.model_validate(data,context=self.context)
|
|
|
if extra.model_fields_set:
|
|
|
self.criterian_meta.extra = extra
|
|
|
+ self._remove_model_aliases(data, self.extra_model)
|
|
|
return data
|
|
|
|
|
|
def cast_model(self, bo_model:BaseEntity) -> BaseModel:
|
|
|
data = self.data()
|
|
|
bo = bo_model.model_validate(data)
|
|
|
return bo
|
|
|
+
|
|
|
+ def _remove_model_aliases(
|
|
|
+ self,
|
|
|
+ data: Dict[str, str],
|
|
|
+ model_cls: Type[BaseModel]
|
|
|
+ ) -> None:
|
|
|
+ """
|
|
|
+ 删除已经用于解析的模型字段别名,避免后续模型校验时报额外字段错误
|
|
|
+ """
|
|
|
+ if not data:
|
|
|
+ return
|
|
|
+ for alias in self._collect_aliases(model_cls):
|
|
|
+ data.pop(alias, None)
|
|
|
+
|
|
|
+ def _collect_aliases(self, model_cls: Type[BaseModel]) -> Set[str]:
|
|
|
+ alias_set: Set[str] = set()
|
|
|
+ populate_by_name = getattr(model_cls, "model_config", {}).get(
|
|
|
+ "populate_by_name", False
|
|
|
+ )
|
|
|
+ for name, info in model_cls.model_fields.items():
|
|
|
+ alias_set.update(self._field_aliases(name, info, populate_by_name))
|
|
|
+ return alias_set
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _field_aliases(
|
|
|
+ name: str,
|
|
|
+ info,
|
|
|
+ populate_by_name: bool
|
|
|
+ ) -> Iterable[str]:
|
|
|
+ aliases: Set[str] = set()
|
|
|
+ if getattr(info, "alias", None):
|
|
|
+ aliases.add(info.alias)
|
|
|
+ validation_alias = getattr(info, "validation_alias", None)
|
|
|
+ if isinstance(validation_alias, str):
|
|
|
+ aliases.add(validation_alias)
|
|
|
+ elif isinstance(validation_alias, AliasChoices):
|
|
|
+ aliases.update(validation_alias.choices)
|
|
|
+ elif isinstance(validation_alias, AliasPath):
|
|
|
+ pass
|
|
|
+ if populate_by_name:
|
|
|
+ aliases.add(name)
|
|
|
+ return aliases
|
|
|
|
|
|
|
|
|
@dataclass
|