# -*- coding: utf-8 -*- # @Author : {{ table.function_author }} # @FileName: {{ underscore(table.class_name) }}_mapper.py # @Time : {{ datetime }} from typing import List from ruoyi_generator.domain.entity import {{ table.class_name }} from ruoyi_generator.ext import db from sqlalchemy import select, update, delete, text from datetime import datetime class {{ table.class_name }}Mapper: """{{ table.function_name }}Mapper""" @staticmethod def select_list({{ table.business_name }}: {{ table.class_name }}) -> List[{{ table.class_name }}]: """ 查询{{ table.function_name }}列表 Args: {{ table.business_name }} ({{ table.class_name }}): {{ table.function_name }}对象 Returns: List[{{ table.class_name }}]: {{ table.function_name }}列表 """ try: # 构建查询条件 stmt = select({{ table.class_name }}) # 添加分页条件 if {{ table.business_name }}.page_num and {{ table.business_name }}.page_size: offset = ({{ table.business_name }}.page_num - 1) * {{ table.business_name }}.page_size stmt = stmt.offset(offset).limit({{ table.business_name }}.page_size) result = db.session.execute(stmt).scalars().all() return list(result) if result else [] except Exception as e: print(f"查询{{ table.function_name }}列表出错: {e}") return [] @staticmethod def select_by_id({{ underscore(table.pk_column.java_field) if table.pk_column }}: int) -> {{ table.class_name }}: """ 根据ID查询{{ table.function_name }} Args: {{ underscore(table.pk_column.java_field) if table.pk_column }} (int): {{ table.pk_column.column_comment if table.pk_column }} Returns: {{ table.class_name }}: {{ table.function_name }}对象 """ try: stmt = select({{ table.class_name }}).where({{ table.class_name }}.{{ underscore(table.pk_column.java_field) if table.pk_column }} == {{ underscore(table.pk_column.java_field) if table.pk_column }}) result = db.session.execute(stmt).scalar_one_or_none() return result except Exception as e: print(f"根据ID查询{{ table.function_name }}出错: {e}") return None @staticmethod def insert({{ table.business_name }}: {{ table.class_name }}) -> int: """ 新增{{ table.function_name }} Args: {{ table.business_name }} ({{ table.class_name }}): {{ table.function_name }}对象 Returns: int: 插入的记录数 """ try: # 设置创建时间和更新时间 now = datetime.now() {{ table.business_name }}.create_time = now {{ table.business_name }}.update_time = now db.session.add({{ table.business_name }}) db.session.commit() return 1 except Exception as e: db.session.rollback() print(f"新增{{ table.function_name }}出错: {e}") return 0 @staticmethod def update({{ table.business_name }}: {{ table.class_name }}) -> int: """ 修改{{ table.function_name }} Args: {{ table.business_name }} ({{ table.class_name }}): {{ table.function_name }}对象 Returns: int: 更新的记录数 """ try: # 设置更新时间 {{ table.business_name }}.update_time = datetime.now() # 使用ORM方式更新数据 db.session.merge({{ table.business_name }}) db.session.commit() return 1 except Exception as e: db.session.rollback() print(f"修改{{ table.function_name }}出错: {e}") return 0 @staticmethod def delete_by_ids(ids: List[int]) -> int: """ 批量删除{{ table.function_name }} Args: ids (List[int]): ID列表 Returns: int: 删除的记录数 """ try: stmt = delete({{ table.class_name }}).where({{ table.class_name }}.{{ underscore(table.pk_column.java_field) if table.pk_column }}.in_(ids)) result = db.session.execute(stmt) db.session.commit() return result.rowcount except Exception as e: db.session.rollback() print(f"批量删除{{ table.function_name }}出错: {e}") return 0