# -*- coding: utf-8 -*- # @Author : {{ table.function_author }} # @FileName: {{ underscore(table.class_name) }}_mapper.py # @Time : {{ datetime }} from typing import List from datetime import datetime from flask import g from sqlalchemy import select, update, delete from ruoyi_admin.ext import db from {{ get_import_path(table.package_name, table.module_name, 'domain') }} import {{ table.class_name }} from {{ get_import_path(table.package_name, table.module_name, 'domain', table.class_name) }} import {{ underscore(table.class_name) }}_po class {{ underscore(table.class_name) }}_mapper: """{{ table.function_name }}Mapper""" @staticmethod def select_{{ underscore(table.class_name) }}_list({{ underscore(table.business_name) }}: {{ table.class_name }}) -> List[{{ table.class_name }}]: """ 查询{{ table.function_name }}列表 Args: {{ underscore(table.business_name) }} ({{ table.class_name }}): {{ table.function_name }}对象 Returns: List[{{ table.class_name }}]: {{ table.function_name }}列表 """ try: # 构建查询条件 stmt = select({{ underscore(table.class_name) }}_po) {% for column in table.columns %} {% if column.is_query %} {%- set field_name = underscore(column.java_field) %} {%- if column.query_type == 'EQ' %} if {{ underscore(table.business_name) }}.{{ field_name }} is not None: stmt = stmt.where({{ underscore(table.class_name) }}_po.{{ field_name }} == {{ underscore(table.business_name) }}.{{ field_name }}) {%- elif column.query_type == 'NE' %} if {{ underscore(table.business_name) }}.{{ field_name }} is not None: stmt = stmt.where({{ underscore(table.class_name) }}_po.{{ field_name }} != {{ underscore(table.business_name) }}.{{ field_name }}) {%- elif column.query_type == 'GT' %} if {{ underscore(table.business_name) }}.{{ field_name }} is not None: stmt = stmt.where({{ underscore(table.class_name) }}_po.{{ field_name }} > {{ underscore(table.business_name) }}.{{ field_name }}) {%- elif column.query_type == 'GTE' %} if {{ underscore(table.business_name) }}.{{ field_name }} is not None: stmt = stmt.where({{ underscore(table.class_name) }}_po.{{ field_name }} >= {{ underscore(table.business_name) }}.{{ field_name }}) {%- elif column.query_type == 'LT' %} if {{ underscore(table.business_name) }}.{{ field_name }} is not None: stmt = stmt.where({{ underscore(table.class_name) }}_po.{{ field_name }} < {{ underscore(table.business_name) }}.{{ field_name }}) {%- elif column.query_type == 'LTE' %} if {{ underscore(table.business_name) }}.{{ field_name }} is not None: stmt = stmt.where({{ underscore(table.class_name) }}_po.{{ field_name }} <= {{ underscore(table.business_name) }}.{{ field_name }}) {%- elif column.query_type == 'LIKE' %} if {{ underscore(table.business_name) }}.{{ field_name }}: stmt = stmt.where({{ underscore(table.class_name) }}_po.{{ field_name }}.like("%" + str({{ underscore(table.business_name) }}.{{ field_name }}) + "%")) {%- elif column.query_type == 'BETWEEN' %} _params = getattr({{ underscore(table.business_name) }}, "params", {}) or {} begin_val = _params.get("begin{{ capitalize_first(column.java_field) }}") end_val = _params.get("end{{ capitalize_first(column.java_field) }}") if begin_val is not None: stmt = stmt.where({{ underscore(table.class_name) }}_po.{{ field_name }} >= begin_val) if end_val is not None: stmt = stmt.where({{ underscore(table.class_name) }}_po.{{ field_name }} <= end_val) {%- endif %} {% endif %} {% endfor %} if "criterian_meta" in g and g.criterian_meta.page: g.criterian_meta.page.stmt = stmt result = db.session.execute(stmt).scalars().all() return [{{ table.class_name }}.model_validate(item) for item in result] if result else [] except Exception as e: print(f"查询{{ table.function_name }}列表出错: {e}") return [] {% if table.pk_column %} @staticmethod def select_{{ underscore(table.class_name) }}_by_id({{ underscore(table.pk_column.java_field) }}: int) -> {{ table.class_name }}: """ 根据ID查询{{ table.function_name }} Args: {{ underscore(table.pk_column.java_field) }} (int): {{ table.pk_column.column_comment }} Returns: {{ table.class_name }}: {{ table.function_name }}对象 """ try: result = db.session.get({{ underscore(table.class_name) }}_po, {{ underscore(table.pk_column.java_field) }}) return {{ table.class_name }}.model_validate(result) if result else None except Exception as e: print(f"根据ID查询{{ table.function_name }}出错: {e}") return None {% endif %} @staticmethod def insert_{{ underscore(table.class_name) }}({{ underscore(table.business_name) }}: {{ table.class_name }}) -> int: """ 新增{{ table.function_name }} Args: {{ underscore(table.business_name) }} ({{ table.class_name }}): {{ table.function_name }}对象 Returns: int: 插入的记录数 """ try: now = datetime.now() new_po = {{ underscore(table.class_name) }}_po() {%- for column in table.columns %} {%- set attr = underscore(column.java_field) %} {%- if column.column_name in ['create_time', 'update_time'] %} new_po.{{ attr }} = {{ underscore(table.business_name) }}.{{ attr }} or now {%- else %} new_po.{{ attr }} = {{ underscore(table.business_name) }}.{{ attr }} {%- endif %} {%- endfor %} db.session.add(new_po) db.session.commit() {%- if table.pk_column %} {{ underscore(table.business_name) }}.{{ underscore(table.pk_column.java_field) }} = new_po.{{ underscore(table.pk_column.java_field) }} {%- endif %} return 1 except Exception as e: db.session.rollback() print(f"新增{{ table.function_name }}出错: {e}") return 0 {% if table.pk_column %} @staticmethod def update_{{ underscore(table.class_name) }}({{ underscore(table.business_name) }}: {{ table.class_name }}) -> int: """ 修改{{ table.function_name }} Args: {{ underscore(table.business_name) }} ({{ table.class_name }}): {{ table.function_name }}对象 Returns: int: 更新的记录数 """ try: {% if table.pk_column %} existing = db.session.get({{ underscore(table.class_name) }}_po, {{ underscore(table.business_name) }}.{{ underscore(table.pk_column.java_field) }}) if not existing: return 0 now = datetime.now() {%- for column in table.columns %} {%- set attr = underscore(column.java_field) %} {%- if column.is_pk == '1' %} # 主键不参与更新 {%- elif column.column_name == 'update_time' %} existing.{{ attr }} = {{ underscore(table.business_name) }}.{{ attr }} or now {%- else %} existing.{{ attr }} = {{ underscore(table.business_name) }}.{{ attr }} {%- endif %} {%- endfor %} db.session.commit() return 1 {% else %} db.session.merge({{ underscore(table.business_name) }}) db.session.commit() return 1 {% endif %} except Exception as e: db.session.rollback() print(f"修改{{ table.function_name }}出错: {e}") return 0 @staticmethod def delete_{{ underscore(table.class_name) }}_by_ids(ids: List[int]) -> int: """ 批量删除{{ table.function_name }} Args: ids (List[int]): ID列表 Returns: int: 删除的记录数 """ try: stmt = delete({{ underscore(table.class_name) }}_po).where({{ underscore(table.class_name) }}_po.{{ underscore(table.pk_column.java_field) }}.in_(ids)) result = db.session.execute(stmt) db.session.commit() return result.rowcount except Exception as e: db.session.rollback() print(f"批量删除{{ table.function_name }}出错: {e}") return 0 {% endif %}