From 0028f2b524abbffb1bd040167c1bfd355a999db0 Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Thu, 13 Nov 2025 22:12:07 +0100 Subject: [PATCH] outlet/kafka: continue draining signals as long as scaling in progress This does not seem quite important, but this matches the expected behavior. Finishing the draining goroutine early would not return earlier. --- outlet/kafka/scaler.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/outlet/kafka/scaler.go b/outlet/kafka/scaler.go index 26e55988..b9773074 100644 --- a/outlet/kafka/scaler.go +++ b/outlet/kafka/scaler.go @@ -69,18 +69,16 @@ func (s *scalerState) nextWorkerCount(request ScaleRequest, currentWorkers, minW // scaleWhileDraining runs a scaling function while draining incoming signals // from the channel. It spawns two goroutines: one to discard signals and one to // run the scaling function. -func scaleWhileDraining(ctx context.Context, ch <-chan ScaleRequest, scaleFn func()) { +func scaleWhileDraining(ch <-chan ScaleRequest, scaleFn func()) { var wg sync.WaitGroup done := make(chan struct{}) wg.Go(func() { for { select { - case <-ctx.Done(): - return case <-done: return case <-ch: - // Discard signal + // Discard requests } } }) @@ -120,7 +118,7 @@ func runScaler(ctx context.Context, config scalerConfiguration) chan<- ScaleRequ current := config.getWorkerCount() target := state.nextWorkerCount(ScaleIncrease, current, config.minWorkers, config.maxWorkers) if target > current { - scaleWhileDraining(ctx, ch, func() { + scaleWhileDraining(ch, func() { config.increaseWorkers(current, target) }) } @@ -166,7 +164,7 @@ func runScaler(ctx context.Context, config scalerConfiguration) chan<- ScaleRequ current := config.getWorkerCount() target := state.nextWorkerCount(ScaleDecrease, current, config.minWorkers, config.maxWorkers) if target < current { - scaleWhileDraining(ctx, ch, func() { + scaleWhileDraining(ch, func() { config.decreaseWorkers(current, target) }) }