| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- # -*- coding: utf-8 -*-
- # @Author : YY
- import importlib
- import re
- from types import NoneType
- from typing import Callable, Dict, List, Optional, Tuple
- from apscheduler.schedulers.base import BaseScheduler
- from apscheduler.triggers.cron import CronTrigger as _CronTrigger
- from croniter import croniter
- from ruoyi_apscheduler.constant import ScheduleConstant, ScheduleStatus
- from ruoyi_apscheduler.domain.entity import SysJob
- class TargetParser:
-
- def __init__(self, target:str):
- self.target = target
-
- def parse(self):
- self.func_str = self.target
- def check_method_importable(module: str, method: str) -> bool:
- try:
- module = importlib.import_module(module)
- if hasattr(module, method):
- return True
- else:
- return False
- except Exception as e:
- return False
- class ScheduleUtil:
-
- REPLACE_EXISTING = False
-
- DEFAULT_JOBSTORE = 'default'
-
- @classmethod
- def create_schedule_job(cls, scheduler:BaseScheduler, job:SysJob):
- """
- 创建定时任务
-
- Args:
- scheduler: 调度器
- job: 系统任务
- """
- module_name, method_name, args, kwargs = cls.parse_target(job.invoke_target)
- if not check_method_importable(module_name, method_name):
- raise ImportError(
- f"Method {method_name} not found in module {module_name}"
- )
- func = getattr(importlib.import_module(module_name), method_name)
- trigger = CronTrigger.from_crontab(job.cron_expression)
-
- misfire_time, replace = cls.get_misfire_policy(job.misfire_policy)
- concurrent_num = cls.concurrent_num(job.concurrent)
-
- scheduler.add_job(
- func=func,
- args=args,
- kwargs=kwargs,
- trigger=trigger,
- id=job.job_key,
- name=job.job_name,
- max_instances=concurrent_num,
- coalesce=True,
- misfire_grace_time=misfire_time,
- replace_existing=cls.REPLACE_EXISTING or replace,
- jobstore=cls.DEFAULT_JOBSTORE
- )
- if job.status == ScheduleStatus.PAUSED.value:
- scheduler.pause_job(job.job_key)
-
- @classmethod
- def reschedule_job(cls, scheduler:BaseScheduler, job:SysJob):
- """
- 重新调度任务
-
- Args:
- scheduler: 调度器
- job: 系统任务
- """
- if job.cron_expression:
- trigger = CronTrigger.from_crontab(job.cron_expression)
- scheduler.reschedule_job(
- job_id=job.job_key,
- trigger=trigger,
- jobstore=cls.DEFAULT_JOBSTORE
- )
- else:
- scheduler.resume_job(job.job_key, jobstore=cls.DEFAULT_JOBSTORE)
-
- @classmethod
- def concurrent_num(cls, concurrent:str) -> int:
- """
- 允许并发数量
-
- Args:
- concurrent(str): 并发策略
-
- Returns:
- int: 并发数量
- """
- if concurrent == ScheduleConstant.ALLOW_CONCURRENT:
- return 10
- else:
- return 1
-
- @classmethod
- def get_misfire_policy(cls, policy:str) -> Tuple[Optional[int], bool]:
- """
- 获取任务的过期策略
-
- Args:
- misfire_policy(str): 过期策略
-
- Returns:
- Tuple[Optional[int], bool]: 过期策略, 是否替换已存在的任务
- """
- if policy == ScheduleConstant.MISFIRE_DEFAULT:
- misfire_grace_time = None
- replace_existing = True
- elif policy == ScheduleConstant.MISFIRE_IGNORE_MISFIRES:
- misfire_grace_time = None
- replace_existing = True
- elif policy == ScheduleConstant.MISFIRE_FIRE_AND_PROCEED:
- misfire_grace_time = 1
- replace_existing = False
- elif policy == ScheduleConstant.MISFIRE_DO_NOTHING:
- misfire_grace_time = None
- replace_existing = False
- else:
- misfire_grace_time = None
- replace_existing = False
- return misfire_grace_time, replace_existing
-
- @classmethod
- def parse_target(cls, target:str) -> Tuple[Callable, List, Dict]:
- """
- 解析目标方法字符串
-
- Args:
- target(str): 目标字符串
-
- Returns:
- Tuple[Str, Str, List, Dict]: 模块名, 方法名, 参数列表, 关键字参数字典
- """
- match = re.match(r"""
- (?P<module>[a-zA-Z_][a-zA-Z0-9_\.]*)\.
- (?P<method>[a-zA-Z_][a-zA-Z0-9_]*)
- (\((?P<params>.*)\))?
- """, target, re.VERBOSE)
- method_dict = match.groupdict()
- if method_dict:
- module_name = method_dict['module']
- method_name = method_dict['method']
- params = method_dict['params']
- args = []; kwargs = {}
- if params:
- for param in params.split(','):
- _param = param.strip().strip("'").strip("\"")
- if "=" in _param:
- key,value = _param.split("=")
- kwargs[key] = value
- else:
- args.append(_param)
- return module_name, method_name, args, kwargs
- raise ValueError("Invalid target string: %s" % target)
-
- @classmethod
- def unparse_target(cls, module_name, method_name, args, kwargs) -> str:
- """
- 反解析目标方法字符串
-
- Args:
- module_name(str): 模块名
- method_name(str): 方法名
- args(list): 参数列表
- kwargs(dict): 关键字参数字典
-
- Returns:
- str: 目标字符串
- """
- funcname = f"{module_name}.{method_name}"
- return cls.unparse_target_by_funcname(funcname, args, kwargs)
-
- @classmethod
- def unparse_target_by_funcname(cls, funcname, args, kwargs) -> str:
- """
- 反解析目标方法字符串-根据带模块的方法名
-
- Args:
- funcname(str): 带模块的方法名
- args(list): 参数列表
- kwargs(dict): 关键字参数字典
-
- Returns:
- str: 目标字符串
- """
- args_list = []
- for arg in args:
- if isinstance(arg, str):
- args_list.append(f"'{arg}'")
- else:
- args_list.append(str(arg))
- for k in kwargs:
- if isinstance(kwargs[k], str):
- args_list.append(f"{k}='{kwargs[k]}'")
- elif isinstance(kwargs[k], int):
- args_list.append(f"{k}={kwargs[k]}")
- args_str = ", ".join(args_list) if args_list else ""
- return f"{funcname}({args_str})"
- @classmethod
- def white_list_check(cls, invoke_target:str) -> bool:
- """
- 白名单检查
-
- Args:
- invoke_target(str): 目标字符串
-
- Returns:
- bool: 是否允许执行
- """
- module_name, _, _, _ = cls.parse_target(invoke_target)
- for pack_name in ScheduleConstant.JOB_WHITELIST_STR:
- if module_name.startswith(pack_name):
- return True
- return False
-
- @classmethod
- def check_cron_expression(cls, expr:str) -> bool:
- """
- 校验cron表达式
-
- Args:
- cron_expression(str): cron表达式
-
- Returns:
- bool: 是否合法
- """
- expr = CronTrigger.transform_from_quartz(expr)
- return croniter.is_valid(expr)
- class CronTrigger(_CronTrigger):
-
- @classmethod
- def from_crontab(cls, expr:str, timezone=None) -> 'CronTrigger':
- """
- cron表达式转换为CronTrigger
- Args:
- expr (str): cron表达式
- timezone (_type_, optional): 时区. Defaults to None.
- Raises:
- ValueError: cron表达式错误
- Returns:
- CronTrigger: CronTrigger对象
- """
- expr = cls.transform_from_quartz(expr)
-
- if not croniter.is_valid(expr):
- raise ValueError("Invalid cron expression: %s" % expr)
-
- values = expr.split()
- return cls(second=values[0], minute=values[1], hour=values[2], day=values[3], month=values[4],
- day_of_week=values[5], timezone=timezone)
-
- @classmethod
- def transform_from_quartz(cls, expr:str) -> str:
- """
- quartz表达式转换为apscheduler表达式
- Args:
- expr (str): quartz表达式
- Returns:
- str: apscheduler表达式
- """
- return expr.replace('?', '*')
|