From e5e63be586379aa358cf98fc9b0da4be94ca56aa Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Tue, 9 Aug 2022 15:20:52 +0200 Subject: [PATCH] inlet/kafka: use a 4-byte random value for key Using the exporter IP address is not helpful at all as we may not have many of them and it would make big exporters difficult to scale if one thread is not enough to ingest on the ClickHouse side. Fix #75 --- console/data/docs/99-changelog.md | 1 + inlet/core/root_test.go | 3 --- inlet/kafka/root.go | 6 +++++- inlet/kafka/root_test.go | 4 ++-- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/console/data/docs/99-changelog.md b/console/data/docs/99-changelog.md index 2fd82e6b..cbdc64f2 100644 --- a/console/data/docs/99-changelog.md +++ b/console/data/docs/99-changelog.md @@ -14,6 +14,7 @@ identified with a specific icon: ## Unreleased - ✨ *console*: add a bidirectional mode for graphs to also display flows in the opposite direction +- 🌱 *inlet*: Kafka key is now a 4-byte random value making scaling less dependent on the number of exporters - 🌱 *demo-exporter*: add a setting to automatically generate a reverse flow - 🌱 *docker-compose*: loosen required privileges for `conntrack-fixer` diff --git a/inlet/core/root_test.go b/inlet/core/root_test.go index a6fd0f5a..c7a1ad54 100644 --- a/inlet/core/root_test.go +++ b/inlet/core/root_test.go @@ -129,9 +129,6 @@ func TestCore(t *testing.T) { if msg.Topic != expectedTopic { t.Errorf("Kafka message topic (-got, +want):\n-%s\n+%s", msg.Topic, expectedTopic) } - if msg.Key != sarama.StringEncoder("192.0.2.142") { - t.Errorf("Kafka message key (-got, +want):\n-%s\n+%s", msg.Key, "192.0.2.142") - } got := flow.Message{} b, err := msg.Value.Encode() diff --git a/inlet/kafka/root.go b/inlet/kafka/root.go index 5bdc96bc..b857f447 100644 --- a/inlet/kafka/root.go +++ b/inlet/kafka/root.go @@ -5,7 +5,9 @@ package kafka import ( + "encoding/binary" "fmt" + "math/rand" "strings" "time" @@ -124,9 +126,11 @@ func (c *Component) Stop() error { func (c *Component) Send(exporter string, payload []byte) { c.metrics.bytesSent.WithLabelValues(exporter).Add(float64(len(payload))) c.metrics.messagesSent.WithLabelValues(exporter).Inc() + key := make([]byte, 4) + binary.BigEndian.PutUint32(key, rand.Uint32()) c.kafkaProducer.Input() <- &sarama.ProducerMessage{ Topic: c.kafkaTopic, - Key: sarama.StringEncoder(exporter), + Key: sarama.ByteEncoder(key), Value: sarama.ByteEncoder(payload), } } diff --git a/inlet/kafka/root_test.go b/inlet/kafka/root_test.go index 28d10ae0..52aac474 100644 --- a/inlet/kafka/root_test.go +++ b/inlet/kafka/root_test.go @@ -26,9 +26,9 @@ func TestKafka(t *testing.T) { defer close(received) expected := sarama.ProducerMessage{ Topic: "flows-v2", - Key: sarama.StringEncoder("127.0.0.1"), + Key: got.Key, Value: sarama.ByteEncoder("hello world!"), - Partition: 30, + Partition: got.Partition, } if diff := helpers.Diff(got, expected); diff != "" { t.Fatalf("Send() (-got, +want):\n%s", diff)