| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388 |
- # -*- coding: utf-8 -*-
- # @Author : YY
- # @FileName: __init__.py
- import json
- from typing import List, Optional, Tuple
- from datetime import datetime
- from ruoyi_common.utils import DateUtil, StringUtil
- from ruoyi_generator.domain.entity import GenTable, GenTableColumn
- from ruoyi_generator.mapper import gen_table_mapper, gen_table_column_mapper
- from ruoyi_generator.util import GenUtils, to_underscore
- from ruoyi_generator.config import GeneratorConfig
- class GenTableService:
- def select_gen_table_list(self, gen_table: GenTable) -> Tuple[List[GenTable], int]:
- """
- 查询代码生成表列表
- Args:
- gen_table (GenTable): 代码生成表对象
- Returns:
- Tuple[List[GenTable], int]: 代码生成表列表和总数
- """
- # 查询列表
- gen_tables = gen_table_mapper.select_list(gen_table)
- # 查询总数
- # 注意:这里需要根据实际需求实现总数查询逻辑
- total = len(gen_tables)
- return gen_tables, total
- def select_db_table_list(self, gen_table: GenTable) -> Tuple[List[GenTable], int]:
- """
- 查询数据库表列表
- Args:
- gen_table (GenTable): 代码生成表对象
- Returns:
- Tuple[List[GenTable], int]: 数据库表列表和总数
- """
- # 查询列表
- gen_tables = gen_table_mapper.select_db_list(gen_table)
- # 查询总数
- # 注意:这里需要根据实际需求实现总数查询逻辑
- total = len(gen_tables)
- return gen_tables, total
- def select_gen_table_by_id(self, table_id: int) -> Optional[GenTable]:
- """
- 根据ID查询代码生成表
- Args:
- table_id (int): 表ID
- Returns:
- Optional[GenTable]: 代码生成表对象
- """
- gen_table = gen_table_mapper.select_by_id(table_id)
- if gen_table:
- gen_table.columns = gen_table_column_mapper.select_list_by_table_id(table_id)
- return gen_table
- def select_gen_table_by_name(self, table_name: str) -> Optional[GenTable]:
- """
- 根据表名查询代码生成表
- Args:
- table_name (str): 表名
- Returns:
- Optional[GenTable]: 代码生成表对象
- """
- return gen_table_mapper.select_by_table_name(table_name)
- def delete_gen_table_by_id(self, table_id: int):
- """
- 根据ID删除代码生成表
- Args:
- table_id (int): 表ID
- """
- # 先删除字段信息
- gen_table_column_mapper.delete_by_table_id(table_id)
- # 再删除表信息
- gen_table_mapper.delete_by_id(table_id)
- def delete_gen_table_by_ids(self, table_ids: List[int]):
- """
- 批量删除代码生成表
- Args:
- table_ids (List[int]): 表ID列表
- """
- for table_id in table_ids:
- # 先删除字段信息
- gen_table_column_mapper.delete_by_table_id(table_id)
- # 再删除表信息
- gen_table_mapper.delete_by_id(table_id)
- def import_gen_table(self, table_names: List[str]) -> int:
- """
- 导入代码生成表
- Args:
- table_names (List[str]): 表名列表
- Returns:
- int: 导入的表数量
- """
- success_count = 0
- for table_name in table_names:
- # 检查表是否已存在
- if gen_table_mapper.exists_table(table_name):
- continue
- # 创建GenTable对象
- table = GenTable()
- # 设置默认值
- table.table_name = table_name
- clean_table_name = GenUtils.remove_table_prefix(
- table_name) if GeneratorConfig.auto_remove_pre else table_name
- table.class_name = to_underscore(clean_table_name)
- table.business_name = GenUtils.get_business_name(clean_table_name)
- table.package_name = GeneratorConfig.package_name
- # 使用配置中的 modelName 作为模块名
- table.module_name = GeneratorConfig.model_name
- # 获取表注释
- try:
- result = gen_table_mapper.select_db_table_comment_by_name(table_name)
- table.table_comment = result if result else table_name
- except:
- table.table_comment = table_name
- table.function_name = table.table_comment
- table.function_author = GeneratorConfig.author
- table.create_by = "admin"
- table.create_time = datetime.now()
- # 保存表信息
- table_id = gen_table_mapper.insert(table)
- if table_id > 0:
- # 保存列信息
- columns = gen_table_mapper.select_db_table_columns_by_name(table_name)
- if not columns:
- print(f"警告:表 {table_name} 没有字段信息,可能表不存在或查询失败")
- else:
- print(f"表 {table_name} 找到 {len(columns)} 个字段")
- for column in columns:
- try:
- column.table_id = table_id
- column.create_by = "admin"
- column.create_time = datetime.now()
- # 确保 java_field 已设置
- if not column.java_field:
- column.java_field = GenUtils.to_camel_case(column.column_name)
- gen_table_column_mapper.insert(column)
- except Exception as e:
- print(f"插入字段 {column.column_name} 失败: {e}")
- continue
- success_count += 1
- return success_count
- def update_gen_table(self, gen_table: GenTable):
- """
- 更新代码生成表
- Args:
- gen_table (GenTable): 代码生成表对象
- """
- print("更新代码生成表:", gen_table.columns)
- # 获取列信息
- columns = gen_table.columns
- if columns:
- # 处理列信息
- for column in columns:
- print("处理列信息:", column)
- # 确保column是GenTableColumn对象而不是dict
- column_obj = None
- if isinstance(column, dict):
- # 直接使用字典数据创建对象,避免手动字段映射
- # 前端传过来的已经是正确的字符串格式,直接使用
- print("原始列数据:", column)
- # 使用 model_validate 方法处理别名字段
- column_obj = GenTableColumn.model_validate(column)
- print("处理后列对象:", column_obj.model_dump())
- else:
- column_obj = column
- # 设置表ID
- column_obj.table_id = gen_table.table_id
- column_obj.update_time = datetime.now()
- # 如果有column_id则更新,否则插入
- if hasattr(column_obj, 'column_id') and column_obj.column_id:
- # 更新列信息
- gen_table_column_mapper.update(column_obj)
- else:
- # 插入列信息
- column_obj.create_by = "admin"
- column_obj.create_time = datetime.now()
- gen_table_column_mapper.insert(column_obj)
- # 更新表信息
- gen_table.update_time = datetime.now()
- gen_table_mapper.update(gen_table)
- def synch_db(self, table_name: str):
- """
- 同步数据库表结构
- Args:
- table_name (str): 表名
- """
- try:
- # 查询表信息
- gen_table = gen_table_mapper.select_by_table_name(table_name)
- if not gen_table:
- raise Exception(f"表{table_name}不存在")
- # 查询数据库表列信息
- db_columns = gen_table_mapper.select_db_table_columns_by_name(table_name)
- # 查询代码生成表列信息
- gen_columns = gen_table_column_mapper.select_list_by_table_id(gen_table.table_id)
- # 处理新增和更新的列
- for db_column in db_columns:
- exist_column = None
- for gen_column in gen_columns:
- if db_column.column_name == gen_column.column_name:
- exist_column = gen_column
- break
- if exist_column:
- # 更新列信息
- exist_column.column_comment = db_column.column_comment
- exist_column.column_type = db_column.column_type
- exist_column.java_type = db_column.java_type
- exist_column.java_field = db_column.java_field
- exist_column.is_pk = db_column.is_pk
- exist_column.is_increment = db_column.is_increment
- exist_column.is_required = db_column.is_required
- exist_column.update_by = "admin"
- exist_column.update_time = datetime.now()
- gen_table_column_mapper.update(exist_column)
- else:
- # 新增列信息
- db_column.table_id = gen_table.table_id
- db_column.create_by = "admin"
- db_column.create_time = datetime.now()
- gen_table_column_mapper.insert(db_column)
- # 处理删除的列
- for gen_column in gen_columns:
- exist_column = False
- for db_column in db_columns:
- if gen_column.column_name == db_column.column_name:
- exist_column = True
- break
- if not exist_column:
- # 删除列信息
- gen_table_column_mapper.delete_by_id(gen_column.column_id)
-
- return True
- except Exception as e:
- print(f"同步数据库失败: {e}")
- raise e
- def generator_code(self, table_name: str) -> bytes:
- """
- 生成代码
- Args:
- table_name (str): 表名
- Returns:
- bytes: 生成的代码文件
- """
- # 查询表信息
- gen_table = gen_table_mapper.select_by_table_name(table_name)
- if not gen_table:
- raise Exception(f"表{table_name}不存在")
- # 查询列信息
- gen_table.columns = gen_table_column_mapper.select_list_by_table_id(gen_table.table_id)
-
- # 设置列的 list_index 属性
- GenUtils.set_column_list_index(gen_table)
-
- # 设置主键列
- pk_columns = [column for column in gen_table.columns if column.is_pk == '1']
- if pk_columns:
- gen_table.pk_column = pk_columns[0]
- else:
- gen_table.pk_column = None
- # 生成代码
- return GenUtils.generator_code(gen_table).getvalue()
- def batch_generator_code(self, table_names: List[str]) -> bytes:
- """
- 批量生成代码
- Args:
- table_names (List[str]): 表名列表
- Returns:
- bytes: 生成的代码文件
- """
- gen_tables = []
- for table_name in table_names:
- # 查询表信息
- gen_table = gen_table_mapper.select_by_table_name(table_name)
- if gen_table:
- # 查询列信息
- gen_table.columns = gen_table_column_mapper.select_list_by_table_id(gen_table.table_id)
- # 设置列的 list_index 属性
- GenUtils.set_column_list_index(gen_table)
- # 设置主键列
- pk_columns = [column for column in gen_table.columns if column.is_pk == '1']
- if pk_columns:
- gen_table.pk_column = pk_columns[0]
- else:
- gen_table.pk_column = None
- gen_tables.append(gen_table)
- # 生成代码
- return GenUtils.batch_generator_code(gen_tables).getvalue()
- def preview_code(self, table_id: int) -> dict:
- """
- 预览代码
- Args:
- table_id (int): 表ID
- Returns:
- dict: 预览代码
- """
- # 查询表信息
- gen_table = gen_table_mapper.select_by_id(table_id)
- if not gen_table:
- raise Exception(f"表ID{table_id}不存在")
- # 查询列信息
- gen_table.columns = gen_table_column_mapper.select_list_by_table_id(table_id)
-
- # 设置列的 list_index 属性
- GenUtils.set_column_list_index(gen_table)
-
- # 设置主键列
- pk_columns = [column for column in gen_table.columns if column.is_pk == '1']
- if pk_columns:
- gen_table.pk_column = pk_columns[0]
- else:
- gen_table.pk_column = None
- # 预览代码
- return GenUtils.preview_code(gen_table)
- def select_db_table_comment_by_name(self, table_name: str) -> Optional[str]:
- """
- 根据表名查询数据库表注释
- Args:
- table_name (str): 表名
- Returns:
- Optional[str]: 表注释
- """
- try:
- result = gen_table_mapper.session.execute(
- text(
- "SELECT table_comment FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = :table_name"),
- {"table_name": table_name}
- ).fetchone()
- return result[0] if result else None
- except Exception:
- return None
- # 实例化Service
- gen_table_service = GenTableService()
|