Build a High‑Performance Go WebSocket Server with fasthttp, Priority Queues, and Prometheus

Learn how to construct a low‑latency, scalable WebSocket server in Go using fasthttp, custom read/write pumps, priority message queues, a worker‑pool, and Prometheus metrics, with full source code, detailed module explanations, and deployment instructions for real‑time high‑concurrency applications.

Code Wrench
Code Wrench
Code Wrench
Build a High‑Performance Go WebSocket Server with fasthttp, Priority Queues, and Prometheus

Overview

This guide demonstrates how to build a high‑throughput, low‑latency WebSocket server in Go using fasthttp and fasthttp/websocket. The server replaces the standard net/http stack, runs independent read/write pumps per connection, employs a priority message queue, processes business logic via a worker pool, and exposes Prometheus metrics for observability.

Core Design Concepts

High‑Performance I/O

Replace net/http with fasthttp to reduce garbage‑collector pressure.

Each connection runs a dedicated read pump and write pump.

The write pump batches messages and respects a priority queue.

Priority Message Queue

Every connection maintains a min‑heap that orders messages by numeric priority.

High‑priority messages (e.g., heartbeats, system notifications) are sent first.

If the queue reaches sendQueueSize, new messages are dropped and counted.

Room Management & Broadcast

A global Hub maps room names to Room objects: map[string]*Room.

Rooms support intra‑room broadcast and global broadcast via the hub.

Worker Pool for Asynchronous Tasks

The read pump parses incoming frames and forwards business‑logic tasks to a fixed‑size worker pool.

The pool limits concurrency, provides back‑pressure, and records submitted/dropped tasks.

Implementation Details

Prometheus Metrics Registration

var (
    activeConnections = prometheus.NewGauge(prometheus.GaugeOpts{Name: "ws_active_connections", Help: "Current active connections"})
    messagesSent = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_messages_sent_total", Help: "Total messages sent"})
    messagesDropped = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_messages_dropped_total", Help: "Messages dropped due to full priority queue"})
    sendQueueLength = prometheus.NewGauge(prometheus.GaugeOpts{Name: "ws_send_queue_length", Help: "Sum of send queue lengths across all connections"})
    tasksSubmitted = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_tasks_submitted_total", Help: "Total tasks submitted to worker pool"})
    tasksDropped = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_tasks_dropped_total", Help: "Tasks dropped when worker queue is full"})
    workerQueueLen = prometheus.NewGauge(prometheus.GaugeOpts{Name: "ws_worker_queue_length", Help: "Current length of worker pool task queue"})
)
func init() {
    prometheus.MustRegister(activeConnections, messagesSent, messagesDropped, sendQueueLength, tasksSubmitted, tasksDropped, workerQueueLen)
}

Buffer Pool

var bufPool = sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufCap); return &b}}

Hub / Room Management

type Room struct {
    name  string
    conns map[*ConnWrap]bool
    lock  sync.RWMutex
}
func NewRoom(name string) *Room { return &Room{name: name, conns: make(map[*ConnWrap]bool)} }
func (r *Room) AddConn(c *ConnWrap) { r.lock.Lock(); defer r.lock.Unlock(); r.conns[c] = true; c.room = r }
func (r *Room) RemoveConn(c *ConnWrap) { r.lock.Lock(); defer r.lock.Unlock(); delete(r.conns, c) }
func (r *Room) Broadcast(msg []byte, priority int) {
    r.lock.RLock(); defer r.lock.RUnlock()
    for c := range r.conns { _ = c.Send(msg, priority) }
}

Worker Pool

type Task struct { Conn *ConnWrap; Data []byte }
type WorkerPool struct { tasks chan Task; wg sync.WaitGroup; quit chan struct{} }
var workerPool *WorkerPool
func NewWorkerPool(n, cap int) *WorkerPool {
    wp := &WorkerPool{tasks: make(chan Task, cap), quit: make(chan struct{})}
    wp.wg.Add(n)
    for i := 0; i < n; i++ {
        go func(id int) {
            defer wp.wg.Done()
            for {
                select {
                case t, ok := <-wp.tasks:
                    if !ok { return }
                    handleTask(id, t)
                    workerQueueLen.Set(float64(len(wp.tasks)))
                case <-wp.quit:
                    return
                }
            }
        }(i)
    }
    return wp
}
func (wp *WorkerPool) Submit(t Task) bool {
    select { case wp.tasks <- t: workerQueueLen.Set(float64(len(wp.tasks))); return true default: return false }
}
func (wp *WorkerPool) Shutdown() { close(wp.tasks); close(wp.quit); wp.wg.Wait() }

WebSocket Connection Wrapper

