Integrating SpringBoot with Canal and RabbitMQ for Decoupled Data Change Capture
This guide explains how to use Canal to listen to MySQL binlog changes, forward those events through RabbitMQ, and process them in a SpringBoot application, providing a decoupled solution for recording new and old data on insert, update, and delete operations.
Requirement
I need a way in SpringBoot to record data changes in a way that is decoupled from business code, storing the new row data and, for updates, also the old row data.
After researching, I found that Canal can listen to MySQL binlog changes, and I plan to use RabbitMQ to immediately persist the change records.
Steps
Start a MySQL container with binlog enabled.
Start a Canal container, create a MySQL account for it, and connect as a slave.
Set Canal server mode to TCP, write a Java client to listen to binlog modifications.
Switch Canal server mode to RabbitMQ, configure RabbitMQ, and let the message queue receive binlog events.
Environment Setup
The environment is built with docker-compose :
version: "3"
services:
mysql:
network_mode: mynetwork
container_name: mymysql
ports:
- 3306:3306
restart: always
volumes:
- /etc/localtime:/etc/localtime
- /home/mycontainers/mymysql/data:/data
- /home/mycontainers/mymysql/mysql:/var/lib/mysql
- /home/mycontainers/mymysql/conf:/etc/mysql
environment:
- MYSQL_ROOT_PASSWORD=root
command: --character-set-server=utf8mb4 \
--collation-server=utf8mb4_unicode_ci \
--log-bin=/var/lib/mysql/mysql-bin \
--server-id=1 \
--binlog-format=ROW \
--expire_logs_days=7 \
--max_binlog_size=500M
image: mysql:5.7.20
rabbitmq:
container_name: myrabbit
ports:
- 15672:15672
- 5672:5672
restart: always
volumes:
- /etc/localtime:/etc/localtime
- /home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq
network_mode: mynetwork
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=123456
image: rabbitmq:3.8-management
canal-server:
container_name: canal-server
restart: always
ports:
- 11110:11110
- 11111:11111
- 11112:11112
volumes:
- /home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
- /home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
- /home/mycontainers/canal-server/logs:/home/admin/canal-server/logs
network_mode: mynetwork
depends_on:
- mysql
- rabbitmq
image: canal/canal-server:v1.1.5Modify canal.properties and instance.properties to point to the correct paths and MySQL credentials.
Integrating SpringBoot with Canal (Client)
Add Maven dependencies:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>${canal.version}</version>
</dependency>Create a Spring component that connects to Canal and processes messages:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient {
private static final int BATCH_SIZE = 1000;
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("localhost", 11111), "canal-exchange", "canal", "canal");
try {
connector.connect();
connector.subscribe(".*..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(2000);
} else {
printEntry(message.getEntries());
}
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private static void printEntry(List
entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println(String.format("binlog[%s:%s], schema[%s], table[%s], event:%s",
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(),
eventType));
if (rowChange.getIsDdl()) {
System.out.println("DDL sql:" + rowChange.getSql());
}
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("--- before ---");
printColumn(rowData.getBeforeColumnsList());
System.out.println("--- after ---");
printColumn(rowData.getAfterColumnsList());
}
}
} catch (Exception e) {
throw new RuntimeException("Parse error", e);
}
}
}
private static void printColumn(List
columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}Wire the client into the SpringBoot main class:
@SpringBootApplication
public class BaseApplication implements CommandLineRunner {
@Autowired
private CanalClient canalClient;
@Override
public void run(String... args) throws Exception {
canalClient.run();
}
}Canal Integration with RabbitMQ
Change canal.serverMode to rabbitMQ and set canal.mq.topic to the routing key.
Configure RabbitMQ connection in canal.properties :
rabbitmq.host = myrabbit
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = admin
rabbitmq.password = 123456Create the exchange and queue (or let SpringBoot create them automatically).
SpringBoot Integration with RabbitMQ
Add the AMQP starter dependency:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${amqp.version}</version>
</dependency>Configure connection in application.yml :
spring:
rabbitmq:
host: 192.168.0.108
port: 5672
username: admin
password: 123456
publisher-confirm-type: correlated
publisher-returns: trueDefine RabbitMQ beans:
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}Declare the queue, exchange, and binding:
@Configuration
public class CanalProvider {
public static final String CanalQueue = "canal-queue";
public static final String CanalExchange = "canal-exchange";
public static final String CanalRouting = "canal-routing-key";
@Bean
public Queue canalQueue() {
return new Queue(CanalQueue, true);
}
@Bean
public DirectExchange canalExchange() {
return new DirectExchange(CanalExchange, true, false);
}
@Bean
public Binding bindingCanal() {
return BindingBuilder.bind(canalQueue()).to(canalExchange()).with(CanalRouting);
}
}Consume Canal messages from the queue:
@Component
@RabbitListener(queues = RabbitConstant.CanalQueue)
public class CanalConsumer {
private final SysBackupService sysBackupService;
public CanalConsumer(SysBackupService sysBackupService) {
this.sysBackupService = sysBackupService;
}
@RabbitHandler
public void process(Map
msg) {
System.out.println("Received canal message: " + msg);
boolean isDdl = (boolean) msg.get("isDdl");
if (isDdl) return;
String table = (String) msg.get("table");
if ("sys_backup".equalsIgnoreCase(table)) return;
String type = (String) msg.get("type");
if (!"INSERT".equalsIgnoreCase(type) && !"UPDATE".equalsIgnoreCase(type) && !"DELETE".equalsIgnoreCase(type)) {
return;
}
// further processing logic here
}
}Now, when a row in MySQL changes, Canal publishes the event to RabbitMQ, and the SpringBoot consumer processes it, achieving a decoupled change‑recording mechanism.
Architect's Guide
Dedicated to sharing programmer-architect skills—Java backend, system, microservice, and distributed architectures—to help you become a senior architect.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.