Real-Time Data Warehouse Development with Flink: Architecture, Implementation, and Lessons Learned
This article describes the motivation, technology selection, implementation details, and encountered challenges of building a real‑time data warehouse using Flink, covering streaming computation, code examples, dimension‑table caching, state backend choices, and best practices for production deployment.
Background: Traditional offline data warehouse models are carefully crafted but streaming processing differs; as business demands grew for low‑latency data, a real‑time warehouse was introduced, consisting of five layers (real‑time ingestion, computation, storage, service, and application), with this article focusing on the computation layer.
Technology selection: Among Storm, Spark‑Streaming, and Flink, Flink was chosen for its high throughput, millisecond‑level latency, exactly‑once semantics, and flexible windowing capabilities.
Architecture: The overall architecture integrates multiple data sources (e.g., pcm/app topics) into a unified format, following the flow source → map → flatMap → sink . Images illustrating the architecture are omitted for brevity.
Development practice: The real‑time model mirrors the offline model, starting from the gdm layer to combine various sources, clean and enrich data, and output a unified stream. The core Flink job is shown below.
private static void proc(String[] args) throws Exception {
// 获取运行时环境
final StreamExecutionEnvironment env = CommonUtil.getOnlineStreamEnv(TimeCharacteristic.ProcessingTime);
// 创建kafkaconsumer
FlinkKafkaConsumer010 consumer = CommonUtil.genKafkaConsumer(Constants.TOPIC_WEB_APPLOG, args);
// 处理流程
// source -> map(数据过滤,清洗) -> flatMap(关联维表) -> sink
DataStream
stream = env.addSource(consumer)
.map(new AppLogEntity.Mapper())
.flatMap(new AppLogEntity.Flater())
.returns(String.class);
stream.process(new ProcessFunction
() {
@Override
public void processElement(String value, Context ctx, Collector
out) {
out.collect(value);
}
}).addSink(CommonUtil.genUasKafkaProducer());
env.execute(JOB_NAME);
}Map function: filters special characters, splits the line by tabs, discards records with fewer than 40 fields, and parses logs based on their prefix (app usage, event, or client logs).
public static class Mapper implements MapFunction
{
@Override
public AppLog map(String line) {
// 过滤特殊字符
line = line.trim().replace("\007", "").replace("\t", "").replaceAll("\\n", "");
// 按照\t切分
List
tokens = StringUtil.fastSplit(line, Constants.APP_LOG_ORIGINAL_SPERATOR, 40);
// size < 40 为非法数据
if (tokens.size() < 40) {
return null;
}
// 根据前缀判断数据如何处理
switch (tokens.get(0)) {
case Constants.APP_ACT_LOG_PREFIX:
return new AppUsingLogEntity().parse(tokens);
case Constants.APP_EVENT_LOG_PREFIX:
return new AppEventLogEntity().parse(tokens);
case Constants.APP_CLT_LOG_PREFIX:
return new AppCltEntity().parse(tokens);
default:
return null;
}
}
}FlatMap function: loads dimension caches, joins event logs with dimension tables, filters blank results, and emits cleaned strings.
public static class Flater extends RichFlatMapFunction
{
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
DcDimAppEventParameterCache.getInstance();
DcDimAppEventSiteMappingCache.getInstance();
DcDimEventActionTypeCache.getInstance();
DcDimEventOperTypeRowCache.getInstance();
DcDimEventBizTypeCache.getInstance();
}
@Override
public void flatMap(AppLog model, Collector
collector) {
if (null == model) {
return;
}
switch (model.getLogType()) {
case Constants.APP_EVENT_LOG_PREFIX:
AppEventLogEntity entity = (AppEventLogEntity) model;
AppEventLogEntity.joinAndFix(entity);
List
list = entity.getResult(entity);
for (String outLine : list) {
if (StringUtils.isBlank(outLine)) {
continue;
}
collector.collect(outLine);
}
return;
case Constants.APP_ACT_LOG_PREFIX:
collector.collect(model.getResult());
return;
case Constants.APP_CLT_LOG_PREFIX:
collector.collect(model.getResult());
return;
}
}
}Pitfalls encountered: (1) Dimension‑table joins caused duplicate data because the dimension state never expired, leading to Cartesian products; the solution was to cache small dimensions locally and larger ones in Redis with an LRU eviction policy. (2) Large state size caused out‑of‑memory crashes; after evaluating MemoryStateBackend, FsStateBackend, and RocksDBStateBackend, RocksDBStateBackend with incremental checkpoints was adopted.
Conclusion: A real‑time warehouse complements an offline warehouse by addressing timeliness without diminishing overall data‑warehouse value. The presented solution demonstrates a practical integration of streaming logs into user‑behavior analytics and highlights continuous improvement by monitoring industry‑leading approaches.
HomeTech
HomeTech tech sharing
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.