# Java异步编程

# 线程池

# Executors类

不推荐

Executors类(并发包)提供了4种创建线程池方法,这些方法最终都是通过配置ThreadPoolExecutor的不同参数,来达到不同的线程管理效果,但是这种方式并不推荐,推荐通过ThreadPoolExecutor的方式创建,可以尽量避免内存溢出的情况

  • newCacheTreadPool

    创建一个可以缓存的线程池,如果线程池长度超过处理需要,可以灵活回收空闲线程,没回收的话就新建线程

  • newFixedThread

    创建一个定长的线程池,可控制最大并发数,超出的线程进行队列等待

  • newScheduleThreadPool

    可以创建定长的、支持定时任务,周期任务执行

  • newSingleExecutor

    创建一个单线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行

# ThreadPoolExecutor创建

推荐

@Test
void threadTest() {
  // 获取当前机器的核数
  int cpuNum = Runtime.getRuntime().availableProcessors();
  log.info("当前机器的核数:{}", cpuNum);
  ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
    // 核心线程池大小
    cpuNum,
    // 最大线程池大小
    cpuNum * 2,
    // 线程最大空闲时间
    1L,
    // 时间单位
    TimeUnit.SECONDS,
    // 线程等待队列
    new ArrayBlockingQueue<>(100),
    // 线程创建工厂
    Executors.defaultThreadFactory(),
    // 拒绝策略
    new ThreadPoolExecutor.CallerRunsPolicy()
  );

  threadPoolExecutor.submit(() -> {
    log.info("异步线程:{}", Thread.currentThread().getName());
  });
  log.info("主线程:{}", Thread.currentThread().getName());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Future获取异步执行结果
