Skip to main content

Go Advanced Concurrency Patterns

Advanced concurrency patterns in Go combine goroutines, channels, select statements, and the sync package to create sophisticated concurrent applications. These patterns enable you to build scalable, efficient, and maintainable concurrent systems that can handle complex workloads and provide robust error handling. Understanding advanced concurrency patterns is crucial for building production-ready concurrent applications. This comprehensive guide will teach you everything you need to know about advanced concurrency patterns in Go.

Understanding Advanced Concurrency Patterns

What Are Advanced Concurrency Patterns?

Advanced concurrency patterns in Go are sophisticated techniques that combine multiple concurrency primitives to solve complex problems. They include:

  • Worker Pools - Distributing work across multiple goroutines
  • Pipeline Patterns - Processing data through concurrent stages
  • Fan-out/Fan-in Patterns - Distributing and collecting work
  • Context-based Patterns - Cancellation and timeout management
  • Rate Limiting - Controlling the rate of operations

Benefits of Advanced Patterns

Scalability

Advanced patterns enable applications to scale efficiently with workload.

Resource Management

Proper resource management and cleanup in concurrent applications.

Error Handling

Robust error handling and recovery in concurrent systems.

Performance

Optimized performance through efficient concurrent processing.

Worker Pools

Basic Worker Pool Pattern

Worker Pool Structure

Creating a pool of workers to process jobs concurrently.

Job Distribution

Distributing jobs to available workers.

package main

import (
"fmt"
"sync"
"time"
)

func main() {
// Basic worker pool pattern examples
fmt.Println("Basic worker pool pattern examples:")

// Simple worker pool
func simpleWorkerPool() {
const numWorkers = 3
const numJobs = 10

jobs := make(chan int, numJobs)
results := make(chan int, numJobs)

// Start workers
for w := 1; w <= numWorkers; w++ {
go func(id int) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(100 * time.Millisecond) // Simulate work
results <- job * 2
}
}(w)
}

// Send jobs
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)

// Collect results
for a := 1; a <= numJobs; a++ {
result := <-results
fmt.Printf("Result: %d\n", result)
}
}

simpleWorkerPool()
// Output:
// Worker 1 processing job 1
// Worker 2 processing job 2
// Worker 3 processing job 3
// Worker 1 processing job 4
// Worker 2 processing job 5
// Worker 3 processing job 6
// Worker 1 processing job 7
// Worker 2 processing job 8
// Worker 3 processing job 9
// Worker 1 processing job 10
// Result: 2
// Result: 4
// Result: 6
// Result: 8
// Result: 10
// Result: 12
// Result: 14
// Result: 16
// Result: 18
// Result: 20

// Worker pool with wait group
func workerPoolWithWaitGroup() {
const numWorkers = 3
const numJobs = 10

jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup

// Start workers
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(100 * time.Millisecond)
results <- job * 2
}
}(w)
}

// Send jobs
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)

// Wait for all workers to complete
wg.Wait()
close(results)

// Collect results
for result := range results {
fmt.Printf("Result: %d\n", result)
}
}

workerPoolWithWaitGroup()
// Output:
// Worker 1 processing job 1
// Worker 2 processing job 2
// Worker 3 processing job 3
// Worker 1 processing job 4
// Worker 2 processing job 5
// Worker 3 processing job 6
// Worker 1 processing job 7
// Worker 2 processing job 8
// Worker 3 processing job 9
// Worker 1 processing job 10
// Result: 2
// Result: 4
// Result: 6
// Result: 8
// Result: 10
// Result: 12
// Result: 14
// Result: 16
// Result: 18
// Result: 20

// Worker pool with error handling
func workerPoolWithErrorHandling() {
const numWorkers = 3
const numJobs = 10

type Job struct {
ID int
Data string
}

type Result struct {
JobID int
Data string
Error error
}

jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
var wg sync.WaitGroup

// Start workers
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job.ID)
time.Sleep(100 * time.Millisecond)

