util.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. # -*- coding: utf-8 -*-
  2. # @Author : YY
  3. import importlib
  4. import re
  5. from types import NoneType
  6. from typing import Callable, Dict, List, Optional, Tuple
  7. from apscheduler.schedulers.base import BaseScheduler
  8. from apscheduler.triggers.cron import CronTrigger as _CronTrigger
  9. from croniter import croniter
  10. from ruoyi_apscheduler.constant import ScheduleConstant, ScheduleStatus
  11. from ruoyi_apscheduler.domain.entity import SysJob
  12. class TargetParser:
  13. def __init__(self, target:str):
  14. self.target = target
  15. def parse(self):
  16. self.func_str = self.target
  17. def check_method_importable(module: str, method: str) -> bool:
  18. try:
  19. module = importlib.import_module(module)
  20. if hasattr(module, method):
  21. return True
  22. else:
  23. return False
  24. except Exception as e:
  25. return False
  26. class ScheduleUtil:
  27. REPLACE_EXISTING = False
  28. DEFAULT_JOBSTORE = 'default'
  29. @classmethod
  30. def create_schedule_job(cls, scheduler:BaseScheduler, job:SysJob):
  31. """
  32. 创建定时任务
  33. Args:
  34. scheduler: 调度器
  35. job: 系统任务
  36. """
  37. module_name, method_name, args, kwargs = cls.parse_target(job.invoke_target)
  38. if not check_method_importable(module_name, method_name):
  39. raise ImportError(
  40. f"Method {method_name} not found in module {module_name}"
  41. )
  42. func = getattr(importlib.import_module(module_name), method_name)
  43. trigger = CronTrigger.from_crontab(job.cron_expression)
  44. misfire_time, replace = cls.get_misfire_policy(job.misfire_policy)
  45. concurrent_num = cls.concurrent_num(job.concurrent)
  46. scheduler.add_job(
  47. func=func,
  48. args=args,
  49. kwargs=kwargs,
  50. trigger=trigger,
  51. id=job.job_key,
  52. name=job.job_name,
  53. max_instances=concurrent_num,
  54. coalesce=True,
  55. misfire_grace_time=misfire_time,
  56. replace_existing=cls.REPLACE_EXISTING or replace,
  57. jobstore=cls.DEFAULT_JOBSTORE
  58. )
  59. if job.status == ScheduleStatus.PAUSED.value:
  60. scheduler.pause_job(job.job_key)
  61. @classmethod
  62. def reschedule_job(cls, scheduler:BaseScheduler, job:SysJob):
  63. """
  64. 重新调度任务
  65. Args:
  66. scheduler: 调度器
  67. job: 系统任务
  68. """
  69. if job.cron_expression:
  70. trigger = CronTrigger.from_crontab(job.cron_expression)
  71. scheduler.reschedule_job(
  72. job_id=job.job_key,
  73. trigger=trigger,
  74. jobstore=cls.DEFAULT_JOBSTORE
  75. )
  76. else:
  77. scheduler.resume_job(job.job_key, jobstore=cls.DEFAULT_JOBSTORE)
  78. @classmethod
  79. def concurrent_num(cls, concurrent:str) -> int:
  80. """
  81. 允许并发数量
  82. Args:
  83. concurrent(str): 并发策略
  84. Returns:
  85. int: 并发数量
  86. """
  87. if concurrent == ScheduleConstant.ALLOW_CONCURRENT:
  88. return 10
  89. else:
  90. return 1
  91. @classmethod
  92. def get_misfire_policy(cls, policy:str) -> Tuple[Optional[int], bool]:
  93. """
  94. 获取任务的过期策略
  95. Args:
  96. misfire_policy(str): 过期策略
  97. Returns:
  98. Tuple[Optional[int], bool]: 过期策略, 是否替换已存在的任务
  99. """
  100. if policy == ScheduleConstant.MISFIRE_DEFAULT:
  101. misfire_grace_time = None
  102. replace_existing = True
  103. elif policy == ScheduleConstant.MISFIRE_IGNORE_MISFIRES:
  104. misfire_grace_time = None
  105. replace_existing = True
  106. elif policy == ScheduleConstant.MISFIRE_FIRE_AND_PROCEED:
  107. misfire_grace_time = 1
  108. replace_existing = False
  109. elif policy == ScheduleConstant.MISFIRE_DO_NOTHING:
  110. misfire_grace_time = None
  111. replace_existing = False
  112. else:
  113. misfire_grace_time = None
  114. replace_existing = False
  115. return misfire_grace_time, replace_existing
  116. @classmethod
  117. def parse_target(cls, target:str) -> Tuple[Callable, List, Dict]:
  118. """
  119. 解析目标方法字符串
  120. Args:
  121. target(str): 目标字符串
  122. Returns:
  123. Tuple[Str, Str, List, Dict]: 模块名, 方法名, 参数列表, 关键字参数字典
  124. """
  125. match = re.match(r"""
  126. (?P<module>[a-zA-Z_][a-zA-Z0-9_\.]*)\.
  127. (?P<method>[a-zA-Z_][a-zA-Z0-9_]*)
  128. (\((?P<params>.*)\))?
  129. """, target, re.VERBOSE)
  130. method_dict = match.groupdict()
  131. if method_dict:
  132. module_name = method_dict['module']
  133. method_name = method_dict['method']
  134. params = method_dict['params']
  135. args = []; kwargs = {}
  136. if params:
  137. for param in params.split(','):
  138. _param = param.strip().strip("'").strip("\"")
  139. if "=" in _param:
  140. key,value = _param.split("=")
  141. kwargs[key] = value
  142. else:
  143. args.append(_param)
  144. return module_name, method_name, args, kwargs
  145. raise ValueError("Invalid target string: %s" % target)
  146. @classmethod
  147. def unparse_target(cls, module_name, method_name, args, kwargs) -> str:
  148. """
  149. 反解析目标方法字符串
  150. Args:
  151. module_name(str): 模块名
  152. method_name(str): 方法名
  153. args(list): 参数列表
  154. kwargs(dict): 关键字参数字典
  155. Returns:
  156. str: 目标字符串
  157. """
  158. funcname = f"{module_name}.{method_name}"
  159. return cls.unparse_target_by_funcname(funcname, args, kwargs)
  160. @classmethod
  161. def unparse_target_by_funcname(cls, funcname, args, kwargs) -> str:
  162. """
  163. 反解析目标方法字符串-根据带模块的方法名
  164. Args:
  165. funcname(str): 带模块的方法名
  166. args(list): 参数列表
  167. kwargs(dict): 关键字参数字典
  168. Returns:
  169. str: 目标字符串
  170. """
  171. args_list = []
  172. for arg in args:
  173. if isinstance(arg, str):
  174. args_list.append(f"'{arg}'")
  175. else:
  176. args_list.append(str(arg))
  177. for k in kwargs:
  178. if isinstance(kwargs[k], str):
  179. args_list.append(f"{k}='{kwargs[k]}'")
  180. elif isinstance(kwargs[k], int):
  181. args_list.append(f"{k}={kwargs[k]}")
  182. args_str = ", ".join(args_list) if args_list else ""
  183. return f"{funcname}({args_str})"
  184. @classmethod
  185. def white_list_check(cls, invoke_target:str) -> bool:
  186. """
  187. 白名单检查
  188. Args:
  189. invoke_target(str): 目标字符串
  190. Returns:
  191. bool: 是否允许执行
  192. """
  193. module_name, _, _, _ = cls.parse_target(invoke_target)
  194. for pack_name in ScheduleConstant.JOB_WHITELIST_STR:
  195. if module_name.startswith(pack_name):
  196. return True
  197. return False
  198. @classmethod
  199. def check_cron_expression(cls, expr:str) -> bool:
  200. """
  201. 校验cron表达式
  202. Args:
  203. cron_expression(str): cron表达式
  204. Returns:
  205. bool: 是否合法
  206. """
  207. expr = CronTrigger.transform_from_quartz(expr)
  208. return croniter.is_valid(expr)
  209. class CronTrigger(_CronTrigger):
  210. @classmethod
  211. def from_crontab(cls, expr:str, timezone=None) -> 'CronTrigger':
  212. """
  213. cron表达式转换为CronTrigger
  214. Args:
  215. expr (str): cron表达式
  216. timezone (_type_, optional): 时区. Defaults to None.
  217. Raises:
  218. ValueError: cron表达式错误
  219. Returns:
  220. CronTrigger: CronTrigger对象
  221. """
  222. expr = cls.transform_from_quartz(expr)
  223. if not croniter.is_valid(expr):
  224. raise ValueError("Invalid cron expression: %s" % expr)
  225. values = expr.split()
  226. return cls(second=values[0], minute=values[1], hour=values[2], day=values[3], month=values[4],
  227. day_of_week=values[5], timezone=timezone)
  228. @classmethod
  229. def transform_from_quartz(cls, expr:str) -> str:
  230. """
  231. quartz表达式转换为apscheduler表达式
  232. Args:
  233. expr (str): quartz表达式
  234. Returns:
  235. str: apscheduler表达式
  236. """
  237. return expr.replace('?', '*')