LMAX Disruptor: High‑Performance In‑Memory Queue and Its Typical Use Cases
The article introduces LMAX Disruptor, a high‑performance, lock‑free in‑memory queue for the JVM, compares it with JDK thread‑safe queues, and demonstrates five practical usage scenarios—including broadcast, log collection, chain of responsibility, multi‑task coordination, and multi‑consumer groups—through detailed Java code examples.
Disruptor is an open‑source, high‑performance memory queue developed by the UK forex trading company LMAX, which won Oracle's Duke's Choice Award in 2011. It provides functionality similar to distributed queues like Kafka or RocketMQ, but its scope is limited to the JVM memory space.
Unlike the built‑in thread‑safe queues in the JDK, which are either lock‑based and bounded (e.g., ArrayBlockingQueue , LinkedBlockingQueue ) or lock‑free and unbounded (e.g., LinkedTransferQueue , ConcurrentLinkedQueue ), Disruptor achieves lock‑free operation while keeping the queue bounded and thread‑safe, thus avoiding performance penalties and memory‑overflow risks.
Many large‑scale systems have adopted Disruptor, as illustrated by case studies from DeWu and Vivo.
1. Broadcast Scenario
In a broadcast pattern, a single event from an upstream system is delivered to multiple downstream consumers. Disruptor supports this via its broadcast mode.
public class Broadcast {
public static void main(String[] args) throws InterruptedException {
int bufferSize = 1024;
Disruptor
disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
EventHandler
consumer1 = new LongEventHandler("consumer1");
EventHandler
consumer2 = new LongEventHandler("consumer2");
EventHandler
consumer3 = new LongEventHandler("consumer3");
disruptor.handleEventsWith(consumer1, consumer2, consumer3);
disruptor.start();
RingBuffer
ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; ; l++) {
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
}
}2. Log Collection
A log‑collection scenario with three producer nodes sending logs to Disruptor and three consumer workers processing them.
public class LogCollectHandler implements WorkHandler
{
private String consumer;
public LogCollectHandler(String consumer) { this.consumer = consumer; }
@Override
public void onEvent(LongEvent event) {
System.out.println("consumer: " + consumer + ",Event: " + event);
}
} public static void main(String[] args) throws InterruptedException {
int bufferSize = 1024;
Disruptor
disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
WorkHandler
consumer1 = new LogCollectHandler("consumer1");
WorkHandler
consumer2 = new LogCollectHandler("consumer2");
WorkHandler
consumer3 = new LogCollectHandler("consumer3");
disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3);
disruptor.start();
}Note: When using handleEventsWithWorkerPool , the consumers are WorkHandler instances, which compete for events (each event is processed by only one consumer). Using EventHandler would result in a fan‑out where every consumer receives every event.
3. Chain of Responsibility
Disruptor can model a chain of processing steps, similar to the classic Chain of Responsibility pattern.
public static void main(String[] args) throws InterruptedException {
int bufferSize = 1024;
Disruptor
disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
EventHandler
consumer1 = new LongEventHandler("consumer1");
EventHandler
consumer2 = new LongEventHandler("consumer2");
EventHandler
consumer3 = new LongEventHandler("consumer3");
disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3);
disruptor.start();
}Multiple parallel chains are also supported by configuring separate groups of handlers.
public static void main(String[] args) throws InterruptedException {
int bufferSize = 1024;
Disruptor
disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
EventHandler
c1 = new LongEventHandler("consumer1");
EventHandler
c2 = new LongEventHandler("consumer2");
EventHandler
c3 = new LongEventHandler("consumer3");
EventHandler
c4 = new LongEventHandler("consumer4");
EventHandler
c5 = new LongEventHandler("consumer5");
EventHandler
c6 = new LongEventHandler("consumer6");
disruptor.handleEventsWith(c1).then(c2).then(c3);
disruptor.handleEventsWith(c4).then(c5).then(c6);
disruptor.start();
}4. Multi‑Task Coordination
The classic coffee‑making analogy demonstrates parallel tasks (boiling water, washing cup, grinding coffee) that must all finish before proceeding. The same logic can be expressed with Java's CompletableFuture or with Disruptor.
public static void main(String[] args) {
ExecutorService executor = ...;
CompletableFuture
f1 = CompletableFuture.runAsync(() -> { try { washCup(); } catch (InterruptedException e) { e.printStackTrace(); } }, executor);
CompletableFuture
f2 = CompletableFuture.runAsync(() -> { try { hotWater(); } catch (InterruptedException e) { e.printStackTrace(); } }, executor);
CompletableFuture
f3 = CompletableFuture.runAsync(() -> { try { grindCoffee(); } catch (InterruptedException e) { e.printStackTrace(); } }, executor);
CompletableFuture.allOf(f1, f2, f3).thenAccept(r -> System.out.println("泡咖啡"));
System.out.println("我是主线程");
} public static void main(String[] args) throws InterruptedException {
int bufferSize = 1024;
Disruptor
disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
EventHandler
c1 = new LongEventHandler("consumer1");
EventHandler
c2 = new LongEventHandler("consumer2");
EventHandler
c3 = new LongEventHandler("consumer3");
EventHandler
c4 = new LongEventHandler("consumer4");
disruptor.handleEventsWith(c1, c2, c3).then(c4);
disruptor.start();
}5. Multiple Consumer Groups
Similar to mainstream message‑queue systems, Disruptor can create independent consumer groups where groups process events in parallel, while consumers within a group compete for the same events.
public static void main(String[] args) throws InterruptedException {
int bufferSize = 1024;
Disruptor
disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
WorkHandler
c1 = new LogWorkHandler("consumer1");
WorkHandler
c2 = new LogWorkHandler("consumer2");
WorkHandler
c3 = new LogWorkHandler("consumer3");
WorkHandler
c4 = new LogWorkHandler("consumer4");
WorkHandler
c5 = new LogWorkHandler("consumer5");
WorkHandler
c6 = new LogWorkHandler("consumer6");
disruptor.handleEventsWithWorkerPool(c1, c2, c3);
disruptor.handleEventsWithWorkerPool(c4, c5, c6);
disruptor.start();
}6. Summary
By flexibly combining consumers, Disruptor offers a rich set of usage scenarios. When selecting a solution, consider not only the functional requirements but also Disruptor's core advantage as a high‑performance in‑memory queue.
IT Services Circle
Delivering cutting-edge internet insights and practical learning resources. We're a passionate and principled IT media platform.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.