Big Data 15 min read

Design and Implementation of a Real-Time Video Live Streaming Analytics System Using Tencent Cloud Big Data Services

The article details a cloud‑native architecture on Tencent Cloud that uses CKafka, Oceanus (Flink), MySQL, HBase and a BI service to ingest live‑streaming logs, aggregate gift‑reward metrics in real time, store results, and display them on a continuously refreshed dashboard.

Tencent Cloud Developer
Tencent Cloud Developer
Tencent Cloud Developer
Design and Implementation of a Real-Time Video Live Streaming Analytics System Using Tencent Cloud Big Data Services

This article describes how to design and implement a real‑time analytics system for video live‑streaming gift rewards on Tencent Cloud using cloud‑native big‑data components. By comparing the usage of CKafka, Oceanus (Flink), MySQL and other services, the solution demonstrates the advantages of a cloud‑based approach.

Solution Overview – The architecture combines Tencent Cloud CKafka, the stream‑computing service Oceanus, a private VPC network, and the Business Intelligence (BI) analysis service to provide real‑time visualisation of metrics such as viewer geographic distribution, member‑level statistics, gift totals and online audience counts.

Key Components

Oceanus (Flink‑compatible stream computing)

CKafka (cloud‑native Kafka)

VPC (private network)

MySQL (TencentDB for MySQL)

EMR HBase component

BI (self‑service data analysis)

Preparation Steps

Purchase and create the required big‑data components.

Create a VPC and subnets to ensure all services share the same network.

Deploy an Oceanus cluster, selecting region, availability zone, VPC, logs and storage.

Create a CKafka cluster and define topics live_streaming_log and live_gift_total .

Provision a MySQL instance, enable binlog (FULL), and create the necessary databases and tables.

Set up an EMR HBase cluster and create the dimension table dim_hbase .

Data Generation Script (Python)

#!/usr/bin/python3
# pip3 install kafka
import json, random, time
from kafka import KafkaProducer
TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
PROVINCES = ["北京","广东","山东","江苏","河南","上海","河北","浙江","香港","陕西","湖南","重庆","福建","天津","云南","四川","广西","安徽","海南","江西","湖北","山西","辽宁","台湾","黑龙江","内蒙古","澳门","贵州","甘肃","青海","新疆","西藏","吉林","宁夏"]
broker_lists = ['172.28.28.13:9092']
producer = KafkaProducer(bootstrap_servers=broker_lists, value_serializer=lambda m: json.dumps(m).encode('ascii'))
# generate and send data in an infinite loop (simplified)
while True:
    # generate live streaming log
    # ... (omitted for brevity)
    time.sleep(60)

Flink (Oceanus) SQL – Table Definitions

CREATE TABLE `live_streaming_log_source` (
    `user_id` BIGINT,
    `ip` VARCHAR,
    `room_id` BIGINT,
    `arrive_time` TIMESTAMP,
    `leave_time` TIMESTAMP,
    `create_time` TIMESTAMP,
    `region_code` INT,
    `grade` INT,
    `province` VARCHAR
) WITH (
    'connector' = 'kafka',
    'topic' = 'live_streaming_log',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = '172.28.28.13:9092',
    'properties.group.id' = 'joylyu-consumer-2',
    'format' = 'json',
    'json.ignore-parse-errors' = 'false',
    'json.fail-on-missing-field' = 'false'
);

CREATE TABLE `live_streaming_log_sink` (
    `user_id` BIGINT,
    `ip` VARCHAR,
    `room_id` BIGINT,
    `arrive_time` TIMESTAMP,
    `leave_time` TIMESTAMP,
    `create_time` TIMESTAMP,
    `region_code` INT,
    `grade` INT,
    `province` VARCHAR,
    PRIMARY KEY (`user_id`, `ip`, `room_id`, `arrive_time`) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://172.28.28.227:3306/livedb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
    'table-name' = 'live_streaming_log',
    'username' = 'root',
    'password' = 'xxxxx',
    'sink.buffer-flush.max-rows' = '5000',
    'sink.buffer-flush.interval' = '2s',
    'sink.max-retries' = '3'
);

INSERT INTO `live_streaming_log_sink`
SELECT * FROM `live_streaming_log_source`;

Gift Total Aggregation

CREATE TABLE `live_gift_total_source` (
    `user_id` VARCHAR,
    `gift_type` VARCHAR,
    `gift_total_amount` BIGINT,
    `ip` VARCHAR,
    `create_time` VARCHAR,
    proc_time AS PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = 'live_gift_total',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = '172.28.28.13:9092',
    'properties.group.id' = 'joylyu-consumer-1',
    'format' = 'json',
    'json.ignore-parse-errors' = 'false',
    'json.fail-on-missing-field' = 'false'
);

CREATE TABLE `live_gift_total_sink` (
    `gift_type` VARCHAR,
    `gift_total_amount` BIGINT,
    PRIMARY KEY (`gift_type`) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://172.28.28.227:3306/livedb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
    'table-name' = 'live_gift_total',
    'username' = 'root',
    'password' = 'xxxxx',
    'sink.buffer-flush.max-rows' = '5000',
    'sink.buffer-flush.interval' = '2s',
    'sink.max-retries' = '3'
);

INSERT INTO `live_gift_total_sink`
SELECT `gift_type`, SUM(`gift_total_amount`) AS `gift_total_amount_all`
FROM `live_gift_total_source`
GROUP BY `gift_type`;

HBase Dimension Table

CREATE TABLE `dim_hbase` (
    `rowkey` STRING,
    `cf` ROW<`module_id` STRING>
) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'dim_hbase',
    'zookeeper.quorum' = 'your_hbase_zookeeper_address'
);

Module Gift Aggregation (Join with HBase)

CREATE TABLE `module_gift_total_sink` (
    `module_id` BIGINT,
    `module_gift_total_amount` BIGINT,
    PRIMARY KEY (`module_id`) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://172.28.28.227:3306/livedb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
    'table-name' = 'live_gift_total',
    'username' = 'root',
    'password' = 'xxxxx',
    'sink.buffer-flush.max-rows' = '5000',
    'sink.buffer-flush.interval' = '2s',
    'sink.max-retries' = '3'
);

INSERT INTO `module_gift_total_sink`
SELECT b.cf.module_id, SUM(a.gift_total_amount) AS module_gift_total_amount
FROM `live_gift_total_source` AS a
LEFT JOIN `dim_hbase` AS b FOR SYSTEM_TIME AS OF a.proc_time
ON a.room_id = b.rowkey
GROUP BY b.cf.module_id;

Real‑Time Dashboard Construction

In the BI console, add the MySQL instance as a data source, create SQL datasets based on the tables populated by Oceanus, and build a report containing six charts: viewer geographic distribution, member‑level counts, total gifts per type, gifts in the last six hours, top‑10 gift spenders, and online audience over time. Enable the “real‑time data” option and set the refresh interval to 3 seconds to achieve a live updating dashboard.

Conclusion

The solution demonstrates end‑to‑end data flow: CKafka ingests raw streaming events, Oceanus (Flink) processes and enriches the data, results are stored in MySQL and HBase, and BI visualises the metrics in real time. The example simplifies table design for clarity while showcasing the integration of Tencent Cloud big‑data products.

Cloud Servicesbig dataFlinkstream processingReal-time Analyticsbusiness intelligenceKafkaVideo Streaming
Tencent Cloud Developer
Written by

Tencent Cloud Developer

Official Tencent Cloud community account that brings together developers, shares practical tech insights, and fosters an influential tech exchange community.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.