diff --git a/outlet/kafka/scaler.go b/outlet/kafka/scaler.go index a3f5630e..26e55988 100644 --- a/outlet/kafka/scaler.go +++ b/outlet/kafka/scaler.go @@ -91,13 +91,19 @@ func scaleWhileDraining(ctx context.Context, ch <-chan ScaleRequest, scaleFn fun wg.Wait() } -// runScaler starts the automatic scaling loop +// requestRecord tracks a scale request with its timestamp. +type requestRecord struct { + request ScaleRequest + time time.Time +} + +// runScaler starts the automatic scaling loop. func runScaler(ctx context.Context, config scalerConfiguration) chan<- ScaleRequest { ch := make(chan ScaleRequest, config.maxWorkers) go func() { state := new(scalerState) var last time.Time - var decreaseCount int + var requestHistory []requestRecord for { select { case <-ctx.Done(): @@ -108,7 +114,8 @@ func runScaler(ctx context.Context, config scalerConfiguration) chan<- ScaleRequ if last.Add(config.increaseRateLimit).After(now) { continue } - // Between increaseRateLimit and decreaseRateLimit, we accept increase requests. + // Between increaseRateLimit and decreaseRateLimit, we accept + // increase requests. if request == ScaleIncrease { current := config.getWorkerCount() target := state.nextWorkerCount(ScaleIncrease, current, config.minWorkers, config.maxWorkers) @@ -118,34 +125,54 @@ func runScaler(ctx context.Context, config scalerConfiguration) chan<- ScaleRequ }) } last = time.Now() - decreaseCount = 0 + requestHistory = requestHistory[:0] continue } - // We also count steady requests. - if request == ScaleSteady { - decreaseCount-- - } - // But we ignore everything else. + // Between increaseRateLimit and decreaseRateLimit, we also + // count steady requests to give them a head start. if last.Add(config.decreaseRateLimit).After(now) { + if request == ScaleSteady { + requestHistory = append(requestHistory, requestRecord{request, now}) + } continue } - // Past decreaseRateLimit, we count decrease requests and - // request 10 of them if not cancelled by steady requests (they - // have a head start). - if request == ScaleDecrease { - decreaseCount++ - if decreaseCount >= 10 { - current := config.getWorkerCount() - target := state.nextWorkerCount(ScaleDecrease, current, config.minWorkers, config.maxWorkers) - if target < current { - scaleWhileDraining(ctx, ch, func() { - config.decreaseWorkers(current, target) - }) - } - last = time.Now() - decreaseCount = 0 + // Past decreaseRateLimit, we track all requests. + requestHistory = append(requestHistory, requestRecord{request, now}) + + // Remove old requests to prevent unbounded growth. We only + // consider requests from the last decreaseRateLimit duration to + // avoid accumulating requests over many hours. + windowStart := now.Add(-config.decreaseRateLimit) + i := 0 + for i < len(requestHistory)-1 && requestHistory[i].time.Before(windowStart) { + i++ + } + requestHistory = requestHistory[i:] + + // Count decrease vs steady requests in the window. + var decreaseCount int + var steadyCount int + for _, r := range requestHistory { + switch r.request { + case ScaleDecrease: + decreaseCount++ + case ScaleSteady: + steadyCount++ } } + + // Scale down if we have a majority of decrease requests. + if decreaseCount > steadyCount { + current := config.getWorkerCount() + target := state.nextWorkerCount(ScaleDecrease, current, config.minWorkers, config.maxWorkers) + if target < current { + scaleWhileDraining(ctx, ch, func() { + config.decreaseWorkers(current, target) + }) + } + last = time.Now() + requestHistory = requestHistory[:0] + } } } }() diff --git a/outlet/kafka/scaler_test.go b/outlet/kafka/scaler_test.go index 8419ab0e..595eea6d 100644 --- a/outlet/kafka/scaler_test.go +++ b/outlet/kafka/scaler_test.go @@ -13,124 +13,6 @@ import ( "akvorado/common/helpers" ) -func TestScalerWithoutRateLimiter(t *testing.T) { - for _, tc := range []struct { - name string - minWorkers int - maxWorkers int - requests []ScaleRequest - expected []int - }{ - { - name: "scale up", - minWorkers: 1, - maxWorkers: 16, - requests: []ScaleRequest{ScaleIncrease}, - expected: []int{9}, - }, { - name: "scale up twice", - minWorkers: 1, - maxWorkers: 16, - requests: []ScaleRequest{ScaleIncrease, ScaleIncrease}, - expected: []int{9, 13}, - }, { - name: "scale up many times", - minWorkers: 1, - maxWorkers: 16, - requests: []ScaleRequest{ - ScaleIncrease, ScaleIncrease, ScaleIncrease, ScaleIncrease, - ScaleIncrease, ScaleIncrease, - }, - expected: []int{9, 13, 15, 16}, - }, { - name: "scale up twice, then down a lot", - minWorkers: 1, - maxWorkers: 16, - requests: []ScaleRequest{ - ScaleIncrease, ScaleIncrease, - // We need 10 decrease to decrease - ScaleDecrease, ScaleDecrease, ScaleDecrease, ScaleDecrease, ScaleDecrease, - ScaleDecrease, ScaleDecrease, ScaleDecrease, ScaleDecrease, ScaleDecrease, - }, - expected: []int{9, 13, 12}, - }, { - name: "scale up twice, then down, steady, and repeat", - minWorkers: 1, - maxWorkers: 16, - requests: []ScaleRequest{ - ScaleIncrease, ScaleIncrease, - ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleSteady, - ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleSteady, - ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleSteady, - ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleSteady, - ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleSteady, - ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleSteady, - }, - expected: []int{9, 13}, - }, { - name: "scale up twice, then down, steady, down, steady, down, down, repeat", - minWorkers: 1, - maxWorkers: 16, - requests: []ScaleRequest{ - ScaleIncrease, ScaleIncrease, - ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleDecrease, - ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleDecrease, - ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleDecrease, - ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleDecrease, - ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleSteady, ScaleDecrease, ScaleDecrease, - }, - expected: []int{9, 13, 12}, - }, - // No more tests, the state logic is tested in TestScalerState - } { - t.Run(tc.name, func(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - ctx, cancel := context.WithCancel(t.Context()) - defer cancel() - - var mu sync.Mutex - currentWorkers := tc.minWorkers - got := []int{} - config := scalerConfiguration{ - minWorkers: tc.minWorkers, - maxWorkers: tc.maxWorkers, - increaseRateLimit: time.Second, - decreaseRateLimit: time.Second, - getWorkerCount: func() int { - mu.Lock() - defer mu.Unlock() - return currentWorkers - }, - increaseWorkers: func(from, to int) { - t.Logf("increaseWorkers(from: %d, to: %d)", from, to) - mu.Lock() - defer mu.Unlock() - got = append(got, to) - currentWorkers = to - }, - decreaseWorkers: func(from, to int) { - t.Logf("decreaseWorkers(from: %d, to: %d)", from, to) - mu.Lock() - defer mu.Unlock() - got = append(got, to) - currentWorkers = to - }, - } - ch := runScaler(ctx, config) - for _, req := range tc.requests { - ch <- req - time.Sleep(5 * time.Second) - } - mu.Lock() - defer mu.Unlock() - if diff := helpers.Diff(got, tc.expected); diff != "" { - t.Fatalf("runScaler() (-got, +want):\n%s", diff) - } - }) - }) - } -} - func TestScalerRateLimiter(t *testing.T) { synctest.Test(t, func(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) @@ -201,19 +83,17 @@ func TestScalerRateLimiter(t *testing.T) { check([]int{8, 12}) // Do not decrease even after 4 minutes - for range 40 { + for range 39 { time.Sleep(6 * time.Second) ch <- ScaleDecrease } - // time = 5 minutes + // time = 4m54 check([]int{8, 12}) // Decrease (5-second timeout done) - for range 10 { - time.Sleep(6 * time.Second) - ch <- ScaleDecrease - } - // time = 6 minutes + time.Sleep(6 * time.Second) + ch <- ScaleDecrease + // time = 5 minutes check([]int{8, 12, 11}) // Do not increase @@ -251,26 +131,56 @@ func TestScalerRateLimiter(t *testing.T) { } check([]int{8, 12, 11, 12, 13, 12}) - // If we have many decrease requests at once, we decrease - time.Sleep(300 * time.Second) + // If we have one decrease request after 5 minutes, decrease + time.Sleep(5 * time.Minute) for range 10 { ch <- ScaleDecrease } check([]int{8, 12, 11, 12, 13, 12, 11}) - // But if they are mixed with steady requests, we shouldn't decrease - time.Sleep(300 * time.Second) - for range 10 { - ch <- ScaleDecrease + // But more likely, we have steady requests, then decrease requests + time.Sleep(time.Minute) + for range 240 { + time.Sleep(time.Second) ch <- ScaleSteady } - check([]int{8, 12, 11, 12, 13, 12, 11}) - - // But if we have less Steady than decrease, we should scale down - for range 10 { + for range 60 { + time.Sleep(time.Second) ch <- ScaleDecrease } + // time=6m, no change (240 vs 60) + check([]int{8, 12, 11, 12, 13, 12, 11}) + for range 60 { + time.Sleep(time.Second) + ch <- ScaleDecrease + } + // time=7m, no change (180 vs 120) + check([]int{8, 12, 11, 12, 13, 12, 11}) + for range 30 { + time.Sleep(time.Second) + ch <- ScaleDecrease + } + // time=7m30s, no change (150 vs 150) + check([]int{8, 12, 11, 12, 13, 12, 11}) + time.Sleep(time.Second) + ch <- ScaleDecrease + // OK, now more decrease than increase! check([]int{8, 12, 11, 12, 13, 12, 11, 10}) + + // We should not account for steady requests for too long! + time.Sleep(time.Minute) + for range 2400 { + time.Sleep(time.Second) + ch <- ScaleSteady + } + // 2400 vs 0 + check([]int{8, 12, 11, 12, 13, 12, 11, 10}) + time.Sleep(time.Second) + for range 300 { + ch <- ScaleDecrease + } + // 0 vs 300 + check([]int{8, 12, 11, 12, 13, 12, 11, 10, 9}) }) }