Understanding and Using sync.Cond in Go: Source Code Analysis and Practical Examples
This article explains the purpose, internal implementation, and correct usage patterns of Go's sync.Cond concurrency primitive, walks through its source code, demonstrates simple and advanced examples—including a custom concurrent waiting queue—and provides test cases to illustrate its behavior in real-world scenarios.
In Go, the sync.Cond primitive is rarely used directly but appears in projects like Kubernetes for implementing concurrent waiting queues; this article teaches its proper usage through source code analysis and examples.
Source Code Overview
The exported API of sync.Cond can be seen in the documentation https://pkg.go.dev/[email protected]#Cond and includes the constructor NewCond and three methods: Broadcast , Signal , and Wait :
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
func NewCond(l Locker) *Cond { return &Cond{L: l} }
func (c *Cond) Wait() { /* implementation */ }
func (c *Cond) Signal() { /* implementation */ }
func (c *Cond) Broadcast() { /* implementation */ }sync.Cond is a struct whose L field holds a sync.Locker (implemented by sync.Mutex or sync.RWMutex ) that protects the condition while it is observed or changed. The notify field is a notifyList that records the waiting goroutine queue, and checker (a copyChecker ) prevents the struct from being copied.
The notifyList is a linked‑list implementation used by the runtime to manage waiting and waking goroutines:
type notifyList struct {
wait uint32 // number of goroutines currently waiting
notify uint32 // number of goroutines that have been notified
lock uintptr // lock protecting wait/notify
head unsafe.Pointer // head of the wait queue
tail unsafe.Pointer // tail of the wait queue
}The copyChecker panics if a Cond is copied after use:
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}The three methods share a common step: they call c.checker.check() to ensure the object hasn't been copied, then delegate the core logic to runtime helpers:
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}Correct usage follows the pattern shown in the documentation:
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... use condition ...
// c.L.Unlock()Simple Example
A minimal program creates a sync.Cond with a sync.Mutex , starts a goroutine that waits, sleeps for a second, then signals the waiting goroutine:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
c := sync.NewCond(&sync.Mutex{})
go func() {
fmt.Println("wait before")
c.L.Lock()
c.Wait()
c.L.Unlock()
fmt.Println("wait after")
}()
time.Sleep(time.Second)
fmt.Println("signal before")
c.Signal()
fmt.Println("signal after")
time.Sleep(time.Second)
}The output demonstrates that the waiting goroutine is blocked until Signal wakes it.
Realistic Usage with a Condition Variable
To illustrate the typical pattern, the example adds a boolean condition that the waiting goroutine checks in a loop; the main goroutine sets the condition to true before calling Signal :
func main() {
c := sync.NewCond(&sync.Mutex{})
condition := false
go func() {
fmt.Println("wait before")
c.L.Lock()
for !condition {
c.Wait()
}
fmt.Println("condition met, continue execution")
c.L.Unlock()
fmt.Println("wait after")
}()
time.Sleep(time.Second)
fmt.Println("signal before")
c.L.Lock()
condition = true
c.L.Unlock()
c.Signal()
fmt.Println("signal after")
time.Sleep(time.Second)
}This pattern shows how Wait releases the lock, blocks, and reacquires it after being notified, allowing safe modification of the condition.
Building a Concurrent Waiting Queue with sync.Cond
The article then demonstrates a practical component—a generic Queue —that uses sync.Cond to coordinate producers and consumers. The interface defines Add , Get , Len , ShutDown , and ShuttingDown methods.
type Interface interface {
Add(item any)
Get() (item any, shutdown bool)
Len() int
ShutDown()
ShuttingDown() bool
}The implementation stores items in a slice []any , protects access with a sync.Cond , and tracks a shutdown flag:
type Queue struct {
cond *sync.Cond
queue []any
shuttingDown bool
}
func New() *Queue {
return &Queue{cond: sync.NewCond(&sync.Mutex{})}
}
func (q *Queue) Add(item any) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown { return }
q.queue = append(q.queue, item)
q.cond.Signal()
}
func (q *Queue) Get() (item any, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 { return nil, true }
item = q.queue[0]
q.queue[0] = nil
q.queue = q.queue[1:]
return item, false
}
func (q *Queue) ShutDown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.shuttingDown = true
q.cond.Broadcast()
}
func (q *Queue) ShuttingDown() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.shuttingDown
}
func (q *Queue) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return len(q.queue)
}A test case launches 50 producer goroutines (each adding 50 items) and 10 consumer goroutines that retrieve items until the queue is shut down, verifying that the queue ends empty and that no items are added after shutdown:
func TestBasic(t *testing.T) {
tests := []struct{ queue *queue.Queue }{{queue: queue.New()}, {queue: queue.New()}}
for _, test := range tests {
const producers = 50
var producerWG sync.WaitGroup
producerWG.Add(producers)
for i := 0; i < producers; i++ {
go func(i int) {
defer producerWG.Done()
for j := 0; j < 50; j++ {
test.queue.Add(i)
time.Sleep(time.Millisecond)
}
}(i)
}
const consumers = 10
var consumerWG sync.WaitGroup
consumerWG.Add(consumers)
for i := 0; i < consumers; i++ {
go func(i int) {
defer consumerWG.Done()
for {
item, quit := test.queue.Get()
if item == "added after shutdown!" {
t.Errorf("Got an item added after shutdown.")
}
if quit { return }
t.Logf("Worker %v: processing %v", i, item)
}
}(i)
}
producerWG.Wait()
test.queue.ShutDown()
test.queue.Add("added after shutdown!")
consumerWG.Wait()
if test.queue.Len() != 0 {
t.Errorf("Expected the queue to be empty, had: %v items", test.queue.Len())
}
}
}The output confirms that all items are processed, the shutdown flag works, and the queue behaves correctly under high concurrency. This implementation mirrors the core ideas of Kubernetes' client‑go workqueue component.
Conclusion
The article has dissected sync.Cond 's design, demonstrated its correct pattern of locking, condition checking, and signaling, and built a reusable concurrent waiting queue that can serve as a foundation for more complex systems such as Kubernetes workqueues.
Go Programming World
Mobile version of tech blog https://jianghushinian.cn/, covering Golang, Docker, Kubernetes and beyond.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.