type ConnWrap struct {
    conn   *websocket.Conn
    ctx    context.Context
    cancel context.CancelFunc
    room   *Room
    pq     PriorityQueue
    pqCond *sync.Cond
    closed int32
    wg     sync.WaitGroup
}
func NewConnWrap(ws *websocket.Conn) *ConnWrap {
    ctx, cancel := context.WithCancel(context.Background())
    c := &ConnWrap{conn: ws, ctx: ctx, cancel: cancel, pqCond: sync.NewCond(&sync.Mutex{})}
    heap.Init(&c.pq)
    activeConnections.Inc()
    c.wg.Add(2)
    go c.readPump()
    go c.writePump()
    return c
}
func (c *ConnWrap) Close() {
    if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) { return }
    c.cancel(); _ = c.conn.Close(); c.wg.Wait(); activeConnections.Dec()
    if c.room != nil { c.room.RemoveConn(c) }
}
func (c *ConnWrap) Send(data []byte, priority int) bool {
    if atomic.LoadInt32(&c.closed) == 1 { return false }
    p := bufPool.Get().(*[]byte)
    *p = (*p)[:0]
    *p = append(*p, data...)
    item := &priorityMsg{data: *p, priority: priority}
    c.pqCond.L.Lock()
    if len(c.pq) >= sendQueueSize {
        c.pqCond.L.Unlock()
        bufPool.Put(p)
        messagesDropped.Inc()
        sendQueueLength.Set(float64(len(c.pq)))
        return false
    }
    heap.Push(&c.pq, item)
    c.pqCond.Signal()
    c.pqCond.L.Unlock()
    sendQueueLength.Set(float64(len(c.pq)))
    return true
}
// readPump parses messages, handles "join:", "msg:", "task:" prefixes, and forwards tasks to the worker pool.
// writePump batches queued messages, sends ping frames, and writes messages to the client.

WebSocket Handler

var upgrader = websocket.FastHTTPUpgrader{CheckOrigin: func(ctx *fasthttp.RequestCtx) bool { return true }}
func wsHandler(ctx *fasthttp.RequestCtx) {
    err := upgrader.Upgrade(ctx, func(ws *websocket.Conn) {
        c := NewConnWrap(ws)
        <-c.ctx.Done()
        c.Close()
    })
    if err != nil { log.Printf("upgrade error: %v", err) }
}

Main Function & Server Startup

func main() {
    workerPool = NewWorkerPool(workerPoolSize, workerQueueCapacity)
    log.Printf("worker pool started: size=%d, queue=%d", workerPoolSize, workerQueueCapacity)
    srv := &fasthttp.Server{Handler: func(ctx *fasthttp.RequestCtx) {
        switch string(ctx.Path()) {
        case "/ws":
            wsHandler(ctx)
        case "/metrics":
            promHandler(ctx)
        default:
            ctx.SetStatusCode(fasthttp.StatusNotFound)
        }
    }}
    ln, err := net.Listen("tcp4", addr)
    if err != nil { log.Fatalf("listen: %v", err) }
    go func() { log.Printf("listening on %s", addr); if err := srv.Serve(ln); err != nil { log.Fatalf("server error: %v", err) } }()
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh
    log.Println("shutdown: stopping server and worker pool...")
    _ = srv.Shutdown()
    workerPool.Shutdown()
    log.Println("shutdown complete")
}

Usage Instructions

Initialize the module and run the server:

go mod init golang-ws
go mod tidy
go run main.go

Test the WebSocket endpoint with JavaScript:

const socket = new WebSocket('ws://localhost:8080/ws');
socket.onopen = function() {
    socket.send('join:roomA');
    socket.send('task:hello');
};
socket.onmessage = function(e) { console.log('recv:', e.data); };
socket.onclose = function() { console.log('connection closed'); };

Prometheus metrics are exposed at http://localhost:8080/metrics.

Optimization Suggestions

Adjust the number of workers and the task‑queue capacity to match workload characteristics.

Consider using JSON or Protobuf for the message payload format.

For cross‑instance broadcasting, integrate a message broker such as Redis or NATS.

Extend the task system with retry logic, delayed queues, or a dead‑letter queue.

Open‑Source Repository

GitHub: https://github.com/louis-xie-programmer/golang-ws Gitee (mirror): https://gitee.com/louis_xie/golang-ws
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

GoPrometheuswebsocketpriority-queuefasthttpWorker Pool
Code Wrench
Written by

Code Wrench

Focuses on code debugging, performance optimization, and real-world engineering, sharing efficient development tips and pitfall guides. We break down technical challenges in a down-to-earth style, helping you craft handy tools so every line of code becomes a problem‑solving weapon. 🔧💻

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.