sys_oper_log.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. # -*- coding: utf-8 -*-
  2. # @Author : YY
  3. from typing import List, Optional
  4. from flask import g
  5. from sqlalchemy import delete, insert, select
  6. from ruoyi_common.base.model import ExtraModel
  7. from ruoyi_common.sqlalchemy.model import ColumnEntityList
  8. from ruoyi_common.sqlalchemy.transaction import Transactional
  9. from ruoyi_system.domain.entity import SysOperLog
  10. from ruoyi_admin.ext import db
  11. from ruoyi_system.domain.po import SysOperLogPo
  12. class SysOperLogMapper:
  13. """
  14. 操作日志的数据访问层
  15. """
  16. default_fields = {
  17. "oper_id", "title", "business_type", "method", "request_method",
  18. "operator_type", "oper_name", "dept_name", "oper_url", "oper_ip",
  19. "oper_location", "oper_param", "json_result", "status", "error_msg",
  20. "oper_time"
  21. }
  22. default_columns = ColumnEntityList(SysOperLogPo, default_fields)
  23. @classmethod
  24. def select_operlog_list(cls, oper: Optional[SysOperLog])-> List[SysOperLog]:
  25. '''
  26. 查询系统操作日志集合
  27. Args:
  28. oper (Optional[SysOperLog]): 操作日志对象
  29. Returns:
  30. List[SysOperLog]: 操作日志集合
  31. '''
  32. if oper:
  33. criterions = []
  34. if oper.oper_ip:
  35. criterions.append(SysOperLogPo.ipaddr.like(f'%{oper.oper_ip}%'))
  36. if oper.status:
  37. criterions.append(SysOperLogPo.status == oper.status)
  38. if oper.oper_name:
  39. criterions.append(SysOperLogPo.oper_name.like(f'%{oper.oper_name}%'))
  40. if "criterian_meta" in g and g.criterian_meta.extra:
  41. extra:ExtraModel = g.criterian_meta.extra
  42. if extra.start_time and extra.end_time:
  43. criterions.append(SysOperLogPo.oper_time >= extra.start_time)
  44. criterions.append(SysOperLogPo.oper_time <= extra.end_time)
  45. stmt = select(*cls.default_columns) \
  46. .where(*criterions)
  47. else:
  48. stmt = select(*cls.default_columns)
  49. if "criterian_meta" in g and g.criterian_meta.page:
  50. g.criterian_meta.page.stmt = stmt
  51. rows = db.session.execute(stmt).all()
  52. return [cls.default_columns.cast(row, SysOperLog) for row in rows]
  53. @classmethod
  54. @Transactional(db.session)
  55. def insert_operlog(cls, oper: SysOperLog) -> int:
  56. '''
  57. 新增操作日志
  58. Args:
  59. oper (SysOperLog): 操作日志对象
  60. Returns:
  61. int: 新增记录的ID
  62. '''
  63. fields = {
  64. "title", "business_type", "method", "request_method","oper_time",
  65. "operator_type", "oper_name", "dept_name", "oper_url", "oper_ip",
  66. "oper_location", "oper_param", "json_result", "status", "error_msg"
  67. }
  68. data = oper.model_dump(
  69. include=fields, exclude_none=True
  70. )
  71. stmt = insert(SysOperLogPo).values(data)
  72. out = db.session.execute(stmt).inserted_primary_key
  73. return out[0] if out else 0
  74. @classmethod
  75. @Transactional(db.session)
  76. def delete_operlog_by_ids(cls, ids: List[int]) -> int:
  77. '''
  78. 批量删除系统操作日志
  79. Args:
  80. ids (List[int]): 操作日志ID列表
  81. Returns:
  82. int: 删除的记录数
  83. '''
  84. stmt = delete(SysOperLogPo).where(SysOperLogPo.oper_id.in_(ids))
  85. return db.session.execute(stmt).rowcount
  86. @classmethod
  87. @Transactional(db.session)
  88. def clean_operlog(cls) -> int:
  89. '''
  90. 清空操作日志
  91. Returns:
  92. int: 删除的记录数
  93. '''
  94. stmt = delete(SysOperLogPo)
  95. return db.session.execute(stmt).rowcount
  96. @classmethod
  97. def select_operlog_by_id(cls, id:int) -> Optional[SysOperLog]:
  98. '''
  99. 查询操作日志详细
  100. Args:
  101. id (int): 操作日志ID
  102. Returns:
  103. Optional[SysOperLog]: 操作日志对象
  104. '''
  105. stmt = select(*cls.default_columns).where(SysOperLogPo.oper_id == id)
  106. row = db.session.execute(stmt).one_or_none()
  107. return cls.default_columns.cast(row, SysOperLog) if row else None