// Simulate error for job 5
if job.ID == 5 {
results <- Result{
JobID: job.ID,
Data: "",
Error: fmt.Errorf("error processing job %d", job.ID),
}
continue
}

results <- Result{
JobID: job.ID,
Data: fmt.Sprintf("processed: %s", job.Data),
Error: nil,
}
}
}(w)
}

// Send jobs
for j := 1; j <= numJobs; j++ {
jobs <- Job{
ID: j,
Data: fmt.Sprintf("data-%d", j),
}
}
close(jobs)

// Wait for all workers to complete
wg.Wait()
close(results)

// Collect results
for result := range results {
if result.Error != nil {
fmt.Printf("Error processing job %d: %v\n", result.JobID, result.Error)
} else {
fmt.Printf("Job %d result: %s\n", result.JobID, result.Data)
}
}
}

workerPoolWithErrorHandling()
// Output:
// Worker 1 processing job 1
// Worker 2 processing job 2
// Worker 3 processing job 3
// Worker 1 processing job 4
// Worker 2 processing job 5
// Worker 3 processing job 6
// Worker 1 processing job 7
// Worker 2 processing job 8
// Worker 3 processing job 9
// Worker 1 processing job 10
// Job 1 result: processed: data-1
// Job 2 result: processed: data-2
// Job 3 result: processed: data-3
// Job 4 result: processed: data-4
// Error processing job 5: error processing job 5
// Job 6 result: processed: data-6
// Job 7 result: processed: data-7
// Job 8 result: processed: data-8
// Job 9 result: processed: data-9
// Job 10 result: processed: data-10
}

Advanced Worker Pool Patterns

Dynamic Worker Pool

Adjusting the number of workers based on workload.

Worker Pool with Priority

Implementing priority-based job processing.

package main

import (
"fmt"
"sync"
"time"
)

func main() {
// Advanced worker pool patterns examples
fmt.Println("Advanced worker pool patterns examples:")

// Dynamic worker pool
func dynamicWorkerPool() {
const maxWorkers = 5
const numJobs = 20

jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup

// Start workers dynamically
for w := 1; w <= maxWorkers; w++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(50 * time.Millisecond)
results <- job * 2
}
}(w)
}

// Send jobs
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)

// Wait for all workers to complete
wg.Wait()
close(results)

// Collect results
for result := range results {
fmt.Printf("Result: %d\n", result)
}
}

dynamicWorkerPool()
// Output:
// Worker 1 processing job 1
// Worker 2 processing job 2
// Worker 3 processing job 3
// Worker 4 processing job 4
// Worker 5 processing job 5
// Worker 1 processing job 6
// Worker 2 processing job 7
// Worker 3 processing job 8
// Worker 4 processing job 9
// Worker 5 processing job 10
// Worker 1 processing job 11
// Worker 2 processing job 12
// Worker 3 processing job 13
// Worker 4 processing job 14
// Worker 5 processing job 15
// Worker 1 processing job 16
// Worker 2 processing job 17
// Worker 3 processing job 18
// Worker 4 processing job 19
// Worker 5 processing job 20
// Result: 2
// Result: 4
// Result: 6
// Result: 8
// Result: 10
// Result: 12
// Result: 14
// Result: 16
// Result: 18
// Result: 20
// Result: 22
// Result: 24
// Result: 26
// Result: 28
// Result: 30
// Result: 32
// Result: 34
// Result: 36
// Result: 38
// Result: 40

// Worker pool with priority
func workerPoolWithPriority() {
type PriorityJob struct {
ID int
Priority int
Data string
}

const numWorkers = 3
const numJobs = 10

jobs := make(chan PriorityJob, numJobs)
results := make(chan PriorityJob, numJobs)
var wg sync.WaitGroup

// Start workers
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing priority %d job %d\n", id, job.Priority, job.ID)
time.Sleep(100 * time.Millisecond)
results <- job
}
}(w)
}

