sys_logininfor.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. # -*- coding: utf-8 -*-
  2. # @Author : YY
  3. from typing import List
  4. from flask import g
  5. from sqlalchemy import delete, insert, select
  6. from ruoyi_common.base.model import ExtraModel
  7. from ruoyi_common.sqlalchemy.model import ColumnEntityList
  8. from ruoyi_common.sqlalchemy.transaction import Transactional
  9. from ruoyi_system.domain.entity import SysLogininfor
  10. from ruoyi_admin.ext import db
  11. from ruoyi_system.domain.po import SysLogininforPo
  12. class SysLogininforMapper:
  13. """
  14. 登录日志的数据访问层
  15. """
  16. default_fields = {
  17. "info_id", "user_name", "ipaddr", "login_location", "browser", "os",
  18. "status", "msg", "login_time", "status"
  19. }
  20. default_columns = ColumnEntityList(SysLogininforPo, default_fields, False)
  21. @classmethod
  22. @Transactional(db.session)
  23. def insert_logininfor(cls, info: SysLogininfor) -> int:
  24. """
  25. 新增系统登录日志
  26. Args:
  27. info (SysLogininfor): 系统登录日志对象
  28. Returns:
  29. int: 新增记录的ID
  30. """
  31. fields = {
  32. "user_name", "status", "ipaddr", "login_location", "browser", "os",
  33. "msg", "login_time"
  34. }
  35. data = info.model_dump(
  36. include=fields, exclude_none=True, exclude_unset=True,
  37. )
  38. stmt = insert(SysLogininforPo).values(data)
  39. out = db.session.execute(stmt).inserted_primary_key
  40. return out[0] if out else 0
  41. @classmethod
  42. def select_logininfor_list(cls, info: SysLogininfor) -> List[SysLogininforPo]:
  43. """
  44. 查询系统登录日志集合
  45. Args:
  46. info (SysLogininfor): 系统登录日志对象
  47. Returns:
  48. List[SysLogininforPo]: 系统登录日志集合
  49. """
  50. criterions = []
  51. if info.ipaddr:
  52. criterions.append(SysLogininforPo.ipaddr.like(f"%{info.ipaddr}%"))
  53. if info.status:
  54. criterions.append(SysLogininforPo.status == info.status)
  55. if info.user_name:
  56. criterions.append(SysLogininforPo.user_name.like(f"%{info.user_name}%"))
  57. if "criterian_meta" in g and g.criterian_meta.extra:
  58. extra:ExtraModel = g.criterian_meta.extra
  59. if extra.start_time and extra.end_time:
  60. criterions.append(SysLogininforPo.create_time >= extra.start_time)
  61. criterions.append(SysLogininforPo.create_time <= extra.end_time)
  62. stmt = select(*cls.default_columns) \
  63. .where(*criterions)
  64. if "criterian_meta" in g and g.criterian_meta.page:
  65. g.criterian_meta.page.stmt = stmt
  66. rows = db.session.execute(stmt).all()
  67. return [cls.default_columns.cast(row, SysLogininfor) for row in rows]
  68. @classmethod
  69. @Transactional(db.session)
  70. def delete_logininfor_by_ids(cls, ids: List[int]) -> int:
  71. """
  72. 批量删除系统登录日志
  73. Args:
  74. ids (List[int]): 系统登录日志ID列表
  75. Returns:
  76. int: 删除的行数
  77. """
  78. stmt = delete(SysLogininforPo).where(SysLogininforPo.info_id.in_(ids))
  79. return db.session.execute(stmt).rowcount
  80. @classmethod
  81. @Transactional(db.session)
  82. def clean_logininfor(cls) -> int:
  83. """
  84. 清空系统登录日志
  85. Returns:
  86. int: 删除的行数
  87. """
  88. stmt = delete(SysLogininforPo)
  89. return db.session.execute(stmt).rowcount