Schedule 调度系统设计(单机版)

栏目: Java · 发布时间: 6年前

内容简介:鉴于对Spring实现的@Scheduled的调度和SchedulerFactoryBean的研究发现,基于Spring的调度封装虽满足了大多需求,但为了简化使用方式使得Job并不容易得到控制,导致开发对Job的控制和运维成本上升;下面是本人基于Quartz和Spring及Annotation开发的单机版调度配置,满足单机调度的大部分需求和管理、运维操作并解放对配置文件的繁琐操作;通过页面可对作业进行统一的监控和管理(触发、暂停、恢复、动态添加、参数下发)及报警等操作;简要列出以下功能点:

鉴于对Spring实现的@Scheduled的调度和SchedulerFactoryBean的研究发现,基于Spring的调度封装虽满足了大多需求,但为了简化使用方式使得Job并不容易得到控制,导致开发对Job的控制和运维成本上升;下面是本人基于Quartz和Spring及Annotation开发的单机版调度配置,满足单机调度的大部分需求和管理、运维操作并解放对配置文件的繁琐操作;

功能点描述

功能点 Spring @Scheduled 自定义@SchedulerJob
可控制
可运维
可页面化
可统一跟踪业务状态
可统一跟踪调度状态
支持cron表达式
支持类似ScheduledExecutorService的定时调度

代码演示

  • 基于注解进行作业配置
@Slf4j(topic = "dynamic-datasource")
@Component
public class DetectJob {
   /**
     * 作业配置 value=作业名,group=作业所属组,init=true为容器创建完毕时立即触发
     */
    @SchedulerJob(value = "detectDataSource",cron = "${cron.detect.data.source}",group = "dynamic-datasource",
            descrption="动态数据源切换",init = true)
    public void detectDataSource(){
        log.info(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"dynamic-datasource","detectDataSource"));
    }
}
复制代码
##cron表达式
cron.detect.data.source=1 * * * * ? 
复制代码
  • 代码执行效果

Schedule 调度系统设计(单机版)

页面演示

通过页面可对作业进行统一的监控和管理(触发、暂停、恢复、动态添加、参数下发)及报警等操作;

简要列出以下功能点:

  • 作业展示
    Schedule 调度系统设计(单机版)
  • 作业运维报警
    Schedule 调度系统设计(单机版)
  • 作业参数下发
    Schedule 调度系统设计(单机版)
  • 作业事件跟踪
Schedule 调度系统设计(单机版)

设计思路

  • 应当满足什么业务场景
  • 如何简化操作、降低开发成本
  • 如何对业务、系统功能进行监控、控制、运维
  • 如何设计才能便于后期业务和功能的扩展

功能设计

  • 设计思路
    • 如何获取方法上的注解及配置
    • 如何实现通过Quartz定时执行注解方法
    • 如何对每个方法上的注解进行统一的资源管理和监控、控制、运维
    • 如何对调度进行性能的优化
  • 功能点分析
    • 基本调度
      • 初始化立即调度
      • 人工或系统控制调度(任务创建后不执行调度,控制权交给外部)
      • 定时执行调度(及按照指定cron配置周期调度)
      • 是否可并发执行
    • 资源管理
      • 统一管理系统内全部的配置资源(作业所属组、描述、cron表达式、是否开启报警、是否开启监控等)
    • 调度管理
      • 调度状态管理(系统状态、业务状态)
      • 调度行为管理
      • 作业业务参数下发(弥补业务过失)
      • 调度跟踪、业务跟踪
      • 调度报警、业务报警
  • 基本功能点实现
    • 注解配置
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface SchedulerJob {
  /**
   * 作业名
   * @return
   */
  String value();

  /**
   * 表达式
   * @return
   */
  String cron();

  /**
   * 是否初始化时立即执行
   * @return
   */
  boolean init() default false;

  /**
   * 是否人为控制
   * @return
   */
  boolean control() default false;

  /**
   * 所属组
   * @return
   */
  String group() default "default";

  /**
   * 作业描述
   * @return
   */
  String descrption() default "";

  /**
   * 作业执行器
   * @return
   */
  Class jobClass() default SimpleJob.class;
}

