The Beauty of Go: Parallelism & Concurrency

Mastering Go Concurrency: Goroutines, Channels, and Beyond

Go's concurrency model is one of its most powerful features, built around the principle of "Don't communicate by sharing memory; share memory by communicating." This philosophy is embodied in goroutines and channels, which provide a clean, efficient way to handle concurrent operations.

Understanding Concurrency vs Parallelism

Before diving into the implementation, it's crucial to understand the difference between concurrency and parallelism:

Go excels at both, but they're not the same thing. A concurrent program can run on a single core by switching between tasks, while a parallel program requires multiple cores to execute tasks simultaneously.

Goroutines: Lightweight Threads

Goroutines are Go's way of handling concurrent execution. They're lightweight threads managed by the Go runtime, not operating system threads. You can create thousands of goroutines with minimal memory overhead.

Basic Goroutine Usage

package main

import (
    "fmt"
    "time"
)

func worker(id int) {
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    // Launch multiple goroutines
    for i := 1; i <= 5; i++ {
        go worker(i)
    }
    
    // Wait for goroutines to complete
    time.Sleep(2 * time.Second)
    fmt.Println("All workers completed")
}

Anonymous Goroutines

You can also launch anonymous functions as goroutines:

package main

import (
    "fmt"
    "time"
)

func main() {
    // Launch anonymous goroutine
    go func() {
        fmt.Println("Anonymous goroutine running")
        time.Sleep(time.Second)
        fmt.Println("Anonymous goroutine completed")
    }()
    
    fmt.Println("Main function continuing...")
    time.Sleep(2 * time.Second)
}

Channels: Communication Between Goroutines

Channels are Go's primary mechanism for communication between goroutines. They provide a safe way to share data and synchronize execution.

Basic Channel Operations

package main

import (
    "fmt"
    "time"
)

func sender(ch chan string) {
    messages := []string{"Hello", "World", "from", "Go"}
    for _, msg := range messages {
        ch <- msg // Send message to channel
        time.Sleep(time.Millisecond * 100)
    }
    close(ch) // Close channel when done
}

func receiver(ch chan string) {
    for msg := range ch { // Receive messages until channel is closed
        fmt.Printf("Received: %s\n", msg)
    }
}

func main() {
    ch := make(chan string)
    
    go sender(ch)
    go receiver(ch)
    
    time.Sleep(time.Second)
}

Buffered Channels

Buffered channels allow you to send multiple values without blocking until the buffer is full:

package main

import (
    "fmt"
    "time"
)

func main() {
    // Create buffered channel with capacity 3
    ch := make(chan int, 3)
    
    // Send values without blocking (buffer has space)
    ch <- 1
    ch <- 2
    ch <- 3
    
    // This would block if buffer was full
    // ch <- 4
    
    // Receive values
    fmt.Println(<-ch) // 1
    fmt.Println(<-ch) // 2
    fmt.Println(<-ch) // 3
}

Select Statement for Non-blocking Operations

The select statement allows you to handle multiple channels non-blockingly:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(time.Second)
        ch1 <- "Message from channel 1"
    }()
    
    go func() {
        time.Sleep(time.Millisecond * 500)
        ch2 <- "Message from channel 2"
    }()
    
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("Received from ch1:", msg1)
        case msg2 := <-ch2:
            fmt.Println("Received from ch2:", msg2)
        case <-time.After(time.Second * 2):
            fmt.Println("Timeout occurred")
        }
    }
}

Advanced Concurrency Patterns

Worker Pool Pattern

A common pattern is to create a pool of worker goroutines to process jobs:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        time.Sleep(time.Millisecond * 500) // Simulate work
        results <- job * 2 // Process result
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3
    
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    
    var wg sync.WaitGroup
    
    // Start workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }
    
    // Send jobs
    for i := 1; i <= numJobs; i++ {
        jobs <- i
    }
    close(jobs)
    
    // Wait for all workers to complete
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // Collect results
    for result := range results {
        fmt.Printf("Result: %d\n", result)
    }
}

Pipeline Pattern

Chains of goroutines can form processing pipelines:

package main

import (
    "fmt"
    "math"
)

func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func filter(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            if n%2 == 0 {
                out <- n
            }
        }
    }()
    return out
}

func main() {
    // Pipeline: generate -> square -> filter
    numbers := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squared := square(numbers)
    filtered := filter(squared)
    
    for result := range filtered {
        fmt.Printf("Filtered result: %d\n", result)
    }
}

Synchronization with WaitGroup and Mutex

WaitGroup for Goroutine Synchronization

package main

import (
    "fmt"
    "sync"
    "time"
)

