Files
akvorado/inlet/kafka/config.go
Vincent Bernat 11f878ca21
Some checks failed
CI / 🤖 Check dependabot status (push) Has been cancelled
CI / 🐧 Test on Linux (${{ github.ref_type == 'tag' }}, misc) (push) Has been cancelled
CI / 🐧 Test on Linux (coverage) (push) Has been cancelled
CI / 🐧 Test on Linux (regular) (push) Has been cancelled
CI / ❄️ Build on Nix (push) Has been cancelled
CI / 🍏 Build and test on macOS (push) Has been cancelled
CI / 🧪 End-to-end testing (push) Has been cancelled
CI / 🔍 Upload code coverage (push) Has been cancelled
CI / 🔬 Test only Go (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 20) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 22) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 24) (push) Has been cancelled
CI / ⚖️ Check licenses (push) Has been cancelled
CI / 🐋 Build Docker images (push) Has been cancelled
CI / 🐋 Tag Docker images (push) Has been cancelled
CI / 🚀 Publish release (push) Has been cancelled
Update Nix dependency hashes / Update dependency hashes (push) Has been cancelled
inlet/kafka: make Kafka load-balancing algorithm configurable
And use random by default. This scales better. And even when not using
multiple outlets, there is little drawback to pin an exporter to a
partition.
2025-11-25 22:42:33 +01:00

100 lines
2.7 KiB
Go

// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package kafka
import (
"fmt"
"github.com/twmb/franz-go/pkg/kgo"
"akvorado/common/helpers"
"akvorado/common/kafka"
)
// Configuration describes the configuration for the Kafka exporter.
type Configuration struct {
kafka.Configuration `mapstructure:",squash" yaml:"-,inline"`
// CompressionCodec defines the compression to use.
CompressionCodec CompressionCodec
// QueueSize defines the maximum number of messages to buffer.
QueueSize int `validate:"min=1"`
// LoadBalance defines the load-balancing algorithm to use for Kafka partitions.
LoadBalance LoadBalanceAlgorithm
}
// DefaultConfiguration represents the default configuration for the Kafka exporter.
func DefaultConfiguration() Configuration {
return Configuration{
Configuration: kafka.DefaultConfiguration(),
CompressionCodec: CompressionCodec(kgo.Lz4Compression()),
QueueSize: 4096,
LoadBalance: LoadBalanceRandom,
}
}
// LoadBalanceAlgorithm represents the load-balance algorithm for Kafka partitions
type LoadBalanceAlgorithm int
const (
// LoadBalanceRandom randomly balances flows accross Kafka partitions.
LoadBalanceRandom LoadBalanceAlgorithm = iota
// LoadBalanceByExporter hashes exporter IP addresses for load balancing.
LoadBalanceByExporter
)
// CompressionCodec represents a compression codec.
type CompressionCodec kgo.CompressionCodec
// UnmarshalText produces a compression codec
func (cc *CompressionCodec) UnmarshalText(text []byte) error {
var codec kgo.CompressionCodec
switch string(text) {
case "none":
codec = kgo.NoCompression()
case "gzip":
codec = kgo.GzipCompression()
case "snappy":
codec = kgo.SnappyCompression()
case "lz4":
codec = kgo.Lz4Compression()
case "zstd":
codec = kgo.ZstdCompression()
default:
return fmt.Errorf("unknown compression codec: %s", text)
}
*cc = CompressionCodec(codec)
return nil
}
// String turns a compression codec into a string
func (cc CompressionCodec) String() string {
switch kgo.CompressionCodec(cc) {
case kgo.NoCompression():
return "none"
case kgo.GzipCompression():
return "gzip"
case kgo.SnappyCompression():
return "snappy"
case kgo.Lz4Compression():
return "lz4"
case kgo.ZstdCompression():
return "zstd"
default:
return "unknown"
}
}
// MarshalText turns a compression codec into a string
func (cc CompressionCodec) MarshalText() ([]byte, error) {
return []byte(cc.String()), nil
}
func init() {
helpers.RegisterMapstructureDeprecatedFields[Configuration](
"FlushInterval", // bad for performance
"FlushBytes", // duplicate with QueueSize
"MaxMessageBytes", // just tune QueueSize instead
)
}