SpringSunYY 4 месяцев назад
Родитель
Сommit
7fc484156a

+ 12 - 0
ruoyi_common/base/model.py

@@ -251,6 +251,18 @@ class BaseEntity(BaseModel):
                     val = None
             else:
                 val = data.get(k)
+            
+            # 如果设置了字典类型,将字典值转换为标签(导出时显示标签)
+            if access.dict_type and val is not None:
+                try:
+                    from ruoyi_system.service.sys_dict_type import DictCacheUtil
+                    dict_label = DictCacheUtil.get_dict_label(access.dict_type, str(val))
+                    if dict_label:
+                        val = dict_label
+                except Exception:
+                    # 如果字典转换失败,使用原值
+                    pass
+            
             access.val = val
             yield k,access
     

+ 4 - 2
ruoyi_common/base/schema_excel.py

@@ -22,7 +22,8 @@ def ExcelField(
     background_color:str='FFFFFFFF',
     color:str='FF000000',
     align:str='left',
-    action:Literal['import', 'export', 'both']='both'
+    action:Literal['import', 'export', 'both']='both',
+    dict_type:str=''
 ):
     excel_access = ExcelAccess(
         name=name,
@@ -39,7 +40,8 @@ def ExcelField(
         background_color=background_color,
         color=color,
         align=align,
-        action=action
+        action=action,
+        dict_type=dict_type
     )
     return Field(excel_access=excel_access)
 

+ 13 - 0
ruoyi_common/utils/base.py

@@ -1394,6 +1394,19 @@ class ExcelUtil:
         meta = column_meta.get(header)
         if not meta:
             return value
+        
+        access = meta.get("access")
+        # 如果设置了字典类型,将标签转换为字典值(导入时将标签转换为值)
+        if access and access.dict_type and value not in (None, ""):
+            try:
+                from ruoyi_system.service.sys_dict_type import DictCacheUtil
+                dict_value = DictCacheUtil.get_dict_value(access.dict_type, str(value))
+                if dict_value:
+                    value = dict_value
+            except Exception:
+                # 如果字典转换失败,使用原值
+                pass
+        
         target_type = meta.get("target_type")
         if target_type is None:
             target_type = self._resolve_field_type(meta["path"])

+ 124 - 8
ruoyi_system/service/sys_dict_type.py

@@ -3,9 +3,9 @@
 
 from itertools import groupby
 from types import NoneType
-from typing import List
+from typing import List, Optional
 from flask import Flask
-from pydantic_core import to_json
+from pydantic_core import to_json, from_json
 
 from ruoyi_common.constant import Constants, UserConstants
 from ruoyi_common.base.signal import app_completed
@@ -198,19 +198,135 @@ class DictCacheUtil:
     
     @classmethod
     def get_dict_value_sep(cls, dict_type, dict_label, sep):
-        pass
+        """
+        根据字典类型和标签(支持分隔符),获取字典值
+        
+        Args:
+            dict_type: 字典类型
+            dict_label: 字典标签(支持分隔符分隔的多个标签)
+            sep: 分隔符
+            
+        Returns:
+            str: 字典值(多个值用分隔符连接)
+        """
+        if not dict_type or not dict_label:
+            return dict_label
+        labels = dict_label.split(sep) if sep else [dict_label]
+        values = []
+        for label in labels:
+            value = cls.get_dict_value(dict_type, label.strip())
+            if value:
+                values.append(value)
+            else:
+                values.append(label.strip())
+        return sep.join(values)
     
     @classmethod
     def get_dict_label_sep(cls, dict_type, dict_value, sep):
-        pass
+        """
+        根据字典类型和值(支持分隔符),获取字典标签
+        
+        Args:
+            dict_type: 字典类型
+            dict_value: 字典值(支持分隔符分隔的多个值)
+            sep: 分隔符
+            
+        Returns:
+            str: 字典标签(多个标签用分隔符连接)
+        """
+        if not dict_type or not dict_value:
+            return dict_value
+        values = dict_value.split(sep) if sep else [dict_value]
+        labels = []
+        for value in values:
+            label = cls.get_dict_label(dict_type, value.strip())
+            if label:
+                labels.append(label)
+            else:
+                labels.append(value.strip())
+        return sep.join(labels)
     
     @classmethod
-    def get_dict_value(cls, dict_type, dict_label):
-        pass
+    def get_dict_value(cls, dict_type: str, dict_label: str) -> Optional[str]:
+        """
+        根据字典类型和标签,获取字典值
+        
+        Args:
+            dict_type: 字典类型
+            dict_label: 字典标签
+            
+        Returns:
+            Optional[str]: 字典值,如果找不到则返回None
+        """
+        if not dict_type or not dict_label:
+            return None
+        dict_data_list = cls._get_dict_data_list(dict_type)
+        if not dict_data_list:
+            return None
+        for dict_data in dict_data_list:
+            if dict_data.dict_label == dict_label:
+                return dict_data.dict_value
+        return None
     
     @classmethod
-    def get_dict_label(cls, dict_type, dict_value):
-        pass
+    def get_dict_label(cls, dict_type: str, dict_value: str) -> Optional[str]:
+        """
+        根据字典类型和值,获取字典标签
+        
+        Args:
+            dict_type: 字典类型
+            dict_value: 字典值
+            
+        Returns:
+            Optional[str]: 字典标签,如果找不到则返回None
+        """
+        if not dict_type or not dict_value:
+            return None
+        dict_data_list = cls._get_dict_data_list(dict_type)
+        if not dict_data_list:
+            return None
+        for dict_data in dict_data_list:
+            if dict_data.dict_value == str(dict_value):
+                return dict_data.dict_label
+        return None
+    
+    @classmethod
+    def _get_dict_data_list(cls, dict_type: str) -> List[SysDictData]:
+        """
+        从缓存中获取字典数据列表
+        
+        Args:
+            dict_type: 字典类型
+            
+        Returns:
+            List[SysDictData]: 字典数据列表
+        """
+        try:
+            cache_key = cls.get_cache_key(dict_type)
+            cached_data = redis_cache.get(cache_key)
+            if cached_data:
+                if isinstance(cached_data, bytes):
+                    cached_data = cached_data.decode('utf-8')
+                # 使用 from_json 解析列表,需要指定 List[SysDictData] 类型
+                import json
+                json_data = json.loads(cached_data)
+                if isinstance(json_data, list):
+                    dict_data_list = [SysDictData.model_validate(item) for item in json_data]
+                else:
+                    dict_data_list = [SysDictData.model_validate(json_data)] if json_data else []
+                return dict_data_list
+            # 如果缓存中没有,从数据库查询
+            dict_data_list = SysDictDataMapper.select_dict_data_by_type(dict_type)
+            if dict_data_list:
+                cls.set_dict_cache(dict_type, dict_data_list)
+            return dict_data_list
+        except Exception:
+            # 如果缓存读取失败,从数据库查询
+            try:
+                dict_data_list = SysDictDataMapper.select_dict_data_by_type(dict_type)
+                return dict_data_list
+            except Exception:
+                return []
     
     @classmethod
     def set_dict_cache(cls, key, dict_data_list:List[SysDictData]):