异步调用(一)线程与线程池的创建

异步编程允许多个事情同时发生, 当程序调用需要长时间运行的方法时,它不会阻塞当前的执行流程,程序可以继续运行,当方法执行完成时通知给主线程根据需要获取其执行结果或者失败异常的原因。使用异步编程可以大大提高我们程序的吞吐量,可以更好的面对更高的并发场景并更好的利用现有的系统资源,同时也会一定程度上减少用户的等待时间等。

一、 异步调用概念

异步调用 是相对于同步调用 而言的,同步调用是指程序按预定顺序一步步执行,每一步必须等到上一步执行完后才能执行,异步调用则无需等待上一步程序执行完即可执行。 而在Java中异步调用的实现方式通常为——多线程

二、Java 的 异步实现

1. 直接创建新线程

使用以下方式(lambda表达式需 jdk 8及以上)直接创建 Thread 对象并调用新线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws Exception {
System.out.println("主线程方法===>开始:"+System.currentTimeMillis());
new Thread(()->{
System.out.println("异步线程方法===>开始:"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("异步线程方法===>结束:"+System.currentTimeMillis());
}).start();
Thread.sleep(1000);
System.out.println("主线程方法===>结束:"+System.currentTimeMillis());
}

但这种方式并不推荐使用,因为存在两个显著问题:

  • 创建的线程没有复用。我们知道频繁的线程创建与销毁是需要一部分开销的,而且示例里也没有限制线程的个数,如果使用不当可能会耗尽系统资源,从而导致生产事故。使用线程池可以解决该问题。

  • 无法获取异步任务执行结果。使用 FutureTask 解决该问题

2. 使用线程池

2.1 通过 Excutors 类创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws Exception {
System.out.println("主线程方法===>开始:"+System.currentTimeMillis());

//创建线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(()->{
System.out.println("异步线程方法===>开始:"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("异步线程方法===>结束:"+System.currentTimeMillis());
});
//回收线程池
executorService.shutdown();

Thread.sleep(1000);
System.out.println("主线程方法===>结束:"+System.currentTimeMillis());
}

线程池类型

Executors 类可以创建我们常用的四种线程池:

  • newCachedThreadPool :创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程,不设上限,提交的任务将立即执行;

  • newFixedThreadPool :创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待;

  • newScheduledThreadPool :创建一个定长线程池,支持定时及周期性任务执行;

  • newSingleThreadExecutor :创建一个单线程化的线程池执行任务,如果只是要异步执行,而非多线程并发,一般使用这个就行。

线程池的submit方法:

线程池建立完毕之后,我们就需要往线程池提交任务。通过线程池的submit方法即可,submit方法接收两种RunableCallable (这里采用的 lambda 表达式,方法体中若有返回值或异常抛出实现 Callablecall() 方法;否则实现 Runablerun() 方法)。

实际上,以上创建线程池的方式并不推荐,因为Executors返回的线程池对象的弊端如下:

  • FixedThreadPool和SingleThreadPool:
      允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致OOM;
  • CachedThreadPool:
      允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM

2.2 手动创建线程池 (推荐)

使用线程池实现类 ThreadPoolExecutor

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { ... ... }

线程池的参数

根据以上定义,线程池有以下7大参数:

  • corePoolSize :线程池中的常驻核心线程数
  • maximumPoolSize :线程池能够容纳同时执行的最大线程数,此值必须大于等于1
  • keepAliveTime :多余的空闲线程的存活时间
  • unit :keepAliveTime 的时间单位
  • workQueue :任务队列,被提交但尚未被执行的任务。
  • threadFactory :表示生成线程池中工作线程的线程工厂,用于创建线程一般用默认
  • handler:拒绝策略,表示当队列满了并且工作线程大于等于线程池的最大线程数

线程池工作原理

  1. 当线程数 > corePoolSize 时,多余线程将转入阻塞队列;
  2. 阻塞队列也满了之后线程数将扩容到 maximumPoolSize
  3. maximumPoolSize 也满了之后将采取拒绝策略 handler
  4. 当线程数小了之后,根据设定的空闲线程存活时间将线程总容量回缩到 corePoolSize

使用线程工厂创建线程池

线程工厂需要先借助谷歌工具库 guava ,在pom.xml 中引入如下依赖

1
2
3
4
5
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>

使用线程工厂自定义创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static final int THREAD_CORE_SIZE = 3;
private static final int THREAD_MAX_SIZE = 10;

public static void main(String[] args) throws Exception {
System.out.println("------Start------");
// 使用 ThreadFactoryBuilder 创建自定义线程名称的 ThreadFactory
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

// 创建线程池,其中任务队列需要结合实际情况设置合理的容量
ThreadPoolExecutor executor = new ThreadPoolExecutor(THREAD_CORE_SIZE,
THREAD_MAX_SIZE,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(1024),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());

// 新建 1000 个任务,每个任务是打印当前线程名称
for (int i = 0; i < 1000; i++) {
executor.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
// 优雅关闭线程池
executor.shutdown();
executor.awaitTermination(1000L, TimeUnit.SECONDS);
// 任务执行完毕后打印done
System.out.println("------Done------");
}

这里用到了线程池的优雅关闭,详细内容在 异步调用(二)线程池的优雅关闭

3. @Async注解

3.1 开启异步功能支持

使用 @EnableAsync 来开启异步任务支持,@EnableAsync 注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上,主要是为了扫描范围包下所有的 @Async 注解

1
2
3
4
@EnableAsync
@Configuration
public class AsyncConfiguraion {
}

3.2 标记异步方法

在需要异步调用的方法上添加 @Async 注解,如果在类上标注,代表该类的所有方法都需要异步处理,但并不推荐这么做(需注意方法所在类需要注册为spring组件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class AsyncTask {

@Async
public void doTask1() {
long t1 = System.currentTimeMillis();
Thread.sleep(2000);
long t2 = System.currentTimeMillis();
log.info("task1 cost {} ms" , t2-t1);
}

@Async
public void doTask2() {
long t1 = System.currentTimeMillis();
Thread.sleep(3000);
long t2 = System.currentTimeMillis();
log.info("task2 cost {} ms" , t2-t1);
}
}

3.3 调用异步方法

调用异步方法同样需要在Spring注册的组件中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RestController
@RequestMapping("/async")
public class AsyncController {
@Autowired
private AsyncTask asyncTask;

@RequestMapping("/task")
public void task() throws InterruptedException {
long t1 = System.currentTimeMillis();
asyncTask.doTask1();
asyncTask.doTask2();
Thread.sleep(1000);
long t2 = System.currentTimeMillis();
log.info("main cost {} ms", t2-t1);
}
}

@Async注解,在默认情况下用的是SimpleAsyncTaskExecutor线程池,该线程池不是真正意义上的线程池 。使用此线程池无法实现线程重用,每次调用都会新建一条线程。若系统中不断的创建线程,最终耗尽系统资源,引发OOM,因此该注解的真正使用场景需要自定义线程池,如何实现在 异步调用(三)@Async自定义线程池


异步调用(一)线程与线程池的创建
http://dunkingcurry30.github.io/2022/07/06/异步调用(一)/
作者
Dunking Curry
发布于
2022年7月6日
许可协议