Big Data 31 min read

Streaming SQL with Apache Flink: Theory, Platform Optimizations, and Real‑Time Use Cases

This article introduces Apache Flink's Streaming SQL, explains its theoretical foundations such as the table‑stream relationship and watermark semantics, describes the platform's practical enhancements—including source/sink wrappers, built‑in functions, and native Retract Stream support—and showcases several real‑time computation examples.

HomeTech
HomeTech
HomeTech
Streaming SQL with Apache Flink: Theory, Platform Optimizations, and Real‑Time Use Cases

1. Streaming SQL Related

Streaming SQL uses SQL syntax to express stream processing, essentially applying relational algebra to continuous data streams.

1.1 Table and Stream Relationship

Traditional relational databases use an Append‑Only write‑ahead log (WAL) before data is persisted. Viewing the WAL as a stream shows that consuming the log builds a table, while a table's change log is naturally a stream.

Table: a static snapshot of data at a point in time.

Stream: dynamic data that evolves over time.

Stream ⇒ Table: integrate ("sum") data changes over time to produce a table.

Table ⇒ Stream: differentiate ("diff") a table snapshot to obtain the underlying change stream.

These operations are inverses in many cases.

1.2 Stream‑Batch Computing and Watermark

Time is a core dimension for stream processing. A watermark provides a completeness guarantee: when the watermark reaches time t , all events with timestamps ≤ t have been observed.

In practice, watermarks are approximated, balancing accuracy and latency.

1.3 Streaming Relational Algebra

SQL relies on relational algebra, which is closed over tables. Extending it to streams adds explicit operators for converting between tables and streams, slightly sacrificing closure.

1.3.1 Time‑Varying Property

Each table snapshot corresponds to a specific timestamp; a sequence of snapshots forms a time‑varying series, and streaming relational algebra applies relational operators to each snapshot.

(Traditional) relational algebra operators are valid in streaming relational algebra. Closure is preserved for relational operators when applied to streams.

1.3.2 Limitations and Execution Boundary

Streaming SQL cannot express every computation because:

SQL's relational model imposes constraints on expressiveness.

Stream‑to‑table conversion requires that stream records conform to the table schema (i.e., the stream must carry a compatible Record type).

Consequently, a valid Streaming SQL job must satisfy three conditions:

The Source can emit a stream that satisfies relational constraints.

The Sink can consume data with relational constraints.

The Transformation can be fully expressed by relational algebra.

1.4 Stream‑Table Conversion and Stream Operation Semantics

Traditional DML operations (INSERT, DELETE, UPDATE) map to stream operation types:

INSERT/Accumulate – add or aggregate.

DELETE/Retract – remove or retract.

For example, MySQL binlog events map as follows:

1. WRITE_ROWS → INSERT
2. UPDATE_ROWS → UPDATE (implemented as DELETE + INSERT)
3. DELETE_ROWS → DELETE

1.5 Streaming SQL @ Flink

Flink implements Streaming SQL using a three‑stage pipeline: Source → Transformation → Sink . The SQL is parsed, optimized, and translated into a series of Flink operators.

1.5.1 Window Semantics

TUMBLE – fixed windows.

HOP – sliding windows.

SESSION – session windows.

1.5.2 Stream‑Batch Conversion Operators

fromDataStream – convert a DataStream to a Table.

toAppendStream , toRetractStream – convert a Table back to a DataStream.

1.5.3 Input and Output

Flink provides TableSource and TableSink abstractions for external systems.

1.5.4 Aggregation

Aggregations are expressed with GROUP BY and functions such as SUM , COUNT , or user‑defined aggregate functions (UDAFs).

1.5.5 Stream Types

Type

INSERT

DELETE

Key Required

Append Stream

Retract Stream

Upsert Stream

1.5.6 Stream Join

Flink supports various stream joins, but users must consider state size and accuracy trade‑offs.

2. Platform Practice and Optimizations

2.1 Source & Sink Encapsulation and Optimization

The platform wraps common sources/sinks (Kafka, MySQL/TiDB, Redis, Elasticsearch, HTTP, etc.) and provides configuration parameters for performance tuning.

