Comprehensive Guide to FlinkSQL and Table API: Background, Dependencies, Planners, and Usage
This article provides a detailed introduction to FlinkSQL, covering its background, the Table API, required dependencies, differences between old and Blink planners, various API usage patterns, connector configurations for CSV, Kafka, Elasticsearch, MySQL, and how to convert between DataStream and Table in Flink's unified batch‑stream processing model.
1. Introduction
After a brief apology for the recent inactivity, the author announces a series of tutorials on FlinkSQL and real‑time data warehousing, inviting readers to follow for technical content.
2. Background of FlinkSQL
FlinkSQL is a SQL‑compatible language designed to simplify Flink's real‑time computation model, lowering the entry barrier for users. It originated from Alibaba's research on stream processing engines, leading to the open‑source Blink project, which contributed the implementation of FlinkSQL.
Compared with traditional stream APIs such as Storm or Spark Streaming, FlinkSQL offers a declarative, optimizable, easy‑to‑learn, stable, and unified batch‑stream interface.
Declarative language: users express requirements without implementation details.
Built‑in optimizers generate optimal execution plans.
Low learning cost across industries.
Stable syntax with decades of SQL history.
Unified batch and stream processing at the runtime level.
3. Overall Introduction
3.1 What are Table API and FlinkSQL?
Flink provides a unified batch‑stream framework; Table API and FlinkSQL are higher‑level APIs that operate on this unified engine. Table API is an embedded query API in Java/Scala, while FlinkSQL allows writing standard SQL statements directly.
3.2 Required Dependencies
Two Maven dependencies are needed for Table API and SQL:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.0</version>
</dependency>The planner provides the core planning capabilities, while the bridge connects Table API with DataStream/DataSet APIs.
3.3 Differences Between Old Planner and Blink Planner
Blink treats batch jobs as a special case of stream jobs.
Blink does not support BatchTableSource; it uses bounded StreamTableSource instead.
Blink does not support the deprecated ExternalCatalog.
FilterableTableSource implementations differ between the two planners.
String‑based configuration options are only available in Blink.
PlannerConfig implementations differ.
Blink can optimize multiple sinks into a single DAG (only for TableEnvironment).
Blink supports catalog statistics, while the old planner does not.
4. API Calls
4.1 Basic Program Structure
The typical steps are: create an execution environment, define source, transformation, and sink.
val tableEnv = ... // create table execution environment
// create a temporary source table
tableEnv.connect(...).createTemporaryTable("inputTable")
// create a temporary sink table
tableEnv.connect(...).createTemporaryTable("outputTable")
val result = tableEnv.from("inputTable").select(...)
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...")
result.insertInto("outputTable")4.2 Creating Table Environment
Several ways exist to create a TableEnvironment:
val tableEnv = StreamTableEnvironment.create(env) // default
val settings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build()
val oldPlannerEnv = StreamTableEnvironment.create(env, settings)
val blinkSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val blinkEnv = StreamTableEnvironment.create(env, blinkSettings)TableEnvironment manages catalogs, registers tables, executes SQL, registers UDFs, and bridges DataStream/DataSet with tables.
4.3 Registering Tables in a Catalog
4.3.1 Table Concept
A table is identified by catalog, database, and object name. It can be a regular table (backed by files, databases, or streams) or a view.
4.3.2 CSV Connector
Register a CSV file as a temporary table:
tableEnv
.connect(new FileSystem().path("sensor.txt"))
.withFormat(new OldCsv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE()))
.createTemporaryTable("inputTable")The old CSV descriptor will be deprecated; the new Csv() descriptor requires the flink-csv dependency.
4.3.3 Kafka Connector
Register a Kafka source table:
tableEnv.connect(
new Kafka()
.version("0.11")
.topic("sensor")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE()))
.createTemporaryTable("kafkaInputTable")Similar connectors exist for Elasticsearch, MySQL, HBase, Hive, etc.
4.4 Table Queries
4.4.1 Table API
Table API uses method chaining:
val sensorTable: Table = tableEnv.from("inputTable")
val resultTable: Table = sensorTable
.select("id, temperature")
.filter("id = 'sensor_1'")4.4.2 SQL
SQL queries are expressed as strings:
val sqlResult: Table = tableEnv.sqlQuery("SELECT id, temperature FROM inputTable WHERE id = 'sensor_1'")
val aggResult: Table = tableEnv.sqlQuery("SELECT id, COUNT(id) AS cnt FROM inputTable GROUP BY id")4.5 Converting DataStream to Table
Use tableEnv.fromDataStream() to create a Table from a DataStream, optionally renaming fields:
val inputStream: DataStream[String] = env.readTextFile("sensor.txt")
val dataStream: DataStream[SensorReading] = inputStream.map(...)
val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'timestamp as 'ts, 'temperature)4.6 Creating Temporary Views
tableEnv.createTemporaryView("sensorView", dataStream)
tableEnv.createTemporaryView("sensorView", dataStream, 'id, 'temperature, 'timestamp as 'ts)
tableEnv.createTemporaryView("sensorView", sensorTable)4.7 Output Tables
4.7.1 File Sink
tableEnv.connect(new FileSystem().path(".../out.txt"))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temp", DataTypes.DOUBLE()))
.createTemporaryTable("outputTable")
resultSqlTable.insertInto("outputTable")4.7.2 Update Modes
Flink supports Append, Retract, Upsert, and Update modes for dynamic tables, each defining how insert, delete, and update messages are exchanged with external connectors.
4.7.3 Kafka Sink
tableEnv.connect(new Kafka()
.version("0.11")
.topic("sinkTest")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temp", DataTypes.DOUBLE()))
.createTemporaryTable("kafkaOutputTable")
resultTable.insertInto("kafkaOutputTable")4.7.4 Elasticsearch Sink
tableEnv.connect(new Elasticsearch()
.version("6")
.host("localhost", 9200, "http")
.index("sensor")
.documentType("temp"))
.inUpsertMode()
.withFormat(new Json())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("count", DataTypes.BIGINT()))
.createTemporaryTable("esOutputTable")
aggResultTable.insertInto("esOutputTable")4.7.5 MySQL Sink (DDL)
val sinkDDL = """
|create table jdbcOutputTable (
| id varchar(20) not null,
| cnt bigint not null
|) with (
| 'connector.type' = 'jdbc',
| 'connector.url' = 'jdbc:mysql://localhost:3306/test',
| 'connector.table' = 'sensor_count',
| 'connector.driver' = 'com.mysql.jdbc.Driver',
| 'connector.username' = 'root',
| 'connector.password' = '123456'
|)
""".stripMargin
tableEnv.sqlUpdate(sinkDDL)
aggResultSqlTable.insertInto("jdbcOutputTable")4.7.6 Converting Table to DataStream
val resultStream: DataStream[Row] = tableEnv.toAppendStream[Row](resultTable)
val aggResultStream: DataStream[(Boolean, (String, Long))] =
tableEnv.toRetractStream[(String, Long)](aggResultTable)
resultStream.print("result")
aggResultStream.print("aggResult")4.7.7 Explain and Execute Query
val explanation: String = tableEnv.explain(resultTable)
println(explanation)The explanation shows the unoptimized logical plan, the optimized logical plan, and the physical execution plan. Blink always generates a DataStream program, while the old planner may produce DataSet programs for batch queries.
5. References
http://www.atguigu.com/
https://www.bilibili.com/video/BV12k4y1z7LM
https://blog.csdn.net/u013411339/article/details/93267838
6. Summary
The article delivers a thorough, over‑five‑thousand‑word tutorial on FlinkSQL, covering its origin, the Table API, planner differences, API usage, connector configurations, update modes, and how to switch between Table and DataStream representations, aiming to help beginners quickly master real‑time stream processing with Flink.
Big Data Technology Architecture
Exploring Open Source Big Data and AI Technologies
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.