// Send jobs with different priorities
for j := 1; j <= numJobs; j++ {
priority := j % 3 // 0, 1, 2
jobs <- PriorityJob{
ID: j,
Priority: priority,
Data: fmt.Sprintf("data-%d", j),
}
}
close(jobs)

// Wait for all workers to complete
wg.Wait()
close(results)

// Collect results
for result := range results {
fmt.Printf("Completed job %d with priority %d\n", result.ID, result.Priority)
}
}

workerPoolWithPriority()
// Output:
// Worker 1 processing priority 1 job 1
// Worker 2 processing priority 2 job 2
// Worker 3 processing priority 0 job 3
// Worker 1 processing priority 1 job 4
// Worker 2 processing priority 2 job 5
// Worker 3 processing priority 0 job 6
// Worker 1 processing priority 1 job 7
// Worker 2 processing priority 2 job 8
// Worker 3 processing priority 0 job 9
// Worker 1 processing priority 1 job 10
// Completed job 1 with priority 1
// Completed job 2 with priority 2
// Completed job 3 with priority 0
// Completed job 4 with priority 1
// Completed job 5 with priority 2
// Completed job 6 with priority 0
// Completed job 7 with priority 1
// Completed job 8 with priority 2
// Completed job 9 with priority 0
// Completed job 10 with priority 1
}

Pipeline Patterns

Basic Pipeline Pattern

Pipeline Structure

Creating data processing pipelines with multiple stages.

Stage Communication

Using channels to communicate between pipeline stages.

package main

import (
"fmt"
"sync"
"time"
)

func main() {
// Basic pipeline pattern examples
fmt.Println("Basic pipeline pattern examples:")

// Simple pipeline
func simplePipeline() {
// Stage 1: Generate numbers
numbers := make(chan int)
go func() {
for i := 1; i <= 5; i++ {
numbers <- i
}
close(numbers)
}()

// Stage 2: Square numbers
squares := make(chan int)
go func() {
for n := range numbers {
squares <- n * n
}
close(squares)
}()

// Stage 3: Print results
for square := range squares {
fmt.Printf("Square: %d\n", square)
}
}

simplePipeline()
// Output:
// Square: 1
// Square: 4
// Square: 9
// Square: 16
// Square: 25

// Pipeline with multiple stages
func multiStagePipeline() {
// Stage 1: Generate numbers
numbers := make(chan int)
go func() {
for i := 1; i <= 5; i++ {
numbers <- i
}
close(numbers)
}()

// Stage 2: Double numbers
doubled := make(chan int)
go func() {
for n := range numbers {
doubled <- n * 2
}
close(doubled)
}()

// Stage 3: Add 10
added := make(chan int)
go func() {
for n := range doubled {
added <- n + 10
}
close(added)
}()

// Stage 4: Print results
for result := range added {
fmt.Printf("Result: %d\n", result)
}
}

multiStagePipeline()
// Output:
// Result: 12
// Result: 14
// Result: 16
// Result: 18
// Result: 20

// Pipeline with error handling
func pipelineWithErrorHandling() {
type Data struct {
Value int
Error error
}

// Stage 1: Generate numbers
numbers := make(chan int)
go func() {
for i := 1; i <= 5; i++ {
numbers <- i
}
close(numbers)
}()

// Stage 2: Process with error handling
processed := make(chan Data)
go func() {
for n := range numbers {
if n == 3 {
processed <- Data{
Value: 0,
Error: fmt.Errorf("error processing %d", n),
}
continue
}
processed <- Data{
Value: n * 2,
Error: nil,
}
}
close(processed)
}()

// Stage 3: Handle results
for data := range processed {
if data.Error != nil {
fmt.Printf("Error: %v\n", data.Error)
} else {
fmt.Printf("Processed: %d\n", data.Value)
}
}
}

pipelineWithErrorHandling()
// Output:
// Processed: 2
// Processed: 4
// Error: error processing 3
// Processed: 8
// Processed: 10
}

