Using the Sarama Go Library for Kafka: Configuration, Producer & Consumer Samples, and Simple Performance Testing
This article demonstrates how to use the Go Sarama library to configure Kafka producers and consumers, provides sample code for sending and receiving messages, and includes a lightweight performance test that repeatedly publishes messages to evaluate throughput and latency.
In a previous post I shared a simple Java Kafka client demo that many readers found too basic; this article revisits the topic using Go, reviewing basic Go syntax and applying the same learning habit of implementing each framework in both Java and Go.
The Go implementation relies on Shopify's github.com/Shopify/sarama v1.38.1 library, which offers extensive inline documentation and supports multiple Kafka protocol versions.
Kafka Configuration
Sarama uses a single configuration struct for both producers and consumers, though the settings differ; the example shows only a subset of useful parameters.
Producer configuration:
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.RequiredAcks = sarama.NoResponse
config.Producer.Compression = sarama.CompressionLZ4
config.Producer.Timeout = time.Duration(50) * time.Millisecond
config.Producer.Retry.Max = 3Consumer configuration:
config := sarama.NewConfig()
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.Retry.Max = 3Producer
The producer example adds a header (which Java also supports) and sends messages in a loop, printing the partition and offset for each send.
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatal(err)
return
}
defer func() {
if err = producer.Close(); err != nil {
log.Fatal(err)
}
}()
headers := []sarama.RecordHeader{{Key: []byte("funtest"), Value: []byte("have fun ~")}}
msg := &sarama.ProducerMessage{
Topic: "topic_test",
Key: sarama.StringEncoder("test"),
Value: sarama.StringEncoder("ddddddddddddddddd"),
Headers: headers,
}
for i := 0; i < 100; i++ {
ftool.Sleep(1000)
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Fatal(err)
}
fmt.Printf("partition:%d offset:%d\n", partition, offset)
}Consumer
The consumer uses Go channels, continuously receiving messages without needing to specify batch sizes or timeouts, which simplifies the design compared to Java.
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
topic := "topic_test"
partitionList, err := consumer.Partitions(topic)
if err != nil {
fmt.Printf("fail to get list of partition: err%v\n", err)
return
}
fmt.Println(partitionList)
defer consumer.Close()
for partition := range partitionList {
pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d, err:%v\n", partition, err)
}
for msg := range pc.Messages() {
log.Println(string(msg.Value))
}
for {
msg := <-pc.Messages()
log.Println(string(msg.Value))
}
}Two blocking consumption patterns are shown; for performance testing you can run them inside goroutines using the go keyword.
Performance Test
A simple benchmark repeatedly sends messages using the producer code; the execution helper ExecuteRoutineTimes runs the send operation a specified number of times across multiple goroutines and reports total time, request count, and QPS.
execute.ExecuteRoutineTimes(func() {
_, _, _ := producer.SendMessage(msg)
}, 100, 10) // ExecuteRoutineTimes
// @Description: FunTester performance test framework
func ExecuteRoutineTimes(fun func(), times, routine int) {
c := make(chan int)
key := false
start := ftool.Milli()
for i := 0; i < routine; i++ {
go func() {
sum := 0
for i := 0; i < times; i++ {
if key { break }
fun()
sum++
}
key = true
c <- sum
}()
}
total := 0
for i := 0; i < routine; i++ {
total += <-c
}
end := ftool.Milli()
diff := end - start
log.Printf("Total time: %f", float64(diff)/1000)
log.Printf("Total requests: %d", total)
log.Printf("QPS: %f", float64(total)/float64(diff)*1000.0)
}In summary, the Go implementation feels simpler than Java; developers familiar with Go can get a Kafka test up and running faster, and future work may compare the performance of the two languages.
FunTester Original Recommendation Sarama is a Go library for Apache Kafka, a distributed streaming platform that handles large-scale data streams. It supports multiple Kafka protocol versions and provides producer and consumer APIs, as well as utilities like partition selectors and load balancers to help developers manage Kafka consumers. -- By FunTester
FunTester Original Recommendation 900 Original Collection 2021 Original Collection 2022 Original Collection Interface Function Test Series Performance Test Series Groovy Series Java, Groovy, Go, Python Unit Test & White Box FunTester Community Highlights Testing Theory Soup FunTester Video Series Case Share: Solutions, Bugs, Crawlers UI Automation Series Testing Tools Series -- By FunTester
FunTester
10k followers, 1k articles | completely useless
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.