Build a High‑Performance Go WebSocket Server with fasthttp, Priority Queues, and Prometheus
Learn how to construct a low‑latency, scalable WebSocket server in Go using fasthttp, custom read/write pumps, priority message queues, a worker‑pool, and Prometheus metrics, with full source code, detailed module explanations, and deployment instructions for real‑time high‑concurrency applications.
Overview
This guide demonstrates how to build a high‑throughput, low‑latency WebSocket server in Go using fasthttp and fasthttp/websocket. The server replaces the standard net/http stack, runs independent read/write pumps per connection, employs a priority message queue, processes business logic via a worker pool, and exposes Prometheus metrics for observability.
Core Design Concepts
High‑Performance I/O
Replace net/http with fasthttp to reduce garbage‑collector pressure.
Each connection runs a dedicated read pump and write pump.
The write pump batches messages and respects a priority queue.
Priority Message Queue
Every connection maintains a min‑heap that orders messages by numeric priority.
High‑priority messages (e.g., heartbeats, system notifications) are sent first.
If the queue reaches sendQueueSize, new messages are dropped and counted.
Room Management & Broadcast
A global Hub maps room names to Room objects: map[string]*Room.
Rooms support intra‑room broadcast and global broadcast via the hub.
Worker Pool for Asynchronous Tasks
The read pump parses incoming frames and forwards business‑logic tasks to a fixed‑size worker pool.
The pool limits concurrency, provides back‑pressure, and records submitted/dropped tasks.
Implementation Details
Prometheus Metrics Registration
var (
activeConnections = prometheus.NewGauge(prometheus.GaugeOpts{Name: "ws_active_connections", Help: "Current active connections"})
messagesSent = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_messages_sent_total", Help: "Total messages sent"})
messagesDropped = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_messages_dropped_total", Help: "Messages dropped due to full priority queue"})
sendQueueLength = prometheus.NewGauge(prometheus.GaugeOpts{Name: "ws_send_queue_length", Help: "Sum of send queue lengths across all connections"})
tasksSubmitted = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_tasks_submitted_total", Help: "Total tasks submitted to worker pool"})
tasksDropped = prometheus.NewCounter(prometheus.CounterOpts{Name: "ws_tasks_dropped_total", Help: "Tasks dropped when worker queue is full"})
workerQueueLen = prometheus.NewGauge(prometheus.GaugeOpts{Name: "ws_worker_queue_length", Help: "Current length of worker pool task queue"})
)
func init() {
prometheus.MustRegister(activeConnections, messagesSent, messagesDropped, sendQueueLength, tasksSubmitted, tasksDropped, workerQueueLen)
}Buffer Pool
var bufPool = sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufCap); return &b}}Hub / Room Management
type Room struct {
name string
conns map[*ConnWrap]bool
lock sync.RWMutex
}
func NewRoom(name string) *Room { return &Room{name: name, conns: make(map[*ConnWrap]bool)} }
func (r *Room) AddConn(c *ConnWrap) { r.lock.Lock(); defer r.lock.Unlock(); r.conns[c] = true; c.room = r }
func (r *Room) RemoveConn(c *ConnWrap) { r.lock.Lock(); defer r.lock.Unlock(); delete(r.conns, c) }
func (r *Room) Broadcast(msg []byte, priority int) {
r.lock.RLock(); defer r.lock.RUnlock()
for c := range r.conns { _ = c.Send(msg, priority) }
}Worker Pool
type Task struct { Conn *ConnWrap; Data []byte }
type WorkerPool struct { tasks chan Task; wg sync.WaitGroup; quit chan struct{} }
var workerPool *WorkerPool
func NewWorkerPool(n, cap int) *WorkerPool {
wp := &WorkerPool{tasks: make(chan Task, cap), quit: make(chan struct{})}
wp.wg.Add(n)
for i := 0; i < n; i++ {
go func(id int) {
defer wp.wg.Done()
for {
select {
case t, ok := <-wp.tasks:
if !ok { return }
handleTask(id, t)
workerQueueLen.Set(float64(len(wp.tasks)))
case <-wp.quit:
return
}
}
}(i)
}
return wp
}
func (wp *WorkerPool) Submit(t Task) bool {
select { case wp.tasks <- t: workerQueueLen.Set(float64(len(wp.tasks))); return true default: return false }
}
func (wp *WorkerPool) Shutdown() { close(wp.tasks); close(wp.quit); wp.wg.Wait() }WebSocket Connection Wrapper
type ConnWrap struct {
conn *websocket.Conn
ctx context.Context
cancel context.CancelFunc
room *Room
pq PriorityQueue
pqCond *sync.Cond
closed int32
wg sync.WaitGroup
}
func NewConnWrap(ws *websocket.Conn) *ConnWrap {
ctx, cancel := context.WithCancel(context.Background())
c := &ConnWrap{conn: ws, ctx: ctx, cancel: cancel, pqCond: sync.NewCond(&sync.Mutex{})}
heap.Init(&c.pq)
activeConnections.Inc()
c.wg.Add(2)
go c.readPump()
go c.writePump()
return c
}
func (c *ConnWrap) Close() {
if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) { return }
c.cancel(); _ = c.conn.Close(); c.wg.Wait(); activeConnections.Dec()
if c.room != nil { c.room.RemoveConn(c) }
}
func (c *ConnWrap) Send(data []byte, priority int) bool {
if atomic.LoadInt32(&c.closed) == 1 { return false }
p := bufPool.Get().(*[]byte)
*p = (*p)[:0]
*p = append(*p, data...)
item := &priorityMsg{data: *p, priority: priority}
c.pqCond.L.Lock()
if len(c.pq) >= sendQueueSize {
c.pqCond.L.Unlock()
bufPool.Put(p)
messagesDropped.Inc()
sendQueueLength.Set(float64(len(c.pq)))
return false
}
heap.Push(&c.pq, item)
c.pqCond.Signal()
c.pqCond.L.Unlock()
sendQueueLength.Set(float64(len(c.pq)))
return true
}
// readPump parses messages, handles "join:", "msg:", "task:" prefixes, and forwards tasks to the worker pool.
// writePump batches queued messages, sends ping frames, and writes messages to the client.WebSocket Handler
var upgrader = websocket.FastHTTPUpgrader{CheckOrigin: func(ctx *fasthttp.RequestCtx) bool { return true }}
func wsHandler(ctx *fasthttp.RequestCtx) {
err := upgrader.Upgrade(ctx, func(ws *websocket.Conn) {
c := NewConnWrap(ws)
<-c.ctx.Done()
c.Close()
})
if err != nil { log.Printf("upgrade error: %v", err) }
}Main Function & Server Startup
func main() {
workerPool = NewWorkerPool(workerPoolSize, workerQueueCapacity)
log.Printf("worker pool started: size=%d, queue=%d", workerPoolSize, workerQueueCapacity)
srv := &fasthttp.Server{Handler: func(ctx *fasthttp.RequestCtx) {
switch string(ctx.Path()) {
case "/ws":
wsHandler(ctx)
case "/metrics":
promHandler(ctx)
default:
ctx.SetStatusCode(fasthttp.StatusNotFound)
}
}}
ln, err := net.Listen("tcp4", addr)
if err != nil { log.Fatalf("listen: %v", err) }
go func() { log.Printf("listening on %s", addr); if err := srv.Serve(ln); err != nil { log.Fatalf("server error: %v", err) } }()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("shutdown: stopping server and worker pool...")
_ = srv.Shutdown()
workerPool.Shutdown()
log.Println("shutdown complete")
}Usage Instructions
Initialize the module and run the server:
go mod init golang-ws
go mod tidy
go run main.goTest the WebSocket endpoint with JavaScript:
const socket = new WebSocket('ws://localhost:8080/ws');
socket.onopen = function() {
socket.send('join:roomA');
socket.send('task:hello');
};
socket.onmessage = function(e) { console.log('recv:', e.data); };
socket.onclose = function() { console.log('connection closed'); };Prometheus metrics are exposed at http://localhost:8080/metrics.
Optimization Suggestions
Adjust the number of workers and the task‑queue capacity to match workload characteristics.
Consider using JSON or Protobuf for the message payload format.
For cross‑instance broadcasting, integrate a message broker such as Redis or NATS.
Extend the task system with retry logic, delayed queues, or a dead‑letter queue.
Open‑Source Repository
GitHub: https://github.com/louis-xie-programmer/golang-ws Gitee (mirror): https://gitee.com/louis_xie/golang-ws
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Code Wrench
Focuses on code debugging, performance optimization, and real-world engineering, sharing efficient development tips and pitfall guides. We break down technical challenges in a down-to-earth style, helping you craft handy tools so every line of code becomes a problem‑solving weapon. 🔧💻
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
