Understanding Kafka Topic Partitions, Producer Partitioning Strategies, and Consumer Assignment
This article explains how Kafka producers decide which partition to send messages to, how topic partition counts are configured, and how consumer groups assign partitions to instances using default range and round‑robin strategies, with code examples for illustration.
We explain that producers send messages to topics which are divided into partitions, and consumers in a consumer group read from these partitions; the key questions are which partition a producer sends to and how partitions are assigned to consumer instances.
1. Topic Partition Count Settings
The global default partition count can be set in server.properties , default is 1; individual topics can override this when created, e.g.:
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 2 --replication-factor 1Describing a topic shows its partitions and replicas.
# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic abc
Topic:abc PartitionCount:2 ReplicationFactor:1
Topic: abc Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: abc Partition: 1 Leader: 0 Replicas: 0 Isr: 02. Producer Partitioning
The default partitioning strategy is:
If a partition is specified in the record, use it. If no partition is specified but a key is present, choose a partition based on the hash of the key. If neither partition nor key is present, choose a partition in a round‑robin fashion.
The implementation class is org.apache.kafka.clients.producer.internals.DefaultPartitioner . The source code confirms the logic:
/** Compute the partition for the given record.
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List
partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List
availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non‑available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}3. Consumer Partition Assignment
Consumers subscribe to a topic as a group; each partition can be consumed by only one instance in the group. If the number of partitions is less than the number of consumer instances, some consumers will be idle under the default strategy.
3.1 Range Assignor
The default assignor org.apache.kafka.clients.consumer.RangeAssignor works per topic: partitions are ordered numerically, consumers are ordered lexicographically, then partitions are divided evenly; extra partitions go to the first consumers.
public Map
> assign(Map
partitionsPerTopic,
Map
subscriptions) {
Map
> consumersPerTopic = consumersPerTopic(subscriptions);
Map
> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<>());
for (Map.Entry
> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
List
consumersForTopic = topicEntry.getValue();
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null) continue;
Collections.sort(consumersForTopic);
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List
partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}3.2 RoundRobin Assignor
The round‑robin assignor org.apache.kafka.clients.consumer.RoundRobinAssignor lays out all partitions and all consumers and distributes them in a round‑robin manner, achieving a more balanced distribution when subscriptions are identical.
/** The round robin assignor lays out all the available partitions and all the available consumers.
* It then proceeds to do a round robin assignment from partition to consumer.
* ... (example omitted) ... */When consumer subscriptions differ, the algorithm still iterates round‑robin but skips consumers that do not subscribe to a given topic, which may lead to imbalance.
4. Test Code
Example Maven pom.xml and simple producer/consumer Java classes are provided to demonstrate the concepts.
// HelloProducer.java (code omitted for brevity) // HelloConsumer.java (code omitted for brevity)Architect
Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.