job.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. # -*- coding: utf-8 -*-
  2. # @Author : YY
  3. import atexit
  4. from datetime import datetime
  5. from typing import List, Optional
  6. from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, \
  7. EVENT_JOB_MISSED, EVENT_JOB_SUBMITTED, EVENT_JOB_REMOVED, JobEvent
  8. from flask import Flask
  9. from ruoyi_common.base.signal import app_completed
  10. from ruoyi_common.exception import ServiceException
  11. from ruoyi_common.sqlalchemy.transaction import Transactional
  12. from ruoyi_apscheduler.constant import ScheduleStatus
  13. from ruoyi_apscheduler.domain.entity import SysJob, SysJobLog
  14. from ruoyi_apscheduler.mapper.job import SysJobMapper
  15. from ruoyi_apscheduler.util import ScheduleUtil
  16. from ruoyi_apscheduler.service.job_log import SysJobLogService
  17. from ruoyi_admin.ext import db
  18. from .. import reg,scheduler
  19. class SysJobService:
  20. @classmethod
  21. def init(cls):
  22. """
  23. 初始化定时任务
  24. """
  25. scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | \
  26. EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_SUBMITTED | \
  27. EVENT_JOB_REMOVED)
  28. scheduler.remove_all_jobs()
  29. try:
  30. for job in SysJobMapper.select_job_all():
  31. ScheduleUtil.create_schedule_job(scheduler,job)
  32. except ImportError as e:
  33. raise ServiceException(f"导入定时任务失败,请检查表SysJob的数据:{e}")
  34. scheduler.start()
  35. atexit.register(lambda: scheduler.shutdown())
  36. @classmethod
  37. def select_job_list(cls, job:SysJob) -> List[SysJob]:
  38. """
  39. 查询定时任务列表
  40. Args:
  41. job (SysJob): 包含查询条件的任务
  42. Returns:
  43. List[SysJob]: 任务信息列表
  44. """
  45. return SysJobMapper.select_job_list(job)
  46. @classmethod
  47. def select_job_by_id(cls, job_id:int) -> Optional[SysJob]:
  48. """
  49. 查询定时任务
  50. Args:
  51. job_id (int): 任务ID
  52. Returns:
  53. Optional[SysJob]: 任务信息
  54. """
  55. return SysJobMapper.select_job_by_id(job_id)
  56. @classmethod
  57. @Transactional(db.session)
  58. def insert_job(cls, job:SysJob) -> bool:
  59. """
  60. 新增定时任务
  61. Args:
  62. job (SysJob): 任务信息
  63. Returns:
  64. bool: 操作结果
  65. """
  66. flag = SysJobMapper.insert_job(job)
  67. if flag:
  68. ScheduleUtil.create_schedule_job(scheduler,job)
  69. return flag > 0
  70. @classmethod
  71. @Transactional(db.session)
  72. def update_job(cls, job:SysJob) -> bool:
  73. """
  74. 更新定时任务
  75. Args:
  76. job (SysJob): 任务信息
  77. Returns:
  78. bool: 操作结果
  79. """
  80. db_job:SysJob = cls.select_job_by_id(job.job_id)
  81. flag = SysJobMapper.update_job(job)
  82. if flag > 0:
  83. sched_job = scheduler.get_job(db_job.job_key)
  84. if sched_job:
  85. scheduler.remove_job(db_job.job_key)
  86. ScheduleUtil.create_schedule_job(scheduler,job)
  87. return flag > 0
  88. @classmethod
  89. @Transactional(db.session)
  90. def delete_job_by_id(cls, job:SysJob) -> int:
  91. """
  92. 删除定时任务
  93. Args:
  94. job (SysJob): 任务信息
  95. Returns:
  96. int: 删除的任务数量
  97. """
  98. num = SysJobMapper.delete_job_by_id(job.job_id)
  99. if num > 0:
  100. scheduler.remove_job(job.job_key)
  101. return num
  102. @classmethod
  103. @Transactional(db.session)
  104. def delete_job_by_ids(cls, job_ids:List[int]):
  105. """
  106. 批量删除定时任务
  107. Args:
  108. job_ids (List[int]): 任务ID列表
  109. """
  110. for job_id in job_ids:
  111. job = cls.select_job_by_id(job_id)
  112. if job:
  113. cls.delete_job_by_id(job)
  114. @classmethod
  115. @Transactional(db.session)
  116. def pause_job(cls, job:SysJob) -> int:
  117. """
  118. 暂停定时任务
  119. Args:
  120. job (SysJob): 任务信息
  121. Returns:
  122. int: 操作结果
  123. """
  124. job.status = ScheduleStatus.PAUSED.value
  125. num = SysJobMapper.update_job(job)
  126. if num > 0:
  127. scheduler.pause_job(job.job_key)
  128. return num
  129. @classmethod
  130. @Transactional(db.session)
  131. def resume_job(cls, job:SysJob) -> int:
  132. """
  133. 恢复定时任务
  134. Args:
  135. job (SysJob): 任务信息
  136. Returns:
  137. int: 操作结果
  138. """
  139. job.status = ScheduleStatus.NORMAL.value
  140. num = SysJobMapper.update_job(job)
  141. if num > 0:
  142. sched_job = scheduler.get_job(job.job_key)
  143. if not sched_job:
  144. # APScheduler 中不存在该任务时重新创建
  145. ScheduleUtil.create_schedule_job(scheduler, job)
  146. else:
  147. scheduler.resume_job(job.job_key)
  148. return num
  149. @classmethod
  150. @Transactional(db.session)
  151. def change_job_status(cls, job:SysJob) -> int:
  152. """
  153. 更改定时任务状态
  154. Args:
  155. job (SysJob): 任务信息
  156. Returns:
  157. int: 操作结果
  158. """
  159. num = 0
  160. if job.status == ScheduleStatus.NORMAL.value:
  161. num = cls.resume_job(job)
  162. elif job.status == ScheduleStatus.PAUSED.value:
  163. num =cls.pause_job(job)
  164. return num
  165. @classmethod
  166. def run(cls, job:SysJob):
  167. """
  168. 立即执行定时任务
  169. Args:
  170. job (SysJob): 任务信息
  171. """
  172. ScheduleUtil.reschedule_job(scheduler,job)
  173. def job_listener(event:JobEvent):
  174. """
  175. 任务监听器
  176. Args:
  177. event (JobEvent): 任务事件
  178. """
  179. job , _ = scheduler._lookup_job(event.job_id,event.jobstore)
  180. job_state = job.__getstate__()
  181. invoke_target = ScheduleUtil.unparse_target_by_funcname(
  182. job_state["func"],
  183. job_state["args"],
  184. job_state["kwargs"]
  185. )
  186. name = job_state["name"]
  187. _,group = job_state["id"].split("_")
  188. joblog = SysJobLog(
  189. job_name=name,
  190. job_group=group,
  191. invoke_target=invoke_target,
  192. create_time=datetime.now()
  193. )
  194. if event.code == EVENT_JOB_EXECUTED:
  195. pass
  196. elif event.code == EVENT_JOB_ERROR:
  197. if event.exception:
  198. joblog.status = "1"
  199. joblog.exception_info = str(event.exception)
  200. joblog.job_message = str(event.traceback)
  201. print(f"任务{event.job_id}异常:{event.exception}")
  202. elif event.code == EVENT_JOB_MISSED:
  203. pass
  204. elif event.code == EVENT_JOB_SUBMITTED:
  205. pass
  206. elif event.code == EVENT_JOB_REMOVED:
  207. pass
  208. with reg.app.app_context():
  209. SysJobLogService.insert_job_log(joblog)
  210. @app_completed.connect_via(reg.app)
  211. def init(sender:Flask):
  212. '''
  213. 初始化操作
  214. Args:
  215. sender (Flask): 消息发送者
  216. '''
  217. with sender.app_context():
  218. SysJobService.init()