Contents

Quản lý goroutine: errgroup, leak-proof, backpressure

Mở đầu

Goroutine là món “shot espresso” của Go: nhẹ, rẻ, bật là chạy. Mỗi goroutine chỉ tốn ~2KB stack ban đầu — so với ~1MB của OS thread, bạn có thể chạy hàng trăm ngàn goroutine trong một tiến trình. Nhưng nếu rót liên tục mà không kiểm soát, bạn sẽ có một quầy pha chế quá tải, chảy tràn và khách chờ dài cổ.

Bài này chia sẻ cách tôi quản lý goroutine ở những dịch vụ có tải cao: dùng errgroup để phối hợp và huỷ đồng loạt; thiết kế leak-proof để không rò rỉ; và áp dụng backpressure để bảo vệ tài nguyên. Không lý thuyết suông — mỗi pattern đều có nguyên nhân từ kinh nghiệm thực tế.

https://blog-bucket.luandnh.com/images/covers/manage-routine.jpg

Vấn đề: goroutine leak

Goroutine leak xảy ra khi goroutine không bao giờ kết thúc. Nguyên nhân phổ biến nhất là channel không được đọc hoặc context không được cancel. Tưởng tượng: bạn tuyển thêm barista nhưng không bao giờ cho nghỉ — cuối ngày, quầy bar đầy người làm việc không ai tiếp khách.

// ❌ Sai: goroutine leak
go func() {
    for item := range ch {
        process(item)
    }
}()
// Nếu ch không bao giờ close, goroutine chạy mãi mãi
// ✅ Đúng: dùng context để dừng
go func() {
    for {
        select {
        case <-ctx.Done():
            return
        case item, ok := <-ch:
            if !ok {
                return
            }
            process(item)
        }
    }
}()

Cách phát hiện leak: Dùng pprof — nếu số goroutine tăng đều đặn mà không giảm, bạn có leak.

go tool pprof http://localhost:6060/debug/pprof/goroutine

errgroup: phối hợp và huỷ đồng loạt

Khi cần chạy nhiều tác vụ song song và muốn huỷ tất cả nếu một tác vụ thất bại — errgroup là công cụ đúng.

import "golang.org/x/sync/errgroup"

func fetchDashboard(ctx context.Context, userID string) (*Dashboard, error) {
    g, ctx := errgroup.WithContext(ctx)

    var users *UserList
    var orders *OrderList
    var stats *Stats

    g.Go(func() error {
        var err error
        users, err = fetchUsers(ctx, userID)
        return err
    })

    g.Go(func() error {
        var err error
        orders, err = fetchOrders(ctx, userID)
        return err
    })

    g.Go(func() error {
        var err error
        stats, err = fetchStats(ctx, userID)
        return err
    })

    if err := g.Wait(); err != nil {
        return nil, err
    }

    return &Dashboard{users, orders, stats}, nil
}

Khi một goroutine trả về lỗi, context của tất cả goroutine còn lại bị cancel. Không cần tự viết sync.WaitGroup + context.Cancel như trước nữa.

Worker pool: giới hạn số barista

Không phải lúc nào cũng nên tạo goroutine mới cho mọi tác vụ. Khi tải cao, worker pool giúp giới hạn số goroutine đang chạy đồng thời — giống như giới hạn số quầy bar trong quán.

func processBatch(ctx context.Context, jobs []Job, numWorkers int) []Result {
    ch := make(chan Job, len(jobs))
    results := make(chan Result, len(jobs))
    var wg sync.WaitGroup

    // Start workers
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case job, ok := <-ch:
                    if !ok {
                        return
                    }
                    results <- process(job)
                }
            }
        }(i)
    }

    // Send jobs
    go func() {
        for _, job := range jobs {
            select {
            case <-ctx.Done():
                close(ch)
                return
            case ch <- job:
            }
        }
        close(ch)
    }()

    wg.Wait()
    close(results)

    var output []Result
    for r := range results {
        output = append(output, r)
    }
    return output
}

Backpressure: khi quán quá đông

Backpressure là kỹ thuật giúp hệ thống tự giảm tốc khi quá tải — thay vì nhận hết tất cả request rồi crash.

Buffered channel tự động backpressure

// Channel buffer = 100 → tối đa 100 job chờ
ch := make(chan Job, 100)

// Nếu channel đầy, sender sẽ block
// → tự động giảm tốc khi worker chậm
select {
case ch <- job:
    // sent successfully
case <-time.After(100 * time.Millisecond):
    // quá tải — drop hoặc retry
    log.Println("Queue full, dropping job")
}

Rate limiting

Dùng token bucket để giới hạn tốc độ xử lý:

import "golang.org/x/time/rate"

limiter := rate.NewLimiter(100, 200) // 100 req/s, burst 200

func handleRequest(w http.ResponseWriter, r *http.Request) {
    if err := limiter.Wait(r.Context()); err != nil {
        http.Error(w, "Too Many Requests", 429)
        return
    }
    // Process request...
}

Ảnh hưởng tới observability

Khi chạy nhiều goroutine, việc trace và debug trở nên khó khăn hơn. Với context, bạn nên luôn truyền trace ID xuyên suốt:

// Extract trace từ incoming request
span, ctx := opentracing.StartSpanFromContext(ctx, "HandleRequest")
defer span.Finish()

// Truyền ctx xuống — trace ID tự động theo
err := s.service.Process(ctx, data)

Checklist quản lý goroutine

  • Luôn có cách dừng goroutine (context cancel, close channel)
  • Dùng errgroup khi cần cancel đồng loạt
  • Giới hạn số goroutine chạy đồng thời (worker pool)
  • Áp dụng backpressure khi tải cao
  • Truyền trace ID qua context
  • Monitor số goroutine với pprof

“Goroutine rẻ không có nghĩa là không tốn chi phí. Mỗi goroutine leak là một barista làm việc không ai tiếp khách.”


Bạn đã từng gặp goroutine leak trong dự án thực tế chưa? Chia sẻ kinh nghiệm bên dưới nhé! 🚀