Lancer: Evolution of Bilibili's Real-Time Streaming Architecture
Lancer, Bilibili’s real‑time streaming backbone, has evolved from a monolithic Flume pipeline to a log‑id‑isolated, Kubernetes‑native architecture where Go edge agents feed synchronous Kafka‑proxied gateways into per‑logid topics processed by dedicated Flink‑SQL jobs, delivering exactly‑once, back‑pressured, highly scalable data ingestion for billions of daily requests.
Background – Lancer is Bilibili’s real‑time streaming platform that handles the entire site’s data ingestion, transport and integration. It processes up to 5 × 10⁷ requests per second, 3 PB per day and more than 4 000 streams, serving as the lifeline of Bilibili’s data warehouse.
Key Terminology
logid – identifier for each business data stream.
Data source – entry points such as log‑agent, bfe‑agent, Flink CDC.
lancer‑gateway – data gateway that receives reports.
Data buffer – internal Kafka used for decoupling ingestion and distribution.
lancer‑collector – distribution layer that syncs data to ODS.
Technical Evolution
V1.0 (Flume‑based, pre‑2019) – a single large pipeline built on a customized Flume stack. Architecture: data source → gateway (http/grpc) → internal Kafka buffer → collector (Flume) → ODS. Pain points included poor source‑side fault tolerance, lack of resource isolation, Flume complexity, and no exactly‑once guarantees for HDFS.
V2.0 (BU‑granular pipeline, 2020‑2021) – introduced BU‑level pipelines, edge agents (log‑agent, bfe‑agent) with built‑in buffering, flow‑control and retry, a self‑developed lancer‑gateway2.0 deployed on Kubernetes, and Flink‑based HDFS jobs providing exactly‑once semantics. New issues: logid‑level isolation still weak, gateway async model could lose data, ODS hotspots, and small‑file explosion.
V3.0 (Flink‑SQL single‑stream single‑job) – refactored the whole chain to isolate each logid. Core changes:
Edge agents isolate pipelines per logid.
lancer‑gateway3.0 uses synchronous Kafka send with back‑pressure and per‑logid topic mapping.
Each logid gets its own internal Kafka topic.
Data distribution runs a dedicated Flink‑SQL job per logid.
Benefits: higher reliability (no data loss, exactly‑once), better maintainability (logid isolation, fine‑grained resource control) and scalability (logid‑level elastic scaling).
V3.0 Detailed Implementation
4.1 Edge Reporting Layer
log‑agent – Go‑based, plugin architecture, deployed on physical machines. Features: file and unix‑socket collection, gRPC ACK + retry + flow‑control, per‑logid pipelines, CGroup limits, local disk buffering, aggregation, support for both VM and container logs.
bfe‑agent – Go‑based, deployed on CDN edge nodes, handles public web/app traffic. Adds local buffering for burst traffic, pre‑emptive downgrade/flow‑control, logid‑level sharding, compression to reduce upstream QPS.
4.2 Data Gateway Layer
lancer‑gateway3.0 acts as a generic Kafka proxy supporting both HTTP and gRPC. It sends data synchronously (Kafka send callback) and maps each logid to a dedicated Kafka topic, enabling isolation and back‑pressure. It also dynamically excludes problematic partitions.
Request handling flow (illustrated in the original diagram):
Requests are placed into per‑logid queues; overflow is rejected.
A pool of Kafka producers iterates over queues, sending records.
Producer count per logid and time‑slice per producer are limited to ensure fairness.
4.3 Data Distribution Layer
Flink‑SQL is used for data export, supporting three main scenarios:
Kafka → Hive (streaming import with exactly‑once, event‑time partitioning, text+LZO or ORC+ZSTD storage).
Kafka → Kafka (real‑time sync with header passthrough).
CDC → Kafka → Hudi/Hive (full and incremental MySQL binlog sync).
5 Flink Connector Enhancements
5.1 Hive Sink Optimizations
Partition commit on stream‑empty checkpoints (break‑flow handling).
Support for downstream incremental sync without waiting for ODS partition readiness.
ORC+ZSTD storage reduces space by >40% compared to text+LZO.
Asynchronous HDFS close to avoid tail‑latency from slow files.
Small‑file merging after checkpoint to reduce NameNode pressure.
5.2 Kafka Sink Optimizations
Protobuf format support via custom Deserialization/Serialization schemas.
Custom routing UDFs allowing dynamic selection of broker list and topic per record.
CREATE TABLE sink_test (
broker_name_arg varchar,
topic_name_arg varchar,
message string,
t1 string
) WITH (
'bootstrapServers' = 'BrokerUdf(broker_name_arg)',
'bootstrapServers.udf.class' = 'com.bilibili.lancer.udf.BrokerUdf',
'topic' = 'TopicUdf(broker_name_arg, topic_name_arg)',
'topic.udf.class' = 'com.bilibili.lancer.udf.TopicUdf',
'udf.cache.min' = '1',
'exclude.udf.field' = 'false',
'connector' = 'kafka-diversion'
);5.3 CDC Connector Optimizations
Multi‑database/table support via a new “bytes” format that streams raw changelog JSON, avoiding per‑column conversion and improving throughput.
Strictly monotonic sequence numbers (gtid) added to record headers for downstream ordering.
HeartbeatRecord type to keep watermarks advancing during silent periods.
CREATE TABLE mysql_binlog (
host_name STRING METADATA FROM 'host_name' VIRTUAL,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP(3) METADATA FROM 'op_ts' VIRTUAL,
sequence BIGINT METADATA FROM 'sequence' VIRTUAL, -- strictly monotonic
heartbeat BOOLEAN METADATA FROM 'heartbeat' VIRTUAL, -- true for heartbeat
mtime TIMESTAMP(3) METADATA FROM 'mtime' VIRTUAL,
id BIGINT NOT NULL,
filed_list BYTES NOT NULL, -- raw changelog bytes
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xxxx',
'port' = '3552',
'username' = 'datacenter_cdc',
'password' = 'xxx',
'database-name' = 'xxx',
'debezium.format' = 'bytes',
'table-name' = 'xxx',
'server-time-zone' = 'Asia/Shanghai',
'heartbeat.enable' = 'true',
'scan.incremental.snapshot.chunk.size' = '80960'
);6 Stability Optimizations
6.1 Pipeline Hotspot Mitigation
Local flow scheduling: tasks choose downstream channels with smaller backlog instead of round‑robin.
Kafka producer auto‑excludes failing partitions and periodically retries them.
6.2 End‑to‑End Trace Monitoring
Custom trace system records receive, send and error metrics per logid at every processing stage.
Metrics are windowed by creation time and aligned to compute latency, completeness and error rates.
6.3 Kafka Sync Duplicate Reduction
Checkpoint interval reduced to 10 s.
Session‑mode job submission to avoid full resource re‑allocation on restart.
Region‑based failover strategy and regional checkpoints to limit rollback scope.
6.4 Dynamic Kafka Failover
Sliding‑window circuit breaker (10‑second windows) calculates failure rate and switches traffic between normal and failover Kafka clusters.
6.5 Full‑Chain Flow Control, Back‑Pressure and Degradation
Back‑pressure propagates from downstream lag to upstream Kafka buffers.
Gateway returns HTTP 429 on Kafka write delay, prompting client retry/back‑off.
Local disk buffering and adaptive sampling at the edge.
6.6 Development‑Stage Quality Verification
Dual‑run validation: new and old pipelines write to separate Hive tables; hourly/daily MD5 checks ensure consistency.
Chaos testing injects failures (JM/TM crashes, hot‑spots, dirty data) to verify robustness.
7 Future Outlook
Integrate with company‑wide data bus for unified ingestion.
Full cloud‑native migration on Kubernetes with quota management and auto‑scaling.
Strengthen batch‑stream convergence, extend incremental integration to offline scenarios.
Bilibili Tech
Provides introductions and tutorials on Bilibili-related technologies.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.