7 min read

Quản lý goroutine: errgroup, leak-proof, backpressure

Quản lý goroutine: errgroup, leak-proof, backpressure

Mở bài

Goroutine là món “shot espresso” của Go: nhẹ, rẻ, bật là chạy. Nhưng nếu rót liên tục mà không kiểm soát, bạn sẽ có một quầy pha chế quá tải, chảy tràn và khách chờ dài cổ. Bài này chia sẻ cách tôi quản lý goroutine ở những dịch vụ có tải cao: dùng errgroup để phối hợp và huỷ đồng loạt; thiết kế leak-proof để không rò rỉ; và áp dụng backpressure để bảo vệ tài nguyên. Ví dụ sẽ gắn với bối cảnh quán cà phê: order → pha → thanh toán → giao.

Tư duy nền tảng: ai sở hữu goroutine?

Mỗi goroutine cần một “chủ sở hữu” chịu trách nhiệm 3 việc: truyền context, cung cấp cách dừng (huỷ hoặc đóng channel), và thu dọn tài nguyên (defer, Wait). Khi bạn không trả lời được “ai sẽ dừng goroutine này khi không cần nữa?”, rò rỉ đang rình rập.

Quy ước an toàn:

  • Goroutine phải lắng nghe ctx.Done() trong các chỗ chờ I/O, chờ trên channel, hoặc vòng lặp dài.
  • Nếu goroutine phát dữ liệu qua channel, chủ sở hữu là bên tạo và đóng channel khi xong.
  • Mọi nhóm goroutine liên quan nên gắn chung một ctx để có thể huỷ đồng loạt.

errgroup: fan-out/fan-in có huỷ đồng loạt

Trong quán cà phê, khi có một order mới, ta tính tồn kho, phí ship và xác thực voucher song song. Nếu một bước lỗi hoặc quá hạn, ta dừng cả nhóm để tiết kiệm tài nguyên.

import (
    "context"
    "golang.org/x/sync/errgroup"
)

type Totals struct { InvOK bool; ShipFee int; Discount int }

func PrepareCheckout(ctx context.Context, cart Cart) (Totals, error) {
    g, ctx := errgroup.WithContext(ctx) // ctx mới sẽ bị huỷ khi có lỗi

    var invOK bool
    var shipFee int
    var discount int

    g.Go(func() error { ok, err := checkInventory(ctx, cart.Items); invOK = ok; return err })
    g.Go(func() error { fee, err := calcShipping(ctx, cart.Address); shipFee = fee; return err })
    g.Go(func() error { d, err := validateCoupon(ctx, cart.Coupon); discount = d; return err })

    if err := g.Wait(); err != nil { return Totals{}, err }
    return Totals{InvOK: invOK, ShipFee: shipFee, Discount: discount}, nil
}

Ưu điểm: nếu một nhánh DeadlineExceeded, ctx chung bị huỷ → các nhánh còn lại dừng sớm. Bạn tránh được việc “pha xong 3 ly” trong khi order đã huỷ.

Thu kết quả an toàn

Khi gán biến từ nhiều goroutine, đảm bảo mỗi biến chỉ được ghi bởi đúng một goroutine hoặc dùng sync.Mutex. Tránh viết/đọc tranh chấp.

Leak-proof: công thức không rò rỉ

Rò rỉ thường đến từ 3 nhóm lỗi: không nghe ctx.Done(), quên đóng channel, và spawn không giới hạn.

1) Luôn select trên ctx.Done()

func consumeOrders(ctx context.Context, ch <-chan Order) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case o, ok := <-ch:
            if !ok { return nil } // producer đóng channel
            if err := handle(o); err != nil { return err }
        }
    }
}

2) Đóng channel ở đúng nơi

Bên producer đóng channel khi gửi xong, tránh để consumer chờ mãi.

func produce(ctx context.Context) <-chan Order {
    out := make(chan Order)
    go func() {
        defer close(out)
        for _, o := range loadBatch() {
            select {
            case <-ctx.Done():
                return
            case out <- o:
            }
        }
    }()
    return out
}

3) Không spawn vô hạn: dùng bounded concurrency

Thay vì tạo một goroutine cho mỗi ly cà phê, hãy giới hạn số “barista” hoạt động.

Cách A: semaphore (kênh làm giấy phép)

func processBatch(ctx context.Context, jobs []Job, max int) error {
    g, ctx := errgroup.WithContext(ctx)
    sem := make(chan struct{}, max)

    for i := range jobs {
        j := jobs[i]
        g.Go(func() error {
            select {
            case sem <- struct{}{}: // xin slot
            case <-ctx.Done():
                return ctx.Err()
            }
            defer func() { <-sem }()
            return handleJob(ctx, j)
        })
    }
    return g.Wait()
}

Cách B: worker pool

func runWorkers(ctx context.Context, n int, jobs <-chan Job) error {
    g, ctx := errgroup.WithContext(ctx)
    for w := 0; w < n; w++ {
        g.Go(func() error {
            for {
                select {
                case <-ctx.Done():
                    return ctx.Err()
                case j, ok := <-jobs:
                    if !ok { return nil }
                    if err := handleJob(ctx, j); err != nil { return err }
                }
            }
        })
    }
    return g.Wait()
}

Worker pool phù hợp khi dòng việc liên tục; semaphore tiện cho mẻ công việc có sẵn.

Backpressure: giữ quán không quá tải

Backpressure là cách hệ thống “phản kháng” khi bị ép làm quá sức. Ba lớp bạn nên cân nhắc:

1) Giới hạn đồng thời (concurrency)

