The Beauty of Go: Parallelism & Concurrency
Mastering Go Concurrency: Goroutines, Channels, and Beyond
Go's concurrency model is one of its most powerful features, built around the principle of "Don't communicate by sharing memory; share memory by communicating." This philosophy is embodied in goroutines and channels, which provide a clean, efficient way to handle concurrent operations.
Understanding Concurrency vs Parallelism
Before diving into the implementation, it's crucial to understand the difference between concurrency and parallelism:
- Concurrency: Dealing with multiple things at once by interleaving execution
- Parallelism: Doing multiple things simultaneously using multiple processors
Go excels at both, but they're not the same thing. A concurrent program can run on a single core by switching between tasks, while a parallel program requires multiple cores to execute tasks simultaneously.
Goroutines: Lightweight Threads
Goroutines are Go's way of handling concurrent execution. They're lightweight threads managed by the Go runtime, not operating system threads. You can create thousands of goroutines with minimal memory overhead.
Basic Goroutine Usage
package main
import (
"fmt"
"time"
)
func worker(id int) {
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
// Launch multiple goroutines
for i := 1; i <= 5; i++ {
go worker(i)
}
// Wait for goroutines to complete
time.Sleep(2 * time.Second)
fmt.Println("All workers completed")
}
Anonymous Goroutines
You can also launch anonymous functions as goroutines:
package main
import (
"fmt"
"time"
)
func main() {
// Launch anonymous goroutine
go func() {
fmt.Println("Anonymous goroutine running")
time.Sleep(time.Second)
fmt.Println("Anonymous goroutine completed")
}()
fmt.Println("Main function continuing...")
time.Sleep(2 * time.Second)
}
Channels: Communication Between Goroutines
Channels are Go's primary mechanism for communication between goroutines. They provide a safe way to share data and synchronize execution.
Basic Channel Operations
package main
import (
"fmt"
"time"
)
func sender(ch chan string) {
messages := []string{"Hello", "World", "from", "Go"}
for _, msg := range messages {
ch <- msg // Send message to channel
time.Sleep(time.Millisecond * 100)
}
close(ch) // Close channel when done
}
func receiver(ch chan string) {
for msg := range ch { // Receive messages until channel is closed
fmt.Printf("Received: %s\n", msg)
}
}
func main() {
ch := make(chan string)
go sender(ch)
go receiver(ch)
time.Sleep(time.Second)
}
Buffered Channels
Buffered channels allow you to send multiple values without blocking until the buffer is full:
package main
import (
"fmt"
"time"
)
func main() {
// Create buffered channel with capacity 3
ch := make(chan int, 3)
// Send values without blocking (buffer has space)
ch <- 1
ch <- 2
ch <- 3
// This would block if buffer was full
// ch <- 4
// Receive values
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
fmt.Println(<-ch) // 3
}
Select Statement for Non-blocking Operations
The select
statement allows you to handle multiple channels non-blockingly:
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(time.Second)
ch1 <- "Message from channel 1"
}()
go func() {
time.Sleep(time.Millisecond * 500)
ch2 <- "Message from channel 2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("Received from ch1:", msg1)
case msg2 := <-ch2:
fmt.Println("Received from ch2:", msg2)
case <-time.After(time.Second * 2):
fmt.Println("Timeout occurred")
}
}
}
Advanced Concurrency Patterns
Worker Pool Pattern
A common pattern is to create a pool of worker goroutines to process jobs:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(time.Millisecond * 500) // Simulate work
results <- job * 2 // Process result
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
// Start workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// Send jobs
for i := 1; i <= numJobs; i++ {
jobs <- i
}
close(jobs)
// Wait for all workers to complete
go func() {
wg.Wait()
close(results)
}()
// Collect results
for result := range results {
fmt.Printf("Result: %d\n", result)
}
}
Pipeline Pattern
Chains of goroutines can form processing pipelines:
package main
import (
"fmt"
"math"
)
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func filter(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
}
}()
return out
}
func main() {
// Pipeline: generate -> square -> filter
numbers := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squared := square(numbers)
filtered := filter(squared)
for result := range filtered {
fmt.Printf("Filtered result: %d\n", result)
}
}
Synchronization with WaitGroup and Mutex
WaitGroup for Goroutine Synchronization
package main
import (
"fmt"
"sync"
"time"
)
func processTask(id int, wg *sync.WaitGroup) {
defer wg.Done() // Decrement counter when done
fmt.Printf("Starting task %d\n", id)
time.Sleep(time.Millisecond * 500) // Simulate work
fmt.Printf("Completed task %d\n", id)
}
func main() {
var wg sync.WaitGroup
// Launch 5 goroutines
for i := 1; i <= 5; i++ {
wg.Add(1) // Increment counter
go processTask(i, &wg)
}
// Wait for all goroutines to complete
wg.Wait()
fmt.Println("All tasks completed!")
}
Mutex for Shared Resource Protection
package main
import (
"fmt"
"sync"
"time"
)
type SafeCounter struct {
mu sync.Mutex
count int
}
func (c *SafeCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
func (c *SafeCounter) GetCount() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
func main() {
counter := SafeCounter{}
// Launch multiple goroutines that increment the counter
for i := 0; i < 1000; i++ {
go func() {
counter.Increment()
}()
}
time.Sleep(time.Second)
fmt.Printf("Final count: %d\n", counter.GetCount())
}
Context for Cancellation and Timeouts
The context
package provides a way to carry deadlines, cancellation signals, and other request-scoped values across API boundaries:
package main
import (
"context"
"fmt"
"time"
)
func longRunningTask(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("Task cancelled:", ctx.Err())
return
default:
fmt.Println("Working...")
time.Sleep(time.Millisecond * 200)
}
}
}
func main() {
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
go longRunningTask(ctx)
// Wait for timeout
<-ctx.Done()
fmt.Println("Main function completed")
}
Best Practices and Common Pitfalls
Avoid Goroutine Leaks
Always ensure goroutines can exit:
package main
import (
"fmt"
"time"
)
func goodGoroutine(done chan bool) {
defer func() {
done <- true // Signal completion
}()
// Do work
time.Sleep(time.Millisecond * 100)
fmt.Println("Work completed")
}
func main() {
done := make(chan bool)
go goodGoroutine(done)
// Wait for completion
<-done
fmt.Println("Goroutine finished")
}
Use Buffered Channels Appropriately
package main
import (
"fmt"
"time"
)
func main() {
// Unbuffered channel - sender blocks until receiver is ready
ch1 := make(chan int)
// Buffered channel - sender doesn't block until buffer is full
ch2 := make(chan int, 10)
go func() {
for i := 0; i < 5; i++ {
ch1 <- i // This will block until main reads
ch2 <- i // This won't block (buffer has space)
}
close(ch1)
close(ch2)
}()
time.Sleep(time.Millisecond * 100)
// Read from both channels
for i := 0; i < 5; i++ {
fmt.Printf("ch1: %d, ch2: %d\n", <-ch1, <-ch2)
}
}
Performance Considerations
GOMAXPROCS and Parallelism
package main
import (
"fmt"
"runtime"
"sync"
)
func main() {
// Get current GOMAXPROCS setting
fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
// Set GOMAXPROCS to use all available CPU cores
runtime.GOMAXPROCS(runtime.NumCPU())
fmt.Printf("CPU cores: %d\n", runtime.NumCPU())
// Benchmark parallel vs sequential processing
const size = 1000000
data := make([]int, size)
for i := range data {
data[i] = i
}
// Sequential processing
start := time.Now()
result1 := processSequentially(data)
sequentialTime := time.Since(start)
// Parallel processing
start = time.Now()
result2 := processInParallel(data)
parallelTime := time.Since(start)
fmt.Printf("Sequential time: %v\n", sequentialTime)
fmt.Printf("Parallel time: %v\n", parallelTime)
fmt.Printf("Speedup: %.2fx\n", float64(sequentialTime)/float64(parallelTime))
}
func processSequentially(data []int) int {
sum := 0
for _, v := range data {
sum += v
}
return sum
}
func processInParallel(data []int) int {
const numWorkers = runtime.NumCPU()
chunkSize := len(data) / numWorkers
var wg sync.WaitGroup
results := make(chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
wg.Add(1)
start := i * chunkSize
end := start + chunkSize
if i == numWorkers-1 {
end = len(data)
}
go func(start, end int) {
defer wg.Done()
sum := 0
for j := start; j < end; j++ {
sum += data[j]
}
results <- sum
}(start, end)
}
go func() {
wg.Wait()
close(results)
}()
total := 0
for sum := range results {
total += sum
}
return total
}
Conclusion
Go's concurrency model provides a powerful yet simple way to write concurrent and parallel programs. The combination of goroutines, channels, and the standard library's synchronization primitives makes it easy to build robust concurrent applications.
Key takeaways:
- Use goroutines for lightweight concurrent execution
- Communicate between goroutines using channels
- Leverage buffered channels for performance
- Use WaitGroup for synchronization
- Protect shared resources with mutexes
- Use context for cancellation and timeouts
- Design with the pipeline and worker pool patterns
- Always ensure goroutines can exit to prevent leaks
With these tools and patterns, you can build highly concurrent applications that efficiently utilize system resources while maintaining clean, readable code.