From 11f878ca213e567fdd25f930c4cf5026f054fc60 Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Tue, 25 Nov 2025 22:42:33 +0100 Subject: [PATCH] inlet/kafka: make Kafka load-balancing algorithm configurable And use random by default. This scales better. And even when not using multiple outlets, there is little drawback to pin an exporter to a partition. --- Makefile | 3 ++ console/data/docs/02-configuration.md | 7 ++++ console/data/docs/99-changelog.md | 2 + inlet/kafka/config.go | 13 +++++++ inlet/kafka/root.go | 10 ++++- inlet/kafka/root_test.go | 53 +++++++++++++++++++++++++++ 6 files changed, 87 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index d7150d94..e41957f7 100644 --- a/Makefile +++ b/Makefile @@ -30,6 +30,7 @@ GENERATED_GO = \ orchestrator/clickhouse/data/tcp.csv \ orchestrator/clickhouse/data/udp.csv \ console/filter/parser.go \ + inlet/kafka/loadbalancealgorithm_enumer.go \ outlet/core/asnprovider_enumer.go \ outlet/core/netprovider_enumer.go \ outlet/metadata/provider/snmp/authprotocol_enumer.go \ @@ -105,6 +106,8 @@ inlet/flow/input/udp/reuseport_%.o: inlet/flow/input/udp/reuseport_kern.c inlet/ $Q ! $(CLANG) -print-targets 2> /dev/null | grep -qF $* || \ $(CLANG) -O2 -g -Wall -target $* -c $< -o $@ +inlet/kafka/loadbalancealgorithm_enumer.go: go.mod inlet/kafka/config.go ; $(info $(M) generate enums for LoadBalanceAlgorithm…) + $Q $(ENUMER) -type=LoadBalanceAlgorithm -text -transform=kebab -trimprefix=LoadBalance inlet/kafka/config.go outlet/core/asnprovider_enumer.go: go.mod outlet/core/config.go ; $(info $(M) generate enums for ASNProvider…) $Q $(ENUMER) -type=ASNProvider -text -transform=kebab -trimprefix=ASNProvider outlet/core/config.go outlet/core/netprovider_enumer.go: go.mod outlet/core/config.go ; $(info $(M) generate enums for NetProvider…) diff --git a/console/data/docs/02-configuration.md b/console/data/docs/02-configuration.md index 9481be6c..f1c8ece8 100644 --- a/console/data/docs/02-configuration.md +++ b/console/data/docs/02-configuration.md @@ -132,6 +132,13 @@ The following keys are accepted: - `compression-codec` defines the compression codec for messages: `none`, `gzip`, `snappy`, `lz4` (default), or `zstd`. - `queue-size` defines the maximum number of messages to buffer for Kafka. +- `load-balance` defines the load balancing algorithm for flows accross Kafka + partitions. The default value is `random`: each flow is assigned a random + partition, ensuring an even distribution. The other possible value is + `by-exporter`: all flows from a given exporter is assigned to a single + partition. This setting can be important if you have several outlets and IPFIX + or NetFlow: each outlet needs to receive the templates before decoding flows + and this is less likely when using `random`. A version number is automatically added to the topic name. This is to prevent problems if the protobuf schema changes in a way that is not diff --git a/console/data/docs/99-changelog.md b/console/data/docs/99-changelog.md index c05fdd31..95451a65 100644 --- a/console/data/docs/99-changelog.md +++ b/console/data/docs/99-changelog.md @@ -13,6 +13,8 @@ identified with a specific icon: ## Unreleased - 🩹 *docker*: restart geoip container on boot +- 🌱 *inlet*: make load-balancing algorithm for Kafka partitions configurable + (`random` or `by-exporter`) and revert back to `random` by default (like before 2.0.3) - 🌱 *orchestrator*: add `kafka`→`manage-topic` flag to enable or disable topic management - 🌱 *cmd*: make `akvorado healthcheck` use an abstract Unix socket to check service liveness diff --git a/inlet/kafka/config.go b/inlet/kafka/config.go index 434d3386..42ff9314 100644 --- a/inlet/kafka/config.go +++ b/inlet/kafka/config.go @@ -19,6 +19,8 @@ type Configuration struct { CompressionCodec CompressionCodec // QueueSize defines the maximum number of messages to buffer. QueueSize int `validate:"min=1"` + // LoadBalance defines the load-balancing algorithm to use for Kafka partitions. + LoadBalance LoadBalanceAlgorithm } // DefaultConfiguration represents the default configuration for the Kafka exporter. @@ -27,9 +29,20 @@ func DefaultConfiguration() Configuration { Configuration: kafka.DefaultConfiguration(), CompressionCodec: CompressionCodec(kgo.Lz4Compression()), QueueSize: 4096, + LoadBalance: LoadBalanceRandom, } } +// LoadBalanceAlgorithm represents the load-balance algorithm for Kafka partitions +type LoadBalanceAlgorithm int + +const ( + // LoadBalanceRandom randomly balances flows accross Kafka partitions. + LoadBalanceRandom LoadBalanceAlgorithm = iota + // LoadBalanceByExporter hashes exporter IP addresses for load balancing. + LoadBalanceByExporter +) + // CompressionCodec represents a compression codec. type CompressionCodec kgo.CompressionCodec diff --git a/inlet/kafka/root.go b/inlet/kafka/root.go index 720427db..0da5d8b4 100644 --- a/inlet/kafka/root.go +++ b/inlet/kafka/root.go @@ -6,7 +6,9 @@ package kafka import ( "context" + "encoding/binary" "fmt" + "math/rand/v2" "strings" "time" @@ -107,9 +109,15 @@ func (c *Component) Stop() error { // Send a message to Kafka. func (c *Component) Send(exporter string, payload []byte, finalizer func()) { + key := []byte(exporter) + switch c.config.LoadBalance { + case LoadBalanceRandom: + key = make([]byte, 4) + binary.BigEndian.PutUint32(key, rand.Uint32()) + } record := &kgo.Record{ Topic: c.kafkaTopic, - Key: []byte(exporter), + Key: key, Value: payload, } c.kafkaClient.Produce(context.Background(), record, func(r *kgo.Record, err error) { diff --git a/inlet/kafka/root_test.go b/inlet/kafka/root_test.go index 63c99671..7ea68022 100644 --- a/inlet/kafka/root_test.go +++ b/inlet/kafka/root_test.go @@ -126,3 +126,56 @@ func TestKafka(t *testing.T) { t.Fatalf("Metrics (-got, +want):\n%s", diff) } } + +func TestLoadBalancingAlgorithm(t *testing.T) { + for _, algo := range []LoadBalanceAlgorithm{LoadBalanceRandom, LoadBalanceByExporter} { + t.Run(algo.String(), func(t *testing.T) { + topic := fmt.Sprintf("balance-%s", algo) + r := reporter.NewMock(t) + config := DefaultConfiguration() + config.QueueSize = 1 + config.Topic = topic + c, mock := NewMock(t, r, config) + defer mock.Close() + + total := 500 + + // Intercept messages + var wg sync.WaitGroup + wg.Add(total) + var mu sync.Mutex + messages := make(map[int32]int) + kafka.InterceptMessages(t, mock, func(r *kgo.Record) { + mu.Lock() + defer mu.Unlock() + messages[r.Partition]++ + wg.Done() + }) + + // Send messages + for i := range total { + c.Send("127.0.0.1", []byte(fmt.Sprintf("hello %d", i)), func() {}) + } + wg.Wait() + + expected := make(map[int32]int, len(messages)) + if algo == LoadBalanceRandom { + for p, count := range messages { + if count > total/len(messages)*6/10 && count < total/len(messages)*14/10 { + expected[p] = count + } else { + expected[p] = total / len(messages) + } + } + } else if algo == LoadBalanceByExporter { + for p, count := range messages { + expected[p] = count + } + } + + if diff := helpers.Diff(messages, expected); diff != "" { + t.Fatalf("Messages per partitions (-got, +want):\n%s", diff) + } + }) + } +}