Ensuring Idempotent Consumption in MQ: Strategies and Code Examples
To prevent duplicate processing when integrating message queues for rate limiting and decoupling, this article explains idempotency concepts, common pitfalls, and five practical solutions—including query checks, pessimistic and optimistic locking, deduplication tables, and non‑transactional approaches—complete with code snippets and diagrams.
1. Idempotency Issue
Idempotency means that executing a method multiple times yields the same result. Common idempotent operations include:
<code>1. Query idempotency
select * from user where id = 10;
2. Update idempotency
update user set name = 'zhangsan' where id = 20;
3. Insert idempotency (userId is a unique key)
insert info user(user_id, name, age) values (1, 'zhangsan', 20);
4. Delete idempotency
delete from user where id = 21;</code>Non‑idempotent examples:
<code>1. Non‑idempotent update
update user set age = age + 1 where id = 20;
2. Non‑idempotent insert
insert into user(name, age, nick_name) values ('zhangsan', 20, 'sanzhang');</code>2. Causes of MQ Idempotency Problems
(1) Producer duplicate sending
Network timeouts may cause the producer to resend a message that the MQ has already received, resulting in duplicate messages for the consumer.
(2) MQ retry mechanism
If the consumer acknowledges a message slowly, MQ may consider the consumption failed and redeliver the same message, causing the consumer to process it twice.
3. MQ Idempotency Solutions
After analyzing the root causes, the following common solutions are used on the consumer side.
(1) Query Method
Before processing, query the database using the business ID (e.g., order ID) to see if the operation has already been performed. If it exists, skip the message. This method fails under high concurrency because multiple threads may read the same state before any update.
(2) Pessimistic Lock
Use SELECT ... FOR UPDATE to lock the row during the check, forcing other threads to wait until the lock is released. This guarantees correctness but reduces throughput because the lock holds the transaction.
(3) Optimistic Lock
Include a version field with each message. When consuming, update the record with a condition on id and version . If the affected row count is greater than 0, the update succeeded; otherwise the message is considered already processed. This improves concurrency but requires the producer to carry extra version information.
(4) Deduplication Table
Insert the message’s unique key (e.g., message ID or order ID) into a dedicated message table that defines msg_id as a unique constraint. If the insert succeeds, proceed with business logic; if a DuplicateKeyException occurs, swallow the exception and treat the message as already consumed.
<code>try {
// 1. Insert into message table
messageMapper.addMessage(message);
// 2. Execute business logic
this.dealMessage(message);
// 3. Return success flag to MQ
return Boolean.TRUE;
} catch (DuplicateKeyException de) {
log.warn("消息重复 messageId:{}", message.getId());
return Boolean.TRUE;
}</code>This approach is independent of the specific business and can be reused across services, but it requires the entire consumption flow to be wrapped in a relational‑database transaction.
(5) Non‑Transactional Deduplication
When the consumer must also call external systems (e.g., via RPC) or work with non‑transactional stores, a pure deduplication table is insufficient. The following pattern adds an expiration flag and status field to the message record.
<code>insert message (msg_id, desc, expire_time, status) values (1, 'MQ消息', 5, 0);</code>Steps:
Consume the MQ message and insert a record with a short expire_time (e.g., 5 minutes) and status 0 (processing).
If the insert succeeds, execute the local business, then notify downstream systems via RPC. On success, update status to 1 and acknowledge the MQ message. On failure, roll back the business, delete the record, and signal MQ consumption failure.
If the insert fails (duplicate key), check the existing record’s status . If 0 , the message is still being processed and MQ should be told to retry later; if 1 , the message has already been handled and MQ can be acknowledged.
A scheduled job (e.g., XXL‑Job) periodically scans for records whose expire_time has passed and removes them, preventing dead‑letter buildup.
By combining retry handling, status checks, and periodic cleanup, this method achieves idempotent consumption without relying on a single database transaction.
Conclusion
The query‑before‑execute method works when concurrency is low, but high‑throughput scenarios need locking or versioning. Pessimistic locks guarantee correctness at the cost of performance; optimistic locks improve throughput but add complexity. A dedicated deduplication table offers a clean, reusable solution, while a non‑transactional pattern extends idempotency to workflows involving RPC or non‑relational stores.
Lobster Programming
Sharing insights on technical analysis and exchange, making life better through technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.