| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- # -*- coding: utf-8 -*-
- # @Author : YY
- from typing import List
- from flask import g
- from sqlalchemy import insert,delete,select
- from ruoyi_common.base.model import ExtraModel
- from ruoyi_common.sqlalchemy.model import ColumnEntityList
- from ruoyi_apscheduler.domain.po import SysJobLogPo
- from ruoyi_apscheduler.domain.entity import SysJobLog
- from ruoyi_admin.ext import db
- class SysJobLogMapper:
- default_fields = {
- "job_log_id", "job_name", "job_group", "invoke_target", "job_message",
- "status", "exception_info", "create_time"
- }
-
- default_columns = ColumnEntityList(SysJobLogPo, default_fields, False)
-
- @classmethod
- def select_job_log_list(cls, joblog:SysJobLog) -> List[SysJobLog]:
- """
- 查询任务日志列表
- Args:
- joblog (SysJobLog): 包含查询条件的数据传输对象
- Returns:
- List[SysJobLog]: 任务日志列表
- """
- criterions = []
- if joblog.job_name:
- criterions.append(SysJobLogPo.job_name.like(f"%{joblog.job_name}%"))
- if joblog.job_group:
- criterions.append(SysJobLogPo.job_group==joblog.job_group)
- if joblog.status:
- criterions.append(SysJobLogPo.status == joblog.status)
- if joblog.invoke_target:
- criterions.append(
- SysJobLogPo.invoke_target.like(f"%{joblog.invoke_target}%")
- )
- if "criterian_meta" in g and g.criterian_meta.extra:
- extra:ExtraModel = g.criterian_meta.extra
- if extra.start_time and extra.end_time:
- criterions.append(SysJobLogPo.create_time >= extra.start_time)
- criterions.append(SysJobLogPo.create_time <= extra.end_time)
- stmt = select(*cls.default_columns) \
- .where(*criterions)
- rows = db.session.execute(stmt).all()
- return [cls.default_columns.cast(row,SysJobLog) for row in rows]
-
- @classmethod
- def select_job_log_all(cls) -> List[SysJobLog]:
- """
- 查询所有任务日志
- Returns:
- List[SysJobLog]: 任务日志列表
- """
- stmt = select(*cls.default_columns)
- rows = db.session.execute(stmt).all()
- return [cls.default_columns.cast(row,SysJobLog) for row in rows]
-
- @classmethod
- def insert_job_log(cls, joblog:SysJobLog):
- """
- 新增任务日志
- Args:
- joblog (SysJobLog): 任务日志信息
- Returns:
- int: 影响的行数
- """
- fields = {
- "job_log_id", "job_name", "job_group", "invoke_target", "job_message",
- "status", "exception_info", "create_time"
- }
- data = joblog.model_dump(
- include=fields,exclude_unset=True,exclude_none=True
- )
- stmt = insert(SysJobLogPo).values(data)
- return db.session.execute(stmt).rowcount
- @classmethod
- def delete_job_log_by_id(cls, job_id:int):
- """
- 根据任务日志ID,删除任务日志
- Args:
- job_id (int): 任务日志ID
- Returns:
- int: 影响的行数
- """
- stmt = delete(SysJobLogPo).where(SysJobLogPo.job_log_id==job_id)
- return db.session.execute(stmt).rowcount
-
- @classmethod
- def delete_job_log_by_ids(cls, job_ids:List[int]):
- """
- 根据任务日志ID列表,删除任务日志
- Args:
- job_ids (List[int]): 任务日志ID列表
- Returns:
- int: 影响的行数
- """
- stmt = delete(SysJobLogPo).where(SysJobLogPo.job_log_id.in_(job_ids))
- return db.session.execute(stmt).rowcount
-
- @classmethod
- def clean_job_logs(cls):
- """
- 清空任务日志
- Returns:
- int: 影响的行数
- """
- stmt = delete(SysJobLogPo)
- return db.session.execute(stmt).rowcount
|