Big Data 10 min read

Real-time Data Synchronization from OceanBase to Kafka Using ActionOMS and Flink

This article demonstrates how to use ActionOMS to capture incremental changes from OceanBase, stream them to Kafka in various formats, and employ Flink to deduplicate and aggregate transaction data into a daily summary, illustrating a complete real-time data pipeline for financial use cases.

Aikesheng Open Source Community
Aikesheng Open Source Community
Aikesheng Open Source Community
Real-time Data Synchronization from OceanBase to Kafka Using ActionOMS and Flink

Background

In the digital era, real‑time data warehouse technology is widely used in finance, e‑commerce, and manufacturing, where timely capture of database incremental records is critical. OceanBase is a high‑performance distributed relational database that provides strong consistency and high throughput.

ActionOMS leverages OceanBase CDC to pull redo logs via RPC, assemble distributed transactions, parse data, format statements, and output change data per transaction. The tool can forward data to Kafka, RocketMQ, DataHub, etc., enabling rapid construction of real‑time warehouses.

Example

Business Scenario

A bank’s transaction table may contain duplicate records due to system delays. After deduplication, the data is aggregated to analyze customer consumption habits.

Data Source – Data Channel

ActionOMS can sync full and incremental data (DML/DDL) from OceanBase to Kafka in multiple formats such as Default, Canal, Debezium, Avro, etc. Note that when a job resumes, Kafka may contain recent duplicate records, so downstream systems must support deduplication.

Below is an example of the JSON message format produced by ActionOMS:

{
    "prevStruct": null,
    "postStruct": {
        "order_id": "RTDW202411210006",
        "user": "u001",
        "product": "p008",
        "num": 800,
        "proctime": "1732181459",
        "__pk_increment": 8
    },
    "allMetaData": {
        "checkpoint": "1732168058",
        "dbType": "OB_MYSQL",
        "storeDataSequence": 173216805935500000,
        "db": "oms_mysql.rt_dw_test",
        "timestamp": "1732168059",
        "uniqueId": "1002_1001_7681208\u0000\u0000_5572734820_0",
        "transId": "1002_7681208",
        "clusterId": "33",
        "ddlType": null,
        "record_primary_key": "__pk_increment",
        "source_identity": "OB_MYSQL_ten_1_698lmn9kj7cw-1-0",
        "record_primary_value": "8",
        "table_name": "orders"
    },
    "recordType": "INSERT"
}

Flink – Data Warehouse

Flink subscribes to Kafka, deduplicates records using the primary key, and aggregates daily transaction volume and total amount, then writes the result back to OceanBase.

Input table definition in Flink:

CREATE TABLE kafka_input (
  prevStruct ROW<>,
  postStruct ROW<
    order_id STRING,
    `user` STRING,
    product STRING,
    num INT,
    proctime STRING
  >,
  allMetaData ROW<>,
  recordType STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'rt_dw_test',
  'properties.bootstrap.servers' = 'ip:port',
  'properties.group.id' = 'oms_test_1',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',
  'json.timestamp-format.standard' = 'ISO-8601'
);

Output table definition:

CREATE TABLE daily_order_summary (
  order_date DATE,
  total_orders BIGINT,
  total_amount DECIMAL(10, 2),
  PRIMARY KEY (order_date) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'table-name' = 'daily_order_summary',
  'url' = 'jdbc:mysql://ip:port/rt_dw_test',
  'username' = 'test',
  'password' = 'test'
);

Flink SQL that performs deduplication and aggregation:

INSERT INTO daily_order_summary
SELECT
  CAST(FROM_UNIXTIME(CAST(postStruct.proctime AS BIGINT)) AS DATE) AS order_date,
  COUNT(DISTINCT postStruct.order_id) AS total_orders,
  SUM(postStruct.num) AS total_amount
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY postStruct.order_id ORDER BY TO_TIMESTAMP(FROM_UNIXTIME(CAST(postStruct.proctime AS BIGINT))) DESC) AS row_num
  FROM kafka_input
) WHERE row_num = 1
GROUP BY CAST(FROM_UNIXTIME(CAST(postStruct.proctime AS BIGINT)) AS DATE);

Validation

After running the Flink job, the aggregated results for historical data contain no duplicates. When new and duplicate orders are inserted into OceanBase, the pipeline synchronizes them to Kafka, Flink removes duplicates, and the final summary reflects only the new orders.

Summary

Using ActionOMS to sync OceanBase data to Kafka, combined with Flink for real‑time deduplication and aggregation, provides an efficient end‑to‑end solution for building real‑time data warehouses, especially in financial scenarios where data accuracy and latency are critical.

FlinkKafkareal-time datadata synchronizationOceanBaseActionOMSSQL CDC
Aikesheng Open Source Community
Written by

Aikesheng Open Source Community

The Aikesheng Open Source Community provides stable, enterprise‑grade MySQL open‑source tools and services, releases a premium open‑source component each year (1024), and continuously operates and maintains them.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.