Backend Development 15 min read

Mastering Java 8 CompletableFuture: Async Patterns and Best Practices

This article introduces Java 8's CompletableFuture class, compares it with Future, demonstrates basic Future usage, then explores advanced asynchronous patterns—including CompletionService, chaining, exception handling, combining tasks, and various utility methods—providing code examples and execution results to illustrate each concept.

Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Mastering Java 8 CompletableFuture: Async Patterns and Best Practices

Introduction to CompletableFuture in Java 8

Java 8 adds CompletableFuture, offering about 50 methods for asynchronous scenarios, extending Future and providing functional programming capabilities.

Basic Future usage

Future has been available since JDK 1.5, used with ExecutorService and Callable to obtain asynchronous results.

1. Future with Callable

Single task example:

<code>private static class Task implements Callable<String> {
    @Override
    public String call() throws Exception {
        TimeUnit.SECONDS.sleep(3);
        return "success";
    }
}
public static void main(String[] args) throws Exception {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(10));
    Future<String> future = executor.submit(new Task());
    String result = future.get();
    System.out.println("执行结果:" + result);
}
</code>

Calling future.get() blocks until the task completes.

Multiple tasks

<code>private static class Task implements Callable<String> {
    private int sleep;
    public Task(int sleep) { this.sleep = sleep; }
    @Override
    public String call() throws Exception {
        TimeUnit.SECONDS.sleep(this.sleep);
        return "success";
    }
}
public static void main(String[] args) throws Exception {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(10));
    Future<String> future1 = executor.submit(new Task(3));
    Future<String> future2 = executor.submit(new Task(2));
    Future<String> future3 = executor.submit(new Task(1));
    String result1 = future1.get();
    String result2 = future2.get();
    String result3 = future3.get();
    System.out.println("result1:" + result1 + "\t" + "result2:" + result2 + "\t" + "result3:" + result3);
}
</code>

Even though tasks finish at different times, get() waits for each future in order, which can cause unnecessary blocking.

Using CompletionService to overcome Future limitations

<code>private static class Task implements Callable<String> {
    private int time;
    private String name;
    public Task(int time, String name) {
        this.time = time;
        this.name = name;
    }
    @Override
    public String call() throws Exception {
        TimeUnit.SECONDS.sleep(this.time);
        return name;
    }
}
public static void main(String[] args) throws Exception {
    ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(10));
    CompletionService<String> cs = new ExecutorCompletionService<>(pool);
    cs.submit(new Task(3, "name3"));
    cs.submit(new Task(1, "name1"));
    cs.submit(new Task(2, "name2"));
    for (int i = 0; i < 3; i++) {
        System.out.println(cs.take().get());
    }
}
</code>

Tasks are processed in the order they complete, regardless of submission order.

CompletableFuture asynchronous programming

Four static methods are used to start async tasks (illustrated in the image below).

2.1 Simple async chain

<code>ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(10));
CompletableFuture.runAsync(() -> {
    try { TimeUnit.SECONDS.sleep(3);
          System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成");
    } catch (InterruptedException e) { e.printStackTrace(); }
}, executor).thenRun(() -> {
    try { TimeUnit.SECONDS.sleep(2);
          System.out.println(Thread.currentThread().getName() + ", 2 任务执行完成");
    } catch (InterruptedException e) { e.printStackTrace(); }
});
System.out.println("主线程:" + Thread.currentThread().getName());
executor.shutdown();
</code>

2.2 Getting previous result and handling completion

<code>CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.SECONDS.sleep(3);
          System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成");
    } catch (InterruptedException e) { e.printStackTrace(); }
    return "1";
}, executor).thenApply(res -> {
    System.out.println("获取到上一步任务执行结果:" + res);
    try { TimeUnit.SECONDS.sleep(2);
          System.out.println(Thread.currentThread().getName() + ", 2 任务执行完成");
    } catch (InterruptedException e) { e.printStackTrace(); }
    return "2";
}).whenComplete((res, tx) -> {
    System.out.println("获取到结果:" + res);
    if (tx != null) { System.err.println("发生错误了:" + tx.getMessage()); }
    executor.shutdown();
});
System.out.println("主线程:" + Thread.currentThread().getName());
</code>

2.3 Asynchronous exception handling

<code>CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.SECONDS.sleep(3);
          System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成");
    } catch (InterruptedException e) { e.printStackTrace(); }
    return "1";
}, executor).thenApply(res -> {
    System.out.println("获取到上一步任务执行结果:" + res);
    try { TimeUnit.SECONDS.sleep(2);
          System.out.println(Thread.currentThread().getName() + ", 2 任务执行完成");
          System.out.println(1 / 0);
    } catch (InterruptedException e) { e.printStackTrace(); }
    return "2";
}).exceptionally(tx -> {
    System.out.println(Thread.currentThread().getName() + ", 任务执行发生了异常");
    return "error";
}).whenComplete((res, tx) -> {
    System.out.println("获取到结果:" + res);
    if (tx != null) { System.err.println("发生错误了:" + tx.getMessage()); }
    executor.shutdown();
});
System.out.println("主线程:" + Thread.currentThread().getName());
</code>

