job.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. # -*- coding: utf-8 -*-
  2. # @Author : YY
  3. import atexit
  4. from datetime import datetime
  5. from typing import List, Optional
  6. from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, \
  7. EVENT_JOB_MISSED, EVENT_JOB_SUBMITTED, EVENT_JOB_REMOVED, JobEvent
  8. from flask import Flask
  9. import importlib
  10. from ruoyi_common.base.signal import app_completed
  11. from ruoyi_common.exception import ServiceException
  12. from ruoyi_common.sqlalchemy.transaction import Transactional
  13. from ruoyi_apscheduler.constant import ScheduleStatus
  14. from ruoyi_apscheduler.domain.entity import SysJob, SysJobLog
  15. from ruoyi_apscheduler.mapper.job import SysJobMapper
  16. from ruoyi_apscheduler.util import ScheduleUtil, check_method_importable
  17. from ruoyi_apscheduler.service.job_log import SysJobLogService
  18. from ruoyi_admin.ext import db
  19. from .. import reg, scheduler
  20. class SysJobService:
  21. @classmethod
  22. def init(cls):
  23. """
  24. 初始化定时任务
  25. """
  26. scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | \
  27. EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_SUBMITTED | \
  28. EVENT_JOB_REMOVED)
  29. scheduler.remove_all_jobs()
  30. try:
  31. for job in SysJobMapper.select_job_all():
  32. ScheduleUtil.create_schedule_job(scheduler, job)
  33. except ImportError as e:
  34. raise ServiceException(f"导入定时任务失败,请检查表SysJob的数据:{e}")
  35. scheduler.start()
  36. atexit.register(lambda: scheduler.shutdown())
  37. @classmethod
  38. def select_job_list(cls, job: SysJob) -> List[SysJob]:
  39. """
  40. 查询定时任务列表
  41. Args:
  42. job (SysJob): 包含查询条件的任务
  43. Returns:
  44. List[SysJob]: 任务信息列表
  45. """
  46. return SysJobMapper.select_job_list(job)
  47. @classmethod
  48. def select_job_by_id(cls, job_id: int) -> Optional[SysJob]:
  49. """
  50. 查询定时任务
  51. Args:
  52. job_id (int): 任务ID
  53. Returns:
  54. Optional[SysJob]: 任务信息
  55. """
  56. return SysJobMapper.select_job_by_id(job_id)
  57. @classmethod
  58. @Transactional(db.session)
  59. def insert_job(cls, job: SysJob) -> bool:
  60. """
  61. 新增定时任务
  62. Args:
  63. job (SysJob): 任务信息
  64. Returns:
  65. bool: 操作结果
  66. """
  67. flag = SysJobMapper.insert_job(job)
  68. if flag:
  69. ScheduleUtil.create_schedule_job(scheduler, job)
  70. return flag > 0
  71. @classmethod
  72. @Transactional(db.session)
  73. def update_job(cls, job: SysJob) -> bool:
  74. """
  75. 更新定时任务
  76. Args:
  77. job (SysJob): 任务信息
  78. Returns:
  79. bool: 操作结果
  80. """
  81. db_job: SysJob = cls.select_job_by_id(job.job_id)
  82. flag = SysJobMapper.update_job(job)
  83. if flag > 0:
  84. sched_job = scheduler.get_job(db_job.job_key)
  85. if sched_job:
  86. scheduler.remove_job(db_job.job_key)
  87. ScheduleUtil.create_schedule_job(scheduler, job)
  88. return flag > 0
  89. @classmethod
  90. @Transactional(db.session)
  91. def delete_job_by_id(cls, job: SysJob) -> int:
  92. """
  93. 删除定时任务
  94. Args:
  95. job (SysJob): 任务信息
  96. Returns:
  97. int: 删除的任务数量
  98. """
  99. num = SysJobMapper.delete_job_by_id(job.job_id)
  100. if num > 0:
  101. scheduler.remove_job(job.job_key)
  102. return num
  103. @classmethod
  104. @Transactional(db.session)
  105. def delete_job_by_ids(cls, job_ids: List[int]):
  106. """
  107. 批量删除定时任务
  108. Args:
  109. job_ids (List[int]): 任务ID列表
  110. """
  111. for job_id in job_ids:
  112. job = cls.select_job_by_id(job_id)
  113. if job:
  114. cls.delete_job_by_id(job)
  115. @classmethod
  116. @Transactional(db.session)
  117. def pause_job(cls, job: SysJob) -> int:
  118. """
  119. 暂停定时任务
  120. Args:
  121. job (SysJob): 任务信息
  122. Returns:
  123. int: 操作结果
  124. """
  125. job.status = ScheduleStatus.PAUSED.value
  126. num = SysJobMapper.update_job(job)
  127. if num > 0:
  128. scheduler.pause_job(job.job_key)
  129. return num
  130. @classmethod
  131. @Transactional(db.session)
  132. def resume_job(cls, job: SysJob) -> int:
  133. """
  134. 恢复定时任务
  135. Args:
  136. job (SysJob): 任务信息
  137. Returns:
  138. int: 操作结果
  139. """
  140. job.status = ScheduleStatus.NORMAL.value
  141. num = SysJobMapper.update_job(job)
  142. if num > 0:
  143. sched_job = scheduler.get_job(job.job_key)
  144. if not sched_job:
  145. # APScheduler 中不存在该任务时重新创建
  146. ScheduleUtil.create_schedule_job(scheduler, job)
  147. else:
  148. scheduler.resume_job(job.job_key)
  149. return num
  150. @classmethod
  151. @Transactional(db.session)
  152. def change_job_status(cls, job: SysJob) -> int:
  153. """
  154. 更改定时任务状态
  155. Args:
  156. job (SysJob): 任务信息
  157. Returns:
  158. int: 操作结果
  159. """
  160. num = 0
  161. if job.status == ScheduleStatus.NORMAL.value:
  162. num = cls.resume_job(job)
  163. elif job.status == ScheduleStatus.PAUSED.value:
  164. num = cls.pause_job(job)
  165. return num
  166. @classmethod
  167. def run(cls, job: SysJob):
  168. """
  169. 立即执行定时任务
  170. Args:
  171. job (SysJob): 任务信息
  172. """
  173. # 先从数据库取全量信息,避免缺失 job_group 等导致 job_key 不匹配
  174. db_job = cls.select_job_by_id(job.job_id)
  175. if not db_job:
  176. raise ServiceException("任务不存在")
  177. print(f"[job_run] 请求立即执行,job_key={db_job.job_key}, invoke_target={db_job.invoke_target}")
  178. # 直接调用目标函数,确保“执行一次”必定执行
  179. module_name, method_name, args, kwargs = ScheduleUtil.parse_target(db_job.invoke_target)
  180. if not check_method_importable(module_name, method_name):
  181. raise ServiceException(f"方法不存在: {db_job.invoke_target}")
  182. func = getattr(importlib.import_module(module_name), method_name)
  183. try:
  184. func(*args, **kwargs)
  185. except Exception as e:
  186. raise ServiceException(f"立即执行任务失败: {e}")
  187. # 维持调度器中的任务(下一次仍按 cron 跑)
  188. sched_job = scheduler.get_job(db_job.job_key)
  189. if sched_job is None:
  190. ScheduleUtil.create_schedule_job(scheduler, db_job)
  191. def job_listener(event: JobEvent):
  192. """
  193. 任务监听器
  194. Args:
  195. event (JobEvent): 任务事件
  196. """
  197. job, _ = scheduler._lookup_job(event.job_id, event.jobstore)
  198. job_state = job.__getstate__()
  199. invoke_target = ScheduleUtil.unparse_target_by_funcname(
  200. job_state["func"],
  201. job_state["args"],
  202. job_state["kwargs"]
  203. )
  204. name = job_state["name"]
  205. _, group = job_state["id"].split("_")
  206. joblog = SysJobLog(
  207. job_name=name,
  208. job_group=group,
  209. invoke_target=invoke_target,
  210. create_time=datetime.now()
  211. )
  212. if event.code == EVENT_JOB_EXECUTED:
  213. pass
  214. elif event.code == EVENT_JOB_ERROR:
  215. if event.exception:
  216. joblog.status = "1"
  217. joblog.exception_info = str(event.exception)
  218. joblog.job_message = str(event.traceback)
  219. print(f"任务{event.job_id}异常:{event.exception}")
  220. elif event.code == EVENT_JOB_MISSED:
  221. pass
  222. elif event.code == EVENT_JOB_SUBMITTED:
  223. pass
  224. elif event.code == EVENT_JOB_REMOVED:
  225. pass
  226. with reg.app.app_context():
  227. SysJobLogService.insert_job_log(joblog)
  228. @app_completed.connect_via(reg.app)
  229. def init(sender: Flask):
  230. '''
  231. 初始化操作
  232. Args:
  233. sender (Flask): 消息发送者
  234. '''
  235. with sender.app_context():
  236. SysJobService.init()