Scheduled和Async详解

Lou.Chen
大约 12 分钟

1. @Scheduled使用

1.1 概述

常用调度框架:

  • Spring Framework 的 Spring Task 模块,提供了轻量级的定时任务的实现。

    • 即**@Scheduled**方式
  • Spring Boot 2.0 版本,整合了 Quartz 做业调度框架,提供了功能强大的定时任务的实现。

  • Elastic-Job

    惟品会基于 Elastic-Job 之上,演化出了 Saturn 项目。

  • XXL-JOB

目前国内采用 Elastic-Job 和 XXL-JOB 为主。

1.2 基本使用

pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
		<!--工具类-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.5.1</version>
        </dependency>

通过注解开启定时任务

在启动类上或者容器上加@EnableScheduling注解

@EnableScheduling
@Slf4j
@Component
public class ScheduledTest {}

多线程定时任务配置

⚠️**当没有指定线程数的时候,即默认配置,则使用的单线程串行执行所有的任务。**当一个任务执行时,其他任务会等待该任务执行完毕后再执行。所以一般不用单线程方式

@Configuration
public class ScheduledConfig {
    @Bean
    public SchedulingConfigurer schedulingConfigurer() {
        return new SchedulingConfigurer() {
            @Override
            public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
                //配置十个线程(视具体任务数)
                taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
            }
        };
    }
}

也可通过配置文件中配置,任选一种即可:

  • spring.task.scheduling 配置项,Spring Task 调度任务的配置,对应 TaskSchedulingProperties 配置类。
  • Spring Boot TaskSchedulingAutoConfiguration 自动化配置类,实现 Spring Task 的自动配置,建立 ThreadPoolTaskScheduler 基于线程池的任务调度器。本质上,ThreadPoolTaskScheduler 是基于 ScheduledExecutorService 的封装,加强在调度时间上的功能。

注意spring.task.scheduling.shutdown 配置项,是为了实现 Spring Task 定时任务的优雅关闭。咱们想象一下,若是定时任务在执行的过程当中,若是应用开始关闭,把定时任务须要使用到的 Spring Bean 进行销毁,例如说数据库链接池,那么此时定时任务还在执行中,一旦须要访问数据库,可能会致使报错。

  • 因此,经过配置 await-termination = true ,实现应用关闭时,等待定时任务执行完成。这样,应用在关闭的时,Spring 会优先等待 ThreadPoolTaskScheduler 执行完任务以后,再开始 Spring Bean 的销毁。
  • 同时,又考虑到咱们不可能无限等待定时任务所有执行结束,所以能够配置 await-termination-period = 60 ,等待任务完成的最大时长,单位为秒。具体设置多少的等待时长,能够根据本身应用的须要。
# Spring Task 调度任务的配置,对应 TaskSchedulingProperties 配置类
# 线程池大小。默认为 1 ,根据本身应用来设置
spring.task.scheduling.pool.size=10
# 线程池的线程名的前缀。默认为 scheduling- ,建议根据本身应用来设置
spring.task.scheduling.thread-name-prefix=scheduled-test
# 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
spring.task.scheduling.shutdown.await-termination=true
#  应用关闭时,等待任务完成的最大时长,单位为秒。默认为 0 ,根据本身应用来设置
spring.task.scheduling.shutdown.await-termination-period=5

基本任务示例

@EnableScheduling
@Slf4j
@Component
public class ScheduledTest {

    /**
     * 每两秒执行一次
     */
    @Scheduled(cron = "0/2 * * * * ? ")
    public void task1() {
        log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
    }

    /**
     * rate速率
     * 第一次任务和第二次任务开始的时间间隔(可能第一次任务还没结束,第二次就接着执行)
     * 毫秒为单位(这里为2s)
     */
    @Scheduled(fixedRate = 2000)
    public void fixedRate() {
        log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
    }

    /**
     * delay延期
     * 第一次任务的结束时间和第二次任务开始的时间间隔(即第一次任务已经执行完毕后再执行第二次任务的间隔)
     * 毫秒为单位(这里为3s)
     */
    @Scheduled(fixedDelay = 3000)
    public void fixedDelay() {
        log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
    }

    /**
     * initialDelay 表示首次任务启动的延迟时间
     */
    @Scheduled(initialDelay = 2000,fixedDelay = 2000)
    public void initialDelay() {
        log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
    }

