mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-11 22:14:02 +01:00
inlet/kafka: use exporter as a key for Kafka
Otherwise, messages are reaching the outlet unordered and missing IPFIX templates may make more flows lost. Moreover, when using several outlets, some of them may not get the templates.
This commit is contained in:
@@ -15,6 +15,7 @@ identified with a specific icon:
|
||||
- 💥 *config*: `skip-verify` is false by default in TLS configurations for
|
||||
ClickHouse, Kafka and remote data sources (previously, `verify` was set to
|
||||
false by default)
|
||||
- 🩹 *inlet*: keep flows from one exporter into a single partition
|
||||
- 🩹 *outlet*: provide additional gracetime for a worker to send to ClickHouse
|
||||
- 🩹 *outlet*: prevent discarding flows on shutdown
|
||||
- 🩹 *outlet*: enhance scaling up and down workers to avoid hysteresis
|
||||
|
||||
@@ -6,9 +6,7 @@ package kafka
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -109,11 +107,9 @@ func (c *Component) Stop() error {
|
||||
|
||||
// Send a message to Kafka.
|
||||
func (c *Component) Send(exporter string, payload []byte, finalizer func()) {
|
||||
key := make([]byte, 4)
|
||||
binary.BigEndian.PutUint32(key, rand.Uint32())
|
||||
record := &kgo.Record{
|
||||
Topic: c.kafkaTopic,
|
||||
Key: key,
|
||||
Key: []byte(exporter),
|
||||
Value: payload,
|
||||
}
|
||||
c.kafkaClient.Produce(context.Background(), record, func(r *kgo.Record, err error) {
|
||||
|
||||
Reference in New Issue
Block a user