2.4 CompletableFuture.allOf

<code>CompletableFuture<Double> calc1 = CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.SECONDS.sleep(2);
          System.out.println(Thread.currentThread().getName() + ", calc1任务执行完成");
    } catch (InterruptedException e) { e.printStackTrace(); }
    return 10D;
}, executor);
CompletableFuture<Double> calc2 = CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.SECONDS.sleep(5);
          System.out.println(Thread.currentThread().getName() + ", calc2任务执行完成");
    } catch (InterruptedException e) { e.printStackTrace(); }
    return 20D;
}, executor);
CompletableFuture.allOf(calc1, calc2).whenComplete((res, tx) -> {
    System.out.println("获取到结果:" + res + ", " + tx);
    try {
        System.out.println(calc1.get());
        System.out.println(calc2.get());
    } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
    executor.shutdown();
});
</code>

2.5 handle method

<code>CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.SECONDS.sleep(2);
          System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成");
    } catch (InterruptedException e) { e.printStackTrace(); }
    return "0";
}, executor).handle((res, tx) -> {
    return res + "1";
}).whenComplete((res, tx) -> {
    System.out.println("获取到结果:" + res);
    if (tx != null) { System.err.println("发生错误了:" + tx.getMessage()); }
    executor.shutdown();
});
</code>

2.6 thenCombine

<code>CompletableFuture<Double> task1 = CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.SECONDS.sleep(2);
          System.out.println(Thread.currentThread().getName() + ", 任务1执行完成");
    } catch (InterruptedException e) { e.printStackTrace(); }
    return 10d;
}, executor);
CompletableFuture<Double> task2 = CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.SECONDS.sleep(2);
          System.out.println(Thread.currentThread().getName() + ", 任务2执行完成");
    } catch (InterruptedException e) { e.printStackTrace(); }
    return 20d;
}, executor);
task1.thenCombine(task2, (t1, t2) -> {
    System.out.println(Thread.currentThread().getName() + ", 合并任务完成");
    return t1 + "," + t2;
}).whenComplete((res, tx) -> {
    System.out.println("获取到结果:" + res);
    if (tx != null) { System.err.println("发生错误了:" + tx.getMessage()); }
    executor.shutdown();
});
</code>

2.7 applyToEither

<code>CompletableFuture<Double> task1 = CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.SECONDS.sleep(2);
          System.out.println(Thread.currentThread().getName() + ", 任务1执行完成");
    } catch (InterruptedException e) { e.printStackTrace(); }
    return 10d;
}, executor);
CompletableFuture<Double> task2 = CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.SECONDS.sleep(2);
          System.out.println(Thread.currentThread().getName() + ", 任务2执行完成");
    } catch (InterruptedException e) { e.printStackTrace(); }
    return 20d;
}, executor);
task1.applyToEither(task2, res -> res)
    .whenComplete((res, tx) -> {
        System.out.println("获取到结果:" + res);
        if (tx != null) { System.err.println("发生错误了:" + tx.getMessage()); }
        executor.shutdown();
    });
</code>

2.8 runAfterBoth

<code>CompletableFuture<Double> task1 = CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.SECONDS.sleep(2);
          System.out.println(Thread.currentThread().getName() + ", 任务1执行完成");
    } catch (InterruptedException e) { e.printStackTrace(); }
    return 10d;
}, executor);
CompletableFuture<Double> task2 = CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.SECONDS.sleep(2);
          System.out.println(Thread.currentThread().getName() + ", 任务2执行完成");
    } catch (InterruptedException e) { e.printStackTrace(); }
    return 20d;
}, executor);
task1.runAfterBoth(task2, () -> {
    System.out.println("任务都执行完成了...");
}).whenComplete((res, tx) -> {
    System.out.println("获取到结果:" + res);
    if (tx != null) { System.err.println("发生错误了:" + tx.getMessage()); }
    executor.shutdown();
});
</code>

2.9 anyOf

<code>CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
    sleep(1000);
    System.out.println("我是任务1");
    return "Task1";
}, executor);
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
    sleep(3000);
    System.out.println("我是任务2");
    System.out.println(1 / 0);
    return "Task2";
}, executor);
CompletableFuture.anyOf(task1, task2).whenCompleteAsync((v, th) -> {
    System.out.println("v = " + v);
    System.out.println("th = " + th);
}, executor);
</code>

2.10 thenAcceptAsync to receive previous result

<code>CompletableFuture.supplyAsync(() -> {
    sleep(2000);
    System.out.println("第一个任务执行完成...");
    return new Random().nextInt(10000);
}, executor).thenAcceptAsync(res -> {
    System.out.println("任务执行结果:" + res);
}, executor);
</code>

The article concludes with a brief summary and wishes the reader success.

JavaconcurrencyasynchronousCompletableFutureExecutorServiceFuture
Spring Full-Stack Practical Cases
Written by

Spring Full-Stack Practical Cases

Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.