Big Data 8 min read

Understanding Custom Stream IDs and Topology Building in Apache Storm

This article explains how to construct Apache Storm topologies with custom stream IDs, demonstrates the classic WordCountTopology example, and provides detailed Java code snippets illustrating spout and bolt configurations, stream declarations, and grouping strategies for real‑time stream processing.

Architect
Architect
Architect
Understanding Custom Stream IDs and Topology Building in Apache Storm

The article starts with a personal anecdote about encountering a complex Storm CEP engine (Flowmix) and the many groupings and stream IDs used in its topology.

It then presents a sample TopologyBuilder implementation, showing how to set spouts and bolts, declare groupings, and define output streams using Java code.

public TopologyBuilder create() {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(EVENT, (IRichSpout) eventsComponent, eventLoaderParallelism == -1 ? parallelismHint : eventLoaderParallelism);
    builder.setSpout(FLOW_LOADER_STREAM, (IRichSpout) flowLoaderSpout, 1);
    builder.setSpout("tick", new TickSpout(1000), 1);
    builder.setBolt(INITIALIZER, new FlowInitializerBolt(), parallelismHint)
        .localOrShuffleGrouping(EVENT)
        .allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM);
    // ... other bolt declarations ...
    return builder;
}
private static void declarebolt(TopologyBuilder builder, String boltName, IRichBolt bolt, int parallelism, boolean control) {
    BoltDeclarer declarer = builder.setBolt(boltName, bolt, parallelism)
        .allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM)
        .allGrouping("tick", "tick")
        .localOrShuffleGrouping(INITIALIZER, boltName)
        // ... other groupings ...
        .localOrShuffleGrouping(JOIN, boltName);
}
public static void declareOutputStreams(OutputFieldsDeclarer declarer, Fields fields) {
    declarer.declareStream(PARTITION, fields);
    // ... other stream declarations ...
}

Next, the classic WordCountTopology example is revisited, with full Java source for spout, split bolt, count bolt, and printer bolt, illustrating default stream usage.

public class WordCountTopologySimple {
    public static class RandomSentenceSpout extends BaseRichSpout { /* ... */ }
    public static class SplitSentenceBolt extends BaseRichBolt { /* ... */ }
    public static class WordCountBolt extends BaseBasicBolt { /* ... */ }
    public static class PrinterBolt extends BaseBasicBolt { /* ... */ }
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);
        builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout");
        builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", new Fields("word"));
        builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count");
        // submit topology ...
    }
}

The article then explains that by default the stream-id is default , and shows how to emit and declare custom stream IDs, accompanied by diagrams.

It provides concrete code for a spout emitting on a custom stream, bolts receiving from those streams, and the topology wiring that specifies both component and stream IDs.

class RandomSentenceSpout {
    public void nextTuple() {
        Utils.sleep(1000);
        String sentence = sentences[rand.nextInt(sentences.length)];
        this.collector.emit("split-stream", new Values(sentence));
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("split-stream", new Fields("sentence"));
    }
}
class SplitSentenceBolt {
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        for (String word : sentence.split(" ")) {
            this.collector.emit("count-stream", new Values(word));
        }
        this.collector.ack(tuple);
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("count-stream", new Fields("word"));
    }
}
class WordCountBolt {
    public void execute(Tuple tuple) {
        String word = tuple.getString(0);
        Integer count = counts.getOrDefault(word, 0) + 1;
        counts.put(word, count);
        collector.emit("print-stream", new Values(word, count));
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("print-stream", new Fields("word", "count"));
    }
}
class Topology {
    public static void main() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);
        builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout", "split-stream");
        builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", "count-stream", new Fields("word"));
        builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count", "print-stream");
    }
}

Finally, the article outlines the two‑step process for using custom stream IDs and includes several illustrative images of the topology flow.

Javabig datastream processingApache StormtopologyCustom Stream ID
Architect
Written by

Architect

Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.