线程池和异步编排

发布于 2023-01-13  143 次阅读


1.为什么使用线程池?

  1. 降低资源的消耗【减少创建销毁操作】通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗高并发状态下过多创建线程可能将资源耗尽
  2. 提高响应速度【控制线程个数】因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行(线程个数过多导致CPU调度慢)
  3. 提高线程的可管理性【例如系统中可以创建两个线程池,核心线程池、非核心线程池【例如发送短信】,显存告警时关闭非核心线程池释放内存资源】线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配

2.常见的4种默认线程池

注意:
    回收线程 = maximumPoolSize - corePoolSize
​
可缓冲线程池【CachedThreadPool】:corePoolSize=0, maximumPoolSize=Integer.MAX_VALUE
定长线程池【FixedThreadPool】:corePoolSize=maximumPoolSize
周期线程池【ScheduledThreadPool】:指定核心线程数,maximumPoolSize=Integer.MAX_VALUE,支持定时及周期性任务执行(一段时间之后再执行)
单任务线程池【SingleThreadPool】:corePoolSize=maximumPoolSize=1,从队列中获取任务(一个核心线程)
  
Executors.newCachedThreadPool();
Executors.newFixedThreadPool(10);
Executors.newScheduledThreadPool(10);
Executors.newSingleThreadExecutor();

3. 创建原生线程池ThreadPoolExecutor

new ThreadPoolExecutor(5,
        200,
        10,
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<>(100000),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy());
​
7个参数:
    corePoolSize:   核心线程数,不会被回收,接收异步任务时才会创建
    maximumPoolSize:最大线程数量,控制资源
    keepAliveime:   maximumPoolSize-corePoolSize 无任务存活超过空闲时间则线程被释放
    TimeUnitunit:   时间单位
    workQueue:      阻塞队列,任务被执行之前保存在任务队列中,只要有线程空闲,就会从队列取出任务执行
    threadFactory:  线程的创建工厂【可以自定义】
    RejectedExecutionHandler handler:队列满后执行的拒绝策略
    
线程池任务执行流程
    当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
    当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
    当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
    当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理(默认策略抛出异常)
    当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,释放空闲线程
    当设置allowCoreThreadTimeOut(true)时,该参数默认false,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭

拒绝策略

DiscardOldestPolicy:丢弃最老的任务
AbortPolicy:丢弃当前任务,抛出异常【默认策略】
CallerRunsPolicy:同步执行run方法
DiscardPolicy:丢弃当前任务,不抛出异常

异步编排CompletableFuture

1.常用的异步操作

1.1 开始异步任务

supplyAsync() : 提交任务异步执行,有返回值

runAsync(): 提交任务异步执行,无返回值

可以传入自定义线程池,否则使用默认线程池。

如果方法后面没有Async,说明不是异步执行

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "测试使用", executor);
System.out.println(future1.get());

1.2 方法完成后的处理

whenComplete:获取上一步执行结果并获取异常信息,无法修改返回数据

exceptionally:获取上一步异常,如果出现异常可返回默认值,不出现异常保持原值

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("运行结果:" + i);
    return i;
}, executor).whenComplete((res,exception) -> {
    //虽然能得到异常信息,但是没法修改返回数据
    System.out.println("异步任务成功完成了...结果是:" + res + "异常是:" + exception);
}).exceptionally(throwable -> {
    //可以感知异常,同时返回默认值
    return 10;
});

handle:获取上一步执行结果并获取异常信息,可修改返回数据

         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
             System.out.println("当前线程:" + Thread.currentThread().getId());
             int i = 10 / 2;
             System.out.println("运行结果:" + i);
             return i;
         }, executor).handle((result,thr) -> {
             if (result != null) {
                 return result * 2;
             }
             if (thr != null) {
                 System.out.println("异步任务成功完成了...结果是:" + result + "异常是:" + thr);
                 return 0;
             }
             return 0;
         });

1.3 线程串行化

  1. thenRunAsync:不能获取上一步的执行结果
  2. thenAcceptAsync:能接受上一步结果,但是无返回值
  3. henApplyAsync:能接受上一步结果,有返回值
        /**
         * 线程串行化
         * 1、thenRunAsync:不能获取上一步的执行结果
         * 2、thenAcceptAsync:能接受上一步结果,但是无返回值
         * 3、thenApplyAsync:能接受上一步结果,有返回值
         *  后面加Async: 异步执行
         */
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).thenApplyAsync(res -> {
            System.out.println("任务2启动了..." + res);
            return "Hello" + res;
        }, executor);
        System.out.println(future.get());

1.4 两任务组合

1.4.1 都要完成

  1. runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务。
  2. thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。
  3. thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值

runAfterBothAsync

// 5.6.1.二者都要完成,组合【不获取前两个任务返回值,且自己无返回值】
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1执行");
    return 10 / 2;
}, executor);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2执行");
    return "hello";
}, executor);
CompletableFuture<Void> future03 = future01.runAfterBothAsync(future02, () -> {
    System.out.println("任务3执行");
}, executor);

thenAcceptBothAsync

// 5.6.2.二者都要完成,组合【获取前两个任务返回值,自己无返回值】
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1执行");
    return 10 / 2;
}, executor);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2执行");
    return "hello";
}, executor);
CompletableFuture<Void> future03 = future01.thenAcceptBothAsync(future02,
        (result1, result2) -> {
            System.out.println("任务3执行");
            System.out.println("任务1返回值:" + result1);
            System.out.println("任务2返回值:" + result2);
        }, executor);

thenCombineAsync

// 二者都要完成,组合【获取前两个任务返回值,自己有返回值】
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1执行");
    return 10 / 2;
}, executor);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2执行");
    return "hello";
}, executor);
CompletableFuture<String> future03 = future01.thenCombineAsync(future02,
        (result1, result2) -> {
            System.out.println("任务3执行");
            System.out.println("任务1返回值:" + result1);
            System.out.println("任务2返回值:" + result2);
            return "任务3返回值";
        }, executor);
System.out.println(future03.get());

1.4.2 任一完成

  1. runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值。
  2. acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
  3. applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

runAfterEitherAsync

// 不获取前任务返回值,且当前任务无返回值
future01.runAfterEitherAsync(future02,()->{
    System.out.println("任务3结束");
},executor);

acceptEitherAsync

// 获取前任务返回值,但当前任务无返回值
future01.acceptEitherAsync(future02,f1->{
    System.out.println("任务3结束:"+f1);
},executor);

applyToEitherAsync

// 获取前任务返回值,当前任务有返回值
CompletableFuture<String> future = future01.applyToEitherAsync(future02, f1 -> {
    return f1 + "==>haha";
}, executor);
System.out.println(future.get());

1.5 多任务组合

CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
    System.out.println("查询商品图片");
    return "hello.jpg";
},executor);
CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
    System.out.println("查询商品信息");
    return "黑色";
},executor);
CompletableFuture<String> futureBrand = CompletableFuture.supplyAsync(() -> {
    System.out.println("查询品牌信息");
    return "小米";
},executor);

allOf

// 等待所有任务完成
CompletableFuture<Void> future = CompletableFuture.allOf(futureImg, futureAttr, futureBrand);
System.out.println(futureImg.get()+"--"+futureAttr.get()+"--"+futureBrand.get());

anyOf

CompletableFuture<Object> future = CompletableFuture.anyOf(futureImg, futureAttr, futureBrand);
System.out.println(future.get());