# Java异步编程
# 线程池
# Executors类
不推荐
Executors类(并发包)提供了4种创建线程池方法,这些方法最终都是通过配置ThreadPoolExecutor的不同参数,来达到不同的线程管理效果,但是这种方式并不推荐,推荐通过ThreadPoolExecutor的方式创建,可以尽量避免内存溢出的情况
newCacheTreadPool
创建一个可以缓存的线程池,如果线程池长度超过处理需要,可以灵活回收空闲线程,没回收的话就新建线程
newFixedThread
创建一个定长的线程池,可控制最大并发数,超出的线程进行队列等待
newScheduleThreadPool
可以创建定长的、支持定时任务,周期任务执行
newSingleExecutor
创建一个单线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
# ThreadPoolExecutor创建
推荐
@Test
void threadTest() {
// 获取当前机器的核数
int cpuNum = Runtime.getRuntime().availableProcessors();
log.info("当前机器的核数:{}", cpuNum);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
// 核心线程池大小
cpuNum,
// 最大线程池大小
cpuNum * 2,
// 线程最大空闲时间
1L,
// 时间单位
TimeUnit.SECONDS,
// 线程等待队列
new ArrayBlockingQueue<>(100),
// 线程创建工厂
Executors.defaultThreadFactory(),
// 拒绝策略
new ThreadPoolExecutor.CallerRunsPolicy()
);
threadPoolExecutor.submit(() -> {
log.info("异步线程:{}", Thread.currentThread().getName());
});
log.info("主线程:{}", Thread.currentThread().getName());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Future获取异步执行结果
// 缺点
// 无法方便得知任务何时完成
// 在主线程获得任务结果会导致主线程阻塞
Future<String> future = threadPoolExecutor.submit(() -> {
log.info("异步线程:{}", Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "end";
});
log.info("异步线程提交:{}", future.get());
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
#
# ScheduledThreadPoolExecutor
- 继承自ThreadPooExecutor,为任务提供延迟或周期执行
- 使用专门的ScheduledFutureTask来执行周期任务,也可以接收不需要时间调度的任务.
- 使用DelayedWorkQueue存储任务(一种无界延迟队列)
- 支持线程池关闭后可执行,可选择线程池关闭后支持继续执行周期或延迟任务
# 核心方法
schedule
- 若任务为空,则抛出异常
- 检查线程池是否关闭,若关闭,则拒绝任务
- 若没有关闭,则将任务加入到等待队列
- 再次检查线程池是否在运行,若线程池在运行或者允许线程池关闭运行,则启动新线程等待执行任务
- 若线程池已经关闭并且不允许线程池关闭后运行,则从队列中移出指定的任务,再取消任务
scheduleAtFixedRate
创建一个周期执行的任务,第一次执行延期时间为initialDelay之后,每隔period执行一次,不等待第一次执行完就开始计时
scheduleWithFixedDelay
创建一个周期执行的任务,第一次执行延期时间为initialDelay,在第一次执行完之后延迟delay后开始下一次执行
shutdown
判断是否有线程池关闭后保留的任务
- 若没有保留的任务,则依次取消任务,并清除队列
- 若有保留的任务,则对于非周期性任务,取消该任务并将其清除出队列
@Test
void scheduleThreadPool() throws InterruptedException {
ConcurrentHashMap<String, ScheduledFuture<?>> taskMap = new ConcurrentHashMap<>(4);
AtomicInteger count = new AtomicInteger(0);
// 获取当前机器的核数
int cpuNum = Runtime.getRuntime().availableProcessors();
log.info("当前机器的核数:{}", cpuNum);
ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(cpuNum);
//若任务为空,则抛出异常.
//检查线程池是否关闭.若关闭,则拒绝任务.
//若没有关闭,则将任务加入到等待队列.
//再次检查线程池是否在运行.若线程池在运行或者允许线程池关闭运行,则启动新线程等待执行任务.
//若线程池已经关闭并且不允许线程池关闭后运行,则从队列中移出指定的任务,再取消任务
scheduled.schedule(() -> {
log.info("执行次数:{}", count.incrementAndGet());
log.info("schedule-{}", Thread.currentThread().getName());
}, 3, TimeUnit.SECONDS);
// 创建一个周期执行的任务,
// 第一次执行延期时间为initialDelay之后
// 每隔 period 执行一次, 不等待第一次执行完就开始计时
ScheduledFuture<?> scheduleAtFixedRate = scheduled.scheduleAtFixedRate(() -> {
int incrementAndGet = count.incrementAndGet();
log.info("执行次数:{}", incrementAndGet);
if(incrementAndGet > 10){
log.info("remove task scheduleAtFixedRate");
taskMap.get("scheduleAtFixedRate").cancel(true);
return;
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("scheduleAtFixedRate-{}", Thread.currentThread().getName());
}, 5, 5, TimeUnit.SECONDS);
taskMap.put("scheduleAtFixedRate", scheduleAtFixedRate);
// 在第一次执行完之后延迟delay后开始下一次执行
ScheduledFuture<?> scheduleWithFixedDelay = scheduled.scheduleWithFixedDelay(() -> {
int incrementAndGet = count.incrementAndGet();
log.info("执行次数:{}", incrementAndGet);
if(incrementAndGet > 10){
log.info("remove task scheduleWithFixedDelay");
taskMap.get("scheduleWithFixedDelay").cancel(true);
return;
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("scheduleWithFixedDelay-{}", Thread.currentThread().getName());
}, 5, 5, TimeUnit.SECONDS);
taskMap.put("scheduleWithFixedDelay", scheduleWithFixedDelay);
Thread.sleep(50 * 1000);
//scheduled.shutdown();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# CompletableFuture
CompletableFuture是对Future的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富的扩展,完美弥补了Future的局限性,同时CompletableFuture实现了对任务编排的能力,默认线程池是ForkJoinPool.commonPool(),但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池
# 常用方法
- supplyAsync:带返回值的异步查询
- runAsync:无返回值的异步任务
- thenApply:表示某个任务执行完之后执行的动作,也就是回调方法,将任务的执行结果传递到回调方法的参数
- thenApply与thenApplyAsync的区别:前者是同一个线程执行,后者是将第一个任务提交到线程池异步执行,实际执行的可能是另外一个线程
- thenAccept:thenAccept和thenApply一样,但无返回值
- thenReturn:对比thenApply没有入参也没有返回值
- exceptionally:指定某个任务执行异常的回调方法,会将抛出的异常作为参数传递到该回调方法中
- whenComplete:当某个任务执行完成后执行的回调方法(参数1:正常执行的结果 参数2:上个任务抛出的异常) 返回值类型继承上个任务
- handle:对比whenComplete的区别是返回值类型可以自定义
- thenCombine:将2个future组合起来,只有当2个都正常执行完了才会执行某个任务,将2个任务的执行结果作为入参,有返回值
- thenAcceptBoth:对比thenCombine,无返回值
- runAfterBoth:对比thenCombine,无入参,无返回值
- allof:组合多个future 等待所有任务完成 返回值void
- anyof:对比allof,只要有一个future结束就可以做接下来的事情
- get:等待任务完成(主线程休眠等待子任务完成 子线程执行完成后唤醒主线程) 抛出的是经过检查的异常 需要手动处理
- join:对比get 会抛出的是未经过检查的异常,get不会抛出异常。
- complete:如果尚未完成,则将 get() 和相关方法返回的值设置为给定值。参数:value - 结果值 返回:如果此调用导致此CompletableFuture 转换为已完成状态,则为 true,否则为 false
# 特征
- 以Async结尾的方法签名表示是在异步线程里执行,没有以Async结尾的方法则是由主线程调用
- 如果参数里有Runnable类型,则没有返回结果,即纯消费的方法
- 如果参数里没有指定executor则默认使用forkJoinPool线程池,指定了则以指定的线程池来执行任务
@Test
void completableFutureTest() {
CompletableFuture.supplyAsync(() -> {
boolean nextBoolean = new Random().nextBoolean();
log.info("thread:{},{}", Thread.currentThread().getName(), nextBoolean);
return nextBoolean;
}).thenApply(r -> CompletableFuture.runAsync(() -> {
log.info("接收到的结果:{}", r);
log.info("thread,{}", Thread.currentThread().getName());
}));
CompletableFuture.supplyAsync(() -> {
boolean nextBoolean = new Random().nextBoolean();
log.info("thread:{},{}", Thread.currentThread().getName(), nextBoolean);
return nextBoolean;
}).thenCompose(r -> CompletableFuture.runAsync(() -> {
log.info("接收到的结果:{}", r);
log.info("thread,{}", Thread.currentThread().getName());
}));
CompletableFuture.supplyAsync(() -> {
boolean nextBoolean = new Random().nextBoolean();
log.info("thread:{},{}", Thread.currentThread().getName(), nextBoolean);
return nextBoolean;
}).thenAccept(r -> log.info("接收到的结果:{}", r));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29