| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- # -*- coding: utf-8 -*-
- # @Author : YY
- from typing import List, Optional
- from sqlalchemy import insert,update,delete,select
- from ruoyi_common.sqlalchemy.model import ColumnEntityList
- from ruoyi_apscheduler.domain.po import SysJobPo
- from ruoyi_apscheduler.domain.entity import SysJob
- from ruoyi_admin.ext import db
- class SysJobMapper:
-
- default_fields = {
- "job_id","job_name","job_group","invoke_target","cron_expression", \
- "misfire_policy","concurrent","status","create_by","create_time", \
- "remark"
- }
-
- default_columns = ColumnEntityList(SysJobPo, default_fields, False)
-
- @classmethod
- def select_job_list(cls, job:SysJob) -> List[SysJob]:
- """
- 有条件查询任务列表
- Args:
- job (SysJob): 查询条件
- Returns:
- List[SysJob]: 任务列表
- """
- criterions = []
- if job.job_name:
- criterions.append(SysJobPo.job_name.like(f"%{job.job_name}%"))
- if job.job_group:
- criterions.append(SysJobPo.job_group==job.job_group)
- if job.status:
- criterions.append(SysJobPo.status == job.status)
- if job.invoke_target:
- criterions.append(
- SysJobPo.invoke_target.like(f"%{job.invoke_target}%")
- )
- stmt = select(*cls.default_columns) \
- .where(*criterions)
- rows = db.session.execute(stmt).all()
- return [cls.default_columns.cast(row,SysJob) for row in rows]
-
- @classmethod
- def select_job_all(cls) -> List[SysJob]:
- """
- 查询所有任务
- Returns:
- List[SysJob]: 任务列表
- """
- stmt = select(*cls.default_columns).select_from(SysJobPo)
- rows = db.session.execute(stmt).all()
- return [cls.default_columns.cast(row,SysJob) for row in rows]
-
- @classmethod
- def select_job_by_id(cls, job_id:int) -> Optional[SysJob]:
- """
- 根据任务ID,查询任务
- Args:
- job_id (int): 任务ID
- Returns:
- Optional[SysJob]: 任务
- """
- stmt = select(*cls.default_columns) \
- .where(SysJobPo.job_id==job_id)
- row = db.session.execute(stmt).one_or_none()
- return cls.default_columns.cast(row,SysJob) if row else None
-
- @classmethod
- def insert_job(cls, job:SysJob) -> Optional[int]:
- """
- 新增任务
- Args:
- job (SysJob): 任务
- Returns:
- Optional[int]: 任务ID
- """
- fields = {
- "job_id","job_name","job_group","invoke_target","cron_expression", \
- "misfire_policy","concurrent","status","create_by","create_time", \
- "remark"
- }
- data = job.model_dump(
- include=fields,exclude_unset=True,exclude_none=True
- )
- # 如果未指定 job_id,则交由数据库自增
- if not data.get("job_id"):
- data.pop("job_id", None)
- stmt = insert(SysJobPo).values(data)
- result = db.session.execute(stmt)
- pk_values = result.inserted_primary_key
- if pk_values:
- return pk_values[0]
- return data.get("job_id")
-
- @classmethod
- def update_job(cls, job:SysJob) -> int:
- """
- 更新任务
- Args:
- job (SysJob): 任务
- Returns:
- int: 影响行数
- """
- fields = {
- "job_name","job_group","invoke_target","cron_expression", \
- "misfire_policy","concurrent","status","update_by","update_time", \
- "remark"
- }
- data = job.model_dump(
- include=fields,exclude_unset=True,exclude_none=True
- )
- stmt = update(SysJobPo) \
- .where(SysJobPo.job_id==job.job_id) \
- .values(data)
- return db.session.execute(stmt).rowcount
-
- @classmethod
- def delete_job_by_id(cls, job_id:int) -> int:
- """
- 根据任务ID,删除任务
- Args:
- job_id (int): 任务ID
- Returns:
- int: 影响行数
- """
- stmt = delete(SysJobPo).where(SysJobPo.job_id==job_id)
- return db.session.execute(stmt).rowcount
-
- @classmethod
- def delete_job_by_ids(cls, job_ids:List[int]) -> int:
- """
- 根据任务ID列表,删除任务
- Args:
- job_ids (List[int]): 任务ID列表
- Returns:
- int: 影响行数
- """
- stmt = delete(SysJobPo).where(SysJobPo.job_id.in_(job_ids))
- return db.session.execute(stmt).rowcount
|