job.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. # -*- coding: utf-8 -*-
  2. # @Author : YY
  3. from typing import List, Optional
  4. from sqlalchemy import insert,update,delete,select
  5. from ruoyi_common.sqlalchemy.model import ColumnEntityList
  6. from ruoyi_apscheduler.domain.po import SysJobPo
  7. from ruoyi_apscheduler.domain.entity import SysJob
  8. from ruoyi_admin.ext import db
  9. class SysJobMapper:
  10. default_fields = {
  11. "job_id","job_name","job_group","invoke_target","cron_expression", \
  12. "misfire_policy","concurrent","status","create_by","create_time", \
  13. "remark"
  14. }
  15. default_columns = ColumnEntityList(SysJobPo, default_fields, False)
  16. @classmethod
  17. def select_job_list(cls, job:SysJob) -> List[SysJob]:
  18. """
  19. 有条件查询任务列表
  20. Args:
  21. job (SysJob): 查询条件
  22. Returns:
  23. List[SysJob]: 任务列表
  24. """
  25. criterions = []
  26. if job.job_name:
  27. criterions.append(SysJobPo.job_name.like(f"%{job.job_name}%"))
  28. if job.job_group:
  29. criterions.append(SysJobPo.job_group==job.job_group)
  30. if job.status:
  31. criterions.append(SysJobPo.status == job.status)
  32. if job.invoke_target:
  33. criterions.append(
  34. SysJobPo.invoke_target.like(f"%{job.invoke_target}%")
  35. )
  36. stmt = select(*cls.default_columns) \
  37. .where(*criterions)
  38. rows = db.session.execute(stmt).all()
  39. return [cls.default_columns.cast(row,SysJob) for row in rows]
  40. @classmethod
  41. def select_job_all(cls) -> List[SysJob]:
  42. """
  43. 查询所有任务
  44. Returns:
  45. List[SysJob]: 任务列表
  46. """
  47. stmt = select(*cls.default_columns).select_from(SysJobPo)
  48. rows = db.session.execute(stmt).all()
  49. return [cls.default_columns.cast(row,SysJob) for row in rows]
  50. @classmethod
  51. def select_job_by_id(cls, job_id:int) -> Optional[SysJob]:
  52. """
  53. 根据任务ID,查询任务
  54. Args:
  55. job_id (int): 任务ID
  56. Returns:
  57. Optional[SysJob]: 任务
  58. """
  59. stmt = select(*cls.default_columns) \
  60. .where(SysJobPo.job_id==job_id)
  61. row = db.session.execute(stmt).one_or_none()
  62. return cls.default_columns.cast(row,SysJob) if row else None
  63. @classmethod
  64. def insert_job(cls, job:SysJob) -> Optional[int]:
  65. """
  66. 新增任务
  67. Args:
  68. job (SysJob): 任务
  69. Returns:
  70. Optional[int]: 任务ID
  71. """
  72. fields = {
  73. "job_id","job_name","job_group","invoke_target","cron_expression", \
  74. "misfire_policy","concurrent","status","create_by","create_time", \
  75. "remark"
  76. }
  77. data = job.model_dump(
  78. include=fields,exclude_unset=True,exclude_none=True
  79. )
  80. # 如果未指定 job_id,则交由数据库自增
  81. if not data.get("job_id"):
  82. data.pop("job_id", None)
  83. stmt = insert(SysJobPo).values(data)
  84. result = db.session.execute(stmt)
  85. pk_values = result.inserted_primary_key
  86. if pk_values:
  87. return pk_values[0]
  88. return data.get("job_id")
  89. @classmethod
  90. def update_job(cls, job:SysJob) -> int:
  91. """
  92. 更新任务
  93. Args:
  94. job (SysJob): 任务
  95. Returns:
  96. int: 影响行数
  97. """
  98. fields = {
  99. "job_name","job_group","invoke_target","cron_expression", \
  100. "misfire_policy","concurrent","status","update_by","update_time", \
  101. "remark"
  102. }
  103. data = job.model_dump(
  104. include=fields,exclude_unset=True,exclude_none=True
  105. )
  106. stmt = update(SysJobPo) \
  107. .where(SysJobPo.job_id==job.job_id) \
  108. .values(data)
  109. return db.session.execute(stmt).rowcount
  110. @classmethod
  111. def delete_job_by_id(cls, job_id:int) -> int:
  112. """
  113. 根据任务ID,删除任务
  114. Args:
  115. job_id (int): 任务ID
  116. Returns:
  117. int: 影响行数
  118. """
  119. stmt = delete(SysJobPo).where(SysJobPo.job_id==job_id)
  120. return db.session.execute(stmt).rowcount
  121. @classmethod
  122. def delete_job_by_ids(cls, job_ids:List[int]) -> int:
  123. """
  124. 根据任务ID列表,删除任务
  125. Args:
  126. job_ids (List[int]): 任务ID列表
  127. Returns:
  128. int: 影响行数
  129. """
  130. stmt = delete(SysJobPo).where(SysJobPo.job_id.in_(job_ids))
  131. return db.session.execute(stmt).rowcount