    /**
     * cron表达式
     * [秒] [分] [小时] [日] [月] [周] [年](可省略)
     * 在这里 / 用于递增触发,如在秒上面设置”0/5” 表示从0秒开始,每增5秒触发(0,5,10,15……)
     */
    @Scheduled(cron = "0/5 * * * * *")
    public void cron() {
        log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
    }

}

1.3 注解参数详解

常用属性:

  • 首先使用 @Scheduled 注解开启一个定时任务。

  • fixedRate 表示任务执行之间的时间间隔,具体是指两次任务的开始时间间隔,即第二次任务开始时,第一次任务可能还没结束。

  • fixedDelay 表示任务执行之间的时间间隔,具体是指本次任务结束到下次任务开始之间的时间间隔。

  • initialDelay 表示首次任务启动的延迟时间。

  • 所有时间的单位都是毫秒。

其他属性:

  • fixedDelayString:与fixedDelay含义一样,只是参数类型变为String;
  • fixedRateString: 与fixedRate的含义一样,只是将参数类型变为String;
  • initialDelayString:与initialDelay的含义一样,只是将参数类型变为String;
  • zone :解析 Spring Cron 表达式的所属的时区。默认状况下,使用服务器的本地时区。
@Scheduled(fixedRate=2000):上一次开始执行时间点后2秒再次执行;
@Scheduled(fixedDelay=2000):上一次执行完毕时间点后2秒再次执行;
@Scheduled(initialDelay=1000, fixedDelay=2000):第一次延迟1秒执行,然后在上一次执行完毕时间点后2秒再次执行;
@Scheduled(cron="* * * * * ?"):按cron规则执行。

cron表达式

[秒] [分] [小时] [日] [月] [周] [年](可省略)

序号说明是否必填允许填写的值允许的通配符
10-59- * /
20-59- * /
30-23- * /
41-31- * ? / L W
51-12 or JAN-DEC- * /
61-7 or SUN-SAT- * ? / L #
71970-2099- * /

月份中的日期和星期可能会起冲突,因此在配置时这两个得有一个是 ?

在线Cron表达式生成器open in new window

通配符含义:

  • ? 表示不指定值,即不关心某个字段的取值时使用。需要注意的是,月份中的日期和星期可能会起冲突,因此在配置时这两个得有一个是 ?
  • * 表示所有值,例如:在秒的字段上设置 *,表示每一秒都会触发
  • , 用来分开多个值,例如在周字段上设置 “MON,WED,FRI” 表示周一,周三和周五触发
  • - 表示区间,例如在秒上设置 “10-12”,表示 10,11,12秒都会触发
  • / 用于递增触发,如在秒上面设置”5/15” 表示从5秒开始,每增15秒触发(5,20,35,50)
  • # 序号(表示每月的第几个周几),例如在周字段上设置”6#3”表示在每月的第三个周六,(用 在母亲节和父亲节再合适不过了)
  • 周字段的设置,若使用英文字母是不区分大小写的 ,即 MON 与mon相同
  • L 表示最后的意思。在日字段设置上,表示当月的最后一天(依据当前月份,如果是二月还会自动判断是否是润年), 在周字段上表示星期六,相当于”7”或”SAT”(注意周日算是第一天)。如果在”L”前加上数字,则表示该数据的最后一个。例如在周字段上设置”6L”这样的格式,则表示”本月最后一个星期五”
  • W 表示离指定日期的最近工作日(周一至周五),例如在日字段上设置”15W”,表示离每月15号最近的那个工作日触发。如果15号正好是周六,则找最近的周五(14号)触发, 如果15号是周未,则找最近的下周一(16号)触发,如果15号正好在工作日(周一至周五),则就在该天触发。如果指定格式为 “1W”,它则表示每月1号往后最近的工作日触发。如果1号正是周六,则将在3号下周一触发。(注,”W”前只能设置具体的数字,不允许区间”-“)
  • LW 可以一组合使用。如果在日字段上设置”LW”,则表示在本月的最后一个工作日触发(一般指发工资 )

2. @Async使用

2.1 概述

在Java应用中,绝大多数情况下都是通过同步的方式来实现交互处理的;但是在处理与第三方系统交互的时候,容易造成响应迟缓的情况,之前大部分都是使用多线程来完成此类任务,其实,在spring 3.x之后,就已经内置了**@Async**来完美解决这个问题

何为异步调用?

