From a66ce7cc3ebaffd1d3263ed38e5c6efd0de8e9a8 Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Sun, 14 Sep 2025 12:01:00 +0200 Subject: [PATCH] outlet/kafka: make lag test more robust The consumer may not have started when testing initial lag. Just try a bit more. --- outlet/kafka/functional_test.go | 34 +++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/outlet/kafka/functional_test.go b/outlet/kafka/functional_test.go index dc0d8e0f..7bc44baf 100644 --- a/outlet/kafka/functional_test.go +++ b/outlet/kafka/functional_test.go @@ -367,7 +367,7 @@ func TestKafkaLagMetric(t *testing.T) { defer cluster.Close() // Watch for autocommits to avoid relying on time - clusterCommitNotification := make(chan interface{}) + clusterCommitNotification := make(chan any) cluster.Control(func(request kmsg.Request) (kmsg.Response, error, bool) { switch k := kmsg.Key(request.Key()); k { case kmsg.OffsetCommit: @@ -404,7 +404,7 @@ func TestKafkaLagMetric(t *testing.T) { helpers.StartStop(t, c) // Start a worker with a callback that blocks on a channel after receiving a message - workerBlockReceive := make(chan interface{}) + workerBlockReceive := make(chan any) defer close(workerBlockReceive) c.StartWorkers(func(_ int, _ chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) { return func(context.Context, []byte) error { @@ -414,14 +414,24 @@ func TestKafkaLagMetric(t *testing.T) { }) // No messages yet, no lag - time.Sleep(10 * time.Millisecond) - gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "workers") - expected := map[string]string{ - "consumergroup_lag_messages": "0", - "workers": "1", - } - if diff := helpers.Diff(gotMetrics, expected); diff != "" { - t.Fatalf("Metrics (-got, +want):\n%s", diff) + ctx, cancel := context.WithTimeout(t.Context(), time.Second) + defer cancel() + for { + gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "workers") + expected := map[string]string{ + "consumergroup_lag_messages": "0", + "workers": "1", + } + if diff := helpers.Diff(gotMetrics, expected); diff != "" { + select { + case <-ctx.Done(): + t.Fatalf("Metrics (-got, +want):\n%s", diff) + default: + } + time.Sleep(10 * time.Millisecond) + } else { + break + } } // Send a single message, allow it to be processed @@ -442,8 +452,8 @@ func TestKafkaLagMetric(t *testing.T) { } // The message was processed, there's no lag - gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "received_messages_total") - expected = map[string]string{ + gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "received_messages_total") + expected := map[string]string{ "consumergroup_lag_messages": "0", `received_messages_total{worker="0"}`: "1", }