2.1.1 Kafka Source

Built on the official KafkaConsumerBase , the source adds monitoring metrics and supports multiple data formats (JSON, SPLIT, Protobuf, RAW, BINLOG).

Example JSON format:

{
  "action": "click",
  "dt": "2020-01-20",
  "device_id": "edbdf_ce424452_2"
}

Corresponding DDL:

CREATE TABLE `kafka_source_demo` (
  `action` STRING,
  `dt` STRING,
  `device_id` STRING
) WITH (
  `type` = 'kafka',
  `servers` = '***',
  `deserializationType` = 'JSON',
  `topicName` = 'test'
);

2.1.2 MySQL (TiDB) Sink

The custom MysqlTableSink offers many tunable options:

Option

Description

batchSize

Number of rows per batch write.

flushInterval

Time interval for flushing.

isAsync

Enable asynchronous writes.

isFlushOnCheckpoint

Flush on checkpoint.

isKeyByBeforeSink

Key‑by before sink to reduce write conflicts.

oneSink

Force sink parallelism = 1.

isUpdateOnDuplicateKey

Use UPDATE … ON DUPLICATE KEY instead of REPLACE.

mergeByKey

Merge rows by key before write.

ignoreDelete

Skip DELETE events.

ignoreException

Ignore write exceptions.

2.2 Built‑in Function Support

More than 100 built‑in functions (UDF/UDTF/UDAF) are provided, covering JSON parsing, time conversion, type casting, statistical metrics, window simulation, and string manipulation.

2.2.1 Statistical Functions

count_with_external_redis() – initializes a counter from Redis.

hll_distinct_count(key) – HyperLogLog based approximate distinct count.

udf_tp99(v) – 99th percentile estimation.

Monotonic sequence counters that reset when a supplied compare value increases.

2.2.2 Window Simulation Functions

Scalar function tumble_window_group(ts, windowSize) returns the window start timestamp for a fixed window. Table function slide_window_group(ts, windowSize, slideSize) returns a list of overlapping window timestamps for a sliding window.

2.3 Dimension Table Support

Dimension tables are identified by a special flag in the DDL; during job submission the platform rewrites dimension‑table joins into FlatMap operators.

2.4 Optimization Tools and Strategies

2.4.1 Practical Tools

Interactive SQL console with instant result preview.

Full‑lifecycle job management (deployment, checkpointing, monitoring, alerting).

Integration with the AutoBI visualization system.

2.4.2 Optimization Strategies

SQL Pre‑parsing : the platform parses and validates SQL before Flink's native parser, allowing custom rule injection.

Submit Client Refactoring : a wrapper around Flink’s client provides REST submission, YARN/K8s support, JAR management, built‑in function registration, and load‑balancing.

Source Reuse : identical sources are materialized once to avoid repeated reads.

Temporary View Syntax : supports CREATE VIEW within a session.

Replace Window Aggregation with Regular Aggregation : use time‑handling functions and window‑simulation functions to achieve lower latency and memory usage.

Field Charset Extension : modifies the expression parser to allow special characters (e.g., @timestamp ) in field names.

Micro‑batch : optionally batch writes before the sink to improve throughput when latency requirements allow.

2.5 Embracing Flink 1.9+

After extensive testing, the platform migrated to Flink 1.9+, leveraging its enhanced SQL capabilities and performance improvements.

2.6 Native Loading of Retract Stream

Standard Flink loads Kafka data as an Append Stream, requiring explicit CASE logic for INSERT/UPDATE/DELETE. The platform introduced a native BINLOG_RET deserialization that directly produces a Retract Stream, where:

INSERT → INSERT.

DELETE → RETRACT.

UPDATE → RETRACT of the old row + INSERT of the new row.

DDL example:

CREATE TABLE `kafka_source_demo_ret_1` (
  `value` STRING,
  `op` STRING
) WITH (
  `type` = 'kafka',
  `servers` = '***',
  `deserializationType` = 'BINLOG_RET',
  `topicName` = 'test'
);

SQL becomes a simple aggregation:

SELECT SUM(text_to_long(`value`)) FROM `kafka_source_demo_ret_1`;

