mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-12 06:24:10 +01:00
Some checks failed
CI / 🤖 Check dependabot status (push) Has been cancelled
CI / 🐧 Test on Linux (${{ github.ref_type == 'tag' }}, misc) (push) Has been cancelled
CI / 🐧 Test on Linux (coverage) (push) Has been cancelled
CI / 🐧 Test on Linux (regular) (push) Has been cancelled
CI / ❄️ Build on Nix (push) Has been cancelled
CI / 🍏 Build and test on macOS (push) Has been cancelled
CI / 🧪 End-to-end testing (push) Has been cancelled
CI / 🔍 Upload code coverage (push) Has been cancelled
CI / 🔬 Test only Go (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 20) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 22) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 24) (push) Has been cancelled
CI / ⚖️ Check licenses (push) Has been cancelled
CI / 🐋 Build Docker images (push) Has been cancelled
CI / 🐋 Tag Docker images (push) Has been cancelled
CI / 🚀 Publish release (push) Has been cancelled
Update Nix dependency hashes / Update dependency hashes (push) Has been cancelled
And use random by default. This scales better. And even when not using multiple outlets, there is little drawback to pin an exporter to a partition.
100 lines
2.7 KiB
Go
100 lines
2.7 KiB
Go
// SPDX-FileCopyrightText: 2022 Free Mobile
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
package kafka
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/twmb/franz-go/pkg/kgo"
|
|
|
|
"akvorado/common/helpers"
|
|
"akvorado/common/kafka"
|
|
)
|
|
|
|
// Configuration describes the configuration for the Kafka exporter.
|
|
type Configuration struct {
|
|
kafka.Configuration `mapstructure:",squash" yaml:"-,inline"`
|
|
// CompressionCodec defines the compression to use.
|
|
CompressionCodec CompressionCodec
|
|
// QueueSize defines the maximum number of messages to buffer.
|
|
QueueSize int `validate:"min=1"`
|
|
// LoadBalance defines the load-balancing algorithm to use for Kafka partitions.
|
|
LoadBalance LoadBalanceAlgorithm
|
|
}
|
|
|
|
// DefaultConfiguration represents the default configuration for the Kafka exporter.
|
|
func DefaultConfiguration() Configuration {
|
|
return Configuration{
|
|
Configuration: kafka.DefaultConfiguration(),
|
|
CompressionCodec: CompressionCodec(kgo.Lz4Compression()),
|
|
QueueSize: 4096,
|
|
LoadBalance: LoadBalanceRandom,
|
|
}
|
|
}
|
|
|
|
// LoadBalanceAlgorithm represents the load-balance algorithm for Kafka partitions
|
|
type LoadBalanceAlgorithm int
|
|
|
|
const (
|
|
// LoadBalanceRandom randomly balances flows accross Kafka partitions.
|
|
LoadBalanceRandom LoadBalanceAlgorithm = iota
|
|
// LoadBalanceByExporter hashes exporter IP addresses for load balancing.
|
|
LoadBalanceByExporter
|
|
)
|
|
|
|
// CompressionCodec represents a compression codec.
|
|
type CompressionCodec kgo.CompressionCodec
|
|
|
|
// UnmarshalText produces a compression codec
|
|
func (cc *CompressionCodec) UnmarshalText(text []byte) error {
|
|
var codec kgo.CompressionCodec
|
|
switch string(text) {
|
|
case "none":
|
|
codec = kgo.NoCompression()
|
|
case "gzip":
|
|
codec = kgo.GzipCompression()
|
|
case "snappy":
|
|
codec = kgo.SnappyCompression()
|
|
case "lz4":
|
|
codec = kgo.Lz4Compression()
|
|
case "zstd":
|
|
codec = kgo.ZstdCompression()
|
|
default:
|
|
return fmt.Errorf("unknown compression codec: %s", text)
|
|
}
|
|
*cc = CompressionCodec(codec)
|
|
return nil
|
|
}
|
|
|
|
// String turns a compression codec into a string
|
|
func (cc CompressionCodec) String() string {
|
|
switch kgo.CompressionCodec(cc) {
|
|
case kgo.NoCompression():
|
|
return "none"
|
|
case kgo.GzipCompression():
|
|
return "gzip"
|
|
case kgo.SnappyCompression():
|
|
return "snappy"
|
|
case kgo.Lz4Compression():
|
|
return "lz4"
|
|
case kgo.ZstdCompression():
|
|
return "zstd"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
// MarshalText turns a compression codec into a string
|
|
func (cc CompressionCodec) MarshalText() ([]byte, error) {
|
|
return []byte(cc.String()), nil
|
|
}
|
|
|
|
func init() {
|
|
helpers.RegisterMapstructureDeprecatedFields[Configuration](
|
|
"FlushInterval", // bad for performance
|
|
"FlushBytes", // duplicate with QueueSize
|
|
"MaxMessageBytes", // just tune QueueSize instead
|
|
)
|
|
}
|