Backend Development 23 min read

Design and Implementation of a Transactional Message Module with Spring Boot, RabbitMQ, and MySQL

This article details the design principles, implementation steps, and code examples for a lightweight transactional message module in a microservices environment, leveraging Spring Boot, RabbitMQ, MySQL, and Redisson to achieve low‑intrusion, eventual‑consistency messaging with compensation and retry mechanisms.

Architecture Digest
Architecture Digest
Architecture Digest
Design and Implementation of a Transactional Message Module with Spring Boot, RabbitMQ, and MySQL

Premise

Distributed transactions are a tricky problem in microservice architectures. The author adopts a compromise solution based on the local‑message‑table pattern originally proposed by eBay, using RabbitMQ and MySQL to build a lightweight, low‑intrusion transactional message module.

Design Idea

Transactional messages are suitable for weak‑consistency (eventual consistency) scenarios, such as sending a marketing SMS after a user registration or notifying an approval service after an order is saved. Strong‑consistency scenarios should not use this pattern because they require strict synchronous guarantees, which would add unnecessary overhead.

The module records a message in a local table together with the business logic inside the same transaction. After the transaction commits, the message is pushed to RabbitMQ, ensuring that the two actions are atomic from the business perspective.

In short: the upstream service guarantees its own business correctness, pushes a correct message to RabbitMQ, and then its responsibility ends.

Transactional Logic Unit

The saving of the transactional message record and the business logic are bound to the same transaction. The actual push to RabbitMQ is delayed until after the transaction commits, using Spring's TransactionSynchronization callbacks (e.g., afterCommit() or afterCompletion(int status) ).

@Transactional
public Dto businessMethod() {
    // business transaction code block ...
    // save transaction message
    saveTransactionMessageRecord();
    // register transaction synchronizer – push message in afterCommit()
    registerTransactionSynchronization();
    // other business code ...
}

Compensation

If the afterCommit() callback fails (e.g., the service crashes before the callback runs) or RabbitMQ itself is unavailable, the system must retry. The implementation marks the message as "processing" when saved, updates the status to "success" after a successful push, and uses an exponential back‑off algorithm with a maximum retry count for failures.

Implementation

The project introduces the following core dependencies:

<properties>
    <spring.boot.version>2.2.4.RELEASE</spring.boot.version>
    <redisson.version>3.12.1</redisson.version>
    <mysql.connector.version>5.1.48</mysql.connector.version>
</properties>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.connector.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-aop</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>${redisson.version}</version>
    </dependency>
</dependencies>

Table Design

Two MySQL tables are created: t_transactional_message for message metadata and t_transactional_message_content for the actual payload. Splitting the tables avoids heavy I/O when querying large numbers of records.

CREATE TABLE `t_transactional_message` (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
    edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    creator VARCHAR(20) NOT NULL DEFAULT 'admin',
    editor VARCHAR(20) NOT NULL DEFAULT 'admin',
    deleted TINYINT NOT NULL DEFAULT 0,
    current_retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '当前重试次数',
    max_retry_times TINYINT NOT NULL DEFAULT 5 COMMENT '最大重试次数',
    queue_name VARCHAR(255) NOT NULL COMMENT '队列名',
    exchange_name VARCHAR(255) NOT NULL COMMENT '交换器名',
    exchange_type VARCHAR(8) NOT NULL COMMENT '交换类型',
    routing_key VARCHAR(255) COMMENT '路由键',
    business_module VARCHAR(32) NOT NULL COMMENT '业务模块',
    business_key VARCHAR(255) NOT NULL COMMENT '业务键',
    next_schedule_time DATETIME NOT NULL COMMENT '下一次调度时间',
    message_status TINYINT NOT NULL DEFAULT 0 COMMENT '消息状态',
    init_backoff BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT '退避初始化值,单位为秒',
    backoff_factor TINYINT NOT NULL DEFAULT 2 COMMENT '退避因子(也就是指数)',
    INDEX idx_queue_name (queue_name),
    INDEX idx_create_time (create_time),
    INDEX idx_next_schedule_time (next_schedule_time),
    INDEX idx_business_key (business_key)
) COMMENT '事务消息表';

CREATE TABLE `t_transactional_message_content` (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    message_id BIGINT UNSIGNED NOT NULL COMMENT '事务消息记录ID',
    content TEXT COMMENT '消息内容'
) COMMENT '事务消息内容表';

Code Design

Domain models TransactionalMessage and TransactionalMessageContent are defined with Lombok @Data . DAO interfaces provide CRUD operations. The service interface TransactionalMessageService exposes a single method to send a transactional message.

