Kafka Exactly-Once Semantics and Transaction API Overview
This article explains Kafka's exactly‑once semantics and transaction support, detailing the new producer API methods, related exceptions, configuration parameters, and a sample application illustrating how to initialize, begin, process, and commit or abort transactions while ensuring idempotent and atomic message handling.
Introduction
Kafka provides exactly‑once semantics and transactional messaging. This article consolidates the related API, data flow, and configuration details.
Producer Interface
API Changes
The producer now includes five new methods for transaction handling and a new exception for the send interface.
public interface Producer
extends Closeable {
// Initialize transactions; must be called before any other transaction method.
void initTransactions() throws IllegalStateException;
// Begin a new transaction.
void beginTransaction() throws ProducerFencedException;
// Send consumed offsets to the transaction.
void sendOffsetsToTransaction(Map
offsets,
String consumerGroupId) throws ProducerFencedException;
// Commit the ongoing transaction.
void commitTransaction() throws ProducerFencedException;
// Abort the ongoing transaction.
void abortTransaction() throws ProducerFencedException;
// Asynchronous send returning a future.
public Future
send(ProducerRecord
record);
public Future
send(ProducerRecord
record, Callback callback);
}OutOfOrderSequenceException
If the broker detects missing data, it throws OutOfOrderSequenceException , which propagates through the Future and any provided callback. Subsequent producer calls such as send , beginTransaction , or commitTransaction will then raise an IllegalStateException .
Application Example
A minimal Java program demonstrates the transaction workflow:
public class KafkaTransactionsExample {
public static void main(String[] args) {
KafkaConsumer
consumer = new KafkaConsumer<>(consumerConfig);
// Transactional producer requires a transactional.id.
KafkaProducer
producer = new KafkaProducer<>(producerConfig);
producer.initTransactions();
while (true) {
ConsumerRecords
records = consumer.poll(CONSUMER_POLL_TIMEOUT);
if (!records.isEmpty()) {
producer.beginTransaction();
List
> outputRecords = processRecords(records);
for (ProducerRecord
outputRecord : outputRecords) {
producer.send(outputRecord);
}
// Send consumed offsets as part of the transaction.
producer.sendOffsetsToTransaction(getUncommittedOffsets(), "my-group");
// Commit the transaction (or abort on error).
producer.commitTransaction();
}
}
}
}New Configuration Options
Broker Settings
transactional.id.timeout.ms – Time after which an idle transactional.id is considered expired (default 7 days).
max.transaction.timeout.ms – Upper bound for transaction timeout (default 15 minutes).
transaction.state.log.replication.factor – Replication factor for the transaction log (default 3).
transaction.state.log.num.partitions – Number of partitions for the transaction log (default 50).
transaction.state.log.min.isr – Minimum in‑sync replicas for the transaction log (default 2).
transaction.state.log.segment.bytes – Segment size for the transaction log (default 104 857 600 bytes).
Producer Settings
enable.idempotence – Enables idempotent writes (must be true for transactions).
transaction.timeout.ms – Transaction timeout; must be ≤ broker's max.transaction.timeout.ms (default 60 seconds).
transactional.id – Identifier that ties multiple producer instances to the same transaction state; requires idempotence.
Consumer Settings
isolation.level – Controls visibility of transactional messages (read_uncommitted or read_committed).
Semantic Guarantees
Producer Idempotence
Each producer instance receives a unique Producer ID (PID) and a monotonically increasing sequence number per partition. The broker validates the sequence, rejecting out‑of‑order records and ensuring each message appears exactly once even after retries.
Transaction Guarantees
Transactions allow atomic writes to multiple partitions; either all writes succeed or all are rolled back. When combined with offset commits, the consume‑transform‑produce pipeline becomes atomic, and a TransactionalId guarantees continuity across producer restarts.
Core Concepts
Transaction Coordinator – Manages PIDs, epochs, and transaction state.
Transaction Log – Internal replicated topic storing transaction metadata.
Control Messages – Special broker‑generated messages that indicate commit or abort status to consumers.
TransactionalId – User‑provided identifier that maps to a PID and enables cross‑session transaction recovery.
Producer Epoch – Ensures only one active producer instance per TransactionalId.
Data Flow
The accompanying diagram (not shown) illustrates the interaction between producers, brokers, transaction coordinator, and consumer groups, with numbered RPCs representing each step.
Transaction Lifecycle
FindCoordinatorRequest – Locate the Transaction Coordinator.
InitPidRequest – Obtain a PID and epoch; recover any unfinished transactions.
beginTransaction – Start a new transaction.
Consume‑Transform‑Produce Loop – Produce records, optionally add partitions (AddPartitionsToTxnRequest), send offsets (AddOffsetCommitsToTxnRequest), and commit offsets (TxnOffsetCommitRequest).
Commit or Abort – EndTxnRequest triggers PREPARE_COMMIT/ABORT, WriteTxnMarkerRequest writes control messages, and the coordinator finalizes the transaction state.
After the final commit/abort marker, the transaction log can discard most intermediate data, retaining only the PID and timestamps for cleanup.
Architect
Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.