并发编程——ScheduledThreadPoolExecutor

2023-09-21 15:01:36

ScheduledThreadPoolExecutor介绍

ScheduledThreadPoolExecutor是ThreadPoolExecutor的一个子类,在线程池的基础上实现了延迟执行任务以及周期性执行任务的功能。

Java最早提供的是Timer类执行定时任务,单线程串行的,效率非常低。在不采用第三方框架时,需要执行定时任务,ScheduledThreadPoolExecutor是比较好的选择。

ScheduledThreadPoolExecutor就是在线程池的基础上实现的定时执行任务的功能。

ScheduledThreadPoolExecutor提供了比较常用的四种方法执行任务:

  • execute:跟普通线程池执行没区别。
  • schedule:可以指定延迟时间,一次性执行任务。
  • scheduleAtFixedRate:可以让任务在固定的周期下执行。
  • scheduleWithFixedDelay:可以让任务在固定的周期下执行,不同与 scheduleAtFixedRate 的是,如果一次任务执行时长超过周期时间,下一次任务会在该次任务执行结束时间基础上,计算执行延时。

代码示例:

public static void main(String[] args) throws InterruptedException {
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);

    //1. execute
    executor.execute(() -> {
        System.out.println("execute");
    });

    //2. schedule
    executor.schedule(() -> {
        System.out.println("schedule");
    },2000,TimeUnit.MILLISECONDS);

    //3. AtFixedRate
    executor.scheduleAtFixedRate(() -> {
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("at:" + System.currentTimeMillis());
    },3000,2000,TimeUnit.MILLISECONDS);

    //4. WithFixedDelay
    executor.scheduleWithFixedDelay(() -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("with:" + System.currentTimeMillis());
    },3000,2000,TimeUnit.MILLISECONDS);
}

如果实际开发应用需要使用到定时任务,推荐一些开源的框架,比如Quartz,XXL-job,Elastic-Job。

ScheduledFutureTask

ScheduledFutureTask是ScheduledThreadPoolExecutor中用于实现可延时执行任务和周期性执行任务的特性而引入的。在ScheduledThreadPoolExecutor中,当一个任务被提交后,它会被转换成ScheduledFutureTask类,该类继承了FutureTask并重写了run方法。 间接的实现了Delayed接口,让任务可以放到延迟队列中,并且基于二叉堆做排序。

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {

    // 就是计数器,每个任务进来时,都会有一个全局唯一的序号。
    // 如果任务的执行时间一模一样,比对sequenceNumber
    private final long sequenceNumber;

    // 任务执行的时间,单位是纳秒
    private long time;

    /*
     * period == 0:表示一次性执行的任务
     * period > 0:表示使用的是At!
     * period < 0:表示使用的是With!
     */
    private final long period;

    // 周期性实行任务时,引用具体任务,方便后面重新扔到阻塞队列
    RunnableScheduledFuture<V> outerTask = this;


    // 有参构造。schedule时使用当前有参重载封装任务!
    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    // At,With时,使用当前有参重载封装任务!
    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
        super(r, result);
        this.time = ns;
        this.period = period;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    // 不考虑这个,有返回结果
    ScheduledFutureTask(Callable<V> callable, long ns) {
        super(callable);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    // 实现Delayed接口重写的方法,执行的时间
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - now(), NANOSECONDS);
    }

    // 实现Delayed接口重写的方法,比较的方式,放在二叉堆内部
    public int compareTo(Delayed other) {
        if (other == this) // compare zero if same object
            return 0;
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }

    // 判断是否是周期执行
    public boolean isPeriodic() {
        return period != 0;
 
    }
    // 省略部分代码
}

四种方法执行任务

execute方法

这个方法最终会调用schedule。

public void execute(Runnable command) {
    schedule(command, 0, NANOSECONDS);
}

schedule方法

public ScheduledFuture<?> schedule(Runnable command, long delay,TimeUnit unit) {
    // 非空判断!
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));
    // 延迟执行
    delayedExecute(t);
    return t;
}

schedule方法将普通的command封装为ScheduledFutureTask, decorateTask方法默认情况下,什么都没做,就是返回了ScheduledFutureTask,可以作为扩展方法,在这个位置修改任务需要执行的具体细节。

// 将command任务封装为ScheduledFutureTask
ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
    // 任务要执行的系统时间
    this.time = ns;
    // 任务是否是周期性执行
    this.period = 0;
    // 基于AtomicLong计算序列化。
    this.sequenceNumber = sequencer.getAndIncrement();
}
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}

执行延迟任务,先会检查线程池是否为RUNNING状态,如果不是,执行拒绝策略。否则调用阻塞队列,将任务添加进去,将任务扔到了延迟队列中(二叉堆)。在添加任务到延迟队列的数组时,会记录当前任务所在的索引位置,方便取消任务时,从数组中移除。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);
        if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

