diff --git a/console/data/docs/99-changelog.md b/console/data/docs/99-changelog.md index 2fd82e6b..cbdc64f2 100644 --- a/console/data/docs/99-changelog.md +++ b/console/data/docs/99-changelog.md @@ -14,6 +14,7 @@ identified with a specific icon: ## Unreleased - ✨ *console*: add a bidirectional mode for graphs to also display flows in the opposite direction +- 🌱 *inlet*: Kafka key is now a 4-byte random value making scaling less dependent on the number of exporters - 🌱 *demo-exporter*: add a setting to automatically generate a reverse flow - 🌱 *docker-compose*: loosen required privileges for `conntrack-fixer` diff --git a/inlet/core/root_test.go b/inlet/core/root_test.go index a6fd0f5a..c7a1ad54 100644 --- a/inlet/core/root_test.go +++ b/inlet/core/root_test.go @@ -129,9 +129,6 @@ func TestCore(t *testing.T) { if msg.Topic != expectedTopic { t.Errorf("Kafka message topic (-got, +want):\n-%s\n+%s", msg.Topic, expectedTopic) } - if msg.Key != sarama.StringEncoder("192.0.2.142") { - t.Errorf("Kafka message key (-got, +want):\n-%s\n+%s", msg.Key, "192.0.2.142") - } got := flow.Message{} b, err := msg.Value.Encode() diff --git a/inlet/kafka/root.go b/inlet/kafka/root.go index 5bdc96bc..b857f447 100644 --- a/inlet/kafka/root.go +++ b/inlet/kafka/root.go @@ -5,7 +5,9 @@ package kafka import ( + "encoding/binary" "fmt" + "math/rand" "strings" "time" @@ -124,9 +126,11 @@ func (c *Component) Stop() error { func (c *Component) Send(exporter string, payload []byte) { c.metrics.bytesSent.WithLabelValues(exporter).Add(float64(len(payload))) c.metrics.messagesSent.WithLabelValues(exporter).Inc() + key := make([]byte, 4) + binary.BigEndian.PutUint32(key, rand.Uint32()) c.kafkaProducer.Input() <- &sarama.ProducerMessage{ Topic: c.kafkaTopic, - Key: sarama.StringEncoder(exporter), + Key: sarama.ByteEncoder(key), Value: sarama.ByteEncoder(payload), } } diff --git a/inlet/kafka/root_test.go b/inlet/kafka/root_test.go index 28d10ae0..52aac474 100644 --- a/inlet/kafka/root_test.go +++ b/inlet/kafka/root_test.go @@ -26,9 +26,9 @@ func TestKafka(t *testing.T) { defer close(received) expected := sarama.ProducerMessage{ Topic: "flows-v2", - Key: sarama.StringEncoder("127.0.0.1"), + Key: got.Key, Value: sarama.ByteEncoder("hello world!"), - Partition: 30, + Partition: got.Partition, } if diff := helpers.Diff(got, expected); diff != "" { t.Fatalf("Send() (-got, +want):\n%s", diff)