From ca0daeaffde3e3b6388bdef4420ad5374ebd3a1f Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Sun, 16 Nov 2025 06:32:52 +0100 Subject: [PATCH] outlet/kafka: make scaling test more robust --- outlet/kafka/functional_test.go | 55 +++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/outlet/kafka/functional_test.go b/outlet/kafka/functional_test.go index 168ce764..8339b86f 100644 --- a/outlet/kafka/functional_test.go +++ b/outlet/kafka/functional_test.go @@ -323,7 +323,7 @@ func TestWorkerScaling(t *testing.T) { cluster, err := kfake.NewCluster( kfake.NumBrokers(1), - kfake.SeedTopics(16, expectedTopicName), + kfake.SeedTopics(4, expectedTopicName), kfake.WithLogger(kafka.NewLogger(r)), ) if err != nil { @@ -360,8 +360,8 @@ func TestWorkerScaling(t *testing.T) { } helpers.StartStop(t, c) - if maxWorkers := c.(*realComponent).config.MaxWorkers; maxWorkers != 16 { - t.Errorf("Start() max workers should have been capped to 16 instead of %d", maxWorkers) + if maxWorkers := c.(*realComponent).config.MaxWorkers; maxWorkers != 4 { + t.Errorf("Start() max workers should have been capped to 4 instead of %d", maxWorkers) } msg := atomic.Uint32{} c.StartWorkers(func(_ int, ch chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) { @@ -386,7 +386,7 @@ func TestWorkerScaling(t *testing.T) { "worker_increase_total": "1", "workers": "1", "min_workers": "1", - "max_workers": "16", + "max_workers": "4", } if diff := helpers.Diff(gotMetrics, expected); diff != "" { t.Fatalf("Metrics (-got, +want):\n%s", diff) @@ -400,18 +400,27 @@ func TestWorkerScaling(t *testing.T) { if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil { t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr()) } - time.Sleep(100 * time.Millisecond) - t.Log("Check if workers increased to 9") - gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "worker") - expected = map[string]string{ - "worker_decrease_total": "0", - "worker_increase_total": "9", - "workers": "9", + + var diff string + + t.Log("Check if workers increased to 3") + for range 100 { + time.Sleep(10 * time.Millisecond) + gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "worker") + expected = map[string]string{ + "worker_decrease_total": "0", + "worker_increase_total": "3", + "workers": "3", + } + if diff = helpers.Diff(gotMetrics, expected); diff == "" { + break + } } - if diff := helpers.Diff(gotMetrics, expected); diff != "" { + if diff != "" { t.Fatalf("Metrics (-got, +want):\n%s", diff) } + time.Sleep(100 * time.Millisecond) t.Log("Send 1 message (decrease)") record = &kgo.Record{ Topic: expectedTopicName, @@ -420,15 +429,21 @@ func TestWorkerScaling(t *testing.T) { if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil { t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr()) } - time.Sleep(100 * time.Millisecond) - t.Log("Check if workers decreased to 8") - gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "worker") - expected = map[string]string{ - "worker_decrease_total": "1", - "worker_increase_total": "9", - "workers": "8", + + t.Log("Check if workers decreased to 2") + for range 200 { + time.Sleep(10 * time.Millisecond) + gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "worker") + expected = map[string]string{ + "worker_decrease_total": "1", + "worker_increase_total": "3", + "workers": "2", + } + if diff = helpers.Diff(gotMetrics, expected); diff == "" { + break + } } - if diff := helpers.Diff(gotMetrics, expected); diff != "" { + if diff != "" { t.Fatalf("Metrics (-got, +want):\n%s", diff) } }