Implementing Cache‑Database Consistency with Canal, MySQL, Redis and RabbitMQ
This article demonstrates how to achieve cache‑database consistency by updating MySQL first, deleting Redis cache asynchronously via Canal, publishing change events to RabbitMQ, and handling acknowledgments manually in a Spring Boot application, complete with configuration steps and troubleshooting tips.
Recently I was researching Canal and read an article about solving cache‑database consistency problems; it suggested a solution that combines Canal with other components, so I decided to implement it manually.
Architecture
The proposed approach includes three ideas:
Update the database first, then delete the cache to avoid concurrency issues.
Use asynchronous retries to ensure both steps succeed.
Subscribe to change logs to clear Redis cache.
Based on this, the architecture is:
APP reads data from Redis and writes updates to MySQL.
Canal captures MySQL binlog via dump protocol.
Canal pushes the captured data to an MQ (RabbitMQ).
The MQ consumer deletes the corresponding Redis cache entry.
Environment Preparation
Install MySQL, Canal and RabbitMQ (the server already has RabbitMQ installed). The following configuration files need to be edited.
Canal Configuration
Modify conf/canal.properties :
# Specify mode
canal.serverMode = rabbitMQ
# Specify instance (multiple instances separated by commas)
canal.destinations = example
# RabbitMQ server IP
rabbitmq.host = 127.0.0.1
# Virtual host
rabbitmq.virtual.host = /
# Exchange
rabbitmq.exchange = xxx
# Username
rabbitmq.username = xxx
# Password
rabbitmq.password = xxx
rabbitmq.deliveryMode =Modify the instance file conf/example/instance.properties :
# Slave ID, must differ from MySQL server ID
canal.instance.mysql.slaveId=10
# Database address
canal.instance.master.address=ip:port
# Credentials
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx
# Filter all databases and tables
canal.instance.filter.regex=.*\..*
# RabbitMQ routing key
canal.mq.topic=xxxAfter editing, restart the Canal service.
Database
Create the table:
CREATE TABLE `product_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`price` decimal(10,4) DEFAULT NULL,
`create_date` datetime DEFAULT NULL,
`update_date` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;Insert sample data:
INSERT INTO product_info (id, name, price, create_date, update_date) VALUES
(1, '从你的全世界路过', 14.0000, '2020-11-21 21:26:12', '2021-03-27 22:17:39'),
(2, '乔布斯传', 25.0000, '2020-11-21 21:26:42', '2021-03-27 22:17:42'),
(3, 'java开发', 87.0000, '2021-03-27 22:43:31', '2021-03-27 22:43:34');Practical Implementation
The project contains many dependencies; the source code can be obtained by replying “canal” to the public account.
RabbitMQ Configuration
@Configuration
public class RabbitMQConfig {
public static final String CANAL_QUEUE = "canal_queue"; // queue
public static final String DIRECT_EXCHANGE = "canal"; // exchange, must match Canal config
public static final String ROUTING_KEY = "routingkey"; // routing key, must match Canal config
@Bean
public Queue canalQueue() {
return new Queue(CANAL_QUEUE, true);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(canalQueue()).to(directExchange()).with(ROUTING_KEY);
}
}Cache‑First Retrieval
@Override
public ProductInfo findProductInfo(Long productInfoId) {
// 1. Try Redis
Object object = redisTemplate.opsForValue().get(REDIS_PRODUCT_KEY + productInfoId);
if (ObjectUtil.isNotEmpty(object)) {
return (ProductInfo) object;
}
// 2. Fallback to MySQL
ProductInfo productInfo = this.baseMapper.selectById(productInfoId);
if (productInfo != null) {
// 3. Cache the result
redisTemplate.opsForValue().set(REDIS_PRODUCT_KEY + productInfoId, productInfo,
REDIS_PRODUCT_KEY_EXPIRE, TimeUnit.SECONDS);
return productInfo;
}
return null;
}Update Method (publishes to MQ)
@PostMapping("/update")
public AjaxResult update(@RequestBody ProductInfo productInfo) {
productInfoService.updateById(productInfo);
return AjaxResult.success();
}After calling the update endpoint, the message appears in RabbitMQ Management, confirming that the change was sent.
Problem Diagnosis
Initially no message was observed because the Canal instance’s meta.dat file stored a binlog position that did not match the MySQL server. The fix was to stop Canal, delete meta.dat , ensure canal.instance.master.address points to the correct MySQL host, and restart Canal.
MQ Consumer
@RabbitListener(queues = "canal_queue")
public void getMsg(Message message, Channel channel, String msg) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("Consumer queue: " + message.getMessageProperties().getConsumerQueue());
ProductInfoDetail productInfoDetail = JSON.parseObject(msg, ProductInfoDetail.class);
if (productInfoDetail != null && productInfoDetail.getData() != null) {
ProductInfo productInfo = productInfoDetail.getData().get(0);
if (productInfo != null) {
Long id = productInfo.getId();
redisTemplate.delete(REDIS_PRODUCT_KEY + id);
channel.basicAck(deliveryTag, true);
return;
}
}
channel.basicReject(deliveryTag, true);
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}When the update endpoint is invoked again, the consumer logs the database and table names and deletes the corresponding Redis key.
Acknowledgment Modes
RabbitMQ can acknowledge messages in three ways:
manual : the consumer explicitly calls basicAck after business logic succeeds.
auto : Spring automatically acknowledges if no exception is thrown.
none : acknowledgments are disabled; messages are removed immediately, which is unsafe.
In this tutorial the manual mode is used to avoid message loss, configured as:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # enable manual ackConsequently the consumer code calls channel.basicAck for successful processing and channel.basicReject for failures.
The article concludes with a reminder to try the implementation and provides reference links.
IT Services Circle
Delivering cutting-edge internet insights and practical learning resources. We're a passionate and principled IT media platform.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.