// 缺点 
// 无法方便得知任务何时完成 
// 在主线程获得任务结果会导致主线程阻塞
Future<String> future = threadPoolExecutor.submit(() -> {
            log.info("异步线程:{}", Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "end";
        });
 log.info("异步线程提交:{}", future.get());
1
2
3
4
5
6
7
8
9
10
11
12
13
#
# ScheduledThreadPoolExecutor
  • 继承自ThreadPooExecutor,为任务提供延迟或周期执行
  • 使用专门的ScheduledFutureTask来执行周期任务,也可以接收不需要时间调度的任务.
  • 使用DelayedWorkQueue存储任务(一种无界延迟队列)
  • 支持线程池关闭后可执行,可选择线程池关闭后支持继续执行周期或延迟任务
# 核心方法
  • schedule

    1. 若任务为空,则抛出异常
    2. 检查线程池是否关闭,若关闭,则拒绝任务
    3. 若没有关闭,则将任务加入到等待队列
    4. 再次检查线程池是否在运行,若线程池在运行或者允许线程池关闭运行,则启动新线程等待执行任务
    5. 若线程池已经关闭并且不允许线程池关闭后运行,则从队列中移出指定的任务,再取消任务
  • scheduleAtFixedRate

    创建一个周期执行的任务,第一次执行延期时间为initialDelay之后,每隔period执行一次,不等待第一次执行完就开始计时

  • scheduleWithFixedDelay

    创建一个周期执行的任务,第一次执行延期时间为initialDelay,在第一次执行完之后延迟delay后开始下一次执行

  • shutdown

    判断是否有线程池关闭后保留的任务

    1. 若没有保留的任务,则依次取消任务,并清除队列
    2. 若有保留的任务,则对于非周期性任务,取消该任务并将其清除出队列
@Test
void scheduleThreadPool() throws InterruptedException {
  ConcurrentHashMap<String, ScheduledFuture<?>> taskMap = new ConcurrentHashMap<>(4);

  AtomicInteger count = new AtomicInteger(0);

  // 获取当前机器的核数
  int cpuNum = Runtime.getRuntime().availableProcessors();
  log.info("当前机器的核数:{}", cpuNum);
  ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(cpuNum);

  //若任务为空,则抛出异常.
  //检查线程池是否关闭.若关闭,则拒绝任务.
  //若没有关闭,则将任务加入到等待队列.
  //再次检查线程池是否在运行.若线程池在运行或者允许线程池关闭运行,则启动新线程等待执行任务.
  //若线程池已经关闭并且不允许线程池关闭后运行,则从队列中移出指定的任务,再取消任务
  scheduled.schedule(() -> {
    log.info("执行次数:{}", count.incrementAndGet());
    log.info("schedule-{}", Thread.currentThread().getName());
  }, 3, TimeUnit.SECONDS);
  // 创建一个周期执行的任务,
  // 第一次执行延期时间为initialDelay之后
  // 每隔 period 执行一次, 不等待第一次执行完就开始计时
  ScheduledFuture<?> scheduleAtFixedRate = scheduled.scheduleAtFixedRate(() -> {
    int incrementAndGet = count.incrementAndGet();
    log.info("执行次数:{}", incrementAndGet);
    if(incrementAndGet > 10){
      log.info("remove task scheduleAtFixedRate");
      taskMap.get("scheduleAtFixedRate").cancel(true);
      return;
    }
    try {
      Thread.sleep(2000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    log.info("scheduleAtFixedRate-{}", Thread.currentThread().getName());
  }, 5, 5, TimeUnit.SECONDS);

  taskMap.put("scheduleAtFixedRate", scheduleAtFixedRate);


  // 在第一次执行完之后延迟delay后开始下一次执行
  ScheduledFuture<?> scheduleWithFixedDelay = scheduled.scheduleWithFixedDelay(() -> {
    int incrementAndGet = count.incrementAndGet();
    log.info("执行次数:{}", incrementAndGet);
    if(incrementAndGet > 10){
      log.info("remove task scheduleWithFixedDelay");
      taskMap.get("scheduleWithFixedDelay").cancel(true);
      return;
    }
    try {
      Thread.sleep(2000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    log.info("scheduleWithFixedDelay-{}", Thread.currentThread().getName());
  }, 5, 5, TimeUnit.SECONDS);

  taskMap.put("scheduleWithFixedDelay", scheduleWithFixedDelay);

  Thread.sleep(50 * 1000);
  //scheduled.shutdown();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

# CompletableFuture

CompletableFuture是对Future的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富的扩展,完美弥补了Future的局限性,同时CompletableFuture实现了对任务编排的能力,默认线程池是ForkJoinPool.commonPool(),但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池

# 常用方法
  • supplyAsync:带返回值的异步查询
  • runAsync:无返回值的异步任务
  • thenApply:表示某个任务执行完之后执行的动作,也就是回调方法,将任务的执行结果传递到回调方法的参数
  • thenApply与thenApplyAsync的区别:前者是同一个线程执行,后者是将第一个任务提交到线程池异步执行,实际执行的可能是另外一个线程
  • thenAccept:thenAccept和thenApply一样,但无返回值
  • thenReturn:对比thenApply没有入参也没有返回值
  • exceptionally:指定某个任务执行异常的回调方法,会将抛出的异常作为参数传递到该回调方法中
  • whenComplete:当某个任务执行完成后执行的回调方法(参数1:正常执行的结果 参数2:上个任务抛出的异常) 返回值类型继承上个任务
  • handle:对比whenComplete的区别是返回值类型可以自定义
  • thenCombine:将2个future组合起来,只有当2个都正常执行完了才会执行某个任务,将2个任务的执行结果作为入参,有返回值
  • thenAcceptBoth:对比thenCombine,无返回值
  • runAfterBoth:对比thenCombine,无入参,无返回值
  • allof:组合多个future 等待所有任务完成 返回值void
  • anyof:对比allof,只要有一个future结束就可以做接下来的事情
  • get:等待任务完成(主线程休眠等待子任务完成 子线程执行完成后唤醒主线程) 抛出的是经过检查的异常 需要手动处理
  • join:对比get 会抛出的是未经过检查的异常,get不会抛出异常。
  • complete:如果尚未完成,则将 get() 和相关方法返回的值设置为给定值。参数:value - 结果值 返回:如果此调用导致此CompletableFuture 转换为已完成状态,则为 true,否则为 false
# 特征
  • 以Async结尾的方法签名表示是在异步线程里执行,没有以Async结尾的方法则是由主线程调用
  • 如果参数里有Runnable类型,则没有返回结果,即纯消费的方法
  • 如果参数里没有指定executor则默认使用forkJoinPool线程池,指定了则以指定的线程池来执行任务
@Test
void completableFutureTest() {
  
  CompletableFuture.supplyAsync(() -> {
    boolean nextBoolean = new Random().nextBoolean();
    log.info("thread:{},{}", Thread.currentThread().getName(), nextBoolean);
    return nextBoolean;
  }).thenApply(r -> CompletableFuture.runAsync(() -> {
    log.info("接收到的结果:{}", r);
    log.info("thread,{}", Thread.currentThread().getName());
  }));

  CompletableFuture.supplyAsync(() -> {
    boolean nextBoolean = new Random().nextBoolean();
    log.info("thread:{},{}", Thread.currentThread().getName(), nextBoolean);
    return nextBoolean;
  }).thenCompose(r -> CompletableFuture.runAsync(() -> {
    log.info("接收到的结果:{}", r);
    log.info("thread,{}", Thread.currentThread().getName());
  }));


  CompletableFuture.supplyAsync(() -> {
    boolean nextBoolean = new Random().nextBoolean();
    log.info("thread:{},{}", Thread.currentThread().getName(), nextBoolean);
    return nextBoolean;
  }).thenAccept(r -> log.info("接收到的结果:{}", r));

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
最后更新于: 2022-08-27 21:46:48