Backend Development 20 min read

Integrating SpringBoot with Canal and RabbitMQ for Decoupled Data Change Capture

This guide explains how to use Canal to listen to MySQL binlog changes, forward those events through RabbitMQ, and process them in a SpringBoot application, providing a decoupled solution for recording new and old data on insert, update, and delete operations.

Architect's Guide
Architect's Guide
Architect's Guide
Integrating SpringBoot with Canal and RabbitMQ for Decoupled Data Change Capture

Requirement

I need a way in SpringBoot to record data changes in a way that is decoupled from business code, storing the new row data and, for updates, also the old row data.

After researching, I found that Canal can listen to MySQL binlog changes, and I plan to use RabbitMQ to immediately persist the change records.

Steps

Start a MySQL container with binlog enabled.

Start a Canal container, create a MySQL account for it, and connect as a slave.

Set Canal server mode to TCP, write a Java client to listen to binlog modifications.

Switch Canal server mode to RabbitMQ, configure RabbitMQ, and let the message queue receive binlog events.

Environment Setup

The environment is built with docker-compose :

version: "3"
services:
  mysql:
    network_mode: mynetwork
    container_name: mymysql
    ports:
      - 3306:3306
    restart: always
    volumes:
      - /etc/localtime:/etc/localtime
      - /home/mycontainers/mymysql/data:/data
      - /home/mycontainers/mymysql/mysql:/var/lib/mysql
      - /home/mycontainers/mymysql/conf:/etc/mysql
    environment:
      - MYSQL_ROOT_PASSWORD=root
    command: --character-set-server=utf8mb4 \
             --collation-server=utf8mb4_unicode_ci \
             --log-bin=/var/lib/mysql/mysql-bin \
             --server-id=1 \
             --binlog-format=ROW \
             --expire_logs_days=7 \
             --max_binlog_size=500M
    image: mysql:5.7.20
  rabbitmq:
    container_name: myrabbit
    ports:
      - 15672:15672
      - 5672:5672
    restart: always
    volumes:
      - /etc/localtime:/etc/localtime
      - /home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq
    network_mode: mynetwork
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=123456
    image: rabbitmq:3.8-management
  canal-server:
    container_name: canal-server
    restart: always
    ports:
      - 11110:11110
      - 11111:11111
      - 11112:11112
    volumes:
      - /home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
      - /home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
      - /home/mycontainers/canal-server/logs:/home/admin/canal-server/logs
    network_mode: mynetwork
    depends_on:
      - mysql
      - rabbitmq
    image: canal/canal-server:v1.1.5

Modify canal.properties and instance.properties to point to the correct paths and MySQL credentials.

Integrating SpringBoot with Canal (Client)

Add Maven dependencies:

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>${canal.version}</version>
</dependency>
<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.protocol</artifactId>
  <version>${canal.version}</version>
</dependency>

Create a Spring component that connects to Canal and processes messages:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalClient {
    private static final int BATCH_SIZE = 1000;

    public void run() {
        CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress("localhost", 11111), "canal-exchange", "canal", "canal");
        try {
            connector.connect();
            connector.subscribe(".*..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(BATCH_SIZE);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    Thread.sleep(2000);
                } else {
                    printEntry(message.getEntries());
                }
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List
entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            try {
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                CanalEntry.EventType eventType = rowChange.getEventType();
                System.out.println(String.format("binlog[%s:%s], schema[%s], table[%s], event:%s",
                    entry.getHeader().getLogfileName(),
                    entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(),
                    entry.getHeader().getTableName(),
                    eventType));
                if (rowChange.getIsDdl()) {
                    System.out.println("DDL sql:" + rowChange.getSql());
                }
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    if (eventType == CanalEntry.EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == CanalEntry.EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("--- before ---");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("--- after ---");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException("Parse error", e);
            }
        }
    }

    private static void printColumn(List
columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "   update=" + column.getUpdated());
        }
    }
}

Wire the client into the SpringBoot main class:

@SpringBootApplication
public class BaseApplication implements CommandLineRunner {
    @Autowired
    private CanalClient canalClient;

    @Override
    public void run(String... args) throws Exception {
        canalClient.run();
    }
}

Canal Integration with RabbitMQ

Change canal.serverMode to rabbitMQ and set canal.mq.topic to the routing key.

Configure RabbitMQ connection in canal.properties :

rabbitmq.host = myrabbit
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = admin
rabbitmq.password = 123456

Create the exchange and queue (or let SpringBoot create them automatically).

SpringBoot Integration with RabbitMQ

Add the AMQP starter dependency:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
  <version>${amqp.version}</version>
</dependency>

Configure connection in application.yml :

spring:
  rabbitmq:
    host: 192.168.0.108
    port: 5672
    username: admin
    password: 123456
    publisher-confirm-type: correlated
    publisher-returns: true

Define RabbitMQ beans:

@Configuration
public class RabbitConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate();
        template.setConnectionFactory(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}

Declare the queue, exchange, and binding:

@Configuration
public class CanalProvider {
    public static final String CanalQueue = "canal-queue";
    public static final String CanalExchange = "canal-exchange";
    public static final String CanalRouting = "canal-routing-key";

    @Bean
    public Queue canalQueue() {
        return new Queue(CanalQueue, true);
    }

    @Bean
    public DirectExchange canalExchange() {
        return new DirectExchange(CanalExchange, true, false);
    }

    @Bean
    public Binding bindingCanal() {
        return BindingBuilder.bind(canalQueue()).to(canalExchange()).with(CanalRouting);
    }
}

Consume Canal messages from the queue:

@Component
@RabbitListener(queues = RabbitConstant.CanalQueue)
public class CanalConsumer {
    private final SysBackupService sysBackupService;

    public CanalConsumer(SysBackupService sysBackupService) {
        this.sysBackupService = sysBackupService;
    }

    @RabbitHandler
    public void process(Map
msg) {
        System.out.println("Received canal message: " + msg);
        boolean isDdl = (boolean) msg.get("isDdl");
        if (isDdl) return;
        String table = (String) msg.get("table");
        if ("sys_backup".equalsIgnoreCase(table)) return;
        String type = (String) msg.get("type");
        if (!"INSERT".equalsIgnoreCase(type) && !"UPDATE".equalsIgnoreCase(type) && !"DELETE".equalsIgnoreCase(type)) {
            return;
        }
        // further processing logic here
    }
}

Now, when a row in MySQL changes, Canal publishes the event to RabbitMQ, and the SpringBoot consumer processes it, achieving a decoupled change‑recording mechanism.

backendDockerMySQLRabbitMQCanalSpringBootdata-capture
Architect's Guide
Written by

Architect's Guide

Dedicated to sharing programmer-architect skills—Java backend, system, microservice, and distributed architectures—to help you become a senior architect.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.