异步调用(三)@Async自定义线程池

一、为什么要给 @Async 自定义线程池

使用@Async注解,在默认情况下用的是 SimpleAsyncTaskExecutor 线程池,该线程池不是真正意义上的线程池。使用此线程池无法实现线程重用,每次调用都会新建一条线程。若系统中不断的创建线程,最终会导致系统占用内存过高,引发OOM。 因此我们需要自定义线程池来保证线程池可控。

二、@Async 如何自定义线程池

1. 配置默认线程池

创建配置类实现 AsyncConfigurer ,重写 以下两个方法来指定默认线程池:

  • getAsyncUncaughtExceptionHandler()
  • getAsyncExecutor()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@EnableAsync
@Configuration
public class AsyncExecutorConfig implements AsyncConfigurer {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncExecutorConfig.class);

@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//核心线程数
taskExecutor.setCorePoolSize(3);
//线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
taskExecutor.setMaxPoolSize(10);
//缓存队列
taskExecutor.setQueueCapacity(50);
//许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
taskExecutor.setKeepAliveSeconds(200);
//异步方法内部线程名称
taskExecutor.setThreadNamePrefix("async-pool-");
/*
* 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
* 通常有以下四种策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();

return taskExecutor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) ->
LOGGER.error("线程池执行任务发生错误,执行方法{}",method.getName(),ex);
}
}

2. 为 @Async 指定线程池

修改以上配置类,添加自定义线程池 customPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@EnableAsync
@Configuration
@Data
@ConfigurationProperties("spring.executor.pool")
public class AsyncExecutorConfig implements AsyncConfigurer {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncExecutorConfig.class);

private static int CORE_SIZE = 3;
private static int MAX_SIZE = 10;
private static int KEEP_ALIVE = 10;
private static int QUEUE_CAPACITY = 50;
private static String THREAD_NAME_PREFIX = "custom-pool-";

/**
* 自定义线程池
*/
@Bean("customPoolExecutor")
public ThreadPoolTaskExecutor customPoolExecutor(){
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//核心线程数
taskExecutor.setCorePoolSize(CORE_SIZE);
//线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
taskExecutor.setMaxPoolSize(MAX_SIZE);
//缓存队列
taskExecutor.setQueueCapacity(KEEP_ALIVE);
//空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
taskExecutor.setKeepAliveSeconds(QUEUE_CAPACITY);
//异步方法内部线程名称
taskExecutor.setThreadNamePrefix(THREAD_NAME_PREFIX);
/*
* 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任y务拒绝策略
* 通常有以下四种策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();

return taskExecutor;
}

/**
* 默认线程池
*/
private ThreadPoolTaskExecutor asyncPoolExecutor(){
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//核心线程数
taskExecutor.setCorePoolSize(3);
//线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
taskExecutor.setMaxPoolSize(10);
//缓存队列
taskExecutor.setQueueCapacity(50);
//许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
taskExecutor.setKeepAliveSeconds(200);
//异步方法内部线程名称
taskExecutor.setThreadNamePrefix("async-pool-");
//线程池的拒绝策略
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();

return taskExecutor;
}

@Override
public Executor getAsyncExecutor() {
return asyncPoolExecutor();
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) ->
LOGGER.error("线程池执行任务发生错误,执行方法{}",method.getName(),ex);
}
}

线程池参数通过配置文件维护(这里使用了 @ConfigurationProperties 搭配 @Data 简化配置,用法可参考 @Configurationproperties注解使用姿势 ;当然你也可以使用 @Value ),

1
2
3
4
5
6
#线程池参数
spring.executor.pool.core-size=3
spring.executor.pool.max-size=10
spring.executor.pool.keep-alive=60
spring.executor.pool.queue-capacity=50
spring.executor.pool.thread-name-prefix=custom-pool-

在注解指定线程池,doTask1() 使用默认线程池,doTask2() 使用自定义线程池,用法非常灵活,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Async
public void doTask1() throws InterruptedException {
long t1 = System.currentTimeMillis();
Thread.sleep(1000);
long t2 = System.currentTimeMillis();
LOGGER.info("task1 cost {} ms" , t2-t1);
}

@Async("customPoolExecutor")
public void doTask2() throws InterruptedException {
long t1 = System.currentTimeMillis();
Thread.sleep(1000);
long t2 = System.currentTimeMillis();
LOGGER.info("task2 cost {} ms" , t2-t1);
}

controller 中调用方法进行测试,

1
2
3
4
5
6
7
8
9
10
@RequestMapping("/learn/async")
public String async(HttpServletRequest request, HttpServletResponse response) throws InterruptedException {
System.out.println("主方法执行...");

asyncTask.doTask1();
asyncTask.doTask2();

System.out.println("主方法执行完毕...");
return "hello world";
}

执行情况如下,可以看出两个方法使用不同线程池进行异步方法执行。

1657278380302

需要注意的是,一定要在外部的类中去调用这个方法,如果在本类调用则不会异步执行,比如 this.doTask1() ;当然一定要在Spring容器环境中,手动 new 一个类调用方法也不会生效


异步调用(三)@Async自定义线程池
http://dunkingcurry30.github.io/2022/07/10/异步调用(三)/
作者
Dunking Curry
发布于
2022年7月10日
许可协议