Dùng semaphore/worker như trên. Đặt số barista bằng số CPU, IO hoặc quota downstream. Khi một slot bận, yêu cầu mới phải chờ hoặc bị từ chối sớm.

2) Giới hạn tốc độ (rate limiting)

Sử dụng time.Ticker hoặc mô hình token bucket để giữ nhịp gọi downstream/API.

func throttleCalls(ctx context.Context, in <-chan Task, rpm int) <-chan Result {
    out := make(chan Result)
    interval := time.Minute / time.Duration(rpm)
    ticker := time.NewTicker(interval)

    go func() {
        defer close(out); defer ticker.Stop()
        for t := range in {
            select {
            case <-ctx.Done():
                return
            case <-ticker.C:
                out <- callDownstream(ctx, t)
            }
        }
    }()
    return out
}

3) Bộ đệm và chính sách khi đầy

Channel có buffer giúp cân bằng “nhịp pha”, nhưng khi đầy cần chính sách: chờ, rơi (drop), rơi có ưu tiên, hoặc từ chối sớm.

Ví dụ drop-newest khi hàng đợi quá tải:

func tryEnqueue(q chan<- Job, j Job) bool {
    select {
    case q <- j:
        return true
    default:
        // hàng đợi đầy: bỏ job mới
        return false
    }
}

Hoặc dùng hàng đợi có kích thước cố định và trả về lỗi 429/QueueFull ở lớp HTTP để người gọi retry có kiểm soát.

Pipeline nhiều tầng: từ order đến ly cà phê

Giả sử pipeline: đọc order → pha → thanh toán → ghi lịch giao. Mỗi tầng đều tôn trọng ctx và đóng channel khi xong.

func brewStage(ctx context.Context, in <-chan Order) <-chan Cup {
    out := make(chan Cup)
    go func() {
        defer close(out)
        for o := range in {
            select {
            case <-ctx.Done():
                return
            default:
                out <- brew(o)
            }
        }
    }()
    return out
}

func payStage(ctx context.Context, in <-chan Cup) <-chan Ticket {
    out := make(chan Ticket)
    g, ctx := errgroup.WithContext(ctx)
    // Bounded fan-out: 8 phiên thanh toán song song
    sem := make(chan struct{}, 8)

    go func() {
        // collector
        defer close(out)
        _ = g.Wait() // khi g xong, đóng out
    }()

    for c := range in {
        c := c
        g.Go(func() error {
            select {
            case sem <- struct{}{}:
            case <-ctx.Done():
                return ctx.Err()
            }
            defer func() { <-sem }()
            t, err := charge(ctx, c)
            if err != nil { return err }
            select {
            case out <- t:
                return nil
            case <-ctx.Done():
                return ctx.Err()
            }
        })
    }
    return out
}

Điểm mấu chốt: mỗi tầng chỉ chịu trách nhiệm đóng channel mình tạo; truyền ctx xuyên suốt; và không để goroutine treo khi upstream đã dừng.

Panic và cô lập lỗi

Trong worker, bạn có thể recover để tránh đổ cả nhóm, rồi trả lỗi cho errgroup.

func safeHandle(ctx context.Context, j Job) (err error) {
    defer func() {
        if r := recover(); r != nil {
            err = fmt.Errorf("panic: %v", r)
        }
    }()
    return handleJob(ctx, j)
}

Khi dùng errgroup, chỉ cần trả lỗi là đủ để phát tín hiệu dừng toàn bộ nhóm.

Quan sát: metric và log

  • Đếm in-flight goroutines hoặc jobs trong hàng để phát hiện bão tải.
  • Gắn request_id/order_id vào log, đo latency, timeout, canceled. Các con số này chỉ ra nơi cần thắt chặt backpressure.
  • Dùng pprof khi cần: số lượng goroutine tăng không giảm thường là dấu hiệu rò rỉ.

Anti-pattern thường gặp

  • Tạo goroutine trong vòng lặp mà không giới hạn, nghĩ rằng “goroutine rẻ” → OOM.
  • Không defer cancel() sau WithTimeout/WithCancel → rò rỉ timer và goroutine chờ.
  • Producer không đóng channel; consumer range mãi không kết thúc.
  • Quên select trên ctx.Done() khi gửi/nhận trên channel.
  • Lặp lại timeout nhiều tầng mà không có ngân sách thời gian tổng.

Checklist áp dụng ngay

  • Mọi goroutine và I/O đều nhận ctx và lắng nghe ctx.Done().
  • Nhóm tác vụ song song dùng errgroup.WithContext để huỷ đồng loạt.
  • Giới hạn đồng thời bằng semaphore/worker pool; đặt mức theo tài nguyên downstream.
  • Đặt hàng đợi có kích thước và chính sách khi đầy (chờ, drop, hoặc trả 429).
  • Đảm bảo bên tạo channel chịu trách nhiệm đóng channel.
  • defer cancel() ngay sau WithTimeout/WithCancel.
  • Thu gom kết quả an toàn; mỗi biến chỉ một goroutine ghi.
  • Gắn metric in-flight, queue depth, latency; theo dõi số goroutine với pprof.

Kết luận

Quản lý goroutine không chỉ là viết go func(){...}() mà là thiết kế vòng đời: sinh, làm việc, dừng, thu dọn. Dùng errgroup để điều phối và huỷ đồng loạt; thiết kế leak-proof bằng hợp đồng sở hữu rõ ràng và ctx; áp dụng backpressure để bảo vệ quán. Khi các nguyên tắc này thống nhất, hệ thống giữ nhịp ổn định như một quán cà phê giờ cao điểm: barista vừa đủ, hàng đợi vừa phải, ly nào cũng ra đúng thời điểm.