# -*- coding: utf-8 -*- # @Author : {{ table.function_author }} # @FileName: {{ underscore(table.class_name) }}_mapper.py # @Time : {{ datetime }} from typing import List from datetime import datetime from sqlalchemy import select, update, delete, func from ruoyi_admin.ext import db from {{ get_import_path(table.package_name, table.module_name, 'domain') }} import {{ table.class_name }} class {{ table.class_name }}Mapper: """{{ table.function_name }}Mapper""" @staticmethod def select_list({{ table.business_name }}: {{ table.class_name }}) -> List[{{ table.class_name }}]: """ 查询{{ table.function_name }}列表 Args: {{ table.business_name }} ({{ table.class_name }}): {{ table.function_name }}对象 Returns: List[{{ table.class_name }}]: {{ table.function_name }}列表 """ try: # 构建查询条件 stmt = select({{ table.class_name }}) {% for column in table.columns %} {% if column.is_query and column.query_type == 'EQ' %} if {{ table.business_name }}.{{ underscore(column.java_field) }} is not None: stmt = stmt.where({{ table.class_name }}.{{ underscore(column.java_field) }} == {{ table.business_name }}.{{ underscore(column.java_field) }}) {% elif column.is_query and column.query_type == 'LIKE' %} if {{ table.business_name }}.{{ underscore(column.java_field) }}: stmt = stmt.where({{ table.class_name }}.{{ underscore(column.java_field) }}.like("%" + str({{ table.business_name }}.{{ underscore(column.java_field) }}) + "%")) {% endif %} {% endfor %} # 添加分页条件 if {{ table.business_name }}.page_num and {{ table.business_name }}.page_size: offset = ({{ table.business_name }}.page_num - 1) * {{ table.business_name }}.page_size stmt = stmt.offset(offset).limit({{ table.business_name }}.page_size) result = db.session.execute(stmt).scalars().all() return list(result) if result else [] except Exception as e: print(f"查询{{ table.function_name }}列表出错: {e}") return [] @staticmethod def select_count({{ table.business_name }}: {{ table.class_name }}) -> int: """ 查询{{ table.function_name }}总数 Args: {{ table.business_name }} ({{ table.class_name }}): {{ table.function_name }}对象 Returns: int: {{ table.function_name }}总数 """ try: stmt = select(func.count({{ table.class_name }}.{{ underscore(table.pk_column.java_field) if table.pk_column }})) {% for column in table.columns %} {% if column.is_query and column.query_type == 'EQ' %} if {{ table.business_name }}.{{ underscore(column.java_field) }} is not None: stmt = stmt.where({{ table.class_name }}.{{ underscore(column.java_field) }} == {{ table.business_name }}.{{ underscore(column.java_field) }}) {% elif column.is_query and column.query_type == 'LIKE' %} if {{ table.business_name }}.{{ underscore(column.java_field) }}: stmt = stmt.where({{ table.class_name }}.{{ underscore(column.java_field) }}.like("%" + str({{ table.business_name }}.{{ underscore(column.java_field) }}) + "%")) {% endif %} {% endfor %} result = db.session.execute(stmt).scalar() return result if result else 0 except Exception as e: print(f"查询{{ table.function_name }}总数出错: {e}") return 0 {% if table.pk_column %} @staticmethod def select_by_id({{ underscore(table.pk_column.java_field) }}: int) -> {{ table.class_name }}: """ 根据ID查询{{ table.function_name }} Args: {{ underscore(table.pk_column.java_field) }} (int): {{ table.pk_column.column_comment }} Returns: {{ table.class_name }}: {{ table.function_name }}对象 """ try: stmt = select({{ table.class_name }}).where({{ table.class_name }}.{{ underscore(table.pk_column.java_field) }} == {{ underscore(table.pk_column.java_field) }}) result = db.session.execute(stmt).scalar_one_or_none() return result except Exception as e: print(f"根据ID查询{{ table.function_name }}出错: {e}") return None {% endif %} @staticmethod def insert({{ table.business_name }}: {{ table.class_name }}) -> int: """ 新增{{ table.function_name }} Args: {{ table.business_name }} ({{ table.class_name }}): {{ table.function_name }}对象 Returns: int: 插入的记录数 """ try: # 设置创建时间和更新时间 now = datetime.now() {{ table.business_name }}.create_time = now {{ table.business_name }}.update_time = now db.session.add({{ table.business_name }}) db.session.commit() return 1 except Exception as e: db.session.rollback() print(f"新增{{ table.function_name }}出错: {e}") return 0 {% if table.pk_column %} @staticmethod def update({{ table.business_name }}: {{ table.class_name }}) -> int: """ 修改{{ table.function_name }} Args: {{ table.business_name }} ({{ table.class_name }}): {{ table.function_name }}对象 Returns: int: 更新的记录数 """ try: # 设置更新时间 {{ table.business_name }}.update_time = datetime.now() # 使用ORM方式更新数据 db.session.merge({{ table.business_name }}) db.session.commit() return 1 except Exception as e: db.session.rollback() print(f"修改{{ table.function_name }}出错: {e}") return 0 @staticmethod def delete_by_ids(ids: List[int]) -> int: """ 批量删除{{ table.function_name }} Args: ids (List[int]): ID列表 Returns: int: 删除的记录数 """ try: stmt = delete({{ table.class_name }}).where({{ table.class_name }}.{{ underscore(table.pk_column.java_field) }}.in_(ids)) result = db.session.execute(stmt) db.session.commit() return result.rowcount except Exception as e: db.session.rollback() print(f"批量删除{{ table.function_name }}出错: {e}") return 0 {% endif %}