From a7366ba9383ba185704cfbf79d78d999a5a484cd Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Wed, 15 Oct 2025 12:23:07 +0200 Subject: [PATCH] outlet/kafka: cap the number of workers to the number of partitions --- console/data/docs/99-changelog.md | 1 + outlet/kafka/functional_test.go | 5 +++++ outlet/kafka/root.go | 19 +++++++++++++++++++ 3 files changed, 25 insertions(+) diff --git a/console/data/docs/99-changelog.md b/console/data/docs/99-changelog.md index f964e7f5..16084b25 100644 --- a/console/data/docs/99-changelog.md +++ b/console/data/docs/99-changelog.md @@ -14,6 +14,7 @@ identified with a specific icon: - 🩹 *inlet*: fix `akvorado_inlet_flow_input_udp_in_dropped_packets_total` metric - 💥 *config*: stop shipping demo exporter configurations from the orchestrator +- 🌱 *outlet*: cap the number of workers to the number of Kafka partitions - 🌱 *docker*: update Vector to 0.50.0. ## 2.0.1 - 2025-10-02 diff --git a/outlet/kafka/functional_test.go b/outlet/kafka/functional_test.go index 3558437a..4fe3f949 100644 --- a/outlet/kafka/functional_test.go +++ b/outlet/kafka/functional_test.go @@ -231,11 +231,16 @@ func TestWorkerScaling(t *testing.T) { configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int()) configuration.WorkerIncreaseRateLimit = 20 * time.Millisecond configuration.WorkerDecreaseRateLimit = 20 * time.Millisecond + configuration.MaxWorkers = 24 c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)}) if err != nil { t.Fatalf("New() error:\n%+v", err) } helpers.StartStop(t, c) + + if maxWorkers := c.(*realComponent).config.MaxWorkers; maxWorkers != 16 { + t.Errorf("Start() max workers should have been capped to 16 instead of %d", maxWorkers) + } msg := atomic.Uint32{} c.StartWorkers(func(_ int, ch chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) { return func(context.Context, []byte) error { diff --git a/outlet/kafka/root.go b/outlet/kafka/root.go index 389ec50e..46b241a0 100644 --- a/outlet/kafka/root.go +++ b/outlet/kafka/root.go @@ -5,6 +5,7 @@ package kafka import ( + "context" "fmt" "strings" "sync" @@ -105,6 +106,24 @@ func (c *realComponent) Start() error { Msg("unable to create Kafka admin client") return fmt.Errorf("unable to create Kafka admin client: %w", err) } + + // Check the number of partitions + topics, err := kadmClient.ListTopics(context.Background()) + if err != nil { + return fmt.Errorf("unable to get metadata for topics: %w", err) + } + topicName := fmt.Sprintf("%s-v%d", c.config.Topic, pb.Version) + topic, ok := topics[topicName] + if !ok { + return fmt.Errorf("unable find topic %q", topicName) + } + nbPartitions := len(topic.Partitions) + c.r.Info().Msgf("topic %q has %d partitions", topicName, nbPartitions) + if nbPartitions < c.config.MaxWorkers { + c.r.Warn().Msgf("capping max workers from %d to %d", c.config.MaxWorkers, nbPartitions) + c.config.MaxWorkers = nbPartitions + } + c.kadmClientMu.Lock() defer c.kadmClientMu.Unlock() c.kadmClient = kadmClient