| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- # -*- coding: utf-8 -*-
- # @Author : {{ table.function_author }}
- # @FileName: {{ underscore(table.class_name) }}_mapper.py
- # @Time : {{ datetime }}
- from typing import List
- from datetime import datetime
- from sqlalchemy import select, update, delete, func
- from ruoyi_admin.ext import db
- from {{ get_import_path(table.package_name, table.module_name, 'domain') }} import {{ table.class_name }}
- class {{ table.class_name }}Mapper:
- """{{ table.function_name }}Mapper"""
- @staticmethod
- def select_list({{ table.business_name }}: {{ table.class_name }}) -> List[{{ table.class_name }}]:
- """
- 查询{{ table.function_name }}列表
- Args:
- {{ table.business_name }} ({{ table.class_name }}): {{ table.function_name }}对象
- Returns:
- List[{{ table.class_name }}]: {{ table.function_name }}列表
- """
- try:
- # 构建查询条件
- stmt = select({{ table.class_name }})
- {% for column in table.columns %}
- {% if column.is_query and column.query_type == 'EQ' %}
- if {{ table.business_name }}.{{ underscore(column.java_field) }} is not None:
- stmt = stmt.where({{ table.class_name }}.{{ underscore(column.java_field) }} == {{ table.business_name }}.{{ underscore(column.java_field) }})
- {% elif column.is_query and column.query_type == 'LIKE' %}
- if {{ table.business_name }}.{{ underscore(column.java_field) }}:
- stmt = stmt.where({{ table.class_name }}.{{ underscore(column.java_field) }}.like("%" + str({{ table.business_name }}.{{ underscore(column.java_field) }}) + "%"))
- {% endif %}
- {% endfor %}
- # 添加分页条件
- if {{ table.business_name }}.page_num and {{ table.business_name }}.page_size:
- offset = ({{ table.business_name }}.page_num - 1) * {{ table.business_name }}.page_size
- stmt = stmt.offset(offset).limit({{ table.business_name }}.page_size)
- result = db.session.execute(stmt).scalars().all()
- return list(result) if result else []
- except Exception as e:
- print(f"查询{{ table.function_name }}列表出错: {e}")
- return []
- @staticmethod
- def select_count({{ table.business_name }}: {{ table.class_name }}) -> int:
- """
- 查询{{ table.function_name }}总数
- Args:
- {{ table.business_name }} ({{ table.class_name }}): {{ table.function_name }}对象
- Returns:
- int: {{ table.function_name }}总数
- """
- try:
- stmt = select(func.count({{ table.class_name }}.{{ underscore(table.pk_column.java_field) if table.pk_column }}))
- {% for column in table.columns %}
- {% if column.is_query and column.query_type == 'EQ' %}
- if {{ table.business_name }}.{{ underscore(column.java_field) }} is not None:
- stmt = stmt.where({{ table.class_name }}.{{ underscore(column.java_field) }} == {{ table.business_name }}.{{ underscore(column.java_field) }})
- {% elif column.is_query and column.query_type == 'LIKE' %}
- if {{ table.business_name }}.{{ underscore(column.java_field) }}:
- stmt = stmt.where({{ table.class_name }}.{{ underscore(column.java_field) }}.like("%" + str({{ table.business_name }}.{{ underscore(column.java_field) }}) + "%"))
- {% endif %}
- {% endfor %}
- result = db.session.execute(stmt).scalar()
- return result if result else 0
- except Exception as e:
- print(f"查询{{ table.function_name }}总数出错: {e}")
- return 0
- {% if table.pk_column %}
- @staticmethod
- def select_by_id({{ underscore(table.pk_column.java_field) }}: int) -> {{ table.class_name }}:
- """
- 根据ID查询{{ table.function_name }}
- Args:
- {{ underscore(table.pk_column.java_field) }} (int): {{ table.pk_column.column_comment }}
- Returns:
- {{ table.class_name }}: {{ table.function_name }}对象
- """
- try:
- stmt = select({{ table.class_name }}).where({{ table.class_name }}.{{ underscore(table.pk_column.java_field) }} == {{ underscore(table.pk_column.java_field) }})
- result = db.session.execute(stmt).scalar_one_or_none()
- return result
- except Exception as e:
- print(f"根据ID查询{{ table.function_name }}出错: {e}")
- return None
- {% endif %}
- @staticmethod
- def insert({{ table.business_name }}: {{ table.class_name }}) -> int:
- """
- 新增{{ table.function_name }}
- Args:
- {{ table.business_name }} ({{ table.class_name }}): {{ table.function_name }}对象
- Returns:
- int: 插入的记录数
- """
- try:
- # 设置创建时间和更新时间
- now = datetime.now()
- {{ table.business_name }}.create_time = now
- {{ table.business_name }}.update_time = now
- db.session.add({{ table.business_name }})
- db.session.commit()
- return 1
- except Exception as e:
- db.session.rollback()
- print(f"新增{{ table.function_name }}出错: {e}")
- return 0
- {% if table.pk_column %}
- @staticmethod
- def update({{ table.business_name }}: {{ table.class_name }}) -> int:
- """
- 修改{{ table.function_name }}
- Args:
- {{ table.business_name }} ({{ table.class_name }}): {{ table.function_name }}对象
- Returns:
- int: 更新的记录数
- """
- try:
- # 设置更新时间
- {{ table.business_name }}.update_time = datetime.now()
- # 使用ORM方式更新数据
- db.session.merge({{ table.business_name }})
- db.session.commit()
- return 1
- except Exception as e:
- db.session.rollback()
- print(f"修改{{ table.function_name }}出错: {e}")
- return 0
- @staticmethod
- def delete_by_ids(ids: List[int]) -> int:
- """
- 批量删除{{ table.function_name }}
- Args:
- ids (List[int]): ID列表
- Returns:
- int: 删除的记录数
- """
- try:
- stmt = delete({{ table.class_name }}).where({{ table.class_name }}.{{ underscore(table.pk_column.java_field) }}.in_(ids))
- result = db.session.execute(stmt)
- db.session.commit()
- return result.rowcount
- except Exception as e:
- db.session.rollback()
- print(f"批量删除{{ table.function_name }}出错: {e}")
- return 0
- {% endif %}
|