Big Data 11 min read

Real-time UV Counting with Flink, Hologres, and RoaringBitmap

This article explains how to implement both offline (T+1) and real‑time UV counting using Hologres with RoaringBitmap for high‑cardinality aggregation, and demonstrates a complete Flink‑Hologres pipeline—including table creation, streaming joins, windowed aggregation, and query examples—for fine‑grained user metric analysis.

Sohu Tech Products
Sohu Tech Products
Sohu Tech Products
Real-time UV Counting with Flink, Hologres, and RoaringBitmap

Depending on business needs, two scenarios are common: offline computation (T+1) for historical data and real‑time computation for newly arriving data with user‑label deduplication.

For offline scenarios, Hologres leverages RoaringBitmap to provide ultra‑high‑cardinality UV calculations with a single fine‑grained pre‑aggregation, achieving sub‑second query latency.

For real‑time scenarios, a Flink‑Hologres solution using RoaringBitmap enables fine‑grained UV/PV statistics (e.g., 5‑minute windows) and supports downstream BI dashboards.

Core Idea

Flink converts streaming data into tables and dimension tables for JOIN operations, then back to streams, using Hologres' insertIfNotExists feature with an auto‑increment field for efficient UID mapping.

The join results are aggregated in time windows using RoaringBitmap, and the bitmap is stored in a Hologres column of type roaringbitmap .

During query, the aggregated bitmap is retrieved, an OR operation is performed on the relevant bitmap fields, and the cardinality is computed to obtain the user count.

Best‑Practice Implementation

1. Create the UID mapping table to convert arbitrary user IDs to dense 32‑bit integers.

BEGIN;
CREATE TABLE public.uid_mapping (
uid text NOT NULL,
uid_int32 serial,
PRIMARY KEY (uid)
);
CALL set_table_property('public.uid_mapping','clustering_key','uid');
CALL set_table_property('public.uid_mapping','distribution_key','uid');
CALL set_table_property('public.uid_mapping','orientation','row');
COMMIT;

2. Create the base aggregation table dws_app with a RoaringBitmap column and appropriate sharding.

CREATE EXTENSION IF NOT EXISTS roaringbitmap;
BEGIN;
CREATE TABLE dws_app (
country text,
prov text,
city text,
ymd text NOT NULL,
timetz TIMESTAMPTZ,
uid32_bitmap roaringbitmap,
PRIMARY KEY(country, prov, city, ymd, timetz)
);
CALL set_table_property('public.dws_app','orientation','column');
CALL set_table_property('public.dws_app','clustering_key','ymd');
CALL set_table_property('public.dws_app','event_time_column','ymd');
CALL set_table_property('public.dws_app','colocate_with','tg16');
CALL set_table_property('public.dws_app','distribution_key','country,prov,city');
COMMIT;

3. In Flink, read the source data (e.g., CSV), register it as a table, and join with the uid_mapping_dim dimension table.

// Create source table from CSV
DataStreamSource odsStream = env.createInput(csvInput, typeInfo);
Table odsTable = tableEnv.fromDataStream(odsStream,
$("uid"),$("country"),$("prov"),$("city"),$("ymd"),$("proctime").proctime());
tableEnv.createTemporaryView("odsTable", odsTable);
// Create Hologres dimension table with insertIfNotExists
String createUidMappingTable = String.format(
    "create table uid_mapping_dim( uid string, uid_int32 INT ) with (\n        'connector'='hologres',\n        'dbname'='%s',\n        'tablename'='%s',\n        'username'='%s',\n        'password'='%s',\n        'endpoint'='%s',\n        'insertifnotexists'='true'\n    )", database, dimTableName, username, password, endpoint);
tableEnv.executeSql(createUidMappingTable);
// Join source with dimension table
String odsJoinDim = "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32 " +
    " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim" +
    " ON ods.uid = dim.uid";
Table joinRes = tableEnv.sqlQuery(odsJoinDim);

4. Aggregate the joined stream in 5‑minute tumbling windows using RoaringBitmap.

DataStream
> processedSource =
    source
    .keyBy(0,1,2,3)
    .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
    .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1)))
    .aggregate(new AggregateFunction<...>() { ... })
    .apply(...);

5. Convert the aggregated stream back to a table and write it to the Hologres result table.

Table resTable = tableEnv.fromDataStream(processedSource,
    $("country"),$("prov"),$("city"),$("ymd"),$("timest"),$("uid32_bitmap"));
String createHologresTable = String.format(
    "create table sink ( country string, prov string, city string, ymd string, timetz timestamp, uid32_bitmap BYTES ) with (\n        'connector'='hologres',\n        'dbname'='%s',\n        'tablename'='%s',\n        'username'='%s',\n        'password'='%s',\n        'endpoint'='%s',\n        'connectionSize'='%s',\n        'mutatetype'='insertOrReplace'\n    )", database, dwsTableName, username, password, endpoint, connectionSize);
tableEnv.executeSql(createHologresTable);
tableEnv.executeSql("insert into sink select * from " + resTable);

6. Query the base aggregation table using Hologres' bitmap functions to obtain UV counts for specific dimensions and time ranges.

SET hg_experimental_enable_force_three_stage_agg=off;
SELECT country, prov, city, RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM dws_app
WHERE ymd = '20210329'
GROUP BY country, prov, city;
SELECT country, prov, RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM dws_app
WHERE time > '2021-04-19 18:00:00+08' AND time < '2021-04-19 19:00:00+08'
GROUP BY country, prov;

The article concludes with links to the full source code and further reading.

Big DataFlinkstreamingHologresRoaringBitmapUV counting
Sohu Tech Products
Written by

Sohu Tech Products

A knowledge-sharing platform for Sohu's technology products. As a leading Chinese internet brand with media, video, search, and gaming services and over 700 million users, Sohu continuously drives tech innovation and practice. We’ll share practical insights and tech news here.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.