A Comprehensive Guide to Java CompletableFuture: Replacing Future and CountDownLatch with Elegant Asynchronous Patterns
This article explains Java's Future and CompletableFuture APIs, demonstrates how to replace blocking Future.get() and CountDownLatch patterns with CompletableFuture, shows creation methods, result retrieval options, callback chains, exception handling, task composition techniques, and provides best‑practice recommendations for thread‑pool usage.
A Example Review of Future
Some business scenarios require multithreaded asynchronous execution to speed up tasks.
JDK5 introduced the Future interface to represent the result of an asynchronous computation.
Although Future provides asynchronous execution, retrieving the result is inconvenient; you must call Future.get() which blocks the calling thread, or poll Future.isDone to check completion.
Both approaches are not elegant. Example code:
@Test
public void testFuture() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Future
future = executorService.submit(() -> {
Thread.sleep(2000);
return "hello";
});
System.out.println(future.get());
System.out.println("end");
}Future also cannot handle dependent asynchronous tasks where the main thread must wait for subtasks. CountDownLatch can solve this, as shown below.
Define two Futures: one to fetch user information by user ID, another to fetch product information by product ID.
@Test
public void testCountDownLatch() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
CountDownLatch downLatch = new CountDownLatch(2);
long startTime = System.currentTimeMillis();
Future
userFuture = executorService.submit(() -> {
// simulate query user, 500 ms
Thread.sleep(500);
downLatch.countDown();
return "用户A";
});
Future
goodsFuture = executorService.submit(() -> {
// simulate query product, 400 ms
Thread.sleep(400);
downLatch.countDown();
return "商品A";
});
downLatch.await();
// simulate main thread work
Thread.sleep(600);
System.out.println("获取用户信息:" + userFuture.get());
System.out.println("获取商品信息:" + goodsFuture.get());
System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}Running results show both results are obtained and total time drops from 1500 ms (500+400+600) to about 1110 ms.
Since Java 8, CompletableFuture offers a more elegant solution.
Implementing the Example with CompletableFuture
@Test
public void testCompletableInfo() throws InterruptedException, ExecutionException {
long startTime = System.currentTimeMillis();
// user service
CompletableFuture
userFuture = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
return "用户A";
});
// product service
CompletableFuture
goodsFuture = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); }
return "商品A";
});
System.out.println("获取用户信息:" + userFuture.get());
System.out.println("获取商品信息:" + goodsFuture.get());
// main thread work
Thread.sleep(600);
System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}Result is similar, but CompletableFuture can replace CountDownLatch and provides richer APIs.
CompletableFuture Creation Methods
1. Four common creation methods
CompletableFuture provides four static methods to start asynchronous tasks:
public static
CompletableFuture
supplyAsync(Supplier
supplier) {..}
public static
CompletableFuture
supplyAsync(Supplier
supplier, Executor executor) {..}
public static CompletableFuture
runAsync(Runnable runnable) {..}
public static CompletableFuture
runAsync(Runnable runnable, Executor executor) {..}supplyAsync executes a task with a return value; runAsync executes a task without a return value.
2. Four ways to obtain results
CompletableFuture offers get() , get(long timeout, TimeUnit unit) , getNow(T valueIfAbsent) , and join() . The first two behave like Future , getNow returns immediately, and join does not wrap checked exceptions.
@Test
public void testCompletableGet() throws InterruptedException, ExecutionException {
CompletableFuture
cp1 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
return "商品A";
});
// getNow example
System.out.println(cp1.getNow("商品B"));
// join example with exception
CompletableFuture
cp2 = CompletableFuture.supplyAsync(() -> 1 / 0);
System.out.println(cp2.join());
// get example with exception
CompletableFuture
cp3 = CompletableFuture.supplyAsync(() -> 1 / 0);
System.out.println(cp3.get());
}Running this shows getNow returns the default when the task is not finished, join throws CompletionException , and get throws ExecutionException .
Asynchronous Callback Methods
1. thenRun / thenRunAsync
Executes a second task after the first completes; the second task has no return value.
@Test
public void testCompletableThenRunAsync() throws InterruptedException, ExecutionException {
long startTime = System.currentTimeMillis();
CompletableFuture
cp1 = CompletableFuture.runAsync(() -> {
try { Thread.sleep(600); } catch (InterruptedException e) { e.printStackTrace(); }
});
CompletableFuture
cp2 = cp1.thenRun(() -> {
try { Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); }
});
System.out.println(cp2.get());
Thread.sleep(600);
System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}If a custom executor is supplied to the first task, thenRun reuses it, while thenRunAsync uses the common ForkJoinPool.
2. thenAccept / thenAcceptAsync
The second task receives the result of the first as a parameter but does not return a value.
@Test
public void testCompletableThenAccept() throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
CompletableFuture
cp1 = CompletableFuture.supplyAsync(() -> "dev");
CompletableFuture
cp2 = cp1.thenAccept(a -> System.out.println("上一个任务的返回结果为: " + a));
cp2.get();
}3. thenApply / thenApplyAsync
The second task receives the first result and returns a new value.
@Test
public void testCompletableThenApply() throws ExecutionException, InterruptedException {
CompletableFuture
cp1 = CompletableFuture.supplyAsync(() -> "dev")
.thenApply(a -> {
if (Objects.equals(a, "dev")) {
return "dev";
}
return "prod";
});
System.out.println("当前环境为:" + cp1.get());
}Exception Callbacks
whenComplete is invoked whether the task finishes normally or exceptionally.
@Test
public void testCompletableWhenComplete() throws ExecutionException, InterruptedException {
CompletableFuture
future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("出错了");
}
System.out.println("正常结束");
return 0.11;
}).whenComplete((aDouble, throwable) -> {
if (aDouble == null) {
System.out.println("whenComplete aDouble is null");
} else {
System.out.println("whenComplete aDouble is " + aDouble);
}
if (throwable == null) {
System.out.println("whenComplete throwable is null");
} else {
System.out.println("whenComplete throwable is " + throwable.getMessage());
}
});
System.out.println("最终返回的结果 = " + future.get());
}When an exception occurs, exceptionally can provide a fallback value.
@Test
public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException {
CompletableFuture
future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("出错了");
}
System.out.println("正常结束");
return 0.11;
}).whenComplete((aDouble, throwable) -> {
// same logging as above
}).exceptionally(throwable -> {
System.out.println("exceptionally中异常:" + throwable.getMessage());
return 0.0;
});
System.out.println("最终返回的结果 = " + future.get());
}Multiple Task Composition
1. AND composition (thenCombine, thenAcceptBoth, runAfterBoth)
Execute a third task after both tasks finish. runAfterBoth ignores results, thenAcceptBoth consumes both results without returning, thenCombine combines them and returns a value.
@Test
public void testCompletableThenCombine() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletableFuture
task = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());
int result = 1 + 1;
System.out.println("异步任务1结束");
return result;
}, executorService);
CompletableFuture
task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
int result = 1 + 1;
System.out.println("异步任务2结束");
return result;
}, executorService);
CompletableFuture
task3 = task.thenCombineAsync(task2, (f1, f2) -> {
System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId());
System.out.println("任务1返回值:" + f1);
System.out.println("任务2返回值:" + f2);
return f1 + f2;
}, executorService);
Integer res = task3.get();
System.out.println("最终结果:" + res);
}2. OR composition (applyToEither, acceptEither, runAfterEither)
Execute a third task when any one of the two tasks finishes.
@Test
public void testCompletableEitherAsync() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletableFuture
task = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());
int result = 1 + 1;
System.out.println("异步任务1结束");
return result;
}, executorService);
CompletableFuture
task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
int result = 1 + 2;
try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("异步任务2结束");
return result;
}, executorService);
task.acceptEitherAsync(task2, res -> {
System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId());
System.out.println("上一个任务的结果为:" + res);
}, executorService);
}3. allOf and anyOf
allOf waits for all tasks; anyOf proceeds when any task finishes.
@Test
public void testCompletableAallOf() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletableFuture
task = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());
int result = 1 + 1;
System.out.println("异步任务1结束");
return result;
}, executorService);
CompletableFuture
task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
int result = 1 + 2;
try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("异步任务2结束");
return result;
}, executorService);
CompletableFuture
task3 = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务3,当前线程是:" + Thread.currentThread().getId());
int result = 1 + 3;
try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("异步任务3结束");
return result;
}, executorService);
CompletableFuture
allOf = CompletableFuture.allOf(task, task2, task3);
allOf.get();
System.out.println("task结果为:" + task.get());
System.out.println("task2结果为:" + task2.get());
System.out.println("task3结果为:" + task3.get());
} @Test
public void testCompletableAnyOf() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletableFuture
task = CompletableFuture.supplyAsync(() -> 1 + 1, executorService);
CompletableFuture
task2 = CompletableFuture.supplyAsync(() -> 1 + 2, executorService);
CompletableFuture
task3 = CompletableFuture.supplyAsync(() -> 1 + 3, executorService);
CompletableFuture
anyOf = CompletableFuture.anyOf(task, task2, task3);
Object o = anyOf.get();
System.out.println("完成的任务的结果:" + o);
}Best Practices and Pitfalls
Future must call get() or join() to surface exceptions.
CompletableFuture.get() is blocking; consider using the timeout overload.
Avoid the default ForkJoinPool for high‑load services; provide a custom Executor with appropriate pool size.
When configuring a custom thread pool, prefer AbortPolicy over DiscardPolicy or DiscardOldestPolicy to avoid silent task loss.
Selected Java Interview Questions
A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.