Big Data 11 min read

Building a Real-Time MySQL and PostgreSQL Streaming ETL with Flink CDC

This tutorial shows how to quickly construct a streaming ETL pipeline that captures changes from MySQL and PostgreSQL using Flink CDC, enriches order data with product and shipment information, and writes the results into Elasticsearch for real‑time visualization in Kibana.

Big Data Technology Architecture
Big Data Technology Architecture
Big Data Technology Architecture
Building a Real-Time MySQL and PostgreSQL Streaming ETL with Flink CDC

This tutorial demonstrates how to quickly build a streaming ETL pipeline that captures changes from MySQL and PostgreSQL using Flink CDC and writes enriched order data to Elasticsearch.

Prerequisites: Docker installed on a Linux or macOS machine.

1. Prepare components – a docker-compose.yml file starts MySQL, PostgreSQL, Elasticsearch, Kibana and Flink containers.

version: '2.1'
services:
  postgres:
    image: debezium/example-postgres:1.1
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_PASSWORD=1234
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
  elasticsearch:
    image: elastic/elasticsearch:7.6.0
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.type=single-node
    ports:
      - "9200:9200"
      - "9300:9300"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
  kibana:
    image: elastic/kibana:7.6.0
    ports:
      - "5601:5601"

2. Load sample data into MySQL (tables products and orders ) and PostgreSQL (table shipments ) using the SQL statements below.

-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
  id INT AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
INSERT INTO products VALUES
  (DEFAULT,'scooter','Small 2‑wheel scooter'),
  (DEFAULT,'car battery','12V car battery'),
  ...;

CREATE TABLE orders (
  order_id INT AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10,5) NOT NULL,
  product_id INT NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT=10001;
INSERT INTO orders VALUES
  (DEFAULT,'2020-07-30 10:08:22','Jark',50.50,102,FALSE),
  ...;

-- PostgreSQL
CREATE TABLE shipments (
  shipment_id SERIAL PRIMARY KEY,
  order_id INT NOT NULL,
  origin VARCHAR(255) NOT NULL,
  destination VARCHAR(255) NOT NULL,
  is_arrived BOOLEAN NOT NULL
);
INSERT INTO shipments VALUES
  (DEFAULT,10001,'Beijing','Shanghai',FALSE),
  ...;

3. Start Flink – run ./bin/start-cluster.sh to launch the Flink cluster and then ./bin/sql-client.sh to open the Flink SQL CLI.

4. Create CDC source tables and Elasticsearch sink in the SQL CLI.

-- Flink SQL
CREATE TABLE products (
  id INT,
  name STRING,
  description STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'database-name' = 'mydb',
  'table-name' = 'products'
);

CREATE TABLE orders (
  order_id INT,
  order_date TIMESTAMP(0),
  customer_name STRING,
  price DECIMAL(10,5),
  product_id INT,
  order_status BOOLEAN,
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  ...
);

CREATE TABLE shipments (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN,
  PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'localhost',
  'port' = '5432',
  'username' = 'postgres',
  'password' = 'postgres',
  'database-name' = 'postgres',
  'schema-name' = 'public',
  'table-name' = 'shipments'
);

CREATE TABLE enriched_orders (
  order_id INT,
  order_date TIMESTAMP(0),
  customer_name STRING,
  price DECIMAL(10,5),
  product_id INT,
  order_status BOOLEAN,
  product_name STRING,
  product_description STRING,
  shipment_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN,
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://localhost:9200',
  'index' = 'enriched_orders'
);

5. Insert joined data into Elasticsearch using an INSERT‑SELECT statement.

INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;

6. Verify results in Kibana; the order view updates in real time as you modify the source tables (INSERT, UPDATE, DELETE examples are provided).

7. Clean up – stop all containers with docker-compose down and shut down the Flink cluster with ./bin/stop-cluster.sh .

Conclusion: The guide provides a hands‑on example of using Flink CDC for real‑time data integration across relational databases and a search engine, illustrating the end‑to‑end workflow from source capture to Elasticsearch visualization.

dockerFlinkSQLElasticsearchMySQLPostgreSQLCDCStreaming ETL
Big Data Technology Architecture
Written by

Big Data Technology Architecture

Exploring Open Source Big Data and AI Technologies

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.