@Target({ElementType.METHOD,ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface SchedulerJobs {
    /**
     * 注解集
     * @return
     */
    SchedulerJob[] value();
}
复制代码
  • 调度创建
@Slf4j
@Configuration
public class SchedulerBean implements InitializingBean, DisposableBean {

    private Scheduler scheduler;

   @Value("#{schdulerProperties['quartz.thread.count']}")
    private String threadCount;

    @Override
    public void destroy() throws Exception {
        scheduler.shutdown();
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        createScheduler();
    }

    /**
     * 创建调度
     * @throws SchedulerException
     */
    public void createScheduler() throws SchedulerException {
        StdSchedulerFactory factory = new StdSchedulerFactory();
        factory.initialize(getBaseQuartzProperties());
        this.scheduler = factory.getScheduler();
    }

    /**
     * 作业配置
     * @return
     */
    private Properties getBaseQuartzProperties() {
        Properties result = new Properties();
        result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
        result.put("org.quartz.threadPool.threadCount", threadCount);
        result.put("org.quartz.scheduler.threadName", "baiyunpeng-scheduler");
        result.put("org.quartz.scheduler.instanceName", "baiyunpeng-scheduler");
        result.put("org.quartz.jobStore.misfireThreshold", "1");
        return result;
    }

    /**
     * 创建作业
     * @param jobParam
     * @throws SchedulerException
     */
    public void createJob(JobParam jobParam) throws SchedulerException {
        SchedulerJob schedulerJob = jobParam.getSchedulerJob();
        JobDetail jobDetail = JobBuilder.newJob(schedulerJob.jobClass())
                .withIdentity(jobParam.getJobKey())
                .withDescription(jobParam.getJobKey().getName())
                .build();
        addJobDataMap(jobDetail,jobParam.getTarget(),jobParam.getTargetMethod());
        this.scheduler.scheduleJob(jobDetail,createTrigger(jobParam.getJobKey(),jobParam.getCron()));
    }

    /**
     * 创建触发器
     * @param jobKey
     * @param cron
     * @return
     */
    private Trigger createTrigger(JobKey jobKey, String cron) {
        return TriggerBuilder.newTrigger().withIdentity(jobKey.getName(),jobKey.getGroup())
                .withSchedule(CronScheduleBuilder.cronSchedule(cron)
                .withMisfireHandlingInstructionDoNothing()).build();
    }

    /**
     * 添加作业map
     * @param jobDetail
     * @param target
     * @param targetMethod
     */
    private void addJobDataMap(JobDetail jobDetail, Object target, Method targetMethod) {
        JobDataMap jobDataMap = jobDetail.getJobDataMap();
        jobDataMap.put("executeJob",target);
        jobDataMap.put("executeMethod",targetMethod);
    }

    public Scheduler getScheduler() {
        return scheduler;
    }

    public void start() throws SchedulerException {
        this.scheduler.start();
    }
}
复制代码
  • 简单的作业执行器创建
/**
 * 作业抽象类
 * @author baiyunpeng
 */
public abstract class ExecuteJob implements Job {

    protected Object executeJob;

    protected Method executeMethod;

    public void setExecuteJob(Object executeJob) {
        this.executeJob = executeJob;
    }

    public void setExecuteMethod(Method executeMethod) {
        this.executeMethod = executeMethod;
    }
}  

/**
 * 非并发执行
 * @author baiyunpeng
 */
@Slf4j
public class SimpleJob extends ExecuteJob {
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        try {
            executeMethod.invoke(executeJob);
        } catch (IllegalAccessException | InvocationTargetException e) {
            log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,this.getClass().getName(), ExceptionUtils.getRootCauseMessage(e)));
        }
    }
}

/**
 * 可并发执行
 * @author baiyunpeng
 */
@Slf4j
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class ConcurrentJob extends ExecuteJob{
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        try {
            executeMethod.invoke(executeJob);
        } catch (IllegalAccessException | InvocationTargetException e) {
            log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,this.getClass().getName(), ExceptionUtils.getRootCauseMessage(e)));
        }
    }
}
复制代码
  • 作业创建
/**
       * 作业配置解析
       * @param scheduled
       * @param method
       * @param bean
      */
    protected void processScheduled(SchedulerJob scheduled, Method method, Object bean) {
            Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
            String cron = scheduled.cron();
            if(StringUtils.hasText(cron)){
                if(Objects.nonNull(this.embeddedValueResolver)){
                    cron = this.embeddedValueResolver.resolveStringValue(cron);
                }
                jobParams.add(new JobParam(scheduled,bean,invocableMethod,new JobKey(scheduled.value(),scheduled.group()),cron));
            }
        }
    
    /**
     * 作业初始化
     */
    private void finishRegister() {
        if(Objects.isNull(this.schedulerBean)){
            SchedulerBean schedulerBean = beanFactory.getBean(SCHEDULER_BEAN, SchedulerBean.class);
            AssertUtil.assertNull(schedulerBean, SystemErrorCode.NS000000,"the scheduler bean init error");
            this.schedulerBean = schedulerBean;
            try {
                jobParams.parallelStream().forEach(jobParam -> {
                    try {
                        this.schedulerBean.createJob(jobParam);
                        SchedulerJob schedulerJob = jobParam.getSchedulerJob();
                        if(!schedulerJob.control()){
                            if (schedulerJob.init()){
                                this.schedulerBean.getScheduler().triggerJob(jobParam.getJobKey());
                            }
                        }else {
                            this.schedulerBean.getScheduler().pauseJob(jobParam.getJobKey());
                        }
                    } catch (SchedulerException e) {
                        log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"the scheduler job init error", ExceptionUtils.getRootCauseMessage(e)));
                       System.exit(1);
                    }
                });
                schedulerBean.start();
            }catch (Exception e){
                log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"the scheduler job init error", ExceptionUtils.getRootCauseMessage(e)));
                System.exit(1);
            }
        }
    }
    
复制代码

总结

  • 如何异步执行方法,首先得获取该方法的实例
  • 如何定时执行,首先创建并获取定时器
  • 如何基于Quarzt监控作业执行,需获Schedule和Jobkey等

后续更新

  • 如何统一监控调度状态和业务状态
  • 如何解决work线程池被任务阻塞的问题
  • 如何做任务补发(note:除了misfire机制外还有哪些做法)
  • 如何基于单机调度实现基本的分布式调度
  • 分布式调度需要考虑的点有哪些

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web Applications (Hacking Exposed)

Web Applications (Hacking Exposed)

Joel Scambray、Mike Shema / McGraw-Hill Osborne Media / 2002-06-19 / USD 49.99

Get in-depth coverage of Web application platforms and their vulnerabilities, presented the same popular format as the international bestseller, Hacking Exposed. Covering hacking scenarios across diff......一起来看看 《Web Applications (Hacking Exposed)》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具