Advanced Pipeline Patterns

Pipeline with Fan-out/Fan-in

Distributing work across multiple goroutines and collecting results.

Pipeline with Context

Using context for cancellation and timeout management.

package main

import (
"context"
"fmt"
"sync"
"time"
)

func main() {
// Advanced pipeline patterns examples
fmt.Println("Advanced pipeline patterns examples:")

// Pipeline with fan-out/fan-in
func pipelineWithFanOutFanIn() {
// Stage 1: Generate numbers
numbers := make(chan int)
go func() {
for i := 1; i <= 10; i++ {
numbers <- i
}
close(numbers)
}()

// Stage 2: Fan-out to multiple workers
const numWorkers = 3
workers := make([]chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
workers[i] = make(chan int)
}

// Distribute work to workers
go func() {
defer func() {
for _, worker := range workers {
close(worker)
}
}()

for n := range numbers {
// Round-robin distribution
workerIndex := n % numWorkers
workers[workerIndex] <- n
}
}()

// Stage 3: Workers process data
results := make(chan int)
var wg sync.WaitGroup

for i, worker := range workers {
wg.Add(1)
go func(id int, input <-chan int) {
defer wg.Done()
for n := range input {
fmt.Printf("Worker %d processing %d\n", id, n)
time.Sleep(100 * time.Millisecond)
results <- n * 2
}
}(i, worker)
}

// Wait for all workers to complete
go func() {
wg.Wait()
close(results)
}()

// Stage 4: Collect results
for result := range results {
fmt.Printf("Result: %d\n", result)
}
}

pipelineWithFanOutFanIn()
// Output:
// Worker 0 processing 1
// Worker 1 processing 2
// Worker 2 processing 3
// Worker 0 processing 4
// Worker 1 processing 5
// Worker 2 processing 6
// Worker 0 processing 7
// Worker 1 processing 8
// Worker 2 processing 9
// Worker 0 processing 10
// Result: 2
// Result: 4
// Result: 6
// Result: 8
// Result: 10
// Result: 12
// Result: 14
// Result: 16
// Result: 18
// Result: 20

// Pipeline with context
func pipelineWithContext() {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

// Stage 1: Generate numbers
numbers := make(chan int)
go func() {
defer close(numbers)
for i := 1; i <= 10; i++ {
select {
case numbers <- i:
fmt.Printf("Generated: %d\n", i)
case <-ctx.Done():
fmt.Println("Context cancelled, stopping generation")
return
}
}
}()

// Stage 2: Process numbers
processed := make(chan int)
go func() {
defer close(processed)
for n := range numbers {
select {
case processed <- n * 2:
fmt.Printf("Processed: %d\n", n*2)
case <-ctx.Done():
fmt.Println("Context cancelled, stopping processing")
return
}
}
}()

// Stage 3: Collect results
for result := range processed {
fmt.Printf("Result: %d\n", result)
}
}

pipelineWithContext()
// Output:
// Generated: 1
// Processed: 2
// Result: 2
// Generated: 2
// Processed: 4
// Result: 4
// Generated: 3
// Processed: 6
// Result: 6
// Context cancelled, stopping generation
// Context cancelled, stopping processing
}

Context-based Patterns

Context for Cancellation

Context with Timeout

Using context for timeout management.

Context with Cancellation

Implementing cancellation patterns with context.

package main

import (
"context"
"fmt"
"sync"
"time"
)