@Data
public class TransactionalMessage {
    private Long id;
    private LocalDateTime createTime;
    private LocalDateTime editTime;
    private String creator;
    private String editor;
    private Integer deleted;
    private Integer currentRetryTimes;
    private Integer maxRetryTimes;
    private String queueName;
    private String exchangeName;
    private String exchangeType;
    private String routingKey;
    private String businessModule;
    private String businessKey;
    private LocalDateTime nextScheduleTime;
    private Integer messageStatus;
    private Long initBackoff;
    private Integer backoffFactor;
}

@Data
public class TransactionalMessageContent {
    private Long id;
    private Long messageId;
    private String content;
}

The implementation RabbitTransactionalMessageService registers a Spring TransactionSynchronization that pushes the message to RabbitMQ in afterCommit() . It also ensures that the queue and exchange are declared only once using a concurrent map.

@Slf4j
@Service
@RequiredArgsConstructor
public class RabbitTransactionalMessageService implements TransactionalMessageService {
    private final AmqpAdmin amqpAdmin;
    private final TransactionalMessageManagementService managementService;
    private static final ConcurrentMap
QUEUE_ALREADY_DECLARE = new ConcurrentHashMap<>();

    @Override
    public void sendTransactionalMessage(Destination destination, TxMessage message) {
        String queueName = destination.queueName();
        String exchangeName = destination.exchangeName();
        String routingKey = destination.routingKey();
        ExchangeType exchangeType = destination.exchangeType();
        // atomic pre‑declaration
        QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName, k -> {
            Queue queue = new Queue(queueName);
            amqpAdmin.declareQueue(queue);
            Exchange exchange = new CustomExchange(exchangeName, exchangeType.getType());
            amqpAdmin.declareExchange(exchange);
            Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
            amqpAdmin.declareBinding(binding);
            return true;
        });
        TransactionalMessage record = new TransactionalMessage();
        record.setQueueName(queueName);
        record.setExchangeName(exchangeName);
        record.setExchangeType(exchangeType.getType());
        record.setRoutingKey(routingKey);
        record.setBusinessModule(message.businessModule());
        record.setBusinessKey(message.businessKey());
        String content = message.content();
        // save record
        managementService.saveTransactionalMessageRecord(record, content);
        // register synchronizer
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                managementService.sendMessageSync(record, content);
            }
        });
    }
}

The management service handles persistence, success/failure marking, exponential back‑off calculation, and compensation processing. Compensation runs in a scheduled job protected by a Redisson distributed lock.

@Slf4j
@RequiredArgsConstructor
@Service
public class TransactionalMessageManagementService {
    private final TransactionalMessageDao messageDao;
    private final TransactionalMessageContentDao contentDao;
    private final RabbitTemplate rabbitTemplate;
    private static final LocalDateTime END = LocalDateTime.of(2999, 1, 1, 0, 0, 0);
    private static final long DEFAULT_INIT_BACKOFF = 10L;
    private static final int DEFAULT_BACKOFF_FACTOR = 2;
    private static final int DEFAULT_MAX_RETRY_TIMES = 5;
    private static final int LIMIT = 100;

    public void saveTransactionalMessageRecord(TransactionalMessage record, String content) {
        record.setMessageStatus(TxMessageStatus.PENDING.getStatus());
        record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.now(), DEFAULT_INIT_BACKOFF, DEFAULT_BACKOFF_FACTOR, 0));
        record.setCurrentRetryTimes(0);
        record.setInitBackoff(DEFAULT_INIT_BACKOFF);
        record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR);
        record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES);
        messageDao.insertSelective(record);
        TransactionalMessageContent messageContent = new TransactionalMessageContent();
        messageContent.setContent(content);
        messageContent.setMessageId(record.getId());
        contentDao.insert(messageContent);
    }

    public void sendMessageSync(TransactionalMessage record, String content) {
        try {
            rabbitTemplate.convertAndSend(record.getExchangeName(), record.getRoutingKey(), content);
            if (log.isDebugEnabled()) {
                log.debug("发送消息成功,目标队列:{},消息内容:{}", record.getQueueName(), content);
            }
            markSuccess(record);
        } catch (Exception e) {
            markFail(record, e);
        }
    }

    private void markSuccess(TransactionalMessage record) {
        record.setNextScheduleTime(END);
        record.setCurrentRetryTimes(Math.min(record.getCurrentRetryTimes() + 1, record.getMaxRetryTimes()));
        record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus());
        record.setEditTime(LocalDateTime.now());
        messageDao.updateStatusSelective(record);
    }

    private void markFail(TransactionalMessage record, Exception e) {
        log.error("发送消息失败,目标队列:{}", record.getQueueName(), e);
        record.setCurrentRetryTimes(Math.min(record.getCurrentRetryTimes() + 1, record.getMaxRetryTimes()));
        LocalDateTime next = calculateNextScheduleTime(record.getNextScheduleTime(), record.getInitBackoff(), record.getBackoffFactor(), record.getCurrentRetryTimes());
        record.setNextScheduleTime(next);
        record.setMessageStatus(TxMessageStatus.FAIL.getStatus());
        record.setEditTime(LocalDateTime.now());
        messageDao.updateStatusSelective(record);
    }

    private LocalDateTime calculateNextScheduleTime(LocalDateTime base, long initBackoff, long backoffFactor, long round) {
        double delta = initBackoff * Math.pow(backoffFactor, round);
        return base.plusSeconds((long) delta);
    }

    public void processPendingCompensationRecords() {
        LocalDateTime max = LocalDateTime.now().plusSeconds(-DEFAULT_INIT_BACKOFF);
        LocalDateTime min = max.plusHours(-1);
        Map
map = messageDao.queryPendingCompensationRecords(min, max, LIMIT)
                .stream()
                .collect(Collectors.toMap(TransactionalMessage::getId, m -> m));
        if (!map.isEmpty()) {
            StringJoiner joiner = new StringJoiner(",", "(", ")");
            map.keySet().forEach(id -> joiner.add(id.toString()));
            contentDao.queryByMessageIds(joiner.toString())
                    .forEach(item -> {
                        TransactionalMessage msg = map.get(item.getMessageId());
                        sendMessageSync(msg, item.getContent());
                    });
        }
    }
}

