Request Collapsing Techniques: Hystrix Collapser, Custom BatchCollapser, and ConcurrentHashMultiset
This article introduces three request‑collapsing techniques—Hystrix Collapser, a custom BatchCollapser implementation, and Guava's ConcurrentHashMultiset—explaining their design, configuration, code examples, and suitable scenarios for reducing downstream load and improving system throughput.
Preface
In typical request‑response models each request occupies its own thread and memory, and performing I/O for every request can be costly. Merging similar or duplicate requests upstream before sending them downstream can dramatically reduce the load on downstream services and increase overall throughput.
The author spent considerable time investigating this problem, comparing several libraries and implementing a simple collapser to address a specific business need.
Hystrix Collapser
Hystrix
Hystrix, an open‑source library from Netflix, provides circuit‑breaker functionality to keep web servers stable under high concurrency. It also includes a request‑collapsing feature called HystrixCollapser , which aggregates multiple similar requests into a single batch request.
When using Hystrix, the hystrix-javanica module allows developers to write concise, annotation‑driven code. The required dependencies are hystrix-core and hystrix-javanica . Hystrix relies on AOP, so a HystrixAspect bean must be declared in the Spring XML configuration:
<aop:aspectj-autoproxy/>
<bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect"/>Collapser
Hystrix Collapser can be configured via annotations. The method to be collapsed is annotated with @HystrixCollapser , and the batch method with @HystrixCommand . Important points:
The single method may accept only one parameter; for multiple parameters wrap them in a custom class. The batch method receives a java.util.List<SingleParam> .
The single method returns java.util.concurrent.Future<SingleReturn> , while the batch method returns java.util.List<SingleReturn> , and the result order must match the input order.
Simple example:
public class HystrixCollapserSample {
@HystrixCollapser(batchMethod = "batch")
public Future
single(String input) {
return null; // single method will not be executed
}
public List
batch(List
inputs) {
return inputs.stream().map(it -> Boolean.TRUE).collect(Collectors.toList());
}
}Source‑code Implementation Details
The collapser works as follows:
A Spring bean registers the aspect that intercepts methods annotated with @HystrixCollapser .
When such a method is invoked, Hystrix creates (or retrieves) a collapser instance.
The request arguments are stored in a ConcurrentHashMap<RequestArgumentType, CollapsedRequest> and an Observable is returned to the caller as a Future .
A timer thread periodically consumes the stored requests, builds a batch request, executes it, and maps the results back to the original futures.
Because the timer must wait before executing the real request, the collapser adds roughly timerInterval/2 ms of latency to each request.
Configuration
Configuration is provided via the @HystrixCollapser annotation. Key attributes include:
collapserKey – optional, defaults to the method name.
batchMethod – name of the batch method.
scope – either REQUEST or GLOBAL ; the latter shares a collapser across all requests.
collapserProperties – allows setting Hystrix command properties such as maxRequestsInBatch , timerDelayInMilliseconds , and requestCache.enabled .
Full example configuration:
@HystrixCollapser(
batchMethod = "batch",
collapserKey = "single",
scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
collapserProperties = {
@HystrixProperty(name = "maxRequestsInBatch", value = "100"),
@HystrixProperty(name = "timerDelayInMilliseconds", value = "1000"),
@HystrixProperty(name = "requestCache.enabled", value = "true")
})BatchCollapser (Custom Implementation)
Design
To avoid the overhead of Hystrix futures, the author built a simple collapser that stores incoming requests in a container and triggers batch execution either after a time interval or when a request count threshold is reached. The business thread submits the request and immediately returns success without waiting for the batch result.
Implementation
The implementation uses a LinkedBlockingDeque as the container and a ScheduledExecutorService to run a timer thread that periodically drains the deque and processes the batch via a user‑provided Handler<List<E>, Boolean> .
public class BatchCollapser
implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(BatchCollapser.class);
private static volatile Map
instance = Maps.newConcurrentMap();
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);
private volatile LinkedBlockingDeque
batchContainer = new LinkedBlockingDeque<>();
private Handler
, Boolean> cleaner;
private long interval;
private int threshHold;
private BatchCollapser(Handler
, Boolean> cleaner, int threshHold, long interval) {
this.cleaner = cleaner;
this.threshHold = threshHold;
this.interval = interval;
}
@Override
public void afterPropertiesSet() throws Exception {
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try { this.clean(); } catch (Exception e) { logger.error("clean container exception", e); }
}, 0, interval, TimeUnit.MILLISECONDS);
}
public void submit(E event) {
batchContainer.add(event);
if (batchContainer.size() >= threshHold) { clean(); }
}
private void clean() {
List
transferList = Lists.newArrayListWithExpectedSize(threshHold);
batchContainer.drainTo(transferList, 100);
if (CollectionUtils.isEmpty(transferList)) { return; }
try { cleaner.handle(transferList); }
catch (Exception e) { logger.error("batch execute error, transferList:{}", transferList, e); }
}
public static
BatchCollapser getInstance(Handler
, Boolean> cleaner, int threshHold, long interval) {
Class jobClass = cleaner.getClass();
if (instance.get(jobClass) == null) {
synchronized (BatchCollapser.class) {
if (instance.get(jobClass) == null) {
instance.put(jobClass, new BatchCollapser<>(cleaner, threshHold, interval));
}
}
}
return instance.get(jobClass);
}
}Key points:
The collapser is a singleton per handler class to support global usage.
A ScheduledExecutorService is used instead of java.util.Timer to avoid blocking behavior.
ConcurrentHashMultiset
Design
Guava's ConcurrentHashMultiset stores each element with a count, incrementing the count on duplicate inserts. It provides lock‑free thread‑safe add/remove operations using a CAS‑style while(true) loop.
Implementation
It can be used to aggregate high‑frequency duplicate requests before persisting them, reducing downstream pressure.
if (ConcurrentHashMultiset.isEmpty()) { return; }
List
transferList = Lists.newArrayList();
ConcurrentHashMultiset.elementSet().forEach(request -> {
int count = ConcurrentHashMultiset.count(request);
if (count <= 0) { return; }
transferList.add(count == 1 ? request : new Request(request.getIncrement() * count));
ConcurrentHashMultiset.remove(request, count);
});Summary
Hystrix Collapser : suitable when each request needs an individual result and extra latency is acceptable.
BatchCollapser : appropriate when results are not needed and batch execution can be triggered by time or count thresholds.
ConcurrentHashMultiset : ideal for high‑frequency duplicate‑counting scenarios such as statistics aggregation.
Combining BatchCollapser with ConcurrentHashMultiset can leverage both time‑based batching and efficient duplicate counting.
Recommended Reading
Optimizing query separation from 20 s to 500 ms
Data heterogeneity best practices
Spring Cloud Gateway + OAuth2.0 integration
Why Nacos is so powerful
Sentinel limit‑rate questions
OpenFeign Q&A
Spring Cloud Gateway Q&A
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.