如果任务添加到了阻塞队列中,忽然线程池不是RUNNING状态,那么此时这个任务是否执行?
periodic - true:代表是周期性执行的任务
periodic - false:代表是一次性的延迟任务


boolean canRunInCurrentRunState(boolean periodic) {
    return isRunningOrShutdown(periodic ?
                               continueExistingPeriodicTasksAfterShutdown :
                               executeExistingDelayedTasksAfterShutdown);
}

// 判断当前任务到底执行不执行
final boolean isRunningOrShutdown(boolean shutdownOK) {
    // 重新拿到线程池的ctl
    int rs = runStateOf(ctl.get());
    // 如果线程池是RUNNING,返回true
    // 如果线程池状态是SHUTDOWN,那么就配合策略返回true、false
    return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}

// 准备执行任务
void ensurePrestart() {
    // 获取线程池中的工作线程个数。
    int wc = workerCountOf(ctl.get());
    // 如果工作线程个数,小于核心线程数,
    if (wc < corePoolSize)
        // 创建核心线程,一致在阻塞队列的位置take,等待拿任务执行
        addWorker(null, true);
    // 如果工作线程数不小于核心线程,但是值为0,创建非核心线程执行任务
    else if (wc == 0)
        // 创建非核心线程处理阻塞队列任务,而且只要阻塞队列没有任务了,当前线程立即销毁
        addWorker(null, false);
}

查看任务放到延迟队列后,是如何被工作线程取出来执行的

执行addWorker方法,会创建一个工作线程,工作线程在创建成功后,会执行start方法。在start方法执行后,会调用Worker的run方法,最终执行了runWorker方法,在runWorker方法中会在阻塞队列的位置执行take方法一直阻塞拿Runnable任务,拿到任务后就返回,然后执行。

所以需要查看的就是延迟队列的take方法,套路和DelayQueue没有区别

在拿到任务后,会执行任务,也就是执行任务的run方法。

// 执行任务
public void run() {
    // 获取任务是否是周期执行
    // true:周期执行
    // false:一次的延迟执行
    boolean periodic = isPeriodic();
    // 再次判断线程池状态是否不是RUNNING,如果不是RUNNING,并且SHUTDOWN情况也不允许执行,或者是STOP状态
    if (!canRunInCurrentRunState(periodic))
        // 取消任务
        cancel(false);
    else if (!periodic)
        // 当前任务是一次性的延迟执行。执行任务具体的run方法,执行完,没了………………
        ScheduledFutureTask.super.run();
    // 后面是周期执行、省略部分代码…………
}

scheduleAtFixedRate和scheduleWithFixedDelay分析

在执行方法的初期,封装任务时:

  • At会将period设置为正数,代表固定周期执行表
  • With会将period设置为负数,代表在执行任务完毕后,再计算下次执行的时间
// 固定周期执行任务,如果任务的执行时间,超过周期,任务执行完,立即执行下一次任务。
public ScheduledFuture<?> scheduleAtFixedRate(
                        Runnable command,    // 具体任务
                        long initialDelay,   // 第一次执行的时间
                        long period,         // 周期执行时间
                        TimeUnit unit) {     // 时间单位
    // 阿巴阿巴~~~
    if (command == null || unit == null)
        throw new NullPointerException();
    // 如果传递的周期小于等于0,直接抛异常
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    // 扩展
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    // 将任务设置给outerTask属性,方便后期重新扔到延迟队列
    sft.outerTask = t;
    // 嗯哼~
    delayedExecute(t);
    return t;
}
// 固定周期执行任务,会在任务执行完毕后,再计算下次执行的时间。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

最终两个方法都会调用delayedExecute方法区将任务扔到阻塞队列,并尝试是否需要构建工作线程,从而执行任务

工作线程会监听延迟队列,拿到任务后会调用任务的run方法

public void run() {
    // 查看At和With可确定任务是周期执行
    boolean periodic = isPeriodic();
    // 线程池状态对不!!
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        // 一次性的延迟执行
        ScheduledFutureTask.super.run();
    // 到这,先执行任务
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 设置下一次任务的运行时间
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

// 计算任务下次执行时间,time是任务执行的时间,而这里是time的上次的执行时间
private void setNextRunTime() {
    // 拿到当前任务的period
    long p = period;
    // period > 0:At
    if (p > 0)
        // 直接拿上次执行的时间,添加上周期时间,来计算下次执行的时间。
        time = time + p;
    else
        // period < 0:With
        // 任务执行完,拿当前系统时间计算下次执行的时间点
        time = now() + p;
}

// 重新将任务扔到延迟队列中
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    // 线程池状态的判断
    if (canRunInCurrentRunState(true)) {
        // 将任务扔到了延迟队列中
        super.getQueue().add(task);
        // 扔到延迟队列后,再次判断线程池状态,是否需要取消任务!
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            // 需要创建线程不~
            ensurePrestart();
    }
}
更多推荐

