Big Data 14 min read

Millisecond‑Level Real‑Time Sync from MySQL to Elasticsearch with Flink CDC

This guide walks through setting up a Spring Boot 3.5 environment, configuring Flink 1.20 and Flink CDC 3.5, preparing MySQL tables, and using both the Flink CDC CLI and SQL client to achieve near‑millisecond synchronization of data from MySQL to Elasticsearch, including custom sink programming and real‑time monitoring via the Flink Web UI.

Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Millisecond‑Level Real‑Time Sync from MySQL to Elasticsearch with Flink CDC

1. Environment preparation

Spring Boot 3.5.0 is used as the application framework. Flink 1.20.1 is downloaded from the Apache archive:

https://archive.apache.org/dist/flink/flink-1.20.1/flink-1.20.1-bin-scala_2.12.tgz

After extracting the archive, edit conf/config.yaml to configure the JobManager, TaskManager and checkpoint interval (3 s):

jobmanager:
  bind-host: 0.0.0.0
  rpc:
    address: localhost
    port: 6123

taskmanager:
  bind-host: 0.0.0.0
  host: localhost
  numberOfTaskSlots: 4

rest:
  address: localhost
  bind-address: 0.0.0.0
  port: 8088

execution:
  checkpointing:
    interval: 3s

Add the host entry for the machine (replace with the actual hostname):

192.168.1.8 pack_host

Download the MySQL JDBC driver and place it in FLINK_HOME/lib:

https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar

Start the Flink cluster:

./bin/start-cluster.sh

The Web UI becomes reachable at http://192.168.1.8:8088/.

2. Flink CDC setup

Download Flink CDC 3.5.0:

https://www.apache.org/dyn/closer.lua/flink/flink-cdc-3.5.0/flink-cdc-3.5.0-bin.tar.gz

After extraction the directory contains bin, lib, log and conf. Place the MySQL CDC pipeline connector and the Elasticsearch pipeline connector into FLINK_CDC_HOME/lib:

https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.5.0/flink-cdc-pipeline-connector-mysql-3.5.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-elasticsearch/3.5.0/flink-cdc-pipeline-connector-elasticsearch-3.5.0.jar

Create a MySQL table msg.employees (sample data: 5 rows). Then write a CDC job configuration mysql-to-es.yaml:

source:
  type: mysql
  name: source_msg
  hostname: localhost
  port: 3306
  username: root
  password: 123123
  tables: msg.employees

sink:
  type: elasticsearch
  name: elasticsearch_sink
  hosts: http://localhost:9200
  version: 8
  batch.size.max.bytes: 50485760
  record.size.max.bytes: 10485760

route:
  - source-table: msg.employees
    sink-table: msg_index

pipeline:
  name: Sync MySQL To ES
  parallelism: 1

Submit the job with the CDC CLI, pointing to the Flink installation:

./bin/flink-cdc.sh --flink-home=/opt/cdc/flink-1.20.1 mysql-to-es.yaml

The console prints a Job ID and description, confirming submission. In the Flink Web UI the job appears as running, and Elasticsearch‑head shows the initial 5 rows synchronized from MySQL.

Inserting a new row into MySQL (e.g., via the MySQL client) is reflected in Elasticsearch within milliseconds, demonstrating millisecond‑level end‑to‑end latency.

3. Using the JDBC SQL connector

Download the JDBC connector JAR and add it to FLINK_HOME/lib:

https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.3.0-1.20/flink-connector-jdbc-3.3.0-1.20.jar

Start the Flink SQL client:

./bin/sql-client.sh

Create a JDBC table that reads from the same MySQL database:

CREATE TABLE employees (
  emp_id INT,
  emp_name STRING,
  dept_name VARCHAR(50),
  hire_date DATE,
  `position` STRING,
  PRIMARY KEY (emp_id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost:3306/msg',
  'table-name' = 'employees',
  'username' = 'root',
  'password' = '123123'
);

Query the table to verify that the data matches the source MySQL table:

SELECT * FROM employees;

4. Using the MySQL‑CDC connector via SQL

Download the MySQL‑CDC connector JAR and place it in FLINK_HOME/lib:

https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.5.0/flink-sql-connector-mysql-cdc-3.5.0.jar

Create a CDC source table and a JDBC sink table:

CREATE TABLE employees (
  emp_id INT,
  emp_name STRING,
  dept_name VARCHAR(50),
  hire_date DATE,
  `position` STRING,
  PRIMARY KEY (emp_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '123123',
  'database-name' = 'msg',
  'table-name' = 'employees'
);

CREATE TABLE all_employees_sink (
  emp_id INT,
  emp_name STRING,
  dept_name VARCHAR(50),
  hire_date DATE,
  `position` STRING,
  PRIMARY KEY (emp_id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost:3306/msg_bak',
  'table-name' = 'employees',
  'username' = 'root',
  'password' = '123123'
);

Insert data from the CDC source into the sink:

INSERT INTO all_employees_sink SELECT * FROM employees;

The job runs in the Flink UI, and any INSERT/UPDATE/DELETE on msg.employees is instantly replicated to msg_bak.employees.

5. Programmatic change listening

Add the following Maven dependencies (excerpt):

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  <version>3.0.1</version>
</dependency>
... (other Flink client and connector dependencies)

Implement a custom sink that parses Debezium JSON events:

public class PackSinkFunction extends RichSinkFunction<String> {
  private static final long serialVersionUID = 1L;
  private ObjectMapper mapper = new ObjectMapper();

  @Override
  public void invoke(String value, Context context) throws Exception {
    System.out.printf("Data changed: %s%n", value);
    TypeReference<Map<String, Object>> valueType = new TypeReference<>() {};
    Map<String, Object> result = mapper.readValue(value, valueType);
    Map<String, Object> payload = (Map<String, Object>) result.get("payload");
    String op = (String) payload.get("op");
    System.err.println("Operation: %s, Data: %s".formatted(op, payload));
    // custom logic to write to Elasticsearch could be added here
  }
}

Build a streaming job that reads CDC events and forwards them to the custom sink:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties jdbcProperties = new Properties();
jdbcProperties.setProperty("useSSL", "false");

MySqlSource<String> source = MySqlSource.<String>builder()
    .hostname("10.100.101.227")
    .port(3306)
    .databaseList("msg")
    .tableList("msg.employees")
    .username("root")
    .password("123123")
    .jdbcProperties(jdbcProperties)
    .includeSchemaChanges(true)
    .deserializer(new JsonDebeziumDeserializationSchema(true))
    .startupOptions(StartupOptions.initial())
    .build();

env.enableCheckpointing(6000);
env.setParallelism(4);

env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")
   .addSink(new PackSinkFunction());

env.execute("Sync MySQL");

When rows are inserted, updated or deleted in MySQL, the console prints the corresponding change events (e.g., delete and insert screenshots in the original article). This demonstrates a fully programmable, millisecond‑level real‑time synchronization pipeline from MySQL to downstream systems such as Elasticsearch.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

ElasticsearchApache FlinkStreamingMySQLFlink CDCReal-time data sync
Spring Full-Stack Practical Cases
Written by

Spring Full-Stack Practical Cases

Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.