Backend Development 9 min read

Local Message Table Principle and Practical Implementation for Distributed Transaction Consistency

This article explains the concept of a local message table, shows how it can be used to achieve reliable distributed transaction consistency with MQ, and provides complete MySQL DDL and Java service code along with a retry mechanism and analysis of failure scenarios.

IT Services Circle
IT Services Circle
IT Services Circle
Local Message Table Principle and Practical Implementation for Distributed Transaction Consistency

This article revisits distributed transaction concepts and introduces the local message table as a simple, reliable solution for ensuring that database changes and MQ messages stay consistent.

Local Message Table Principle

The local message table stores a record of the message inside the same database transaction that modifies business data. If the transaction commits, the message record is persisted; a background thread later sends the message and updates the record status. If the transaction fails, no message is stored, and nothing is sent.

A background task periodically scans records whose next_time has arrived and whose status is still "unsent", then retries sending them.

Practical Implementation

First, create the message table:

CREATE TABLE `message` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `status_delete` tinyint NOT NULL DEFAULT '0' COMMENT '删除标记 0正常 1删除',
  `topic` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'topic',
  `tag` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'tag',
  `msg_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '消息id',
  `msg_key` varchar(64) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '消息key',
  `data` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息json串',
  `try_num` int NOT NULL DEFAULT '0' COMMENT '重试次数',
  `status` tinyint NOT NULL DEFAULT '0' COMMENT '发送状态 0-未发送 1-已发送',
  `next_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '下次驱动开始时间',
  PRIMARY KEY (`id`),
  KEY `idx_key` (`msg_key`),
  KEY `idx_nexttime_status` (`next_time`,`status`),
  KEY `idx_msgid` (`msg_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='本地消息记录表';

Then implement a MessageService that wraps the sending logic:

@Service
public class MessageService implements IMessageService {
    @Resource
    private Producer producer;
    @Resource
    private MessageMapper messageMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void send(String topic, String tag, String key, Object obj) {
        sendDelay(topic, tag, key, obj, 0L);
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void sendDelay(String topic, String tag, String key, Object obj, Long period) {
        int time = (period == 0L ? 10 : period.intValue() / 1000);
        Date nextTime = DateUtil.getAfterNewDateSecond(new Date(), time);
        String data = JSON.toJSONString(obj);
        Message message = new Message()
                .setStatusDelete(0)
                .setTopic(topic)
                .setTag(tag)
                .setMsgId("")
                .setMsgKey(key)
                .setData(data)
                .setTryNum(0)
                .setStatus(0)
                .setNextTime(nextTime);
        // Save local message record
        messageMapper.save(message);
        // After the surrounding transaction commits, send the MQ message and update status
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                String messageId;
                try {
                    if (period == 0L) {
                        messageId = producer.send(topic, tag, key, data);
                    } else {
                        messageId = producer.sendDelay(topic, tag, key, data, period);
                    }
                    Message update = new Message()
                            .setId(message.getId())
                            .setMsgId(messageId)
                            .setStatus(1);
                    messageMapper.updateById(update);
                } catch (Exception e) {
                    log.error("..", e);
                }
            }
        });
    }
}

Usage is straightforward:

@Transactional(rollbackFor = Exception.class)
public void doSth(xx) {
    saveA();
    saveB();
    messageService.send(xxx);
}

The article then analyses three possible failure scenarios (transaction failure, service crash after commit, MQ failure) and shows how the local message table with its retry task gracefully handles each case, preventing data loss or business interruption.

In conclusion, adopting a local message table requires only one database table and a reusable service, yet it dramatically improves the reliability of distributed transactions involving MQ.

javaSpringMySQLrocketmqdistributed transactionlocal message table
IT Services Circle
Written by

IT Services Circle

Delivering cutting-edge internet insights and practical learning resources. We're a passionate and principled IT media platform.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.