Scheduled和Async详解
1. @Scheduled使用
1.1 概述
常用调度框架:
Spring Framework 的 Spring Task 模块,提供了轻量级的定时任务的实现。
- 即**@Scheduled**方式
Spring Boot 2.0 版本,整合了 Quartz 做业调度框架,提供了功能强大的定时任务的实现。
Elastic-Job
惟品会基于 Elastic-Job 之上,演化出了 Saturn 项目。
XXL-JOB
目前国内采用 Elastic-Job 和 XXL-JOB 为主。
1.2 基本使用
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--工具类-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.1</version>
</dependency>
通过注解开启定时任务
在启动类上或者容器上加
@EnableScheduling
注解
@EnableScheduling
@Slf4j
@Component
public class ScheduledTest {}
多线程定时任务配置
⚠️**当没有指定线程数的时候,即默认配置,则使用的单线程串行执行所有的任务。**当一个任务执行时,其他任务会等待该任务执行完毕后再执行。所以一般不用单线程方式
@Configuration
public class ScheduledConfig {
@Bean
public SchedulingConfigurer schedulingConfigurer() {
return new SchedulingConfigurer() {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
//配置十个线程(视具体任务数)
taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
}
};
}
}
也可通过配置文件中配置,任选一种即可:
- 在
spring.task.scheduling
配置项,Spring Task 调度任务的配置,对应 TaskSchedulingProperties 配置类。- Spring Boot TaskSchedulingAutoConfiguration 自动化配置类,实现 Spring Task 的自动配置,建立 ThreadPoolTaskScheduler 基于线程池的任务调度器。本质上,ThreadPoolTaskScheduler 是基于 ScheduledExecutorService 的封装,加强在调度时间上的功能。
注意,
spring.task.scheduling.shutdown
配置项,是为了实现 Spring Task 定时任务的优雅关闭。咱们想象一下,若是定时任务在执行的过程当中,若是应用开始关闭,把定时任务须要使用到的 Spring Bean 进行销毁,例如说数据库链接池,那么此时定时任务还在执行中,一旦须要访问数据库,可能会致使报错。
- 因此,经过配置
await-termination = true
,实现应用关闭时,等待定时任务执行完成。这样,应用在关闭的时,Spring 会优先等待 ThreadPoolTaskScheduler 执行完任务以后,再开始 Spring Bean 的销毁。- 同时,又考虑到咱们不可能无限等待定时任务所有执行结束,所以能够配置
await-termination-period = 60
,等待任务完成的最大时长,单位为秒。具体设置多少的等待时长,能够根据本身应用的须要。
# Spring Task 调度任务的配置,对应 TaskSchedulingProperties 配置类
# 线程池大小。默认为 1 ,根据本身应用来设置
spring.task.scheduling.pool.size=10
# 线程池的线程名的前缀。默认为 scheduling- ,建议根据本身应用来设置
spring.task.scheduling.thread-name-prefix=scheduled-test
# 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
spring.task.scheduling.shutdown.await-termination=true
# 应用关闭时,等待任务完成的最大时长,单位为秒。默认为 0 ,根据本身应用来设置
spring.task.scheduling.shutdown.await-termination-period=5
基本任务示例
@EnableScheduling
@Slf4j
@Component
public class ScheduledTest {
/**
* 每两秒执行一次
*/
@Scheduled(cron = "0/2 * * * * ? ")
public void task1() {
log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
}
/**
* rate速率
* 第一次任务和第二次任务开始的时间间隔(可能第一次任务还没结束,第二次就接着执行)
* 毫秒为单位(这里为2s)
*/
@Scheduled(fixedRate = 2000)
public void fixedRate() {
log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
}
/**
* delay延期
* 第一次任务的结束时间和第二次任务开始的时间间隔(即第一次任务已经执行完毕后再执行第二次任务的间隔)
* 毫秒为单位(这里为3s)
*/
@Scheduled(fixedDelay = 3000)
public void fixedDelay() {
log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
}
/**
* initialDelay 表示首次任务启动的延迟时间
*/
@Scheduled(initialDelay = 2000,fixedDelay = 2000)
public void initialDelay() {
log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
}
/**
* cron表达式
* [秒] [分] [小时] [日] [月] [周] [年](可省略)
* 在这里 / 用于递增触发,如在秒上面设置”0/5” 表示从0秒开始,每增5秒触发(0,5,10,15……)
*/
@Scheduled(cron = "0/5 * * * * *")
public void cron() {
log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
}
}
1.3 注解参数详解
常用属性:
首先使用 @Scheduled 注解开启一个定时任务。
fixedRate 表示任务执行之间的时间间隔,具体是指两次任务的开始时间间隔,即第二次任务开始时,第一次任务可能还没结束。
fixedDelay 表示任务执行之间的时间间隔,具体是指本次任务结束到下次任务开始之间的时间间隔。
initialDelay 表示首次任务启动的延迟时间。
所有时间的单位都是毫秒。
其他属性:
- fixedDelayString:与fixedDelay含义一样,只是参数类型变为String;
- fixedRateString: 与fixedRate的含义一样,只是将参数类型变为String;
- initialDelayString:与initialDelay的含义一样,只是将参数类型变为String;
- zone :解析 Spring Cron 表达式的所属的时区。默认状况下,使用服务器的本地时区。
@Scheduled(fixedRate=2000):上一次开始执行时间点后2秒再次执行;
@Scheduled(fixedDelay=2000):上一次执行完毕时间点后2秒再次执行;
@Scheduled(initialDelay=1000, fixedDelay=2000):第一次延迟1秒执行,然后在上一次执行完毕时间点后2秒再次执行;
@Scheduled(cron="* * * * * ?"):按cron规则执行。
cron表达式
[秒] [分] [小时] [日] [月] [周] [年](可省略)
序号 | 说明 | 是否必填 | 允许填写的值 | 允许的通配符 |
---|---|---|---|---|
1 | 秒 | 是 | 0-59 | - * / |
2 | 分 | 是 | 0-59 | - * / |
3 | 时 | 是 | 0-23 | - * / |
4 | 日 | 是 | 1-31 | - * ? / L W |
5 | 月 | 是 | 1-12 or JAN-DEC | - * / |
6 | 周 | 是 | 1-7 or SUN-SAT | - * ? / L # |
7 | 年 | 否 | 1970-2099 | - * / |
月份中的日期和星期可能会起冲突,因此在配置时这两个得有一个是 ?
通配符含义:
?
表示不指定值,即不关心某个字段的取值时使用。需要注意的是,月份中的日期和星期可能会起冲突,因此在配置时这两个得有一个是?
*
表示所有值,例如:在秒的字段上设置*
,表示每一秒都会触发,
用来分开多个值,例如在周字段上设置 “MON,WED,FRI” 表示周一,周三和周五触发-
表示区间,例如在秒上设置 “10-12”,表示 10,11,12秒都会触发/
用于递增触发,如在秒上面设置”5/15” 表示从5秒开始,每增15秒触发(5,20,35,50)#
序号(表示每月的第几个周几),例如在周字段上设置”6#3”表示在每月的第三个周六,(用 在母亲节和父亲节再合适不过了)- 周字段的设置,若使用英文字母是不区分大小写的 ,即 MON 与mon相同
L
表示最后的意思。在日字段设置上,表示当月的最后一天(依据当前月份,如果是二月还会自动判断是否是润年), 在周字段上表示星期六,相当于”7”或”SAT”(注意周日算是第一天)。如果在”L”前加上数字,则表示该数据的最后一个。例如在周字段上设置”6L”这样的格式,则表示”本月最后一个星期五”W
表示离指定日期的最近工作日(周一至周五),例如在日字段上设置”15W”,表示离每月15号最近的那个工作日触发。如果15号正好是周六,则找最近的周五(14号)触发, 如果15号是周未,则找最近的下周一(16号)触发,如果15号正好在工作日(周一至周五),则就在该天触发。如果指定格式为 “1W”,它则表示每月1号往后最近的工作日触发。如果1号正是周六,则将在3号下周一触发。(注,”W”前只能设置具体的数字,不允许区间”-“)L
和W
可以一组合使用。如果在日字段上设置”LW”,则表示在本月的最后一个工作日触发(一般指发工资 )
2. @Async使用
2.1 概述
在Java应用中,绝大多数情况下都是通过同步的方式来实现交互处理的;但是在处理与第三方系统交互的时候,容易造成响应迟缓的情况,之前大部分都是使用多线程来完成此类任务,其实,在spring 3.x之后,就已经内置了**@Async**来完美解决这个问题
何为异步调用?
在解释异步调用之前,我们先来看同步调用的定义;同步就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果。 异步调用则是只是发送了调用的指令,调用者无需等待被调用的方法完全执行完毕;而是继续执行下面的流程。
例如, 在某个调用中,需要顺序调用 A, B, C三个过程方法;如他们都是同步调用,则需要将他们都顺序执行完毕之后,方算作过程执行完毕; 如B为一个异步的调用方法,则在执行完A之后,调用B,并不等待B完成,而是执行开始调用C,待C执行完毕之后,就意味着这个过程执行完毕了。
2.2 案例准备
pom.xml
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.23</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--工具类-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.1</version>
</dependency>
application.yaml
# 应用名称
spring:
application:
name: demo
# 数据库配置
datasource:
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
password: 123456
username: root
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
#显示sql
jpa:
show-sql: true
#数据库为mysql
database: mysql
#数据库平台
database-platform: org.hibernate.dialect.MySQL57Dialect
# 应用服务 WEB 访问端口
server:
port: 8080
Person实体
@Data
@Entity(name = "person")
public class Person {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String name;
private Integer age;
}
Dao实体
public interface PersonDao extends JpaRepository<Person,Integer> {
}
Service接口
public interface PersonService {
/**
* 异步调用,无返回值
*/
void asyncTask();
/**
* 异步调用,有返回值
* @param s
* @return
*/
Future<String> asyncTask(String s);
/**
* 异步调用,无返回值,事务测试
* @param exFlag
*/
void asyncTaskForTransaction(Boolean exFlag);
}
Service实现类
@Service
public class PersonServiceImp implements PersonService{
@Autowired
private PersonDao personDao;
/**
* 异步调用,无返回值
*/
@Async("asyncTaskExecutor")
@Override
public void asyncTask() {
long startTime = System.currentTimeMillis();
try {
//模拟耗时
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread() + ":void asyncTask(),耗时:" + (endTime - startTime));
}
/**
* 异步调用,有返回值
*
* @param s
* @return
*/
@Async("asyncTaskExecutor")
@Override
public Future<String> asyncTask(String s) {
long startTime = System.currentTimeMillis();
try {
//模拟耗时
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread() + ":Future<String> asyncTask,耗时:" + (endTime - startTime));
return AsyncResult.forValue(s);
}
/**
* 异步调用,无返回值,事务测试
*
* @param exFlag
*/
//开启事务
@Transactional(rollbackFor = Exception.class)
//开启异步任务
@Async("asyncTaskExecutor")
@Override
public void asyncTaskForTransaction(Boolean exFlag) {
try {
//模拟耗时操作
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Person person = new Person();
person.setName("王五");
person.setAge(22);
personDao.save(person);
//模拟异常
if (exFlag) {
throw new RuntimeException("数据库新增异常");
}
System.out.println("新增数据成功");
}
}
异步任务配置类
//开启异步处理
@EnableAsync
//扫描jpa dao层
@EnableJpaRepositories(basePackages = "org.lc.test.demo.async_demo.dao")
@Configuration
public class AsyncApplication {
@Autowired
private PersonService personService;
/**
* 异步线程池配置
* @return
*/
@Order(1)
@Bean("asyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
asyncTaskExecutor.setMaxPoolSize(50);
asyncTaskExecutor.setCorePoolSize(10);
asyncTaskExecutor.setThreadNamePrefix("async-task-thread-pool");
asyncTaskExecutor.initialize();
return asyncTaskExecutor;
}
/**
* spring启动后的一些初始化操作
* @return
*/
@Order(2)
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) throws Exception {
long startTime = System.currentTimeMillis();
System.out.println(Thread.currentThread() + ":开始调用异步业务");
//异步任务,开启新的线程处理,不阻塞主线程
// personService.asyncTask();
//Thread[main,5,main]:开始调用异步业务
// Thread[main,5,main]:调用异步业务结束,耗时:3
// Thread[async-task-thread-pool1,5,main]:void asyncTask(),耗时:3009
//有返回值,但主线程不执行get获取值的操作,也不会阻塞主线程
// Future<String> future = personService.asyncTask("hello-world");
//有返回值,但主线程执行get获取值的操作,会阻塞主线程,直到该任务执行完毕
// String result = personService.asyncTask("hello-world").get();
// try {
// //有返回值,但主线程执行get获取值的操作,会阻塞主线程,并设置主线程等待的超时时间,否则中断异步任务
// String result = personService.asyncTask("hello-world").get(2, TimeUnit.SECONDS);
// } catch (Exception e) {
// System.out.println("异步任务执行超时,已中断");
// }
//事务测试,事务正常提交
// personService.asyncTaskForTransaction(false);
//事务测试,事务回滚
try {
personService.asyncTaskForTransaction(true);
} catch (Exception e) {
System.out.println("数据新增异常,失败!");
}
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread() + ":调用异步业务结束,耗时:" + (endTime - startTime));
}
};
}
}
2.3 案例详解及注意事项
- 使用**@EnableAsync**注解开启异步任务,在组件中或者启动配置类中配置均可
- 配置异步线程池,开启**@Async("asyncTaskExecutor")的异步方法会从指定配置的异步线程池**获取线程执行此任务
- 无返回值的异步任务不会阻塞主线程
- 有返回值的异步任务,但不执行get获取值的操作,也不会阻塞主线程
- 有返回值的异步任务,但执行get获取值的操作,会阻塞主线程直到异步任务执行完毕,若配置了超时等待时间,那么主线程会强制中断异步任务,继续执行主线程任务。
2.4 使用线程池对象提交异步任务
- ⚠️根据任务需要配置线程池参数
@Configuration
@Slf4j
public class AsyncTask {
@Qualifier("asyncTaskExecutor")
@Autowired
private AsyncTaskExecutor asyncTaskExecutor;
@Bean
public ApplicationRunner applicationRunner1() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) throws Exception {
//无返回值异步任务
asyncTaskExecutor.execute(new Runnable() {
@Override
public void run() {
log.info("当前任务=>{},时间=>{}", Thread.currentThread(), new Date());
}
});
//有返回值异步任务
Object o = asyncTaskExecutor.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
log.info("当前任务=>{},时间=>{}", Thread.currentThread(), new Date());
return "hell-world";
}
}).get();
System.out.println(o);
}
};
}
}
3. @Async和@Scheduled联合使用
@Scheduled 任务调度注解,主要用于配置定时任务,SprinBoot默认的调度器线程池大小为 1,即若不配做线程池大小,则所有定时任务会串行执行
@Async 任务异步执行注解,主要用于方法上,表示当前方法会使用新线程异步执行;Springboot默认执行器线程池大小为100,需要根据任务情况,重新配置线程池
线程池配置:
//开启异步处理
@EnableAsync
//开启定时任务
@EnableScheduling
@Configuration
public class AsyncApplication {
@Bean("asyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
asyncTaskExecutor.setMaxPoolSize(50);
asyncTaskExecutor.setCorePoolSize(10);
asyncTaskExecutor.setThreadNamePrefix("async-task-thread-pool");
asyncTaskExecutor.initialize();
return asyncTaskExecutor;
}
}
定时任务示例:
@Slf4j
@Component
public class ScheduledTest {
@Async("asyncTaskExecutor")
@Scheduled(cron = "0/2 * * * * ? ")
public void task1() throws InterruptedException {
log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
}
@Async("asyncTaskExecutor")
@Scheduled(cron = "0/2 * * * * ? ")
public void task2() throws InterruptedException {
log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
}
@Async("asyncTaskExecutor")
@Scheduled(cron = "0/2 * * * * ? ")
public void task3() throws InterruptedException {
log.info("当前任务=>{},时间=>{}", Thread.currentThread(), DateUtil.now());
}
}
- 上述任务若没有配置线程池和加**@Async注解,则三个定时任务会串行执行**
- 联合使用**@Async**和@Scheduled注解后三个任务分别使用不同的线程执行