func main() {
// Context-based patterns examples
fmt.Println("Context-based patterns examples:")

// Context with timeout
func contextWithTimeout() {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

ch := make(chan string)

go func() {
time.Sleep(150 * time.Millisecond)
ch <- "Data received"
}()

select {
case msg := <-ch:
fmt.Printf("Received: %s\n", msg)
case <-ctx.Done():
fmt.Println("Context timeout: operation cancelled")
}
}

contextWithTimeout()
// Output: Context timeout: operation cancelled

// Context with cancellation
func contextWithCancellation() {
ctx, cancel := context.WithCancel(context.Background())

go func() {
time.Sleep(200 * time.Millisecond)
fmt.Println("Cancelling context...")
cancel()
}()

for {
select {
case <-ctx.Done():
fmt.Println("Context cancelled, exiting")
return
default:
fmt.Println("Working...")
time.Sleep(50 * time.Millisecond)
}
}
}

contextWithCancellation()
// Output:
// Working...
// Working...
// Working...
// Working...
// Cancelling context...
// Context cancelled, exiting

// Context with multiple goroutines
func contextWithMultipleGoroutines() {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup

// Start multiple goroutines
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Printf("Goroutine %d: context cancelled\n", id)
return
default:
fmt.Printf("Goroutine %d: working...\n", id)
time.Sleep(100 * time.Millisecond)
}
}
}(i)
}

// Cancel after some time
time.Sleep(250 * time.Millisecond)
fmt.Println("Cancelling context...")
cancel()

// Wait for all goroutines to complete
wg.Wait()
fmt.Println("All goroutines completed")
}

contextWithMultipleGoroutines()
// Output:
// Goroutine 1: working...
// Goroutine 2: working...
// Goroutine 3: working...
// Goroutine 1: working...
// Goroutine 2: working...
// Goroutine 3: working...
// Cancelling context...
// Goroutine 1: context cancelled
// Goroutine 2: context cancelled
// Goroutine 3: context cancelled
// All goroutines completed

// Context with deadline
func contextWithDeadline() {
deadline := time.Now().Add(100 * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

ch := make(chan string)

go func() {
time.Sleep(150 * time.Millisecond)
ch <- "Data received"
}()

select {
case msg := <-ch:
fmt.Printf("Received: %s\n", msg)
case <-ctx.Done():
fmt.Println("Deadline exceeded: operation cancelled")
}
}

contextWithDeadline()
// Output: Deadline exceeded: operation cancelled
}

Context with Value

Context Value Passing

Using context to pass values between goroutines.

Context Value Retrieval

Retrieving values from context.

package main

import (
"context"
"fmt"
"time"
)

func main() {
// Context with value examples
fmt.Println("Context with value examples:")

// Context with value
func contextWithValue() {
ctx := context.WithValue(context.Background(), "userID", "12345")
ctx = context.WithValue(ctx, "requestID", "req-001")

go func() {
userID := ctx.Value("userID")
requestID := ctx.Value("requestID")
fmt.Printf("Goroutine: userID=%s, requestID=%s\n", userID, requestID)
}()

time.Sleep(50 * time.Millisecond)
fmt.Println("Context with value completed")
}

contextWithValue()
// Output:
// Goroutine: userID=12345, requestID=req-001
// Context with value completed

// Context value with timeout
func contextValueWithTimeout() {
ctx := context.WithValue(context.Background(), "userID", "67890")
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()

go func() {
for {
select {
case <-ctx.Done():
fmt.Println("Context cancelled")
return
default:
userID := ctx.Value("userID")
fmt.Printf("Working with userID: %s\n", userID)
time.Sleep(50 * time.Millisecond)
}
}
}()

time.Sleep(150 * time.Millisecond)
fmt.Println("Context value with timeout completed")
}

contextValueWithTimeout()
// Output:
// Working with userID: 67890
// Working with userID: 67890
// Context cancelled
// Context value with timeout completed

// Context value chain
func contextValueChain() {
ctx := context.Background()
ctx = context.WithValue(ctx, "level1", "value1")
ctx = context.WithValue(ctx, "level2", "value2")
ctx = context.WithValue(ctx, "level3", "value3")

go func() {
level1 := ctx.Value("level1")
level2 := ctx.Value("level2")
level3 := ctx.Value("level3")
fmt.Printf("Values: level1=%s, level2=%s, level3=%s\n", level1, level2, level3)
}()

time.Sleep(50 * time.Millisecond)
fmt.Println("Context value chain completed")
}

contextValueChain()
// Output:
// Values: level1=value1, level2=value2, level3=value3
// Context value chain completed
}