func processTask(id int, wg *sync.WaitGroup) {
    defer wg.Done() // Decrement counter when done
    
    fmt.Printf("Starting task %d\n", id)
    time.Sleep(time.Millisecond * 500) // Simulate work
    fmt.Printf("Completed task %d\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    // Launch 5 goroutines
    for i := 1; i <= 5; i++ {
        wg.Add(1) // Increment counter
        go processTask(i, &wg)
    }
    
    // Wait for all goroutines to complete
    wg.Wait()
    fmt.Println("All tasks completed!")
}

Mutex for Shared Resource Protection

package main

import (
    "fmt"
    "sync"
    "time"
)

type SafeCounter struct {
    mu    sync.Mutex
    count int
}

func (c *SafeCounter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

func (c *SafeCounter) GetCount() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

func main() {
    counter := SafeCounter{}
    
    // Launch multiple goroutines that increment the counter
    for i := 0; i < 1000; i++ {
        go func() {
            counter.Increment()
        }()
    }
    
    time.Sleep(time.Second)
    fmt.Printf("Final count: %d\n", counter.GetCount())
}

Context for Cancellation and Timeouts

The context package provides a way to carry deadlines, cancellation signals, and other request-scoped values across API boundaries:

package main

import (
    "context"
    "fmt"
    "time"
)

func longRunningTask(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Task cancelled:", ctx.Err())
            return
        default:
            fmt.Println("Working...")
            time.Sleep(time.Millisecond * 200)
        }
    }
}

func main() {
    // Create context with timeout
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    
    go longRunningTask(ctx)
    
    // Wait for timeout
    <-ctx.Done()
    fmt.Println("Main function completed")
}

Best Practices and Common Pitfalls

Avoid Goroutine Leaks

Always ensure goroutines can exit:

package main

import (
    "fmt"
    "time"
)

func goodGoroutine(done chan bool) {
    defer func() {
        done <- true // Signal completion
    }()
    
    // Do work
    time.Sleep(time.Millisecond * 100)
    fmt.Println("Work completed")
}

func main() {
    done := make(chan bool)
    go goodGoroutine(done)
    
    // Wait for completion
    <-done
    fmt.Println("Goroutine finished")
}

Use Buffered Channels Appropriately

package main

import (
    "fmt"
    "time"
)

func main() {
    // Unbuffered channel - sender blocks until receiver is ready
    ch1 := make(chan int)
    
    // Buffered channel - sender doesn't block until buffer is full
    ch2 := make(chan int, 10)
    
    go func() {
        for i := 0; i < 5; i++ {
            ch1 <- i // This will block until main reads
            ch2 <- i // This won't block (buffer has space)
        }
        close(ch1)
        close(ch2)
    }()
    
    time.Sleep(time.Millisecond * 100)
    
    // Read from both channels
    for i := 0; i < 5; i++ {
        fmt.Printf("ch1: %d, ch2: %d\n", <-ch1, <-ch2)
    }
}

Performance Considerations

GOMAXPROCS and Parallelism

package main

import (
    "fmt"
    "runtime"
    "sync"
)

func main() {
    // Get current GOMAXPROCS setting
    fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
    
    // Set GOMAXPROCS to use all available CPU cores
    runtime.GOMAXPROCS(runtime.NumCPU())
    fmt.Printf("CPU cores: %d\n", runtime.NumCPU())
    
    // Benchmark parallel vs sequential processing
    const size = 1000000
    data := make([]int, size)
    for i := range data {
        data[i] = i
    }
    
    // Sequential processing
    start := time.Now()
    result1 := processSequentially(data)
    sequentialTime := time.Since(start)
    
    // Parallel processing
    start = time.Now()
    result2 := processInParallel(data)
    parallelTime := time.Since(start)
    
    fmt.Printf("Sequential time: %v\n", sequentialTime)
    fmt.Printf("Parallel time: %v\n", parallelTime)
    fmt.Printf("Speedup: %.2fx\n", float64(sequentialTime)/float64(parallelTime))
}

func processSequentially(data []int) int {
    sum := 0
    for _, v := range data {
        sum += v
    }
    return sum
}

func processInParallel(data []int) int {
    const numWorkers = runtime.NumCPU()
    chunkSize := len(data) / numWorkers
    
    var wg sync.WaitGroup
    results := make(chan int, numWorkers)
    
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        start := i * chunkSize
        end := start + chunkSize
        if i == numWorkers-1 {
            end = len(data)
        }
        
        go func(start, end int) {
            defer wg.Done()
            sum := 0
            for j := start; j < end; j++ {
                sum += data[j]
            }
            results <- sum
        }(start, end)
    }
    
    go func() {
        wg.Wait()
        close(results)
    }()
    
    total := 0
    for sum := range results {
        total += sum
    }
    
    return total
}

Conclusion

Go's concurrency model provides a powerful yet simple way to write concurrent and parallel programs. The combination of goroutines, channels, and the standard library's synchronization primitives makes it easy to build robust concurrent applications.

Key takeaways:

With these tools and patterns, you can build highly concurrent applications that efficiently utilize system resources while maintaining clean, readable code.