Millisecond‑Level Real‑Time Sync from MySQL to Elasticsearch with Flink CDC
This guide walks through setting up a Spring Boot 3.5 environment, configuring Flink 1.20 and Flink CDC 3.5, preparing MySQL tables, and using both the Flink CDC CLI and SQL client to achieve near‑millisecond synchronization of data from MySQL to Elasticsearch, including custom sink programming and real‑time monitoring via the Flink Web UI.
1. Environment preparation
Spring Boot 3.5.0 is used as the application framework. Flink 1.20.1 is downloaded from the Apache archive:
https://archive.apache.org/dist/flink/flink-1.20.1/flink-1.20.1-bin-scala_2.12.tgz
After extracting the archive, edit conf/config.yaml to configure the JobManager, TaskManager and checkpoint interval (3 s):
jobmanager:
bind-host: 0.0.0.0
rpc:
address: localhost
port: 6123
taskmanager:
bind-host: 0.0.0.0
host: localhost
numberOfTaskSlots: 4
rest:
address: localhost
bind-address: 0.0.0.0
port: 8088
execution:
checkpointing:
interval: 3sAdd the host entry for the machine (replace with the actual hostname):
192.168.1.8 pack_hostDownload the MySQL JDBC driver and place it in FLINK_HOME/lib:
https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar
Start the Flink cluster:
./bin/start-cluster.shThe Web UI becomes reachable at http://192.168.1.8:8088/.
2. Flink CDC setup
Download Flink CDC 3.5.0:
https://www.apache.org/dyn/closer.lua/flink/flink-cdc-3.5.0/flink-cdc-3.5.0-bin.tar.gz
After extraction the directory contains bin, lib, log and conf. Place the MySQL CDC pipeline connector and the Elasticsearch pipeline connector into FLINK_CDC_HOME/lib:
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.5.0/flink-cdc-pipeline-connector-mysql-3.5.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-elasticsearch/3.5.0/flink-cdc-pipeline-connector-elasticsearch-3.5.0.jar
Create a MySQL table msg.employees (sample data: 5 rows). Then write a CDC job configuration mysql-to-es.yaml:
source:
type: mysql
name: source_msg
hostname: localhost
port: 3306
username: root
password: 123123
tables: msg.employees
sink:
type: elasticsearch
name: elasticsearch_sink
hosts: http://localhost:9200
version: 8
batch.size.max.bytes: 50485760
record.size.max.bytes: 10485760
route:
- source-table: msg.employees
sink-table: msg_index
pipeline:
name: Sync MySQL To ES
parallelism: 1Submit the job with the CDC CLI, pointing to the Flink installation:
./bin/flink-cdc.sh --flink-home=/opt/cdc/flink-1.20.1 mysql-to-es.yamlThe console prints a Job ID and description, confirming submission. In the Flink Web UI the job appears as running, and Elasticsearch‑head shows the initial 5 rows synchronized from MySQL.
Inserting a new row into MySQL (e.g., via the MySQL client) is reflected in Elasticsearch within milliseconds, demonstrating millisecond‑level end‑to‑end latency.
3. Using the JDBC SQL connector
Download the JDBC connector JAR and add it to FLINK_HOME/lib:
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.3.0-1.20/flink-connector-jdbc-3.3.0-1.20.jar
Start the Flink SQL client:
./bin/sql-client.shCreate a JDBC table that reads from the same MySQL database:
CREATE TABLE employees (
emp_id INT,
emp_name STRING,
dept_name VARCHAR(50),
hire_date DATE,
`position` STRING,
PRIMARY KEY (emp_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/msg',
'table-name' = 'employees',
'username' = 'root',
'password' = '123123'
);Query the table to verify that the data matches the source MySQL table:
SELECT * FROM employees;4. Using the MySQL‑CDC connector via SQL
Download the MySQL‑CDC connector JAR and place it in FLINK_HOME/lib:
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.5.0/flink-sql-connector-mysql-cdc-3.5.0.jar
Create a CDC source table and a JDBC sink table:
CREATE TABLE employees (
emp_id INT,
emp_name STRING,
dept_name VARCHAR(50),
hire_date DATE,
`position` STRING,
PRIMARY KEY (emp_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123123',
'database-name' = 'msg',
'table-name' = 'employees'
);
CREATE TABLE all_employees_sink (
emp_id INT,
emp_name STRING,
dept_name VARCHAR(50),
hire_date DATE,
`position` STRING,
PRIMARY KEY (emp_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/msg_bak',
'table-name' = 'employees',
'username' = 'root',
'password' = '123123'
);Insert data from the CDC source into the sink:
INSERT INTO all_employees_sink SELECT * FROM employees;The job runs in the Flink UI, and any INSERT/UPDATE/DELETE on msg.employees is instantly replicated to msg_bak.employees.
5. Programmatic change listening
Add the following Maven dependencies (excerpt):
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>3.0.1</version>
</dependency>
... (other Flink client and connector dependencies)Implement a custom sink that parses Debezium JSON events:
public class PackSinkFunction extends RichSinkFunction<String> {
private static final long serialVersionUID = 1L;
private ObjectMapper mapper = new ObjectMapper();
@Override
public void invoke(String value, Context context) throws Exception {
System.out.printf("Data changed: %s%n", value);
TypeReference<Map<String, Object>> valueType = new TypeReference<>() {};
Map<String, Object> result = mapper.readValue(value, valueType);
Map<String, Object> payload = (Map<String, Object>) result.get("payload");
String op = (String) payload.get("op");
System.err.println("Operation: %s, Data: %s".formatted(op, payload));
// custom logic to write to Elasticsearch could be added here
}
}Build a streaming job that reads CDC events and forwards them to the custom sink:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties jdbcProperties = new Properties();
jdbcProperties.setProperty("useSSL", "false");
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname("10.100.101.227")
.port(3306)
.databaseList("msg")
.tableList("msg.employees")
.username("root")
.password("123123")
.jdbcProperties(jdbcProperties)
.includeSchemaChanges(true)
.deserializer(new JsonDebeziumDeserializationSchema(true))
.startupOptions(StartupOptions.initial())
.build();
env.enableCheckpointing(6000);
env.setParallelism(4);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")
.addSink(new PackSinkFunction());
env.execute("Sync MySQL");When rows are inserted, updated or deleted in MySQL, the console prints the corresponding change events (e.g., delete and insert screenshots in the original article). This demonstrates a fully programmable, millisecond‑level real‑time synchronization pipeline from MySQL to downstream systems such as Elasticsearch.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