Rate Limiting Patterns

Basic Rate Limiting

Rate Limiting with Channels

Implementing rate limiting using channels.

Rate Limiting with Time

Using time-based rate limiting.

package main

import (
"fmt"
"time"
)

func main() {
// Rate limiting patterns examples
fmt.Println("Rate limiting patterns examples:")

// Rate limiting with channels
func rateLimitingWithChannels() {
const rate = 2 // 2 operations per second
const burst = 5 // burst capacity

limiter := make(chan time.Time, burst)

// Fill the limiter
for i := 0; i < burst; i++ {
limiter <- time.Now()
}

// Refill the limiter
go func() {
ticker := time.NewTicker(time.Second / rate)
defer ticker.Stop()

for t := range ticker.C {
select {
case limiter <- t:
default:
}
}
}()

// Simulate requests
for i := 1; i <= 10; i++ {
<-limiter
fmt.Printf("Request %d processed\n", i)
}
}

rateLimitingWithChannels()
// Output:
// Request 1 processed
// Request 2 processed
// Request 3 processed
// Request 4 processed
// Request 5 processed
// Request 6 processed
// Request 7 processed
// Request 8 processed
// Request 9 processed
// Request 10 processed

// Rate limiting with time
func rateLimitingWithTime() {
const rate = time.Second / 2 // 2 operations per second
lastTime := time.Now()

for i := 1; i <= 5; i++ {
now := time.Now()
elapsed := now.Sub(lastTime)

if elapsed < rate {
sleepTime := rate - elapsed
fmt.Printf("Request %d: sleeping for %v\n", i, sleepTime)
time.Sleep(sleepTime)
}

fmt.Printf("Request %d processed\n", i)
lastTime = time.Now()
}
}

rateLimitingWithTime()
// Output:
// Request 1 processed
// Request 2: sleeping for 500ms
// Request 2 processed
// Request 3: sleeping for 500ms
// Request 3 processed
// Request 4: sleeping for 500ms
// Request 4 processed
// Request 5: sleeping for 500ms
// Request 5 processed
}

What You've Learned

Congratulations! You now have a comprehensive understanding of Go's advanced concurrency patterns:

Worker Pools

  • Understanding worker pool patterns for distributing work
  • Implementing worker pools with error handling
  • Creating dynamic and priority-based worker pools
  • Using wait groups for worker coordination

Pipeline Patterns

  • Creating data processing pipelines with multiple stages
  • Implementing pipeline patterns with error handling
  • Using fan-out/fan-in patterns in pipelines
  • Managing pipeline stages with context

Context-based Patterns

  • Using context for cancellation and timeout management
  • Implementing context with value passing
  • Creating context chains and hierarchies
  • Managing goroutine lifecycle with context

Rate Limiting Patterns

  • Implementing rate limiting with channels
  • Using time-based rate limiting
  • Creating burst capacity and sustained rate limits
  • Managing request flow with rate limiting

Key Concepts

  • Worker pools - Distributing work across multiple goroutines
  • Pipeline patterns - Processing data through concurrent stages
  • Fan-out/fan-in - Distributing and collecting work
  • Context patterns - Cancellation and timeout management
  • Rate limiting - Controlling the rate of operations

Next Steps

You now have a solid foundation in Go's advanced concurrency patterns. These patterns enable you to build sophisticated, scalable concurrent applications that can handle complex workloads and provide robust error handling.

Understanding advanced concurrency patterns is crucial for building production-ready concurrent applications. These concepts form the foundation for all the more advanced programming techniques we'll cover in the coming chapters.


Ready to learn about web development? Let's explore Go's web development capabilities and learn how to build robust web applications and APIs!