在解释异步调用之前,我们先来看同步调用的定义;同步就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果。 异步调用则是只是发送了调用的指令,调用者无需等待被调用的方法完全执行完毕;而是继续执行下面的流程。

例如, 在某个调用中,需要顺序调用 A, B, C三个过程方法;如他们都是同步调用,则需要将他们都顺序执行完毕之后,方算作过程执行完毕; 如B为一个异步的调用方法,则在执行完A之后,调用B,并不等待B完成,而是执行开始调用C,待C执行完毕之后,就意味着这个过程执行完毕了。

2.2 案例准备

pom.xml

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.23</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--工具类-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.5.1</version>
        </dependency>

application.yaml

# 应用名称
spring:
  application:
    name: demo
  #      数据库配置
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
    password: 123456
    username: root
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
  #显示sql
  jpa:
    show-sql: true
    #数据库为mysql
    database: mysql
    #数据库平台
    database-platform: org.hibernate.dialect.MySQL57Dialect
# 应用服务 WEB 访问端口
server:
  port: 8080

Person实体

@Data
@Entity(name = "person")
public class Person {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Integer id;
    private String name;
    private Integer age;
}

Dao实体

public interface PersonDao extends JpaRepository<Person,Integer> {
}

Service接口

public interface PersonService {
    /**
     * 异步调用,无返回值
     */
    void asyncTask();

    /**
     * 异步调用,有返回值
     * @param s
     * @return
     */
    Future<String> asyncTask(String s);

    /**
     * 异步调用,无返回值,事务测试
     * @param exFlag
     */
    void asyncTaskForTransaction(Boolean exFlag);
}

Service实现类

@Service
public class PersonServiceImp implements PersonService{

    @Autowired
    private PersonDao personDao;

    /**
     * 异步调用,无返回值
     */
    @Async("asyncTaskExecutor")
    @Override
    public void asyncTask() {
        long startTime = System.currentTimeMillis();
        try {
            //模拟耗时
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long endTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread() + ":void asyncTask(),耗时:" + (endTime - startTime));
    }

    /**
     * 异步调用,有返回值
     *
     * @param s
     * @return
     */
    @Async("asyncTaskExecutor")
    @Override
    public Future<String> asyncTask(String s) {
        long startTime = System.currentTimeMillis();
        try {
            //模拟耗时
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long endTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread() + ":Future<String> asyncTask,耗时:" + (endTime - startTime));
        return AsyncResult.forValue(s);
    }

    /**
     * 异步调用,无返回值,事务测试
     *
     * @param exFlag
     */
    //开启事务
    @Transactional(rollbackFor = Exception.class)
    //开启异步任务
    @Async("asyncTaskExecutor")
    @Override
    public void asyncTaskForTransaction(Boolean exFlag) {
        try {
            //模拟耗时操作
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Person person = new Person();
        person.setName("王五");
        person.setAge(22);
        personDao.save(person);
        //模拟异常
        if (exFlag) {
            throw new RuntimeException("数据库新增异常");
        }
        System.out.println("新增数据成功");
    }
}

异步任务配置类

//开启异步处理
@EnableAsync
//扫描jpa dao层
@EnableJpaRepositories(basePackages = "org.lc.test.demo.async_demo.dao")
@Configuration
public class AsyncApplication {

    @Autowired
    private PersonService personService;

    /**
     * 异步线程池配置
     * @return
     */
    @Order(1)
    @Bean("asyncTaskExecutor")
    public AsyncTaskExecutor asyncTaskExecutor() {
        ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
        asyncTaskExecutor.setMaxPoolSize(50);
        asyncTaskExecutor.setCorePoolSize(10);
        asyncTaskExecutor.setThreadNamePrefix("async-task-thread-pool");
        asyncTaskExecutor.initialize();
        return asyncTaskExecutor;
    }