3. Real‑Time Use Cases

3.1 Classic Cases (Simplified)

3.1.1 Real‑Time Metric Calculation

Goal: compute hourly PV and UV per action from Kafka logs and write results to Redis.

CREATE TABLE `kafka_source_demo` (
  `action` STRING,
  `dt` STRING,
  `device_id` STRING
) WITH (
  `type` = 'kafka',
  `servers` = '***',
  `deserializationType` = 'JSON',
  `topicName` = 'test'
);

CREATE TABLE `redis_sink_demo` (
  `action` STRING,
  `dt` STRING,
  `pv` BIGINT,
  `uv` BIGINT
) WITH (
  `type` = 'redis',
  `server` = '***',
  `valueNames` = 'pv,uv',
  `keyType` = 'string',
  `keyTemplate` = 'demo_key_${action}_${dt}_'
);

INSERT INTO `redis_sink_demo`
SELECT `dt`, `action`,
       COUNT(1) AS `pv`,
       hll_distinct_count(`device_id`) AS `uv`
FROM `kafka_source_demo`
GROUP BY `action`, `dt`;

3.1.2 Real‑Time ETL

Goal: sync selected fields from a Kafka binlog to a TiDB table with < 5 s latency.

CREATE TABLE `kafka_source_demo_2` (
  `biz_type` STRING,
  `biz_id` STRING,
  `property` STRING
) WITH (
  `type` = 'kafka',
  `servers` = '***',
  `deserializationType` = 'JSON',
  `topicName` = 'test2'
);

CREATE TABLE `ti_sink_demo` (
  `biz_type` STRING,
  `biz_id` STRING,
  `property` STRING
) WITH (
  `type` = 'mysql',
  `url` = 'xxxxx',
  `mysqlTableName` = 'test',
  `username` = 'xx',
  `password` = 'xxxxx',
  `mysqlDatabaseName` = 'xx',
  `sinkKeyFieldNames` = 'biz_id,biz_type',
  `batchSize` = 200,
  `flushInterval` = 3000,
  `needMerge` = false,
  `ignoreDelete` = true,
  `specificMysqlSinkExecutionLogicClassName` = 'duplicateKeyUpdate',
  `isKeyByBeforeSink` = true
);

INSERT INTO `ti_sink_demo`
SELECT * FROM `kafka_source_demo_2`;

3.1.3 Real‑Time Online User Count with Sliding Window

Goal: every 30 s, compute the number of distinct online users per room and write to Redis.

CREATE TABLE `kafka_source_demo_3` (
  `room_id` STRING,
  `ts` BIGINT,
  `device_id` STRING
) WITH (
  `type` = 'kafka',
  `servers` = '***',
  `deserializationType` = 'JSON',
  `topicName` = 'test3'
);

CREATE TABLE `redis_sink_demo_3` (
  `room_id` STRING,
  `window_ts` BIGINT,
  `uv` BIGINT
) WITH (
  `type` = 'redis',
  `server` = '***',
  `valueNames` = 'uv',
  `noValueName` = true,
  `keyType` = 'string',
  `keyTemplate` = 'demo_key2_${room_id}_${window_ts}'
);

INSERT INTO `redis_sink_demo_3`
SELECT `room_id`, `window_ts`, COUNT(DISTINCT `device_id`) AS `uv`
FROM `kafka_source_demo_3`,
     LATERAL TABLE(`slide_window_group`(`ts`, 60000, 30000)) AS T(`window_ts`)
GROUP BY `room_id`, `window_ts`;

Conclusion

The article first clarifies the theoretical basis of Streaming SQL, then details the platform's practical enhancements—including source/sink encapsulation, extensive built‑in functions, native Retract Stream support, and various optimization techniques—and finally demonstrates several concise real‑time scenarios, illustrating how SQL can be used to rapidly solve streaming computation problems and empower users with a Flink‑based real‑time data warehouse.

Apache FlinkSQL Optimizationreal-time computingDataStreamwindow functionsStreaming SQLRetract Stream
HomeTech
Written by

HomeTech

HomeTech tech sharing

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.