The scheduled job runs every 10 seconds, acquires a Redisson lock named transactionalMessageCompensationTask , executes the compensation logic, and ensures the lock is held long enough to finish the work.

@Slf4j
@RequiredArgsConstructor
@Configuration
@EnableScheduling
public class ScheduleJobAutoConfiguration {
    private final TransactionalMessageManagementService managementService;
    private final RedissonClient redisson = Redisson.create();

    @Scheduled(fixedDelay = 10000)
    public void transactionalMessageCompensationTask() throws Exception {
        RLock lock = redisson.getLock("transactionalMessageCompensationTask");
        boolean tryLock = lock.tryLock(5, 300, TimeUnit.SECONDS);
        if (tryLock) {
            try {
                long start = System.currentTimeMillis();
                log.info("开始执行事务消息推送补偿定时任务...");
                managementService.processPendingCompensationRecords();
                long end = System.currentTimeMillis();
                long delta = end - start;
                if (delta < 5000) {
                    Thread.sleep(5000 - delta);
                }
                log.info("执行事务消息推送补偿定时任务完毕,耗时:{} ms...", delta);
            } finally {
                lock.unlock();
            }
        }
    }
}

Finally, a mock business runner demonstrates how to save an order inside a Spring @Transactional method and then send a transactional message using the module.

@Component
@RequiredArgsConstructor
public class MockBusinessRunner implements CommandLineRunner {
    private final MockBusinessService mockBusinessService;
    @Override
    public void run(String... args) throws Exception {
        mockBusinessService.saveOrder();
    }
}

@Service
@RequiredArgsConstructor
public class MockBusinessService {
    private final JdbcTemplate jdbcTemplate;
    private final TransactionalMessageService transactionalMessageService;
    private final ObjectMapper objectMapper;

    @Transactional(rollbackFor = Exception.class)
    public void saveOrder() throws Exception {
        String orderId = UUID.randomUUID().toString();
        BigDecimal amount = BigDecimal.valueOf(100L);
        Map
message = new HashMap<>();
        message.put("orderId", orderId);
        message.put("amount", amount);
        jdbcTemplate.update("INSERT INTO t_order(order_id,amount) VALUES (?,?)", ps -> {
            ps.setString(1, orderId);
            ps.setBigDecimal(2, amount);
        });
        String content = objectMapper.writeValueAsString(message);
        transactionalMessageService.sendTransactionalMessage(
                DefaultDestination.builder()
                        .exchangeName("tm.test.exchange")
                        .queueName("tm.test.queue")
                        .routingKey("tm.test.key")
                        .exchangeType(ExchangeType.DIRECT)
                        .build(),
                DefaultTxMessage.builder()
                        .businessKey(orderId)
                        .businessModule("SAVE_ORDER")
                        .content(content)
                        .build()
        );
        log.info("保存订单:{}成功...", orderId);
    }
}

The article concludes that a well‑designed transactional message module can greatly improve system throughput by offloading work to asynchronous messaging, while still providing compensation and monitoring capabilities that can be integrated with Micrometer, Prometheus, and Grafana.

DistributedSystemsMySQLRabbitMQSpringBootTransactionalMessaging
Architecture Digest
Written by

Architecture Digest

Focusing on Java backend development, covering application architecture from top-tier internet companies (high availability, high performance, high stability), big data, machine learning, Java architecture, and other popular fields.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.