    /**
     * spring启动后的一些初始化操作
     * @return
     */
    @Order(2)
    @Bean
    public ApplicationRunner applicationRunner() {
        return new ApplicationRunner() {
            @Override
            public void run(ApplicationArguments args) throws Exception {
                long startTime = System.currentTimeMillis();
                System.out.println(Thread.currentThread() + ":开始调用异步业务");

                //异步任务,开启新的线程处理,不阻塞主线程
                // personService.asyncTask();
                //Thread[main,5,main]:开始调用异步业务
                // Thread[main,5,main]:调用异步业务结束,耗时:3
                // Thread[async-task-thread-pool1,5,main]:void asyncTask(),耗时:3009

                //有返回值,但主线程不执行get获取值的操作,也不会阻塞主线程
                // Future<String> future = personService.asyncTask("hello-world");
                //有返回值,但主线程执行get获取值的操作,会阻塞主线程,直到该任务执行完毕
                // String result = personService.asyncTask("hello-world").get();
                // try {
                //     //有返回值,但主线程执行get获取值的操作,会阻塞主线程,并设置主线程等待的超时时间,否则中断异步任务
                //     String result = personService.asyncTask("hello-world").get(2, TimeUnit.SECONDS);
                // } catch (Exception e) {
                //     System.out.println("异步任务执行超时,已中断");
                // }

                //事务测试,事务正常提交
                // personService.asyncTaskForTransaction(false);
                //事务测试,事务回滚
                try {
                    personService.asyncTaskForTransaction(true);
                } catch (Exception e) {
                    System.out.println("数据新增异常,失败!");
                }

                long endTime = System.currentTimeMillis();
                System.out.println(Thread.currentThread() + ":调用异步业务结束,耗时:" + (endTime - startTime));
            }
        };
    }
}

2.3 案例详解及注意事项

  • 使用**@EnableAsync**注解开启异步任务,在组件中或者启动配置类中配置均可
  • 配置异步线程池,开启**@Async("asyncTaskExecutor")的异步方法会从指定配置的异步线程池**获取线程执行此任务
  • 无返回值的异步任务不会阻塞主线程
  • 有返回值的异步任务,但不执行get获取值的操作,也不会阻塞主线程
  • 有返回值的异步任务,但执行get获取值的操作,会阻塞主线程直到异步任务执行完毕,若配置了超时等待时间,那么主线程会强制中断异步任务,继续执行主线程任务。

2.4 使用线程池对象提交异步任务

  • ⚠️根据任务需要配置线程池参数
@Configuration
@Slf4j
public class AsyncTask {

    @Qualifier("asyncTaskExecutor")
    @Autowired
    private AsyncTaskExecutor asyncTaskExecutor;

    @Bean
    public ApplicationRunner applicationRunner1() {
        return new ApplicationRunner() {
            @Override
            public void run(ApplicationArguments args) throws Exception {
                //无返回值异步任务
                asyncTaskExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        log.info("当前任务=>{},时间=>{}", Thread.currentThread(), new Date());
                    }
                });
                //有返回值异步任务
                Object o = asyncTaskExecutor.submit(new Callable<Object>() {
                    @Override
                    public Object call() throws Exception {
                        log.info("当前任务=>{},时间=>{}", Thread.currentThread(), new Date());
                        return "hell-world";
                    }
                }).get();
                System.out.println(o);
            }
        };
    }
}

3. @Async和@Scheduled联合使用

  • @Scheduled 任务调度注解,主要用于配置定时任务,SprinBoot默认的调度器线程池大小为 1,即若不配做线程池大小,则所有定时任务会串行执行

  • @Async 任务异步执行注解,主要用于方法上,表示当前方法会使用新线程异步执行;Springboot默认执行器线程池大小为100,需要根据任务情况,重新配置线程池

线程池配置:

//开启异步处理
@EnableAsync
//开启定时任务
@EnableScheduling
@Configuration
public class AsyncApplication {
    @Bean("asyncTaskExecutor")
    public AsyncTaskExecutor asyncTaskExecutor() {
        ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
        asyncTaskExecutor.setMaxPoolSize(50);
        asyncTaskExecutor.setCorePoolSize(10);
        asyncTaskExecutor.setThreadNamePrefix("async-task-thread-pool");
        asyncTaskExecutor.initialize();
        return asyncTaskExecutor;
    }
}

定时任务示例:

@Slf4j
@Component
public class ScheduledTest {

    @Async("asyncTaskExecutor")
    @Scheduled(cron = "0/2 * * * * ? ")
    public void task1() throws InterruptedException {
        log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
    }
    @Async("asyncTaskExecutor")
    @Scheduled(cron = "0/2 * * * * ? ")
    public void task2() throws InterruptedException {
        log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
    }
    @Async("asyncTaskExecutor")
    @Scheduled(cron = "0/2 * * * * ? ")
    public void task3() throws InterruptedException {
        log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
    }
}
  • 上述任务若没有配置线程池和加**@Async注解,则三个定时任务会串行执行**
  • 联合使用**@Async**和@Scheduled注解后三个任务分别使用不同的线程执行