SpringBoot文件上传-阿里云OSS

1.打开阿里云说明:登录阿里云账号2.点击AccessKey管理3.创建AccessKey说明:记得复制accessKeyId,accessKeySecret并保存起来4.点击对象存储OSS5.创建Bucket说明:创建储存桶6.查看SDK示例7.Java简单上传8.上传文件流说明:以上传图片为例9.copy代码说明:

自动化测试(五):自动化测试框架的搭建和基于yaml热加载的测试用例的设计

该部分是对自动化测试专栏前四篇的一个补充,本次参考以下文章实现一个完整的谷歌翻译接口自动化测试:[1]【python小脚本】Yaml配置文件动态加载[2]【python做接口测试的学习记录day8——pytest自动化测试框架之热加载和断言封装】目标:框架封装完成后,不需要写python脚本,只需要增加yaml测试用例

LuatOS-SOC接口文档(air780E)--adc - 数模转换

常量常量类型解释adc.ADC_RANGE_3_6numberair105的ADC分压电阻开启,范围0~3.76Vadc.ADC_RANGE_1_8numberair105的ADC分压电阻关闭,范围0~1.88Vadc.ADC_RANGE_3_8numberair780E开启ADC0,1分压电阻,范围0~3.8Vadc

深度融入垂直行业是物联网未来发展必由之路

三年疫情,打断了很多企业的发展进程。但是疫情已过似乎整个业界生态有了一个很大变化。有一个朋友前一段时间参加深圳电子展后有一个感悟,说的很好:“疫情后有很大变化,疫情后,整个环境状态和疫情前有很大不同。无论企业,个人,还是外部环境,感觉都有变化。这种变化是无形的,但有时又感觉能感同身受。”其实这种感觉,就如人大病初愈或者

【计算机网络】——应用层

//图片取自王道仅做交流学习一、基本概念应用层概述协议是网络层次模型中多台主机之间同层之间进行通信的规则。是一个水平概念垂直空间上,向下屏蔽下层细节,向上提供服务接入,多台主机之间同层之间形成一条逻辑信道。应用层的功能:应用层的重要协议:文件传输、访问和管理FTP电子邮件SMTP、POP3虚拟终端HTTP查询服务和远程

easyExcel读取excel文件

简介Java解析、生成Excel比较有名的框架有Apachepoi、jxl。但他们都存在一个严重的问题就是非常的耗内存,poi有一套SAX模式的API可以一定程度的解决一些内存溢出的问题,但POI还是有一些缺陷,比如07版Excel解压缩以及解压后存储都是在内存中完成的,内存消耗依然很大。easyexcel重写了poi

Django系列:Django简介与MTV架构体系概述

Django系列Django简介与MTV架构体系概述作者:李俊才(jcLee95):https://blog.csdn.net/qq_28550263邮箱:291148484@163.com本文地址:https://blog.csdn.net/qq_28550263/article/details/132890054【

本地电脑搭建SFTP服务器,并实现公网访问

本地电脑搭建SFTP服务器,并实现公网访问文章目录本地电脑搭建SFTP服务器,并实现公网访问1.搭建SFTP服务器1.1下载freesshd服务器软件1.3启动SFTP服务1.4添加用户1.5保存所有配置2.安装SFTP客户端FileZilla测试2.1配置一个本地SFTP站点2.2内网连接测试成功3.使用cpolar

react | react-router-dom v6 结合 antd 面包屑 |嵌套路由

大致需求图示如上:需求:1.点击page2默认进入`/page2/中国`2.在中国界面选择省份,进入浙江省3.在浙江省中选择市,进入杭州市4.选择大学,进入浙江大学5.点击面包屑中某个tab,进入对应tab界面,组件/路由切换6.路由携带参数,如面包屑中的数据实现过程:在page2组件内(仅供参考):<divclass

代理IP和Socks5代理:跨界电商与爬虫的智能引擎

跨界电商,作为全球市场的一部分,对数据的需求越来越大。同时,随着互联网的发展,爬虫技术也在不断演进,成为了跨界电商的关键工具之一。然而,随之而来的是网站的反爬虫机制和网络安全风险。在这种情况下,代理IP和Socks5代理应运而生,为企业提供了数据采集的解决方案和网络安全的保护。本文将深入研究代理IP和Socks5代理在

Python爬虫有哪些库,分别怎么用

目录Python常用爬虫库代码示例requests+BeautifulSoupScrapySeleniumPyQueryAxiosrequests-htmlpyppeteer总结Python是一种非常流行的编程语言,因其易学易用和广泛的应用而受到开发者的喜爱。在Python中,有许多库可以用于爬虫程序的开发,这些库可以

热文推荐