Design and Implementation of a Real-Time Video Live Streaming Analytics System Using Tencent Cloud Big Data Services
The article details a cloud‑native architecture on Tencent Cloud that uses CKafka, Oceanus (Flink), MySQL, HBase and a BI service to ingest live‑streaming logs, aggregate gift‑reward metrics in real time, store results, and display them on a continuously refreshed dashboard.
This article describes how to design and implement a real‑time analytics system for video live‑streaming gift rewards on Tencent Cloud using cloud‑native big‑data components. By comparing the usage of CKafka, Oceanus (Flink), MySQL and other services, the solution demonstrates the advantages of a cloud‑based approach.
Solution Overview – The architecture combines Tencent Cloud CKafka, the stream‑computing service Oceanus, a private VPC network, and the Business Intelligence (BI) analysis service to provide real‑time visualisation of metrics such as viewer geographic distribution, member‑level statistics, gift totals and online audience counts.
Key Components
Oceanus (Flink‑compatible stream computing)
CKafka (cloud‑native Kafka)
VPC (private network)
MySQL (TencentDB for MySQL)
EMR HBase component
BI (self‑service data analysis)
Preparation Steps
Purchase and create the required big‑data components.
Create a VPC and subnets to ensure all services share the same network.
Deploy an Oceanus cluster, selecting region, availability zone, VPC, logs and storage.
Create a CKafka cluster and define topics live_streaming_log and live_gift_total .
Provision a MySQL instance, enable binlog (FULL), and create the necessary databases and tables.
Set up an EMR HBase cluster and create the dimension table dim_hbase .
Data Generation Script (Python)
#!/usr/bin/python3
# pip3 install kafka
import json, random, time
from kafka import KafkaProducer
TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
PROVINCES = ["北京","广东","山东","江苏","河南","上海","河北","浙江","香港","陕西","湖南","重庆","福建","天津","云南","四川","广西","安徽","海南","江西","湖北","山西","辽宁","台湾","黑龙江","内蒙古","澳门","贵州","甘肃","青海","新疆","西藏","吉林","宁夏"]
broker_lists = ['172.28.28.13:9092']
producer = KafkaProducer(bootstrap_servers=broker_lists, value_serializer=lambda m: json.dumps(m).encode('ascii'))
# generate and send data in an infinite loop (simplified)
while True:
# generate live streaming log
# ... (omitted for brevity)
time.sleep(60)Flink (Oceanus) SQL – Table Definitions
CREATE TABLE `live_streaming_log_source` (
`user_id` BIGINT,
`ip` VARCHAR,
`room_id` BIGINT,
`arrive_time` TIMESTAMP,
`leave_time` TIMESTAMP,
`create_time` TIMESTAMP,
`region_code` INT,
`grade` INT,
`province` VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'live_streaming_log',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '172.28.28.13:9092',
'properties.group.id' = 'joylyu-consumer-2',
'format' = 'json',
'json.ignore-parse-errors' = 'false',
'json.fail-on-missing-field' = 'false'
);
CREATE TABLE `live_streaming_log_sink` (
`user_id` BIGINT,
`ip` VARCHAR,
`room_id` BIGINT,
`arrive_time` TIMESTAMP,
`leave_time` TIMESTAMP,
`create_time` TIMESTAMP,
`region_code` INT,
`grade` INT,
`province` VARCHAR,
PRIMARY KEY (`user_id`, `ip`, `room_id`, `arrive_time`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.28.28.227:3306/livedb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
'table-name' = 'live_streaming_log',
'username' = 'root',
'password' = 'xxxxx',
'sink.buffer-flush.max-rows' = '5000',
'sink.buffer-flush.interval' = '2s',
'sink.max-retries' = '3'
);
INSERT INTO `live_streaming_log_sink`
SELECT * FROM `live_streaming_log_source`;Gift Total Aggregation
CREATE TABLE `live_gift_total_source` (
`user_id` VARCHAR,
`gift_type` VARCHAR,
`gift_total_amount` BIGINT,
`ip` VARCHAR,
`create_time` VARCHAR,
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'live_gift_total',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '172.28.28.13:9092',
'properties.group.id' = 'joylyu-consumer-1',
'format' = 'json',
'json.ignore-parse-errors' = 'false',
'json.fail-on-missing-field' = 'false'
);
CREATE TABLE `live_gift_total_sink` (
`gift_type` VARCHAR,
`gift_total_amount` BIGINT,
PRIMARY KEY (`gift_type`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.28.28.227:3306/livedb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
'table-name' = 'live_gift_total',
'username' = 'root',
'password' = 'xxxxx',
'sink.buffer-flush.max-rows' = '5000',
'sink.buffer-flush.interval' = '2s',
'sink.max-retries' = '3'
);
INSERT INTO `live_gift_total_sink`
SELECT `gift_type`, SUM(`gift_total_amount`) AS `gift_total_amount_all`
FROM `live_gift_total_source`
GROUP BY `gift_type`;HBase Dimension Table
CREATE TABLE `dim_hbase` (
`rowkey` STRING,
`cf` ROW<`module_id` STRING>
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'dim_hbase',
'zookeeper.quorum' = 'your_hbase_zookeeper_address'
);Module Gift Aggregation (Join with HBase)
CREATE TABLE `module_gift_total_sink` (
`module_id` BIGINT,
`module_gift_total_amount` BIGINT,
PRIMARY KEY (`module_id`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.28.28.227:3306/livedb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
'table-name' = 'live_gift_total',
'username' = 'root',
'password' = 'xxxxx',
'sink.buffer-flush.max-rows' = '5000',
'sink.buffer-flush.interval' = '2s',
'sink.max-retries' = '3'
);
INSERT INTO `module_gift_total_sink`
SELECT b.cf.module_id, SUM(a.gift_total_amount) AS module_gift_total_amount
FROM `live_gift_total_source` AS a
LEFT JOIN `dim_hbase` AS b FOR SYSTEM_TIME AS OF a.proc_time
ON a.room_id = b.rowkey
GROUP BY b.cf.module_id;Real‑Time Dashboard Construction
In the BI console, add the MySQL instance as a data source, create SQL datasets based on the tables populated by Oceanus, and build a report containing six charts: viewer geographic distribution, member‑level counts, total gifts per type, gifts in the last six hours, top‑10 gift spenders, and online audience over time. Enable the “real‑time data” option and set the refresh interval to 3 seconds to achieve a live updating dashboard.
Conclusion
The solution demonstrates end‑to‑end data flow: CKafka ingests raw streaming events, Oceanus (Flink) processes and enriches the data, results are stored in MySQL and HBase, and BI visualises the metrics in real time. The example simplifies table design for clarity while showcasing the integration of Tencent Cloud big‑data products.
Tencent Cloud Developer
Official Tencent Cloud community account that brings together developers, shares practical tech insights, and fosters an influential tech exchange community.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.