| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- # -*- coding: utf-8 -*-
- # @Author : {{ table.function_author }}
- # @FileName: {{ underscore(table.class_name) }}_mapper.py
- # @Time : {{ datetime }}
- from typing import List
- from datetime import datetime
- from flask import g
- from sqlalchemy import select, update, delete
- from ruoyi_admin.ext import db
- from {{ get_import_path(table.package_name, table.module_name, 'domain') }} import {{ class_name_pascal }}
- from {{ get_import_path(table.package_name, table.module_name, 'domain', table.class_name) }} import {{ class_name_pascal }}Po
- class {{ class_name_pascal }}Mapper:
- """{{ table.function_name }}Mapper"""
- @staticmethod
- def select_{{ underscore(table.class_name) }}_list({{ underscore(table.business_name) }}: {{ class_name_pascal }}) -> List[{{ class_name_pascal }}]:
- """
- 查询{{ table.function_name }}列表
- Args:
- {{ underscore(table.business_name) }} ({{ table.class_name }}): {{ table.function_name }}对象
- Returns:
- List[{{ table.class_name }}]: {{ table.function_name }}列表
- """
- try:
- # 构建查询条件
- stmt = select({{ class_name_pascal }}Po)
- {% for column in table.columns %}
- {% if column.is_query %}
- {%- set field_name = underscore(column.java_field) %}
- {%- if column.query_type == 'EQ' %}
- if {{ underscore(table.business_name) }}.{{ field_name }} is not None:
- stmt = stmt.where({{ class_name_pascal }}Po.{{ field_name }} == {{ underscore(table.business_name) }}.{{ field_name }})
- {%- elif column.query_type == 'NE' %}
- if {{ underscore(table.business_name) }}.{{ field_name }} is not None:
- stmt = stmt.where({{ class_name_pascal }}Po.{{ field_name }} != {{ underscore(table.business_name) }}.{{ field_name }})
- {%- elif column.query_type == 'GT' %}
- if {{ underscore(table.business_name) }}.{{ field_name }} is not None:
- stmt = stmt.where({{ class_name_pascal }}Po.{{ field_name }} > {{ underscore(table.business_name) }}.{{ field_name }})
- {%- elif column.query_type == 'GTE' %}
- if {{ underscore(table.business_name) }}.{{ field_name }} is not None:
- stmt = stmt.where({{ class_name_pascal }}Po.{{ field_name }} >= {{ underscore(table.business_name) }}.{{ field_name }})
- {%- elif column.query_type == 'LT' %}
- if {{ underscore(table.business_name) }}.{{ field_name }} is not None:
- stmt = stmt.where({{ class_name_pascal }}Po.{{ field_name }} < {{ underscore(table.business_name) }}.{{ field_name }})
- {%- elif column.query_type == 'LTE' %}
- if {{ underscore(table.business_name) }}.{{ field_name }} is not None:
- stmt = stmt.where({{ class_name_pascal }}Po.{{ field_name }} <= {{ underscore(table.business_name) }}.{{ field_name }})
- {%- elif column.query_type == 'LIKE' %}
- if {{ underscore(table.business_name) }}.{{ field_name }}:
- stmt = stmt.where({{ class_name_pascal }}Po.{{ field_name }}.like("%" + str({{ underscore(table.business_name) }}.{{ field_name }}) + "%"))
- {%- elif column.query_type == 'BETWEEN' %}
- _params = getattr({{ underscore(table.business_name) }}, "params", {}) or {}
- begin_val = _params.get("begin{{ capitalize_first(column.java_field) }}")
- end_val = _params.get("end{{ capitalize_first(column.java_field) }}")
- if begin_val is not None:
- stmt = stmt.where({{ class_name_pascal }}Po.{{ field_name }} >= begin_val)
- if end_val is not None:
- stmt = stmt.where({{ class_name_pascal }}Po.{{ field_name }} <= end_val)
- {%- endif %}
- {% endif %}
- {% endfor %}
- if "criterian_meta" in g and g.criterian_meta.page:
- g.criterian_meta.page.stmt = stmt
- result = db.session.execute(stmt).scalars().all()
- return [{{ class_name_pascal }}.model_validate(item) for item in result] if result else []
- except Exception as e:
- print(f"查询{{ table.function_name }}列表出错: {e}")
- return []
- {% if table.pk_column %}
- @staticmethod
- def select_{{ underscore(table.class_name) }}_by_id({{ underscore(table.pk_column.java_field) }}: int) -> {{ class_name_pascal }}:
- """
- 根据ID查询{{ table.function_name }}
- Args:
- {{ underscore(table.pk_column.java_field) }} (int): {{ table.pk_column.column_comment }}
- Returns:
- {{ table.class_name }}: {{ table.function_name }}对象
- """
- try:
- result = db.session.get({{ class_name_pascal }}Po, {{ underscore(table.pk_column.java_field) }})
- return {{ class_name_pascal }}.model_validate(result) if result else None
- except Exception as e:
- print(f"根据ID查询{{ table.function_name }}出错: {e}")
- return None
- {% endif %}
- @staticmethod
- def insert_{{ underscore(table.class_name) }}({{ underscore(table.business_name) }}: {{ class_name_pascal }}) -> int:
- """
- 新增{{ table.function_name }}
- Args:
- {{ underscore(table.business_name) }} ({{ table.class_name }}): {{ table.function_name }}对象
- Returns:
- int: 插入的记录数
- """
- try:
- now = datetime.now()
- new_po = {{ class_name_pascal }}Po()
- {%- for column in table.columns %}
- {%- set attr = underscore(column.java_field) %}
- {%- if column.column_name in ['create_time', 'update_time'] %}
- new_po.{{ attr }} = {{ underscore(table.business_name) }}.{{ attr }} or now
- {%- else %}
- new_po.{{ attr }} = {{ underscore(table.business_name) }}.{{ attr }}
- {%- endif %}
- {%- endfor %}
- db.session.add(new_po)
- db.session.commit()
- {%- if table.pk_column %}
- {{ underscore(table.business_name) }}.{{ underscore(table.pk_column.java_field) }} = new_po.{{ underscore(table.pk_column.java_field) }}
- {%- endif %}
- return 1
- except Exception as e:
- db.session.rollback()
- print(f"新增{{ table.function_name }}出错: {e}")
- return 0
- {% if table.pk_column %}
- @staticmethod
- def update_{{ underscore(table.class_name) }}({{ underscore(table.business_name) }}: {{ class_name_pascal }}) -> int:
- """
- 修改{{ table.function_name }}
- Args:
- {{ underscore(table.business_name) }} ({{ table.class_name }}): {{ table.function_name }}对象
- Returns:
- int: 更新的记录数
- """
- try:
- {% if table.pk_column %}
- existing = db.session.get({{ class_name_pascal }}Po, {{ underscore(table.business_name) }}.{{ underscore(table.pk_column.java_field) }})
- if not existing:
- return 0
- now = datetime.now()
- {%- for column in table.columns %}
- {%- set attr = underscore(column.java_field) %}
- {%- if column.is_pk == '1' %}
- # 主键不参与更新
- {%- elif column.column_name == 'update_time' %}
- existing.{{ attr }} = {{ underscore(table.business_name) }}.{{ attr }} or now
- {%- else %}
- existing.{{ attr }} = {{ underscore(table.business_name) }}.{{ attr }}
- {%- endif %}
- {%- endfor %}
- db.session.commit()
- return 1
- {% else %}
- db.session.merge({{ underscore(table.business_name) }})
- db.session.commit()
- return 1
- {% endif %}
- except Exception as e:
- db.session.rollback()
- print(f"修改{{ table.function_name }}出错: {e}")
- return 0
- @staticmethod
- def delete_{{ underscore(table.class_name) }}_by_ids(ids: List[int]) -> int:
- """
- 批量删除{{ table.function_name }}
- Args:
- ids (List[int]): ID列表
- Returns:
- int: 删除的记录数
- """
- try:
- stmt = delete({{ class_name_pascal }}Po).where({{ class_name_pascal }}Po.{{ underscore(table.pk_column.java_field) }}.in_(ids))
- result = db.session.execute(stmt)
- db.session.commit()
- return result.rowcount
- except Exception as e:
- db.session.rollback()
- print(f"批量删除{{ table.function_name }}出错: {e}")
- return 0
- {% endif %}
|