Using Spring TransactionSynchronizationManager for Transaction Hooks to Send Kafka Messages
This article demonstrates how to leverage Spring's TransactionSynchronizationManager to detect active transactions and register synchronization callbacks that asynchronously publish payment ledger messages to Kafka after transaction commit, while providing a starter-style library for easy integration.
In a payment system, each account's fund flow must be recorded and archived by sending the flow information as a Kafka message to a dedicated archiving service. To avoid impacting the main business logic, the message should be sent asynchronously and only after the surrounding transaction successfully commits.
Solution Overview
A second‑party library (starter) is proposed to encapsulate the Kafka producer logic, ensuring it does not conflict with any existing KafkaTemplate used by the integrating application. The library must provide a simple API, support transaction‑aware message sending, and minimize integration effort.
Transaction Hook Implementation
The core of the solution relies on TransactionSynchronizationManager , a static utility class that tracks transaction synchronization state per thread. By checking TransactionSynchronizationManager.isSynchronizationActive() , the library can determine whether a transaction is in progress.
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public void sendLog() {
// No active transaction: send message immediately (asynchronously)
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
executor.submit(() -> {
try {
// send message to Kafka
} catch (Exception e) {
// log/handle exception
}
});
return;
}
// Active transaction: register a synchronization callback
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCompletion(int status) {
if (status == TransactionSynchronization.STATUS_COMMITTED) {
// After commit, send message asynchronously
executor.submit(() -> {
try {
// send message to Kafka
} catch (Exception e) {
// log/handle exception
}
});
}
}
});
}The method isSynchronizationActive() checks a thread‑local ThreadLocal<Set<TransactionSynchronization>> that the transaction manager populates when a transaction begins (via TransactionSynchronizationManager.initSynchronization() ).
// Part of TransactionSynchronizationManager.java
private static final ThreadLocal
> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
public static boolean isSynchronizationActive() {
return (synchronizations.get() != null);
}When a transaction starts, initSynchronization() creates a new LinkedHashSet and stores it in the thread‑local variable, marking the transaction as active.
/**
* Activate transaction synchronization for the current thread.
* Called by a transaction manager on transaction begin.
*/
public static void initSynchronization() throws IllegalStateException {
if (isSynchronizationActive()) {
throw new IllegalStateException("Cannot activate transaction synchronization - already active");
}
logger.trace("Initializing transaction synchronization");
synchronizations.set(new LinkedHashSet<>());
}To execute custom logic after a transaction commits, the library registers a TransactionSynchronizationAdapter via registerSynchronization() . The overridden afterCompletion(int status) method checks for STATUS_COMMITTED and then triggers the asynchronous Kafka send.
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCompletion(int status) {
if (status == TransactionSynchronization.STATUS_COMMITTED) {
executor.submit(() -> {
try {
// send message to Kafka
} catch (Exception e) {
// log/handle exception
}
});
}
}
});Because the synchronization information is stored in a thread‑local variable, it is crucial to avoid switching threads between the transaction and the hook execution; otherwise the hook will not be triggered.
Conclusion
The presented approach demonstrates how to safely integrate Kafka message publishing into a Spring‑based payment service by using TransactionSynchronizationManager to detect transaction boundaries and register post‑commit callbacks, ensuring that the main business flow remains unaffected while guaranteeing data consistency.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.