Quản lý goroutine: errgroup, leak-proof, backpressure

Mở bài
Goroutine là món “shot espresso” của Go: nhẹ, rẻ, bật là chạy. Nhưng nếu rót liên tục mà không kiểm soát, bạn sẽ có một quầy pha chế quá tải, chảy tràn và khách chờ dài cổ. Bài này chia sẻ cách tôi quản lý goroutine ở những dịch vụ có tải cao: dùng errgroup để phối hợp và huỷ đồng loạt; thiết kế leak-proof để không rò rỉ; và áp dụng backpressure để bảo vệ tài nguyên. Ví dụ sẽ gắn với bối cảnh quán cà phê: order → pha → thanh toán → giao.
Tư duy nền tảng: ai sở hữu goroutine?
Mỗi goroutine cần một “chủ sở hữu” chịu trách nhiệm 3 việc: truyền context
, cung cấp cách dừng (huỷ hoặc đóng channel), và thu dọn tài nguyên (defer
, Wait
). Khi bạn không trả lời được “ai sẽ dừng goroutine này khi không cần nữa?”, rò rỉ đang rình rập.
Quy ước an toàn:
- Goroutine phải lắng nghe
ctx.Done()
trong các chỗ chờ I/O, chờ trên channel, hoặc vòng lặp dài. - Nếu goroutine phát dữ liệu qua channel, chủ sở hữu là bên tạo và đóng channel khi xong.
- Mọi nhóm goroutine liên quan nên gắn chung một
ctx
để có thể huỷ đồng loạt.
errgroup: fan-out/fan-in có huỷ đồng loạt
Trong quán cà phê, khi có một order mới, ta tính tồn kho, phí ship và xác thực voucher song song. Nếu một bước lỗi hoặc quá hạn, ta dừng cả nhóm để tiết kiệm tài nguyên.
import (
"context"
"golang.org/x/sync/errgroup"
)
type Totals struct { InvOK bool; ShipFee int; Discount int }
func PrepareCheckout(ctx context.Context, cart Cart) (Totals, error) {
g, ctx := errgroup.WithContext(ctx) // ctx mới sẽ bị huỷ khi có lỗi
var invOK bool
var shipFee int
var discount int
g.Go(func() error { ok, err := checkInventory(ctx, cart.Items); invOK = ok; return err })
g.Go(func() error { fee, err := calcShipping(ctx, cart.Address); shipFee = fee; return err })
g.Go(func() error { d, err := validateCoupon(ctx, cart.Coupon); discount = d; return err })
if err := g.Wait(); err != nil { return Totals{}, err }
return Totals{InvOK: invOK, ShipFee: shipFee, Discount: discount}, nil
}
Ưu điểm: nếu một nhánh DeadlineExceeded
, ctx
chung bị huỷ → các nhánh còn lại dừng sớm. Bạn tránh được việc “pha xong 3 ly” trong khi order đã huỷ.
Thu kết quả an toàn
Khi gán biến từ nhiều goroutine, đảm bảo mỗi biến chỉ được ghi bởi đúng một goroutine hoặc dùng sync.Mutex
. Tránh viết/đọc tranh chấp.
Leak-proof: công thức không rò rỉ
Rò rỉ thường đến từ 3 nhóm lỗi: không nghe ctx.Done()
, quên đóng channel, và spawn không giới hạn.
1) Luôn select
trên ctx.Done()
func consumeOrders(ctx context.Context, ch <-chan Order) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case o, ok := <-ch:
if !ok { return nil } // producer đóng channel
if err := handle(o); err != nil { return err }
}
}
}
2) Đóng channel ở đúng nơi
Bên producer đóng channel khi gửi xong, tránh để consumer chờ mãi.
func produce(ctx context.Context) <-chan Order {
out := make(chan Order)
go func() {
defer close(out)
for _, o := range loadBatch() {
select {
case <-ctx.Done():
return
case out <- o:
}
}
}()
return out
}
3) Không spawn vô hạn: dùng bounded concurrency
Thay vì tạo một goroutine cho mỗi ly cà phê, hãy giới hạn số “barista” hoạt động.
Cách A: semaphore (kênh làm giấy phép)
func processBatch(ctx context.Context, jobs []Job, max int) error {
g, ctx := errgroup.WithContext(ctx)
sem := make(chan struct{}, max)
for i := range jobs {
j := jobs[i]
g.Go(func() error {
select {
case sem <- struct{}{}: // xin slot
case <-ctx.Done():
return ctx.Err()
}
defer func() { <-sem }()
return handleJob(ctx, j)
})
}
return g.Wait()
}
Cách B: worker pool
func runWorkers(ctx context.Context, n int, jobs <-chan Job) error {
g, ctx := errgroup.WithContext(ctx)
for w := 0; w < n; w++ {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case j, ok := <-jobs:
if !ok { return nil }
if err := handleJob(ctx, j); err != nil { return err }
}
}
})
}
return g.Wait()
}
Worker pool phù hợp khi dòng việc liên tục; semaphore tiện cho mẻ công việc có sẵn.
Backpressure: giữ quán không quá tải
Backpressure là cách hệ thống “phản kháng” khi bị ép làm quá sức. Ba lớp bạn nên cân nhắc:
1) Giới hạn đồng thời (concurrency)
Dùng semaphore/worker như trên. Đặt số barista bằng số CPU, IO hoặc quota downstream. Khi một slot bận, yêu cầu mới phải chờ hoặc bị từ chối sớm.
2) Giới hạn tốc độ (rate limiting)
Sử dụng time.Ticker
hoặc mô hình token bucket để giữ nhịp gọi downstream/API.
func throttleCalls(ctx context.Context, in <-chan Task, rpm int) <-chan Result {
out := make(chan Result)
interval := time.Minute / time.Duration(rpm)
ticker := time.NewTicker(interval)
go func() {
defer close(out); defer ticker.Stop()
for t := range in {
select {
case <-ctx.Done():
return
case <-ticker.C:
out <- callDownstream(ctx, t)
}
}
}()
return out
}
3) Bộ đệm và chính sách khi đầy
Channel có buffer giúp cân bằng “nhịp pha”, nhưng khi đầy cần chính sách: chờ, rơi (drop), rơi có ưu tiên, hoặc từ chối sớm.
Ví dụ drop-newest khi hàng đợi quá tải:
func tryEnqueue(q chan<- Job, j Job) bool {
select {
case q <- j:
return true
default:
// hàng đợi đầy: bỏ job mới
return false
}
}
Hoặc dùng hàng đợi có kích thước cố định và trả về lỗi 429/QueueFull
ở lớp HTTP để người gọi retry có kiểm soát.
Pipeline nhiều tầng: từ order đến ly cà phê
Giả sử pipeline: đọc order → pha → thanh toán → ghi lịch giao. Mỗi tầng đều tôn trọng ctx
và đóng channel khi xong.
func brewStage(ctx context.Context, in <-chan Order) <-chan Cup {
out := make(chan Cup)
go func() {
defer close(out)
for o := range in {
select {
case <-ctx.Done():
return
default:
out <- brew(o)
}
}
}()
return out
}
func payStage(ctx context.Context, in <-chan Cup) <-chan Ticket {
out := make(chan Ticket)
g, ctx := errgroup.WithContext(ctx)
// Bounded fan-out: 8 phiên thanh toán song song
sem := make(chan struct{}, 8)
go func() {
// collector
defer close(out)
_ = g.Wait() // khi g xong, đóng out
}()
for c := range in {
c := c
g.Go(func() error {
select {
case sem <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}
defer func() { <-sem }()
t, err := charge(ctx, c)
if err != nil { return err }
select {
case out <- t:
return nil
case <-ctx.Done():
return ctx.Err()
}
})
}
return out
}
Điểm mấu chốt: mỗi tầng chỉ chịu trách nhiệm đóng channel mình tạo; truyền ctx
xuyên suốt; và không để goroutine treo khi upstream đã dừng.
Panic và cô lập lỗi
Trong worker, bạn có thể recover
để tránh đổ cả nhóm, rồi trả lỗi cho errgroup
.
func safeHandle(ctx context.Context, j Job) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
}()
return handleJob(ctx, j)
}
Khi dùng errgroup
, chỉ cần trả lỗi là đủ để phát tín hiệu dừng toàn bộ nhóm.
Quan sát: metric và log
- Đếm in-flight goroutines hoặc jobs trong hàng để phát hiện bão tải.
- Gắn
request_id
/order_id
vào log, đolatency
,timeout
,canceled
. Các con số này chỉ ra nơi cần thắt chặt backpressure. - Dùng pprof khi cần: số lượng goroutine tăng không giảm thường là dấu hiệu rò rỉ.
Anti-pattern thường gặp
- Tạo goroutine trong vòng lặp mà không giới hạn, nghĩ rằng “goroutine rẻ” → OOM.
- Không
defer cancel()
sauWithTimeout/WithCancel
→ rò rỉ timer và goroutine chờ. - Producer không đóng channel; consumer
range
mãi không kết thúc. - Quên
select
trênctx.Done()
khi gửi/nhận trên channel. - Lặp lại timeout nhiều tầng mà không có ngân sách thời gian tổng.
Checklist áp dụng ngay
- Mọi goroutine và I/O đều nhận
ctx
và lắng nghectx.Done()
. - Nhóm tác vụ song song dùng
errgroup.WithContext
để huỷ đồng loạt. - Giới hạn đồng thời bằng semaphore/worker pool; đặt mức theo tài nguyên downstream.
- Đặt hàng đợi có kích thước và chính sách khi đầy (chờ, drop, hoặc trả 429).
- Đảm bảo bên tạo channel chịu trách nhiệm đóng channel.
defer cancel()
ngay sauWithTimeout/WithCancel
.- Thu gom kết quả an toàn; mỗi biến chỉ một goroutine ghi.
- Gắn metric in-flight, queue depth, latency; theo dõi số goroutine với pprof.
Kết luận
Quản lý goroutine không chỉ là viết go func(){...}()
mà là thiết kế vòng đời: sinh, làm việc, dừng, thu dọn. Dùng errgroup
để điều phối và huỷ đồng loạt; thiết kế leak-proof bằng hợp đồng sở hữu rõ ràng và ctx
; áp dụng backpressure để bảo vệ quán. Khi các nguyên tắc này thống nhất, hệ thống giữ nhịp ổn định như một quán cà phê giờ cao điểm: barista vừa đủ, hàng đợi vừa phải, ly nào cũng ra đúng thời điểm.