sys_oper_log.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. # -*- coding: utf-8 -*-
  2. # @Author : YY
  3. import re
  4. from typing import List, Optional
  5. from flask import g
  6. from sqlalchemy import delete, insert, select
  7. from ruoyi_common.base.model import ExtraModel
  8. from ruoyi_common.sqlalchemy.model import ColumnEntityList
  9. from ruoyi_common.sqlalchemy.transaction import Transactional
  10. from ruoyi_system.domain.entity import SysOperLog
  11. from ruoyi_admin.ext import db
  12. from ruoyi_system.domain.po import SysOperLogPo
  13. class SysOperLogMapper:
  14. """
  15. 操作日志的数据访问层
  16. """
  17. default_fields = {
  18. "oper_id", "title", "business_type", "method", "request_method",
  19. "operator_type", "oper_name", "dept_name", "oper_url", "oper_ip",
  20. "oper_location", "oper_param", "json_result", "status", "error_msg",
  21. "oper_time"
  22. }
  23. default_columns = ColumnEntityList(SysOperLogPo, default_fields)
  24. @classmethod
  25. def select_operlog_list(cls, oper: Optional[SysOperLog])-> List[SysOperLog]:
  26. '''
  27. 查询系统操作日志集合
  28. Args:
  29. oper (Optional[SysOperLog]): 操作日志对象
  30. Returns:
  31. List[SysOperLog]: 操作日志集合
  32. '''
  33. criterions = []
  34. if oper:
  35. if oper.oper_ip:
  36. criterions.append(SysOperLogPo.oper_ip.like(f'%{oper.oper_ip}%'))
  37. if oper.status is not None:
  38. criterions.append(SysOperLogPo.status == oper.status)
  39. if oper.oper_name:
  40. criterions.append(SysOperLogPo.oper_name.like(f'%{oper.oper_name}%'))
  41. cls._append_extra_criterions(criterions)
  42. stmt = select(*cls.default_columns)
  43. if criterions:
  44. stmt = stmt.where(*criterions)
  45. stmt = cls._apply_sorting(stmt)
  46. if "criterian_meta" in g and g.criterian_meta.page:
  47. g.criterian_meta.page.stmt = stmt
  48. rows = db.session.execute(stmt).all()
  49. return [cls.default_columns.cast(row, SysOperLog) for row in rows]
  50. @classmethod
  51. @Transactional(db.session)
  52. def insert_operlog(cls, oper: SysOperLog) -> int:
  53. '''
  54. 新增操作日志
  55. Args:
  56. oper (SysOperLog): 操作日志对象
  57. Returns:
  58. int: 新增记录的ID
  59. '''
  60. fields = {
  61. "title", "business_type", "method", "request_method","oper_time",
  62. "operator_type", "oper_name", "dept_name", "oper_url", "oper_ip",
  63. "oper_location", "oper_param", "json_result", "status", "error_msg"
  64. }
  65. data = oper.model_dump(
  66. include=fields, exclude_none=True
  67. )
  68. stmt = insert(SysOperLogPo).values(data)
  69. out = db.session.execute(stmt).inserted_primary_key
  70. return out[0] if out else 0
  71. @classmethod
  72. def _append_extra_criterions(cls, criterions:list):
  73. if "criterian_meta" not in g or not g.criterian_meta.extra:
  74. return
  75. extra:ExtraModel = g.criterian_meta.extra
  76. if getattr(extra, "begin_time", None):
  77. criterions.append(SysOperLogPo.oper_time >= extra.begin_time)
  78. if getattr(extra, "end_time", None):
  79. criterions.append(SysOperLogPo.oper_time <= extra.end_time)
  80. @classmethod
  81. def _apply_sorting(cls, stmt):
  82. sort = getattr(getattr(g, "criterian_meta", None), "sort", None)
  83. order_columns = []
  84. if sort and sort.order_by_column:
  85. for alias in sort.order_by_column:
  86. field_name = cls._camel_to_snake(alias)
  87. column = getattr(SysOperLogPo, field_name, None)
  88. if not column:
  89. continue
  90. if sort.is_asc == "asc":
  91. order_columns.append(column.asc())
  92. else:
  93. order_columns.append(column.desc())
  94. if not order_columns:
  95. order_columns.append(SysOperLogPo.oper_time.desc())
  96. return stmt.order_by(*order_columns)
  97. @staticmethod
  98. def _camel_to_snake(value:str) -> str:
  99. s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', value)
  100. return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
  101. @classmethod
  102. @Transactional(db.session)
  103. def delete_operlog_by_ids(cls, ids: List[int]) -> int:
  104. '''
  105. 批量删除系统操作日志
  106. Args:
  107. ids (List[int]): 操作日志ID列表
  108. Returns:
  109. int: 删除的记录数
  110. '''
  111. stmt = delete(SysOperLogPo).where(SysOperLogPo.oper_id.in_(ids))
  112. return db.session.execute(stmt).rowcount
  113. @classmethod
  114. @Transactional(db.session)
  115. def clean_operlog(cls) -> int:
  116. '''
  117. 清空操作日志
  118. Returns:
  119. int: 删除的记录数
  120. '''
  121. stmt = delete(SysOperLogPo)
  122. return db.session.execute(stmt).rowcount
  123. @classmethod
  124. def select_operlog_by_id(cls, id:int) -> Optional[SysOperLog]:
  125. '''
  126. 查询操作日志详细
  127. Args:
  128. id (int): 操作日志ID
  129. Returns:
  130. Optional[SysOperLog]: 操作日志对象
  131. '''
  132. stmt = select(*cls.default_columns).where(SysOperLogPo.oper_id == id)
  133. row = db.session.execute(stmt).one_or_none()
  134. return cls.default_columns.cast(row, SysOperLog) if row else None