Understanding Java CompletionService: Implementation, Usage, and Performance Benefits
This article explains the purpose and implementation of Java's CompletionService, compares it with ExecutorService, shows how it retrieves task results in completion order, provides detailed source code analysis, and discusses typical use cases such as load‑balancing and fast‑result retrieval in concurrent applications.
ExecutorService vs CompletionService Comparison
When submitting four tasks (A, B, C, D) to an ExecutorService , results are retrieved in submission order, which can block if a long‑running task is first.
ExecutorService executorService = Executors.newFixedThreadPool(4);
List
futures = new ArrayList
>();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
futures.add(executorService.submit(D));
// iterate Future list, get each result
for (Future future : futures) {
Integer result = future.get();
// other business logic
}Using CompletionService the same tasks are submitted, but results are taken from the completion queue as soon as each task finishes.
ExecutorService executorService = Executors.newFixedThreadPool(4);
// ExecutorCompletionService is the only implementation of CompletionService
CompletionService executorCompletionService = new ExecutorCompletionService<>(executorService);
List
futures = new ArrayList
>();
futures.add(executorCompletionService.submit(A));
futures.add(executorCompletionService.submit(B));
futures.add(executorCompletionService.submit(C));
futures.add(executorCompletionService.submit(D));
// iterate Future list, get each result
for (int i = 0; i < futures.size(); i++) {
Integer result = executorCompletionService.take().get();
// other business logic
}The main drawback of Future.get() is that it blocks until the specific task completes, preventing other tasks from progressing.
“If the Future result is not ready, calling get() blocks until it is returned.”
CompletionService decouples task production from result consumption, placing completed tasks into a blocking queue so the consumer always receives the earliest result.
“It is a service that decouples asynchronous task production from result consumption.”
High‑Level Overview of CompletionService
It works like a message queue: completed tasks are enqueued and can be taken in completion order.
CompletionService Interface
The interface defines five methods: submit(Callable) , submit(Runnable, V) , take() , poll() , and poll(timeout, unit) .
ExecutorCompletionService Implementation
It requires an Executor and optionally a BlockingQueue (default LinkedBlockingQueue ). Submitted tasks are wrapped in a QueueingFuture which, upon completion, adds the task to the completion queue via the overridden done() method.
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue
> completionQueue;
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue
>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue
> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
} public Future
submit(Callable
task) {
if (task == null) throw new NullPointerException();
RunnableFuture
f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
} private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// important step
done();
callable = null; // reduce footprint
} protected void done() {
completionQueue.add(task);
}Main Use Cases
Typical scenarios include Dubbo's Forking Cluster, parallel file or image download (stop other downloads after the nearest source finishes), and aggregating results from multiple services such as weather APIs where the first successful result is needed.
Conclusion
CompletionService provides a simple way to obtain the fastest result among concurrent tasks and can act as a lightweight load balancer.
Further Questions
What considerations are needed when processing results asynchronously?
Would you choose an unbounded queue and why?
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.