1. 首页
  2. 后端

CompletableFuture:强大的异步编程工具

  CompletableFuture:强大的异步编程工具

===========================

引言

在现代软件开发过程中,我们经常需要处理各种异步任务,这些任务可能是耗时的计算、I/O操作或者与其他系统的交互。为了提高系统的性能和响应速度,我们需要有效地管理和编排这些任务。CompletableFuture是Java 8引入的一个强大的异步编程工具,它可以帮助我们以非阻塞的方式编写并发代码,从而提高应用程序的吞吐量和响应性。

CompletableFuture介绍

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

image.png

CompletableFuture实现了FutureCompletionStage接口,Future都知道是帮助获取异步执行结果的,CompletionStage则是帮助实现任务编排的接口,接口方法如下:

image.png

CompletableFuture基本用法

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 返回一个带执行结果的CompletableFuture,推荐使用自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
public static CompletableFuture<Void> runAsync(Runnable runnable);
// 返回一个不带返回值的CompletableFuture,推荐使用自定义线程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

自定义一个线程池

public class IOThreadPoolUtils {

    /**
     * 根据cpu的数量动态的配置核心线程数和最大线程数
     */
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();

    private static final int CORE_POOL_SIZE = CPU_COUNT + 1;

    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;

    private static final int KEEP_ALIVE = 60;

    public static volatile ThreadPoolExecutor threadPool;


    public static void execute(Runnable runnable) {
        getThreadPool().execute(runnable);
    }
    public static <T> Future<T> submit(Callable<T> callable) {
        return getThreadPool().submit(callable);
    }

    /**
     * 获取线程池
     * @return 线程池对象
     */
    public static ThreadPoolExecutor getThreadPool() {
        if (threadPool == null) {
            synchronized (IOThreadPoolUtils.class){
                if (threadPool == null) {
                    System.out.println("获取到的CPU数量为" + CPU_COUNT);
                    threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS,
                            new LinkedBlockingQueue<>(64), new ThreadPoolExecutor.CallerRunsPolicy());
                    return threadPool;
                }
            }

        }
        return threadPool;
    }
}

demo示例

public static void test1() throws ExecutionException, InterruptedException {
    CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(()-> "hello supplyAsync", IOThreadPoolUtils.getThreadPool());
    System.out.println("reslut: "+supplyAsync.get());
}

CompletableFuture链式调用

//获取上一步返回结果并将计算后结果返回,沿用上一线程
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
//获取上一步返回结果并将计算后结果返回,用自带线程池线程
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
//获取上一步返回结果并将计算后结果返回,用自定义线程池线程
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

//获取上一步返回结果但不会有返回值,沿用上一线程
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
//获取上一步返回结果但不会有返回值,用自带线程池线程
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) 
//获取上一步返回结果但不会有返回值,用自定义线程池线程
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) 
 //无法获取上一步结果也不会有返回值,沿用上一线程                                           
public CompletableFuture<Void> thenRun(Runnable action) 
//无法获取上一步结果也不会有返回值,用自带线程池线程
public CompletableFuture<Void> thenRunAsync(Runnable action)
//无法获取上一步结果也不会有返回值,用自定义线程池线程
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)                                         

demo示例

public static void test2() throws ExecutionException, InterruptedException {
    CompletableFuture.supplyAsync(() -> "hello", IOThreadPoolUtils.getThreadPool())
            .thenApplyAsync((value) -> value + "java",IOThreadPoolUtils.getThreadPool())
            .thenAcceptAsync((value) -> System.out.println("最终执行结果:"+value), IOThreadPoolUtils.getThreadPool());
}
// 执行结果--》最终执行结果:hellojava

CompletableFuture异常处理

//接收异常结果并对异常进行处理
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
// 接收结果和异常信息,沿用上一线程
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
// 接收结果和异常信息,用自带线程池线程
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U>fn) 
// 接收结果和异常信息,用自定义线程池线程
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) 

demo示例

public static void test3() throws ExecutionException, InterruptedException {
    CompletableFuture<String> asyncError = CompletableFuture.supplyAsync(() -> {
        if (true) {
            throw new RuntimeException("supplyAsync error");
        }
        return "success";
    }).exceptionally(e -> {
        System.out.println("异常处理:" + e.getMessage());
        return "fair";
    });
    System.out.println("reslut: " + asyncError.get());
}
//异常处理:java.lang.RuntimeException: supplyAsync error
//reslut: fair

CompletableFuture组合操作

CompletableFuture 的 allOf()这个静态方法可以来并行运行多个 CompletableFuture 

// 必须所有的CompletableFuture都执行完成后才能继续运行
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
// 只要其中一个CompletableFuture执行完成后就可以继续运行
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 

demo示例

public static void test4() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("hello"));
    CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> System.out.println("java"));
    CompletableFuture<Void> completableFuture = CompletableFuture.allOf(future1, future2);
    boolean done = completableFuture.isDone();
    System.out.println("done: " + done);
}
//hello
//java
//done: true

CompletableFuture的应用场景

  1. 微服务架构下异步通信,Dubbo异步通信就是采用CompletableFuture
    dubbo异步通信
  2. 耗时操作例如文件传输等。本人实际开发中利用 CompletableFuture进行文件分片上传,上传成功后调用合片接口进行文件还原操作
    原文链接: https://juejin.cn/post/7355319119296495668

文章收集整理于网络,请勿商用,仅供个人学习使用,如有侵权,请联系作者删除,如若转载,请注明出处:http://www.cxyroad.com/17203.html

QR code