| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- # -*- coding: utf-8 -*-
- # @Author : YY
- import re
- from typing import List
- from flask import g
- from sqlalchemy import delete, insert, select
- from ruoyi_common.base.model import ExtraModel
- from ruoyi_common.sqlalchemy.model import ColumnEntityList
- from ruoyi_common.sqlalchemy.transaction import Transactional
- from ruoyi_system.domain.entity import SysLogininfor
- from ruoyi_admin.ext import db
- from ruoyi_system.domain.po import SysLogininforPo
- class SysLogininforMapper:
- """
- 登录日志的数据访问层
- """
-
- default_fields = {
- "info_id", "user_name", "ipaddr", "login_location", "browser", "os",
- "status", "msg", "login_time", "status"
- }
-
- default_columns = ColumnEntityList(SysLogininforPo, default_fields, False)
-
- @classmethod
- @Transactional(db.session)
- def insert_logininfor(cls, info: SysLogininfor) -> int:
- """
- 新增系统登录日志
-
- Args:
- info (SysLogininfor): 系统登录日志对象
-
- Returns:
- int: 新增记录的ID
- """
- fields = {
- "user_name", "status", "ipaddr", "login_location", "browser", "os",
- "msg", "login_time"
- }
- data = info.model_dump(
- include=fields, exclude_none=True, exclude_unset=True,
- )
- stmt = insert(SysLogininforPo).values(data)
- out = db.session.execute(stmt).inserted_primary_key
- return out[0] if out else 0
-
- @classmethod
- def select_logininfor_list(cls, info: SysLogininfor) -> List[SysLogininforPo]:
- """
- 查询系统登录日志集合
-
- Args:
- info (SysLogininfor): 系统登录日志对象
-
- Returns:
- List[SysLogininforPo]: 系统登录日志集合
- """
- criterions = []
- if info.ipaddr:
- criterions.append(SysLogininforPo.ipaddr.like(f"%{info.ipaddr}%"))
- if info.status:
- criterions.append(SysLogininforPo.status == info.status)
- if info.user_name:
- criterions.append(SysLogininforPo.user_name.like(f"%{info.user_name}%"))
- if "criterian_meta" in g and g.criterian_meta.extra:
- extra:ExtraModel = g.criterian_meta.extra
- if extra.start_time and extra.end_time:
- criterions.append(SysLogininforPo.create_time >= extra.start_time)
- criterions.append(SysLogininforPo.create_time <= extra.end_time)
- stmt = select(*cls.default_columns) \
- .where(*criterions)
- stmt = cls._apply_sorting(stmt)
- if "criterian_meta" in g and g.criterian_meta.page:
- g.criterian_meta.page.stmt = stmt
-
- rows = db.session.execute(stmt).all()
- return [cls.default_columns.cast(row, SysLogininfor) for row in rows]
- @classmethod
- def _apply_sorting(cls, stmt):
- sort = getattr(getattr(g, "criterian_meta", None), "sort", None)
- order_columns = []
- if sort and sort.order_by_column:
- for alias in sort.order_by_column:
- field_name = cls._camel_to_snake(alias)
- column = getattr(SysLogininforPo, field_name, None)
- if not column:
- continue
- if sort.is_asc == "asc":
- order_columns.append(column.asc())
- else:
- order_columns.append(column.desc())
- if not order_columns:
- order_columns.append(SysLogininforPo.login_time.desc())
- return stmt.order_by(*order_columns)
- @staticmethod
- def _camel_to_snake(value:str) -> str:
- s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', value)
- return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
- @classmethod
- @Transactional(db.session)
- def delete_logininfor_by_ids(cls, ids: List[int]) -> int:
- """
- 批量删除系统登录日志
-
- Args:
- ids (List[int]): 系统登录日志ID列表
-
- Returns:
- int: 删除的行数
- """
- stmt = delete(SysLogininforPo).where(SysLogininforPo.info_id.in_(ids))
- return db.session.execute(stmt).rowcount
- @classmethod
- @Transactional(db.session)
- def clean_logininfor(cls) -> int:
- """
- 清空系统登录日志
-
- Returns:
- int: 删除的行数
- """
- stmt = delete(SysLogininforPo)
- return db.session.execute(stmt).rowcount
|