Java并发——ScheduledThreadPoolExecutor分析

栏目: Java · 发布时间: 6年前

内容简介:从图中我们可以看到ScheduledThreadPoolExecutor继承ThreadPoolExecutor实现了ScheduledExecutorService接口。它相当于提供了"延迟"和"周期执行"功能的ThreadPoolExecutor,还有两个重要内部类因为其继承了ThreadPoolExecutor,调用了ThreadLocalExecutor的构造方法。当核心线程数达到ScheduledThreadPoolExecutor实现了ScheduledExecutorService接口,该接
Java并发——ScheduledThreadPoolExecutor分析

从图中我们可以看到ScheduledThreadPoolExecutor继承ThreadPoolExecutor实现了ScheduledExecutorService接口。它相当于提供了"延迟"和"周期执行"功能的ThreadPoolExecutor,还有两个重要内部类 DelayedWorkQueueScheduledFutureTask

构造方法

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }
复制代码

因为其继承了ThreadPoolExecutor,调用了ThreadLocalExecutor的构造方法。当核心线程数达到 corePoolSize ,会将任务提交给 有界阻塞队列 DelayedWorkQueue。ScheduledThreadPoolExecutor线程池最大线程数为 Integer.MAX_VALUE

主要方法

ScheduledThreadPoolExecutor实现了ScheduledExecutorService接口,该接口提供了如下方法:

// 在给定延迟后,执行Runnable任务
    public ScheduledFuture schedule(Runnable command,
                                       long delay, TimeUnit unit);
    // 在给定延迟后,执行Callable任务
    public  ScheduledFuture schedule(Callable callable,
                                           long delay, TimeUnit unit);
    // 给定延迟(initialDelay)之后,随后以给定时间(period)为周期执行任务
    // 即执行将在initialDelay之后开始,然后是initialDelay+period,
    // 再是initialDelay + 2*period,依此类推
    // 如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行
    public ScheduledFuture scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
    // 创建并执行在给定的初始延迟(initialDelay)之后首先启用的定期操作
    // 随后每个任务执行的终止和下一个执行的开始之间给定的延迟(delay)
    public ScheduledFuture scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
复制代码

第一、第二个schedule方法都是 一次性操作 只不过入参一个是Runnable,一个是callable

scheduleAtFixedRate、scheduleWithFixedDelay方法可以看如下示例

  • 示例:
  • public static void main(String[] args) {
            SimpleDateFormat sdf = new SimpleDateFormat("hh:MM:ss");
            ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
            Runnable task1 = () -> {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "测试" + sdf.format(new Date()));
            };
            Runnable task2 = () -> {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "测试" + sdf.format(new Date()));
            };
            executorService.scheduleAtFixedRate(task1, 0, 2, TimeUnit.SECONDS);
            executorService.scheduleWithFixedDelay(task2, 0, 2, TimeUnit.SECONDS);
        }
        
        输出:
        pool-1-thread-1测试11:12:37
        pool-1-thread-2测试11:12:37
        pool-1-thread-1测试11:12:40
        pool-1-thread-2测试11:12:42
        pool-1-thread-1测试11:12:43
        pool-1-thread-1测试11:12:46
        pool-1-thread-2测试11:12:47
    复制代码

    周期间隔2秒,任务耗时3秒

    scheduleAtFixedRate方法:

    1.若任务耗时超过周期间隔,则需要等待上个任务完成下个任务才能执行

    2.若任务耗时小于周期间隔,则下个任务按周期间隔执行任务

    scheduleWithFixedDelay方法:

    1.下任务等到上个任务执行完成+周期间隔之后才执行任务

  • schedule方法

    逻辑处理相差不多,以schedule方法为例分析

    public  ScheduledFuture schedule(Callable callable,
                                               long delay,
                                               TimeUnit unit) {
            if (callable == null || unit == null)
                throw new NullPointerException();
            RunnableScheduledFuture t = decorateTask(callable,
                new ScheduledFutureTask(callable,
                                           triggerTime(delay, unit)));
            delayedExecute(t);
            return t;
        }
    复制代码

    先参数校验,再构造task,最后调用delayedExecute()方法延迟执行任务

    private void delayedExecute(RunnableScheduledFuture task) {
            // 判断线程池是否处于RUNNING状态,不处于则根据相应拒绝策略拒绝任务
            if (isShutdown())
                reject(task);
            else {
                // 往阻塞队列中添加任务
                super.getQueue().add(task);
                if (isShutdown() &&
                    !canRunInCurrentRunState(task.isPeriodic()) &&
                    remove(task))
                    task.cancel(false);
                else
                    ensurePrestart();
            }
        }
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }
    复制代码

    复制代码

    复制代码

    ensurePrestart主要调用了addWorker方法,此方法主要做了两件事:
    1.循环CAS将线程池中的线程数加一
    2.新建一个线程并启用

    当线程执行任务,都会调用到任务的run()方法

    public void run() {
                // 判断是否是周期任务
                boolean periodic = isPeriodic();
                // 判断当前线程状态是否能执行任务
                if (!canRunInCurrentRunState(periodic))
                    cancel(false);
                // 不是周期性任务,直接执行任务    
                else if (!periodic)
                    ScheduledFutureTask.super.run();
                // 若是周期性任务,设置下次执行任务的时间    
                else if (ScheduledFutureTask.super.runAndReset()) {
                    // 设置任务下次执行时间
                    setNextRunTime();
                    // 将下次任务往阻塞队列中添加
                    reExecutePeriodic(outerTask);
                }
            }
    复制代码

    1.先判断该任务是否可以执行,若不能执行则调用cancel方法取消

    2.再判断是否是周期性任务,若不是直接执行

    3.最后调用runAndReset方法执行任务并重置,setNextRunTime方法设置任务下次的执行时间,reExecutePeriodic方法重新把任务添加到队列中.

    private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }
        
        void reExecutePeriodic(RunnableScheduledFuture task) {
            if (canRunInCurrentRunState(true)) {
                super.getQueue().add(task);
                if (!canRunInCurrentRunState(true) && remove(task))
                    task.cancel(false);
                else
                    ensurePrestart();
            }
        }
    复制代码

    DelayedWorkQueue

    ScheduledThreadPoolExecutor是把任务添加到DelayedWorkQueue中,它是一个基于堆的数据结构,通过ScheduledFutureTask的compareTo方法比较大小,小的排在前面,大的排在后面

    public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask x = (ScheduledFutureTask)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }
    复制代码

    首先按照time排序,time小的排在前面,大的排在后面,若time相同,则使用sequenceNumber排序,小的排在前面,大的排在后面


  • 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

    查看所有标签

    猜你喜欢:

    本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

    Zero to One

    Zero to One

    Peter Thiel、Blake Masters / Crown Business / 2014-9-16 / USD 27.00

    “This book delivers completely new and refreshing ideas on how to create value in the world.” - Mark Zuckerberg, CEO of Facebook “Peter Thiel has built multiple breakthrough companies, and ......一起来看看 《Zero to One》 这本书的介绍吧!

    Markdown 在线编辑器
    Markdown 在线编辑器

    Markdown 在线编辑器

    html转js在线工具
    html转js在线工具

    html转js在线工具

    RGB CMYK 转换工具
    RGB CMYK 转换工具

    RGB CMYK 互转工具