From 9afe57b32c620fc3d6b0b5865f577f9d0fb182d5 Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Sat, 15 Nov 2025 06:36:19 +0100 Subject: [PATCH] outlet/kafka: decrease if we have many scale down requests Don't wait for a majority. Just a third of them should be enough. We have a 10-factor between underloaded and overloaded. --- outlet/kafka/scaler.go | 4 ++-- outlet/kafka/scaler_test.go | 10 ++-------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/outlet/kafka/scaler.go b/outlet/kafka/scaler.go index 7c49f059..66d0dc5c 100644 --- a/outlet/kafka/scaler.go +++ b/outlet/kafka/scaler.go @@ -164,8 +164,8 @@ func runScaler(ctx context.Context, config scalerConfiguration) chan<- ScaleRequ } } - // Scale down if we have a majority of decrease requests. - if decreaseCount > steadyCount { + // Scale down if we have many decrease requests + if decreaseCount > steadyCount/2 { current := config.getWorkerCount() target := state.nextWorkerCount(ScaleDecrease, current, config.minWorkers, config.maxWorkers) if target < current { diff --git a/outlet/kafka/scaler_test.go b/outlet/kafka/scaler_test.go index 8fab390d..ed32ca0d 100644 --- a/outlet/kafka/scaler_test.go +++ b/outlet/kafka/scaler_test.go @@ -154,18 +154,12 @@ func TestScalerRateLimiter(t *testing.T) { time.Sleep(time.Second) ch <- ScaleDecrease } - // time=7m, no change (180 vs 120) - check([]int{8, 12, 11, 12, 13, 12, 11}) + // time=7m, decrease (180 vs 120) + check([]int{8, 12, 11, 12, 13, 12, 11, 10}) for range 30 { time.Sleep(time.Second) ch <- ScaleDecrease } - // time=7m30s, no change (150 vs 150) - check([]int{8, 12, 11, 12, 13, 12, 11}) - time.Sleep(time.Second) - ch <- ScaleDecrease - // OK, now more decrease than increase! - check([]int{8, 12, 11, 12, 13, 12, 11, 10}) // We should not account for steady requests for too long! time.Sleep(time.Minute)