outlet/kafka: make scaling test more robust

This commit is contained in:
Vincent Bernat
2025-11-16 06:32:52 +01:00
parent 61eb082db5
commit ca0daeaffd

View File

@@ -323,7 +323,7 @@ func TestWorkerScaling(t *testing.T) {
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
kfake.SeedTopics(4, expectedTopicName),
kfake.WithLogger(kafka.NewLogger(r)),
)
if err != nil {
@@ -360,8 +360,8 @@ func TestWorkerScaling(t *testing.T) {
}
helpers.StartStop(t, c)
if maxWorkers := c.(*realComponent).config.MaxWorkers; maxWorkers != 16 {
t.Errorf("Start() max workers should have been capped to 16 instead of %d", maxWorkers)
if maxWorkers := c.(*realComponent).config.MaxWorkers; maxWorkers != 4 {
t.Errorf("Start() max workers should have been capped to 4 instead of %d", maxWorkers)
}
msg := atomic.Uint32{}
c.StartWorkers(func(_ int, ch chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) {
@@ -386,7 +386,7 @@ func TestWorkerScaling(t *testing.T) {
"worker_increase_total": "1",
"workers": "1",
"min_workers": "1",
"max_workers": "16",
"max_workers": "4",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
@@ -400,18 +400,27 @@ func TestWorkerScaling(t *testing.T) {
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
}
time.Sleep(100 * time.Millisecond)
t.Log("Check if workers increased to 9")
var diff string
t.Log("Check if workers increased to 3")
for range 100 {
time.Sleep(10 * time.Millisecond)
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "worker")
expected = map[string]string{
"worker_decrease_total": "0",
"worker_increase_total": "9",
"workers": "9",
"worker_increase_total": "3",
"workers": "3",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
if diff = helpers.Diff(gotMetrics, expected); diff == "" {
break
}
}
if diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
time.Sleep(100 * time.Millisecond)
t.Log("Send 1 message (decrease)")
record = &kgo.Record{
Topic: expectedTopicName,
@@ -420,15 +429,21 @@ func TestWorkerScaling(t *testing.T) {
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
}
time.Sleep(100 * time.Millisecond)
t.Log("Check if workers decreased to 8")
t.Log("Check if workers decreased to 2")
for range 200 {
time.Sleep(10 * time.Millisecond)
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "worker")
expected = map[string]string{
"worker_decrease_total": "1",
"worker_increase_total": "9",
"workers": "8",
"worker_increase_total": "3",
"workers": "2",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
if diff = helpers.Diff(gotMetrics, expected); diff == "" {
break
}
}
if diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
}