*/kafka: switch to franz-go

The concurrency of this library is easier to handle than Sarama.
Notably, it is more compatible with the new model of "almost share
nothing" we use for the inlet and the outlet. The lock for workers in
outlet is removed. We can now use sync.Pool to allocate slice of bytes
in inlet.

It may also be more performant.

In the future, we may want to commit only when pushing data to
ClickHouse. However, this does not seem easy when there is a rebalance.
In case of rebalance, we need to do something when a partition is
revoked to avoid duplicating data. For example, we could flush the
current batch to ClickHouse. Have a look at the
`example/mark_offsets/main.go` file in franz-go repository for a
possible approach. In the meantime, we rely on autocommit.

Another contender could be https://github.com/segmentio/kafka-go. Also
see https://github.com/twmb/franz-go/pull/1064.
This commit is contained in:
Vincent Bernat
2025-07-20 07:33:06 +02:00
parent 872bda0501
commit 756e4a8fbd
33 changed files with 871 additions and 1107 deletions

13
common/helpers/release.go Normal file
View File

@@ -0,0 +1,13 @@
// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
//go:build release
package helpers
// Testing reports whether the current code is being run in a test. It always
// return false in release mode and therefore its test has no performance
// impact.
func Testing() bool {
return false
}

View File

@@ -121,3 +121,8 @@ func (p Pos) String() string {
}
return ""
}
// Testing reports whether the current code is being run in a test.
func Testing() bool {
return testing.Testing()
}

View File

@@ -6,17 +6,19 @@
package kafka
import (
"context"
"crypto/sha256"
"crypto/sha512"
"fmt"
"reflect"
"akvorado/common/helpers"
"akvorado/common/reporter"
"github.com/IBM/sarama"
"github.com/gin-gonic/gin"
"github.com/go-viper/mapstructure/v2"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl"
"github.com/twmb/franz-go/pkg/sasl/oauth"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
"golang.org/x/oauth2/clientcredentials"
)
@@ -26,8 +28,6 @@ type Configuration struct {
Topic string `validate:"required"`
// Brokers is the list of brokers to connect to.
Brokers []string `min=1,dive,validate:"listen"`
// Version is the version of Kafka we assume to work
Version Version
// TLS defines TLS configuration
TLS helpers.TLSConfiguration
// SASL defines SASL configuration
@@ -53,7 +53,6 @@ func DefaultConfiguration() Configuration {
return Configuration{
Topic: "flows",
Brokers: []string{"127.0.0.1:9092"},
Version: Version(sarama.V2_8_1_0),
TLS: helpers.TLSConfiguration{
Enable: false,
Verify: true,
@@ -61,29 +60,6 @@ func DefaultConfiguration() Configuration {
}
}
// Version represents a supported version of Kafka
type Version sarama.KafkaVersion
// UnmarshalText parses a version of Kafka
func (v *Version) UnmarshalText(text []byte) error {
version, err := sarama.ParseKafkaVersion(string(text))
if err != nil {
return err
}
*v = Version(version)
return nil
}
// String turns a Kafka version into a string
func (v Version) String() string {
return sarama.KafkaVersion(v).String()
}
// MarshalText turns a Kafka version into a string
func (v Version) MarshalText() ([]byte, error) {
return []byte(v.String()), nil
}
// SASLMechanism defines an SASL algorithm
type SASLMechanism int
@@ -100,55 +76,60 @@ const (
SASLOauth
)
// NewConfig returns a Sarama Kafka configuration ready to use.
func NewConfig(config Configuration) (*sarama.Config, error) {
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = sarama.KafkaVersion(config.Version)
kafkaConfig.ClientID = fmt.Sprintf("akvorado-%s", helpers.AkvoradoVersion)
// NewConfig returns a slice of kgo.Opt configurations ready to use.
func NewConfig(r *reporter.Reporter, config Configuration) ([]kgo.Opt, error) {
opts := []kgo.Opt{
kgo.SeedBrokers(config.Brokers...),
kgo.ClientID(fmt.Sprintf("akvorado-%s", helpers.AkvoradoVersion)),
kgo.WithLogger(NewLogger(r)),
}
// TLS configuration
tlsConfig, err := config.TLS.MakeTLSConfig()
if err != nil {
return nil, err
}
if tlsConfig != nil {
kafkaConfig.Net.TLS.Enable = true
kafkaConfig.Net.TLS.Config = tlsConfig
opts = append(opts, kgo.DialTLSConfig(tlsConfig))
}
// SASL
// SASL configuration
if config.SASL.Mechanism != SASLNone {
kafkaConfig.Net.SASL.Enable = true
kafkaConfig.Net.SASL.User = config.SASL.Username
kafkaConfig.Net.SASL.Password = config.SASL.Password
var mechanism sasl.Mechanism
switch config.SASL.Mechanism {
case SASLPlain:
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext
mechanism = plain.Auth{
User: config.SASL.Username,
Pass: config.SASL.Password,
}.AsMechanism()
case SASLScramSHA256:
kafkaConfig.Net.SASL.Handshake = true
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &xdgSCRAMClient{HashGeneratorFcn: sha256.New}
}
mechanism = scram.Auth{
User: config.SASL.Username,
Pass: config.SASL.Password,
}.AsSha256Mechanism()
case SASLScramSHA512:
kafkaConfig.Net.SASL.Handshake = true
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &xdgSCRAMClient{HashGeneratorFcn: sha512.New}
}
mechanism = scram.Auth{
User: config.SASL.Username,
Pass: config.SASL.Password,
}.AsSha512Mechanism()
case SASLOauth:
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
kafkaConfig.Net.SASL.TokenProvider = newOAuthTokenProvider(
context.Background(), // TODO should be bound to the component lifecycle, but no component here
mechanism = oauth.Oauth(
newOAuthTokenProvider(
tlsConfig,
clientcredentials.Config{
ClientID: config.SASL.Username,
ClientSecret: config.SASL.Password,
TokenURL: config.SASL.OAuthTokenURL,
Scopes: config.SASL.OAuthScopes,
})
}),
)
default:
return nil, fmt.Errorf("unknown SASL mechanism: %s", config.SASL.Mechanism)
}
opts = append(opts, kgo.SASL(mechanism))
}
return kafkaConfig, nil
return opts, nil
}
// ConfigurationUnmarshallerHook normalize Kafka configuration:
@@ -204,6 +185,6 @@ func ConfigurationUnmarshallerHook() mapstructure.DecodeHookFunc {
}
func init() {
helpers.RegisterMapstructureDeprecatedFields[Configuration]("Version")
helpers.RegisterMapstructureUnmarshallerHook(ConfigurationUnmarshallerHook())
}

View File

@@ -7,8 +7,8 @@ import (
"testing"
"akvorado/common/helpers"
"akvorado/common/reporter"
"github.com/IBM/sarama"
"github.com/gin-gonic/gin"
)
@@ -80,13 +80,11 @@ func TestKafkaNewConfig(t *testing.T) {
}
for _, tc := range cases {
t.Run(tc.description, func(t *testing.T) {
kafkaConfig, err := NewConfig(tc.config)
r := reporter.NewMock(t)
_, err := NewConfig(r, tc.config)
if err != nil {
t.Fatalf("NewConfig() error:\n%+v", err)
}
if err := kafkaConfig.Validate(); err != nil {
t.Fatalf("Validate() error:\n%+v", err)
}
})
}
}
@@ -111,7 +109,6 @@ func TestTLSConfiguration(t *testing.T) {
Expected: Configuration{
Topic: "flows",
Brokers: []string{"127.0.0.1:9092"},
Version: Version(sarama.V2_8_1_0),
TLS: helpers.TLSConfiguration{
Enable: true,
Verify: true,
@@ -134,7 +131,6 @@ func TestTLSConfiguration(t *testing.T) {
Expected: Configuration{
Topic: "flows",
Brokers: []string{"127.0.0.1:9092"},
Version: Version(sarama.V2_8_1_0),
TLS: helpers.TLSConfiguration{
Enable: true,
Verify: false,
@@ -160,7 +156,6 @@ func TestTLSConfiguration(t *testing.T) {
Expected: Configuration{
Topic: "flows",
Brokers: []string{"127.0.0.1:9092"},
Version: Version(sarama.V2_8_1_0),
TLS: helpers.TLSConfiguration{
Enable: false,
Verify: true,
@@ -189,7 +184,6 @@ func TestTLSConfiguration(t *testing.T) {
Expected: Configuration{
Topic: "flows",
Brokers: []string{"127.0.0.1:9092"},
Version: Version(sarama.V2_8_1_0),
TLS: helpers.TLSConfiguration{
Enable: true,
// Value from DefaultConfig is true
@@ -221,7 +215,6 @@ func TestTLSConfiguration(t *testing.T) {
Expected: Configuration{
Topic: "flows",
Brokers: []string{"127.0.0.1:9092"},
Version: Version(sarama.V2_8_1_0),
TLS: helpers.TLSConfiguration{
Enable: true,
// Value from DefaultConfig is true

View File

@@ -4,60 +4,40 @@
package kafka
import (
"fmt"
"sync/atomic"
"github.com/IBM/sarama"
"github.com/twmb/franz-go/pkg/kgo"
"akvorado/common/helpers"
"akvorado/common/reporter"
)
func init() {
// The logger in Sarama is global. Do the same.
sarama.Logger = &GlobalKafkaLogger
}
// GlobalKafkaLogger is the logger instance registered to sarama.
var GlobalKafkaLogger kafkaLogger
// kafkaLogger implements kgo.Logger interface.
type kafkaLogger struct {
r atomic.Pointer[reporter.Reporter]
r *reporter.Reporter
}
// Register register the provided reporter to be used for logging with sarama.
func (l *kafkaLogger) Register(r *reporter.Reporter) {
l.r.Store(r)
// NewLogger creates a new kafka logger using the provided reporter.
func NewLogger(r *reporter.Reporter) kgo.Logger {
return &kafkaLogger{r: r}
}
// Unregister removes the currently registered reporter.
func (l *kafkaLogger) Unregister() {
var noreporter *reporter.Reporter
l.r.Store(noreporter)
// Level returns the current log level.
func (l *kafkaLogger) Level() kgo.LogLevel {
if !helpers.Testing() {
return kgo.LogLevelInfo
}
return kgo.LogLevelDebug
}
func (l *kafkaLogger) Print(v ...interface{}) {
r := l.r.Load()
if r != nil {
if e := r.Debug(); e.Enabled() {
e.Msg(fmt.Sprint(v...))
}
}
}
func (l *kafkaLogger) Println(v ...interface{}) {
r := l.r.Load()
if r != nil {
if e := r.Debug(); e.Enabled() {
e.Msg(fmt.Sprint(v...))
}
}
}
func (l *kafkaLogger) Printf(format string, v ...interface{}) {
r := l.r.Load()
if r != nil {
if e := r.Debug(); e.Enabled() {
e.Msg(fmt.Sprintf(format, v...))
}
// Log logs a message at the specified level.
func (l *kafkaLogger) Log(level kgo.LogLevel, msg string, keyvals ...any) {
switch level {
case kgo.LogLevelError:
l.r.Error().Fields(keyvals).Msg(msg)
case kgo.LogLevelWarn:
l.r.Warn().Fields(keyvals).Msg(msg)
case kgo.LogLevelInfo:
l.r.Info().Fields(keyvals).Msg(msg)
case kgo.LogLevelDebug:
l.r.Debug().Fields(keyvals).Msg(msg)
}
}

View File

@@ -1,274 +0,0 @@
// SPDX-FileCopyrightText: 2024 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package kafka
import (
"strings"
"github.com/prometheus/client_golang/prometheus"
gometrics "github.com/rcrowley/go-metrics"
"akvorado/common/reporter"
)
// Metrics represent generic Kafka metrics. It includes broker, producer and consumer.
type Metrics struct {
registry gometrics.Registry
// Broker
kafkaIncomingByteRate *reporter.MetricDesc
kafkaOutgoingByteRate *reporter.MetricDesc
kafkaRequestRate *reporter.MetricDesc
kafkaRequestSize *reporter.MetricDesc
kafkaRequestLatency *reporter.MetricDesc
kafkaResponseRate *reporter.MetricDesc
kafkaResponseSize *reporter.MetricDesc
kafkaRequestsInFlight *reporter.MetricDesc
// Producer
kafkaProducerBatchSize *reporter.MetricDesc
kafkaProducerRecordSendRate *reporter.MetricDesc
kafkaProducerRecordsPerRequest *reporter.MetricDesc
kafkaProducerCompressionRatio *reporter.MetricDesc
// Consumer
kafkaConsumerBatchSize *reporter.MetricDesc
kafkaConsumerFetchRate *reporter.MetricDesc
kafkaConsumerFetchResponseSize *reporter.MetricDesc
kafkaConsumerGroupJoin *reporter.MetricDesc
kafkaConsumerGroupJoinFailed *reporter.MetricDesc
kafkaConsumerGroupSync *reporter.MetricDesc
kafkaConsumerGroupSyncFailed *reporter.MetricDesc
}
// Init initialize the Kafka-related metrics.
func (m Metrics) Init(r *reporter.Reporter, registry gometrics.Registry) {
m.registry = registry
m.kafkaIncomingByteRate = r.MetricDesc2(
"brokers_incoming_byte_rate",
"Bytes/second read off a given broker.",
[]string{"broker"})
m.kafkaOutgoingByteRate = r.MetricDesc2(
"brokers_outgoing_byte_rate",
"Bytes/second written off a given broker.",
[]string{"broker"})
m.kafkaRequestRate = r.MetricDesc2(
"brokers_request_rate",
"Requests/second sent to a given broker.",
[]string{"broker"})
m.kafkaRequestSize = r.MetricDesc2(
"brokers_request_size",
"Distribution of the request size in bytes for a given broker.",
[]string{"broker"})
m.kafkaRequestLatency = r.MetricDesc2(
"brokers_request_latency_ms",
"Distribution of the request latency in ms for a given broker.",
[]string{"broker"})
m.kafkaResponseRate = r.MetricDesc2(
"brokers_response_rate",
"Responses/second received from a given broker.",
[]string{"broker"})
m.kafkaResponseSize = r.MetricDesc2(
"brokers_response_bytes",
"Distribution of the response size in bytes for a given broker.",
[]string{"broker"})
m.kafkaRequestsInFlight = r.MetricDesc2(
"brokers_inflight_requests",
"The current number of in-flight requests awaiting a response for a given broker.",
[]string{"broker"})
m.kafkaProducerBatchSize = r.MetricDesc2(
"producer_batch_bytes",
"Distribution of the number of bytes sent per partition per request.",
nil)
m.kafkaProducerRecordSendRate = r.MetricDesc2(
"producer_record_send_rate",
"Records/second sent.",
nil)
m.kafkaProducerRecordsPerRequest = r.MetricDesc2(
"producer_records_per_request",
"Distribution of the number of records sent per request.",
nil)
m.kafkaProducerCompressionRatio = r.MetricDesc2(
"producer_compression_ratio",
"Distribution of the compression ratio times 100 of record batches.",
nil)
m.kafkaConsumerBatchSize = r.MetricDesc2(
"consumer_batch_messages",
"Distribution of the number of messages per batch.",
nil,
)
m.kafkaConsumerFetchRate = r.MetricDesc2(
"consumer_fetch_rate",
"Fetch requests/second sent to all brokers.",
nil,
)
m.kafkaConsumerFetchResponseSize = r.MetricDesc2(
"consumer_fetch_bytes",
"Distribution of the fetch response size in bytes.",
nil,
)
m.kafkaConsumerGroupJoin = r.MetricDesc2(
"consumer_group_join_total",
"Total count of consumer group join attempts",
[]string{"group"})
m.kafkaConsumerGroupJoinFailed = r.MetricDesc2(
"consumer_group_join_failed_total",
"Total count of consumer group join failures.",
[]string{"group"})
m.kafkaConsumerGroupSync = r.MetricDesc2(
"consumer_group_sync_total",
"Total count of consumer group sync attempts",
[]string{"group"})
m.kafkaConsumerGroupSyncFailed = r.MetricDesc2(
"consumer_group_sync_failed_total",
"Total count of consumer group sync failures.",
[]string{"group"})
r.MetricCollector(m)
}
// Describe collected metrics
func (m Metrics) Describe(ch chan<- *prometheus.Desc) {
ch <- m.kafkaIncomingByteRate
ch <- m.kafkaOutgoingByteRate
ch <- m.kafkaRequestRate
ch <- m.kafkaRequestSize
ch <- m.kafkaRequestLatency
ch <- m.kafkaResponseRate
ch <- m.kafkaResponseSize
ch <- m.kafkaRequestsInFlight
ch <- m.kafkaProducerBatchSize
ch <- m.kafkaProducerRecordSendRate
ch <- m.kafkaProducerRecordsPerRequest
ch <- m.kafkaProducerCompressionRatio
ch <- m.kafkaConsumerBatchSize
ch <- m.kafkaConsumerFetchRate
ch <- m.kafkaConsumerFetchResponseSize
ch <- m.kafkaConsumerGroupJoin
ch <- m.kafkaConsumerGroupJoinFailed
ch <- m.kafkaConsumerGroupSync
ch <- m.kafkaConsumerGroupSyncFailed
}
// Collect metrics
func (m Metrics) Collect(ch chan<- prometheus.Metric) {
m.registry.Each(func(name string, gom interface{}) {
// Broker-related
if broker := metricBroker(name, "incoming-byte-rate"); broker != "" {
gomMeter(ch, m.kafkaIncomingByteRate, gom, broker)
return
}
if broker := metricBroker(name, "outgoing-byte-rate"); broker != "" {
gomMeter(ch, m.kafkaOutgoingByteRate, gom, broker)
return
}
if broker := metricBroker(name, "request-rate"); broker != "" {
gomMeter(ch, m.kafkaRequestRate, gom, broker)
return
}
if broker := metricBroker(name, "request-size"); broker != "" {
gomHistogram(ch, m.kafkaRequestSize, gom, broker)
return
}
if broker := metricBroker(name, "request-latency-in-ms"); broker != "" {
gomHistogram(ch, m.kafkaRequestLatency, gom, broker)
return
}
if broker := metricBroker(name, "response-rate"); broker != "" {
gomMeter(ch, m.kafkaResponseRate, gom, broker)
return
}
if broker := metricBroker(name, "response-size"); broker != "" {
gomHistogram(ch, m.kafkaResponseSize, gom, broker)
return
}
if broker := metricBroker(name, "requests-in-flight"); broker != "" {
gomCounter(ch, m.kafkaRequestsInFlight, gom, broker)
return
}
// Producer-related
if name == "batch-size" {
gomHistogram(ch, m.kafkaProducerBatchSize, gom)
return
}
if name == "record-send-rate" {
gomMeter(ch, m.kafkaProducerRecordSendRate, gom)
return
}
if name == "records-per-request" {
gomHistogram(ch, m.kafkaProducerRecordsPerRequest, gom)
return
}
if name == "compression-ratio" {
gomHistogram(ch, m.kafkaProducerCompressionRatio, gom)
return
}
// Consumer-related
if name == "consumer-batch-size" {
gomHistogram(ch, m.kafkaConsumerBatchSize, gom)
return
}
if name == "consumer-fetch-rate" {
gomMeter(ch, m.kafkaConsumerFetchRate, gom)
return
}
if name == "consumer-fetch-response-size" {
gomHistogram(ch, m.kafkaConsumerFetchResponseSize, gom)
return
}
if groupID := metricGroupID(name, "consumer-group-join-total"); groupID != "" {
gomCounter(ch, m.kafkaConsumerGroupJoin, gom, groupID)
return
}
if groupID := metricGroupID(name, "consumer-group-join-failed"); groupID != "" {
gomCounter(ch, m.kafkaConsumerGroupJoinFailed, gom, groupID)
return
}
if groupID := metricGroupID(name, "consumer-group-sync-total"); groupID != "" {
gomCounter(ch, m.kafkaConsumerGroupSync, gom, groupID)
return
}
if groupID := metricGroupID(name, "consumer-group-sync-failed"); groupID != "" {
gomCounter(ch, m.kafkaConsumerGroupSyncFailed, gom, groupID)
return
}
})
}
func metricBroker(name, prefix string) string {
prefix = prefix + "-for-broker-"
if strings.HasPrefix(name, prefix) {
return strings.TrimPrefix(name, prefix)
}
return ""
}
func metricGroupID(name, prefix string) string {
prefix = prefix + "-"
if strings.HasPrefix(name, prefix) {
return strings.TrimPrefix(name, prefix)
}
return ""
}
func gomMeter(ch chan<- prometheus.Metric, desc *reporter.MetricDesc, m interface{}, labels ...string) {
snap := m.(gometrics.Meter).Snapshot()
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, snap.Rate1(), labels...)
}
func gomCounter(ch chan<- prometheus.Metric, desc *reporter.MetricDesc, m interface{}, labels ...string) {
snap := m.(gometrics.Counter).Snapshot()
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(snap.Count()), labels...)
}
func gomHistogram(ch chan<- prometheus.Metric, desc *reporter.MetricDesc, m interface{}, labels ...string) {
snap := m.(gometrics.Histogram).Snapshot()
buckets := map[float64]uint64{
0.5: uint64(snap.Percentile(0.5)),
0.9: uint64(snap.Percentile(0.9)),
0.99: uint64(snap.Percentile(0.99)),
}
ch <- prometheus.MustNewConstHistogram(desc, uint64(snap.Count()), float64(snap.Sum()), buckets, labels...)
}

View File

@@ -8,34 +8,29 @@ import (
"crypto/tls"
"net/http"
"github.com/IBM/sarama"
"github.com/twmb/franz-go/pkg/sasl/oauth"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
// tokenProvider implements sarama.AccessTokenProvider.
// tokenProvider implements OAuth token provider for franz-go.
type tokenProvider struct {
tokenSource oauth2.TokenSource
}
// newOAuthTokenProvider returns a sarama.AccessTokenProvider using OAuth credentials.
func newOAuthTokenProvider(ctx context.Context, tlsConfig *tls.Config, oauthConfig clientcredentials.Config) sarama.AccessTokenProvider {
// newOAuthTokenProvider returns a token provider function using OAuth credentials.
func newOAuthTokenProvider(tlsConfig *tls.Config, oauthConfig clientcredentials.Config) func(context.Context) (oauth.Auth, error) {
return func(ctx context.Context) (oauth.Auth, error) {
httpClient := &http.Client{Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsConfig,
}}
ctx = context.WithValue(ctx, oauth2.HTTPClient, httpClient)
return &tokenProvider{
tokenSource: oauthConfig.TokenSource(context.Background()),
}
}
// Token returns a new *sarama.AccessToken or an error as appropriate.
func (t *tokenProvider) Token() (*sarama.AccessToken, error) {
token, err := t.tokenSource.Token()
tokenSource := oauthConfig.TokenSource(ctx)
token, err := tokenSource.Token()
if err != nil {
return nil, err
return oauth.Auth{}, err
}
return oauth.Auth{Token: token.AccessToken}, nil
}
return &sarama.AccessToken{Token: token.AccessToken}, nil
}

View File

@@ -12,7 +12,7 @@ import (
"testing"
"time"
"github.com/IBM/sarama"
"github.com/twmb/franz-go/pkg/kgo"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
@@ -77,8 +77,6 @@ func TestOAuth2ServerClientCredentials(t *testing.T) {
func TestOAuth2Broker(t *testing.T) {
r := reporter.NewMock(t)
GlobalKafkaLogger.Register(r)
defer GlobalKafkaLogger.Unregister()
// Ensure broker is ready.
SetupKafkaBroker(t)
@@ -90,25 +88,27 @@ func TestOAuth2Broker(t *testing.T) {
[]string{"kafka:9093", "127.0.0.1:9093"})
config := DefaultConfiguration()
config.Brokers = []string{broker}
config.SASL = SASLConfiguration{
Username: "kafka-client",
Password: "kafka-client-secret",
Mechanism: SASLOauth,
OAuthTokenURL: fmt.Sprintf("http://%s/default/token", oauthServer),
}
kafkaConfig, err := NewConfig(config)
opts, err := NewConfig(r, config)
if err != nil {
t.Fatalf("NewConfig() error:\n%+v", err)
}
if err := kafkaConfig.Validate(); err != nil {
t.Fatalf("Validate() error:\n%+v", err)
}
client, err := sarama.NewClient([]string{broker}, kafkaConfig)
client, err := kgo.NewClient(opts...)
if err != nil {
t.Fatalf("sarama.NewClient() error:\n%+v", err)
t.Fatalf("kgo.NewClient() error:\n%+v", err)
}
if err := client.RefreshMetadata(); err != nil {
t.Fatalf("client.RefreshMetadata() error:\n%+v", err)
defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := client.Ping(ctx); err != nil {
t.Fatalf("client.Ping() error:\n%+v", err)
}
}

View File

@@ -1,32 +0,0 @@
// SPDX-FileCopyrightText: 2013 Shopify
// SPDX-License-Identifier: MIT
// From https://github.com/IBM/sarama/blob/main/examples/sasl_scram_client/scram_client.go
package kafka
import "github.com/xdg-go/scram"
type xdgSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}
func (x *xdgSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}
func (x *xdgSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}
func (x *xdgSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}

View File

@@ -1,50 +1,66 @@
// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
// SPDX-FileCopyrightText: 2020 Travis Bischel
// SPDX-License-Identifier: AGPL-3.0-only AND BSD-3-Clause
//go:build !release
package kafka
import (
"context"
"encoding/binary"
"fmt"
"testing"
"time"
"github.com/IBM/sarama"
"akvorado/common/helpers"
"akvorado/common/reporter"
"github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
)
// SetupKafkaBroker configures a client to use for testing.
func SetupKafkaBroker(t *testing.T) (sarama.Client, []string) {
func SetupKafkaBroker(t *testing.T) (*kgo.Client, []string) {
broker := helpers.CheckExternalService(t, "Kafka",
[]string{"kafka:9092", "127.0.0.1:9092"})
// Wait for broker to be ready
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_8_1_0
saramaConfig.Net.DialTimeout = 1 * time.Second
saramaConfig.Net.ReadTimeout = 1 * time.Second
saramaConfig.Net.WriteTimeout = 1 * time.Second
saramaConfig.Producer.Return.Successes = true
ready := false
var (
client sarama.Client
err error
r := reporter.NewMock(t)
opts, err := NewConfig(r, Configuration{
Brokers: []string{broker},
TLS: helpers.TLSConfiguration{
Enable: false,
Verify: true,
},
})
if err != nil {
t.Fatalf("NewConfig() error: %v", err)
}
// Add additional options for testing
opts = append(opts,
kgo.RequestTimeoutOverhead(1*time.Second),
kgo.ProduceRequestTimeout(1*time.Second),
kgo.ConnIdleTimeout(1*time.Second),
)
ready := false
var client *kgo.Client
for i := 0; i < 90 && !ready; i++ {
if client != nil {
client.Close()
}
if client, err = sarama.NewClient([]string{broker}, saramaConfig); err != nil {
if client, err = kgo.NewClient(opts...); err != nil {
continue
}
if err := client.RefreshMetadata(); err != nil {
continue
}
brokers := client.Brokers()
if len(brokers) == 0 {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := client.Ping(ctx); err != nil {
cancel()
continue
}
cancel()
ready = true
}
if !ready {
@@ -53,3 +69,69 @@ func SetupKafkaBroker(t *testing.T) (sarama.Client, []string) {
return client, []string{broker}
}
// forEachBatchRecord iterates through all records in a record batch. This
// function is stolen from franz-go/pkg/kfake/data.go.
func forEachBatchRecord(batch kmsg.RecordBatch, cb func(kmsg.Record) error) error {
records, err := kgo.DefaultDecompressor().Decompress(
batch.Records,
kgo.CompressionCodecType(batch.Attributes&0x0007),
)
if err != nil {
return err
}
for range batch.NumRecords {
rec := kmsg.NewRecord()
err := rec.ReadFrom(records)
if err != nil {
return fmt.Errorf("corrupt batch: %w", err)
}
if err := cb(rec); err != nil {
return err
}
length, amt := binary.Varint(records)
records = records[length+int64(amt):]
}
if len(records) > 0 {
return fmt.Errorf("corrupt batch, extra left over bytes after parsing batch: %v", len(records))
}
return nil
}
// InterceptMessages sets up a ControlKey to intercept all messages produced to a fake cluster
// and calls the callback function for each record received.
func InterceptMessages(t *testing.T, cluster *kfake.Cluster, callback func(*kgo.Record)) {
t.Helper()
// Use ControlKey to intercept ProduceRequest messages
cluster.ControlKey(0, func(kreq kmsg.Request) (kmsg.Response, error, bool) {
cluster.KeepControl()
if req, ok := kreq.(*kmsg.ProduceRequest); ok {
for _, topicData := range req.Topics {
for _, partitionData := range topicData.Partitions {
if partitionData.Records != nil {
var batch kmsg.RecordBatch
if err := batch.ReadFrom(partitionData.Records); err != nil {
t.Fatalf("batch.ReadFrom() error:\n%+v", err)
}
if err := forEachBatchRecord(batch, func(rec kmsg.Record) error {
kgoRecord := &kgo.Record{
Topic: topicData.Topic,
Partition: partitionData.Partition,
Key: rec.Key,
Value: rec.Value,
}
callback(kgoRecord)
return nil
}); err != nil {
t.Fatalf("forEachBatchRecord() error:\n%+v", err)
}
}
}
}
}
// Don't modify the response, just let it pass through
return nil, nil, false
})
}

View File

@@ -6,7 +6,6 @@
kafka:
topic: flows
version: 3.3.1
brokers:
- kafka:9092
topic-configuration:

View File

@@ -1,6 +1,4 @@
---
kafka:
compression-codec: zstd
flow:
inputs:
- type: udp

View File

@@ -126,17 +126,9 @@ The following keys are accepted:
configuration for the [orchestrator service](#kafka-2) (the values of these
keys are copied from the orchestrator configuration, unless `brokers` is
explicitely set)
- `flush-interval` defines the maximum flush interval to send received
flows to Kafka
- `flush-bytes` defines the maximum number of bytes to store before
flushing flows to Kafka
- `max-message-bytes` defines the maximum size of a message (it should
be equal or smaller to the same setting in the broker configuration)
- `compression-codec` defines the compression codec to use to compress
messages (`none`, `gzip`, `snappy`, `lz4` and `zstd`)
- `queue-size` defines the size of the internal queues to send
messages to Kafka. Increasing this value will improve performance,
at the cost of losing messages in case of problems.
messages: `none`, `gzip`, `snappy`, `lz4` (the default)*, or `zstd`.
- `queue-size` defines the maximum number of messages to buffer to Kafka.
The topic name is automatically suffixed by a version number, in case the
protobuf schema changes in a backward-incompatible way.
@@ -159,13 +151,9 @@ keys are accepted:
explicitly set)
- `workers` defines the number of Kafka workers to use
- `consumer-group` defines the consumer group ID for Kafka consumption
- `max-message-bytes` defines the maximum size of a message (it should
be equal or smaller to the same setting in the broker configuration)
- `fetch-min-bytes` defines the minimum number of bytes to fetch from Kafka
- `fetch-max-wait-time` defines how much maximum time to wait for the minimum
number of bytes to become available
- `queue-size` defines the size of the internal queues to receive messages to
Kafka.
### Routing

View File

@@ -105,14 +105,15 @@ happen before writing to ClickHouse.
## Kafka
The Kafka component relies on [Sarama](https://github.com/IBM/sarama). It is
tested using the mock interface provided by this package. *Sarama* uses
`go-metrics` to store metrics. We convert them to Prometheus to keep them. The
logger is global and there is a hack to plug it into the reporter design we
have.
The Kafka component relies on [franz-go](https://github.com/twmb/franz-go). If a
real broker is available under the DNS name `kafka` or at `localhost` on port
9092, it will be used for a quick functional test.
If a real broker is available under the DNS name `kafka` or at
`localhost` on port 9092, it will be used for a quick functional test.
This library did not get benchmarked. Previously, we were using
[Sarama](https://github.com/IBM/sarama). However, the documentation is quite
poor, it relies heavily on pointers (pressure on the garbage collector) and the
concurrency model is difficult to understand. Another contender could be
[kafka-go](https://github.com/segmentio/kafka-go).
## ClickHouse

21
go.mod
View File

@@ -5,7 +5,6 @@ go 1.24
require (
github.com/ClickHouse/ch-go v0.66.1
github.com/ClickHouse/clickhouse-go/v2 v2.37.2
github.com/IBM/sarama v1.45.2
github.com/alecthomas/chroma v0.10.0
github.com/benbjohnson/clock v1.3.5
github.com/bio-routing/bio-rd v0.1.10
@@ -38,13 +37,16 @@ require (
github.com/osrg/gobgp/v3 v3.37.0
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10
github.com/prometheus/client_golang v1.22.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/rs/zerolog v1.34.0
github.com/scrapli/scrapligo v1.3.3
github.com/slayercat/GoSNMPServer v0.5.2
github.com/spf13/cobra v1.9.1
github.com/ti-mo/conntrack v0.5.2
github.com/xdg-go/scram v1.1.2
github.com/twmb/franz-go v1.19.5
github.com/twmb/franz-go/pkg/kadm v1.16.0
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250711145744-a849b8be17b7
github.com/twmb/franz-go/pkg/kmsg v1.11.2
github.com/twmb/franz-go/plugin/kprom v1.2.1
github.com/yuin/goldmark v1.7.12
github.com/yuin/goldmark-highlighting v0.0.0-20220208100518-594be1970594
go.uber.org/mock v0.5.2
@@ -90,8 +92,6 @@ require (
github.com/dnephin/pflag v1.0.7 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/emicklei/dot v0.15.0 // indirect
github.com/fatih/color v1.18.0 // indirect
github.com/fatih/structtag v1.2.0 // indirect
@@ -111,26 +111,17 @@ require (
github.com/goccy/go-json v0.10.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/google/licensecheck v0.3.1 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/itchyny/timefmt-go v0.1.6 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.6.0 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jellydator/ttlcache/v2 v2.11.1 // indirect
github.com/jessevdk/go-flags v1.5.0 // indirect
github.com/jhump/protoreflect v1.17.0 // indirect
@@ -182,8 +173,6 @@ require (
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect

53
go.sum
View File

@@ -15,8 +15,6 @@ github.com/ClickHouse/ch-go v0.66.1 h1:LQHFslfVYZsISOY0dnOYOXGkOUvpv376CCm8g7W74
github.com/ClickHouse/ch-go v0.66.1/go.mod h1:NEYcg3aOFv2EmTJfo4m2WF7sHB/YFbLUuIWv9iq76xY=
github.com/ClickHouse/clickhouse-go/v2 v2.37.2 h1:wRLNKoynvHQEN4znnVHNLaYnrqVc9sGJmGYg+GGCfto=
github.com/ClickHouse/clickhouse-go/v2 v2.37.2/go.mod h1:pH2zrBGp5Y438DMwAxXMm1neSXPPjSI7tD4MURVULw8=
github.com/IBM/sarama v1.45.2 h1:8m8LcMCu3REcwpa7fCP6v2fuPuzVwXDAM2DOv3CBrKw=
github.com/IBM/sarama v1.45.2/go.mod h1:ppaoTcVdGv186/z6MEKsMm70A5fwJfRTpstI37kVn3Y=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/alecthomas/chroma v0.10.0 h1:7XDcGkCQopCNKjZHfYrNLraA+M7e0fMiJ/Mfikbfjek=
@@ -96,10 +94,6 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/emicklei/dot v0.15.0 h1:XDBW0Xco1QNyRb33cqLe10cT04yMWL1XpCZfa98Q6Og=
github.com/emicklei/dot v0.15.0/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s=
github.com/expr-lang/expr v1.17.5 h1:i1WrMvcdLF249nSNlpQZN1S6NXuW9WaOfF5tPi3aw3k=
@@ -110,8 +104,6 @@ github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/frapposelli/wwhrd v0.4.0 h1:Vn4hjT/tHNeOnTxFBO0ys1NBH8/Inxqqi1Q0eJmCImo=
github.com/frapposelli/wwhrd v0.4.0/go.mod h1:Bzwvr3hY1yoBsBbIMkckeHUI6jf1cLRueaaMxZ3N9FY=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
@@ -180,8 +172,6 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
@@ -204,8 +194,6 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaU
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gosnmp/gosnmp v1.36.2-0.20231009064202-d306ed5aa998/go.mod h1:O938QjIS4vpSag1UTcnnBq9MfNmimuOGtvQsT1NbErc=
github.com/gosnmp/gosnmp v1.41.0 h1:6RI78g2ZsbLvpvJegcV98LapszRQnbvYNKSa5WbCll4=
github.com/gosnmp/gosnmp v1.41.0/go.mod h1:CxVS6bXqmWZlafUj9pZUnQX5e4fAltqPcijxWpCitDo=
@@ -215,14 +203,6 @@ github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwn
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY=
github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
@@ -241,18 +221,6 @@ github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg=
github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo=
github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o=
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8=
github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jellydator/ttlcache/v2 v2.11.1 h1:AZGME43Eh2Vv3giG6GeqeLeFXxwxn1/qHItqWZl6U64=
github.com/jellydator/ttlcache/v2 v2.11.1/go.mod h1:RtE5Snf0/57e+2cLWFYWCCsLas2Hy3c5Z4n14XmSvTI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
@@ -395,8 +363,6 @@ github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkq
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
@@ -468,6 +434,16 @@ github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+F
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y=
github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM=
github.com/twmb/franz-go/pkg/kadm v1.16.0 h1:STMs1t5lYR5mR974PSiwNzE5TvsosByTp+rKXLOhAjE=
github.com/twmb/franz-go/pkg/kadm v1.16.0/go.mod h1:MUdcUtnf9ph4SFBLLA/XxE29rvLhWYLM9Ygb8dfSCvw=
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250711145744-a849b8be17b7 h1:SmVArSUtiB+bsqMjHtqemjL1YCj4L74NSiOxjtwAJ/o=
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250711145744-a849b8be17b7/go.mod h1:udxwmMC3r4xqjwrSrMi8p9jpqMDNpC2YwexpDSUmQtw=
github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg=
github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE=
github.com/twmb/franz-go/plugin/kprom v1.2.1 h1:FGWdneW9htySYmvJ5tEuAIZepjFOuTFhHLy5TrVR+QI=
github.com/twmb/franz-go/plugin/kprom v1.2.1/go.mod h1:+dzpKnVE6By8BDRFj240dTDJS9bP2dngmuhv7egJ3Go=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
@@ -475,14 +451,9 @@ github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZ
github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
@@ -536,7 +507,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
@@ -555,7 +525,6 @@ golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
@@ -565,7 +534,6 @@ golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
@@ -628,7 +596,6 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=

View File

@@ -6,10 +6,12 @@ package flow
import (
"errors"
"sync"
"gopkg.in/tomb.v2"
"akvorado/common/daemon"
"akvorado/common/helpers"
"akvorado/common/httpserver"
"akvorado/common/pb"
"akvorado/common/reporter"
@@ -24,8 +26,8 @@ type Component struct {
t tomb.Tomb
config Configuration
// Inputs
inputs []input.Input
payloadPool sync.Pool
}
// Dependencies are the dependencies of the flow component.
@@ -46,6 +48,17 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
d: &dependencies,
config: configuration,
inputs: make([]input.Input, len(configuration.Inputs)),
payloadPool: sync.Pool{
New: func() any {
minPayload := 2000
if helpers.Testing() {
// Ensure we test the extension case.
minPayload = 5
}
s := make([]byte, minPayload)
return &s
},
},
}
// Initialize inputs
@@ -68,8 +81,26 @@ func (c *Component) Send(config InputConfiguration) input.SendFunc {
flow.TimestampSource = config.TimestampSource
flow.Decoder = config.Decoder
flow.UseSourceAddress = config.UseSrcAddrForExporterAddr
if bytes, err := flow.MarshalVT(); err == nil {
c.d.Kafka.Send(exporter, bytes)
// Get a payload from the pool and extend it if needed. We use a pool of
// pointers to slice as we may have to extend the capacity of the slice.
// We keep the original pointer to avoid an extra allocation on the heap
// when returning the slice to the pool.
ptr := c.payloadPool.Get().(*[]byte)
bytes := *ptr
n := flow.SizeVT()
if cap(bytes) < n {
bytes = make([]byte, n+100)
*ptr = bytes
}
// Marshal to it, send it to Kafka and return it when done
if n, err := flow.MarshalToSizedBufferVT(bytes[:n]); err == nil {
c.d.Kafka.Send(exporter, bytes[:n], func() {
c.payloadPool.Put(ptr)
})
} else {
c.payloadPool.Put(ptr)
}
}
}

View File

@@ -8,18 +8,20 @@ import (
"fmt"
"path"
"runtime"
"sync"
"testing"
"time"
"akvorado/common/daemon"
"akvorado/common/helpers"
"akvorado/common/httpserver"
kafkaCommon "akvorado/common/kafka"
"akvorado/common/pb"
"akvorado/common/reporter"
"akvorado/inlet/flow/input/file"
"akvorado/inlet/kafka"
"github.com/IBM/sarama"
"github.com/twmb/franz-go/pkg/kgo"
)
func TestFlow(t *testing.T) {
@@ -43,35 +45,39 @@ func TestFlow(t *testing.T) {
config := DefaultConfiguration()
config.Inputs = inputs
producer, mockProducer := kafka.NewMock(t, r, kafka.DefaultConfiguration())
producer, cluster := kafka.NewMock(t, r, kafka.DefaultConfiguration())
defer cluster.Close()
// Use the new helper to intercept messages
var mu sync.Mutex
helloCount := 0
byeCount := 0
totalCount := 0
done := make(chan bool)
for i := range 100 {
mockProducer.ExpectInputWithMessageCheckerFunctionAndSucceed(func(got *sarama.ProducerMessage) error {
if i == 99 {
defer close(done)
kafkaCommon.InterceptMessages(t, cluster, func(record *kgo.Record) {
mu.Lock()
defer mu.Unlock()
// Check topic
expectedTopic := fmt.Sprintf("flows-v%d", pb.Version)
if record.Topic != expectedTopic {
t.Errorf("Expected topic %s, got %s", expectedTopic, record.Topic)
return
}
expected := sarama.ProducerMessage{
Topic: fmt.Sprintf("flows-v%d", pb.Version),
Key: got.Key,
Value: got.Value,
Partition: got.Partition,
// Count messages based on content
if bytes.Contains(record.Value, []byte("hello world!")) {
helloCount++
} else if bytes.Contains(record.Value, []byte("bye bye")) {
byeCount++
}
if diff := helpers.Diff(got, expected); diff != "" {
t.Fatalf("Send() (-got, +want):\n%s", diff)
totalCount++
if totalCount >= 100 {
close(done)
}
val, _ := got.Value.Encode()
if i%2 == 0 {
if !bytes.Contains(val, []byte("hello world!")) {
t.Fatalf("Send() did not return %q", "hello world!")
}
} else {
if !bytes.Contains(val, []byte("bye bye")) {
t.Fatalf("Send() did not return %q", "bye bye")
}
}
return nil
})
}
c, err := New(r, config, Dependencies{
Daemon: daemon.NewMock(t),
@@ -86,6 +92,15 @@ func TestFlow(t *testing.T) {
// Wait for flows
select {
case <-done:
// Check that we got the expected number of each message type
mu.Lock()
if helloCount != 50 {
t.Errorf("Expected 50 'hello world!' messages, got %d", helloCount)
}
if byeCount != 50 {
t.Errorf("Expected 50 'bye bye' messages, got %d", byeCount)
}
mu.Unlock()
case <-time.After(time.Second):
t.Fatalf("flows not received")
}

View File

@@ -4,27 +4,20 @@
package kafka
import (
"time"
"fmt"
"github.com/IBM/sarama"
"github.com/twmb/franz-go/pkg/kgo"
"akvorado/common/helpers"
"akvorado/common/kafka"
)
// Configuration describes the configuration for the Kafka exporter.
type Configuration struct {
kafka.Configuration `mapstructure:",squash" yaml:"-,inline"`
// FlushInterval tells how often to flush pending data to Kafka.
FlushInterval time.Duration `validate:"min=100ms"`
// FlushBytes tells to flush when there are many bytes to write
FlushBytes int `validate:"min=1000"`
// MaxMessageBytes is the maximum permitted size of a message.
// Should be set equal or smaller than broker's
// `message.max.bytes`.
MaxMessageBytes int `validate:"min=1"`
// CompressionCodec defines the compression to use.
CompressionCodec CompressionCodec
// QueueSize defines the size of the channel used to send to Kafka.
// QueueSize defines the maximum number of messages to buffer.
QueueSize int `validate:"min=1"`
}
@@ -32,28 +25,62 @@ type Configuration struct {
func DefaultConfiguration() Configuration {
return Configuration{
Configuration: kafka.DefaultConfiguration(),
FlushInterval: time.Second,
FlushBytes: int(sarama.MaxRequestSize) - 1,
MaxMessageBytes: 1_000_000,
CompressionCodec: CompressionCodec(sarama.CompressionNone),
CompressionCodec: CompressionCodec(kgo.Lz4Compression()),
QueueSize: 32,
}
}
// CompressionCodec represents a compression codec.
type CompressionCodec sarama.CompressionCodec
type CompressionCodec kgo.CompressionCodec
// UnmarshalText produces a compression codec
func (cc *CompressionCodec) UnmarshalText(text []byte) error {
return (*sarama.CompressionCodec)(cc).UnmarshalText(text)
codec := kgo.CompressionCodec{}
switch string(text) {
case "none":
codec = kgo.NoCompression()
case "gzip":
codec = kgo.GzipCompression()
case "snappy":
codec = kgo.SnappyCompression()
case "lz4":
codec = kgo.Lz4Compression()
case "zstd":
codec = kgo.ZstdCompression()
default:
return fmt.Errorf("unknown compression codec: %s", text)
}
*cc = CompressionCodec(codec)
return nil
}
// String turns a compression codec into a string
func (cc CompressionCodec) String() string {
return sarama.CompressionCodec(cc).String()
switch kgo.CompressionCodec(cc) {
case kgo.NoCompression():
return "none"
case kgo.GzipCompression():
return "gzip"
case kgo.SnappyCompression():
return "snappy"
case kgo.Lz4Compression():
return "lz4"
case kgo.ZstdCompression():
return "zstd"
default:
return "unknown"
}
}
// MarshalText turns a compression codec into a string
func (cc CompressionCodec) MarshalText() ([]byte, error) {
return []byte(cc.String()), nil
}
func init() {
helpers.RegisterMapstructureDeprecatedFields[Configuration](
"FlushInterval", // bad for performance
"FlushBytes", // duplicate with QueueSize
"MaxMessageBytes", // just tune QueueSize instead
)
}

View File

@@ -8,19 +8,21 @@ import (
"akvorado/common/helpers"
"github.com/IBM/sarama"
"github.com/twmb/franz-go/pkg/kgo"
)
func TestCompressionCodecUnmarshal(t *testing.T) {
cases := []struct {
Input string
Expected sarama.CompressionCodec
Expected kgo.CompressionCodec
ExpectedError bool
}{
{"none", sarama.CompressionNone, false},
{"zstd", sarama.CompressionZSTD, false},
{"gzip", sarama.CompressionGZIP, false},
{"unknown", sarama.CompressionNone, true},
{"none", kgo.NoCompression(), false},
{"zstd", kgo.ZstdCompression(), false},
{"gzip", kgo.GzipCompression(), false},
{"snappy", kgo.SnappyCompression(), false},
{"lz4", kgo.Lz4Compression(), false},
{"unknown", kgo.NoCompression(), true},
}
for _, tc := range cases {
var cmp CompressionCodec
@@ -33,10 +35,13 @@ func TestCompressionCodecUnmarshal(t *testing.T) {
t.Errorf("UnmarshalText(%q) got %v but expected error", tc.Input, cmp)
continue
}
if cmp != CompressionCodec(tc.Expected) {
if !tc.ExpectedError && cmp != CompressionCodec(tc.Expected) {
t.Errorf("UnmarshalText(%q) got %v but expected %v", tc.Input, cmp, tc.Expected)
continue
}
if !tc.ExpectedError && cmp.String() != tc.Input {
t.Errorf("String() got %q but expected %q", cmp.String(), tc.Input)
}
}
}

View File

@@ -4,13 +4,14 @@
package kafka
import (
"errors"
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/IBM/sarama"
"github.com/twmb/franz-go/pkg/kgo"
"akvorado/common/daemon"
"akvorado/common/helpers"
@@ -21,14 +22,13 @@ import (
func TestRealKafka(t *testing.T) {
client, brokers := kafka.SetupKafkaBroker(t)
defer client.Close()
topicName := fmt.Sprintf("test-topic-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
configuration := DefaultConfiguration()
configuration.Topic = topicName
configuration.Brokers = brokers
configuration.Version = kafka.Version(sarama.V2_8_1_0)
configuration.FlushInterval = 100 * time.Millisecond
r := reporter.NewMock(t)
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
@@ -43,54 +43,70 @@ func TestRealKafka(t *testing.T) {
msg1[i] = letters[rand.Intn(len(letters))]
}
for i := range msg2 {
msg1[i] = letters[rand.Intn(len(letters))]
msg2[i] = letters[rand.Intn(len(letters))]
}
var wg sync.WaitGroup
wg.Add(2)
c.Send("127.0.0.1", msg1, func() { wg.Done() })
c.Send("127.0.0.1", msg2, func() { wg.Done() })
c.Flush(t)
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("Send() timeout")
}
c.Send("127.0.0.1", msg1)
c.Send("127.0.0.1", msg2)
time.Sleep(10 * time.Millisecond)
gotMetrics := r.GetMetrics("akvorado_inlet_kafka_", "sent_")
gotMetrics := r.GetMetrics("akvorado_inlet_kafka_", "sent_", "connects_")
expectedMetrics := map[string]string{
// Our own metrics
`sent_bytes_total{exporter="127.0.0.1"}`: "100",
`sent_messages_total{exporter="127.0.0.1"}`: "2",
// From franz-go
`connects_total{node_id="1"}`: "2",
`connects_total{node_id="seed_0"}`: "1",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
// Try to consume the two messages
consumer, err := sarama.NewConsumerFromClient(client)
// Try to consume the two messages using franz-go
consumer, err := kgo.NewClient(
kgo.SeedBrokers(brokers...),
kgo.ConsumeTopics(expectedTopicName),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
kgo.FetchMaxWait(10*time.Millisecond),
)
if err != nil {
t.Fatalf("NewConsumerGroup() error:\n%+v", err)
t.Fatalf("NewClient() error:\n%+v", err)
}
defer consumer.Close()
var partitions []int32
for {
partitions, err = consumer.Partitions(expectedTopicName)
if err != nil {
if errors.Is(err, sarama.ErrUnknownTopicOrPartition) {
// Wait for topic to be available
continue
}
t.Fatalf("Partitions() error:\n%+v", err)
}
break
}
partitionConsumer, err := consumer.ConsumePartition(expectedTopicName, partitions[0], sarama.OffsetOldest)
if err != nil {
t.Fatalf("ConsumePartitions() error:\n%+v", err)
}
got := []string{}
expected := []string{string(msg1), string(msg2)}
timeout := time.After(15 * time.Second)
for range len(expected) {
for len(got) < len(expected) {
select {
case msg := <-partitionConsumer.Messages():
got = append(got, string(msg.Value))
case err := <-partitionConsumer.Errors():
t.Fatalf("consumer.Errors():\n%+v", err)
case <-timeout:
t.Fatalf("Timeout waiting for messages. Got %d of %d messages", len(got), len(expected))
default:
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
fetches := consumer.PollFetches(ctx)
cancel()
if errs := fetches.Errors(); len(errs) > 0 {
t.Logf("PollFetches() error: %+v", err)
time.Sleep(100 * time.Millisecond)
continue
}
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
for _, record := range p.Records {
got = append(got, string(record.Value))
}
})
}
}

View File

@@ -4,7 +4,6 @@
package kafka
import (
"akvorado/common/kafka"
"akvorado/common/reporter"
)
@@ -12,8 +11,6 @@ type metrics struct {
messagesSent *reporter.CounterVec
bytesSent *reporter.CounterVec
errors *reporter.CounterVec
kafkaMetrics kafka.Metrics
}
func (c *Component) initMetrics() {
@@ -38,6 +35,4 @@ func (c *Component) initMetrics() {
},
[]string{"error"},
)
c.metrics.kafkaMetrics.Init(c.r, c.kafkaConfig.MetricRegistry)
}

View File

@@ -5,13 +5,16 @@
package kafka
import (
"context"
"encoding/binary"
"fmt"
"math/rand"
"strings"
"time"
"github.com/IBM/sarama"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/plugin/kprom"
"gopkg.in/tomb.v2"
"akvorado/common/daemon"
@@ -27,10 +30,10 @@ type Component struct {
t tomb.Tomb
config Configuration
kafkaConfig *sarama.Config
kafkaOpts []kgo.Opt
kafkaTopic string
kafkaProducer sarama.AsyncProducer
createKafkaProducer func() (sarama.AsyncProducer, error)
kafkaClient *kgo.Client
errLogger reporter.Logger
metrics metrics
}
@@ -40,37 +43,37 @@ type Dependencies struct {
}
// New creates a new Kafka exporter component.
func New(reporter *reporter.Reporter, configuration Configuration, dependencies Dependencies) (*Component, error) {
func New(r *reporter.Reporter, configuration Configuration, dependencies Dependencies) (*Component, error) {
// Build Kafka configuration
kafkaConfig, err := kafka.NewConfig(configuration.Configuration)
kafkaOpts, err := kafka.NewConfig(r, configuration.Configuration)
if err != nil {
return nil, err
}
kafkaConfig.Metadata.AllowAutoTopicCreation = true
kafkaConfig.Producer.MaxMessageBytes = configuration.MaxMessageBytes
kafkaConfig.Producer.Compression = sarama.CompressionCodec(configuration.CompressionCodec)
kafkaConfig.Producer.Return.Successes = false
kafkaConfig.Producer.Return.Errors = true
kafkaConfig.Producer.Flush.Bytes = configuration.FlushBytes
kafkaConfig.Producer.Flush.Frequency = configuration.FlushInterval
kafkaConfig.Producer.Partitioner = sarama.NewHashPartitioner
kafkaConfig.ChannelBufferSize = configuration.QueueSize
if err := kafkaConfig.Validate(); err != nil {
return nil, fmt.Errorf("cannot validate Kafka configuration: %w", err)
}
c := Component{
r: reporter,
r: r,
d: &dependencies,
config: configuration,
kafkaConfig: kafkaConfig,
kafkaTopic: fmt.Sprintf("%s-v%d", configuration.Topic, pb.Version),
errLogger: r.Sample(reporter.BurstSampler(10*time.Second, 3)),
}
c.initMetrics()
c.createKafkaProducer = func() (sarama.AsyncProducer, error) {
return sarama.NewAsyncProducer(c.config.Brokers, c.kafkaConfig)
kafkaMetrics := kprom.NewMetrics("")
r.MetricCollectorForCurrentModule(kafkaMetrics)
// Initialize options error to be able to validate them.
kafkaOpts = append(kafkaOpts,
kgo.AllowAutoTopicCreation(),
kgo.MaxBufferedRecords(configuration.QueueSize),
kgo.ProducerBatchCompression(kgo.CompressionCodec(configuration.CompressionCodec)),
kgo.RecordPartitioner(kgo.UniformBytesPartitioner(64<<20, true, true, nil)),
kgo.WithHooks(kafkaMetrics),
)
if err := kgo.ValidateOpts(kafkaOpts...); err != nil {
return nil, fmt.Errorf("invalid Kafka configuration: %w", err)
}
c.kafkaOpts = kafkaOpts
c.d.Daemon.Track(&c.t, "inlet/kafka")
return &c, nil
}
@@ -78,64 +81,58 @@ func New(reporter *reporter.Reporter, configuration Configuration, dependencies
// Start starts the Kafka component.
func (c *Component) Start() error {
c.r.Info().Msg("starting Kafka component")
kafka.GlobalKafkaLogger.Register(c.r)
// Create producer
kafkaProducer, err := c.createKafkaProducer()
kafkaClient, err := kgo.NewClient(c.kafkaOpts...)
if err != nil {
c.r.Err(err).
Str("brokers", strings.Join(c.config.Brokers, ",")).
Msg("unable to create async producer")
return fmt.Errorf("unable to create Kafka async producer: %w", err)
Msg("unable to create Kafka client")
return fmt.Errorf("unable to create Kafka client: %w", err)
}
c.kafkaProducer = kafkaProducer
c.kafkaClient = kafkaClient
// Main loop
// When dying, close the client
c.t.Go(func() error {
defer kafkaProducer.Close()
errLogger := c.r.Sample(reporter.BurstSampler(10*time.Second, 3))
dying := c.t.Dying()
for {
select {
case <-dying:
c.r.Debug().Msg("stop error logger")
<-c.t.Dying()
kafkaClient.Close()
return nil
case msg := <-kafkaProducer.Errors():
if msg != nil {
c.metrics.errors.WithLabelValues(msg.Error()).Inc()
errLogger.Err(msg.Err).
Str("topic", msg.Msg.Topic).
Int64("offset", msg.Msg.Offset).
Int32("partition", msg.Msg.Partition).
Msg("Kafka producer error")
}
}
}
})
return nil
}
// Stop stops the Kafka component
func (c *Component) Stop() error {
defer func() {
c.kafkaConfig.MetricRegistry.UnregisterAll()
kafka.GlobalKafkaLogger.Unregister()
c.r.Info().Msg("Kafka component stopped")
}()
defer c.r.Info().Msg("Kafka component stopped")
c.r.Info().Msg("stopping Kafka component")
c.t.Kill(nil)
return c.t.Wait()
}
// Send a message to Kafka.
func (c *Component) Send(exporter string, payload []byte) {
c.metrics.bytesSent.WithLabelValues(exporter).Add(float64(len(payload)))
c.metrics.messagesSent.WithLabelValues(exporter).Inc()
func (c *Component) Send(exporter string, payload []byte, finalizer func()) {
key := make([]byte, 4)
binary.BigEndian.PutUint32(key, rand.Uint32())
c.kafkaProducer.Input() <- &sarama.ProducerMessage{
record := &kgo.Record{
Topic: c.kafkaTopic,
Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(payload),
Key: key,
Value: payload,
}
c.kafkaClient.Produce(context.Background(), record, func(r *kgo.Record, err error) {
if err == nil {
c.metrics.bytesSent.WithLabelValues(exporter).Add(float64(len(payload)))
c.metrics.messagesSent.WithLabelValues(exporter).Inc()
} else {
if ke, ok := err.(*kerr.Error); ok {
c.metrics.errors.WithLabelValues(ke.Message).Inc()
} else {
c.metrics.errors.WithLabelValues("unknown").Inc()
}
c.errLogger.Err(err).
Str("topic", c.kafkaTopic).
Int64("offset", r.Offset).
Int32("partition", r.Partition).
Msg("Kafka producer error")
}
finalizer()
})
}

View File

@@ -4,15 +4,18 @@
package kafka
import (
"errors"
"context"
"fmt"
"slices"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/IBM/sarama"
gometrics "github.com/rcrowley/go-metrics"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"akvorado/common/daemon"
"akvorado/common/helpers"
"akvorado/common/pb"
"akvorado/common/reporter"
@@ -20,85 +23,100 @@ import (
func TestKafka(t *testing.T) {
r := reporter.NewMock(t)
c, mockProducer := NewMock(t, r, DefaultConfiguration())
// Send one message
received := make(chan bool)
mockProducer.ExpectInputWithMessageCheckerFunctionAndSucceed(func(got *sarama.ProducerMessage) error {
defer close(received)
topic := fmt.Sprintf("flows-v%d", pb.Version)
expected := sarama.ProducerMessage{
Topic: topic,
Key: got.Key,
Value: sarama.ByteEncoder("hello world!"),
Partition: got.Partition,
config := DefaultConfiguration()
config.QueueSize = 1
c, mock := NewMock(t, r, config)
defer mock.Close()
// Inject an error on third message.
var count atomic.Uint32
mock.ControlKey(0, func(kreq kmsg.Request) (kmsg.Response, error, bool) {
mock.KeepControl()
current := count.Add(1)
if current != 3 {
t.Logf("message %d: ok", current)
return nil, nil, false
}
if diff := helpers.Diff(got, expected); diff != "" {
t.Logf("mesage %d: error", current)
req := kreq.(*kmsg.ProduceRequest)
resp := kreq.ResponseKind().(*kmsg.ProduceResponse)
for _, rt := range req.Topics {
st := kmsg.NewProduceResponseTopic()
st.Topic = rt.Topic
for _, rp := range rt.Partitions {
sp := kmsg.NewProduceResponseTopicPartition()
sp.Partition = rp.Partition
sp.ErrorCode = kerr.CorruptMessage.Code
st.Partitions = append(st.Partitions, sp)
}
resp.Topics = append(resp.Topics, st)
}
return resp, nil, true
})
// Send messages
var wg sync.WaitGroup
wg.Add(4)
c.Send("127.0.0.1", []byte("hello world!"), func() { wg.Done() })
c.Send("127.0.0.1", []byte("goodbye world!"), func() { wg.Done() })
c.Send("127.0.0.1", []byte("nooooo!"), func() { wg.Done() })
c.Send("127.0.0.1", []byte("all good"), func() { wg.Done() })
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("Send() timeout")
}
expectedMessages := []string{"hello world!", "goodbye world!", "all good"}
// Create consumer to check messages
consumer, err := kgo.NewClient(
kgo.SeedBrokers(mock.ListenAddrs()...),
kgo.ConsumeTopics(topic),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
)
if err != nil {
t.Fatalf("NewClient() error:\n%+v", err)
}
defer consumer.Close()
// Consume messages
messages := make([]string, 0)
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second)
defer cancel()
for {
if len(messages) >= len(expectedMessages) {
break
}
fetches := consumer.PollFetches(ctx)
if errs := fetches.Errors(); len(errs) > 0 {
t.Fatalf("PollFetches() error:\n%+v", errs)
}
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
for _, record := range p.Records {
messages = append(messages, string(record.Value))
}
})
}
slices.Sort(expectedMessages)
slices.Sort(messages)
if diff := helpers.Diff(messages, expectedMessages); diff != "" {
t.Fatalf("Send() (-got, +want):\n%s", diff)
}
return nil
})
c.Send("127.0.0.1", []byte("hello world!"))
select {
case <-received:
case <-time.After(1 * time.Second):
t.Fatal("Kafka message not received")
}
// Another but with a fail
mockProducer.ExpectInputAndFail(errors.New("noooo"))
c.Send("127.0.0.1", []byte("goodbye world!"))
time.Sleep(10 * time.Millisecond)
gotMetrics := r.GetMetrics("akvorado_inlet_kafka_")
gotMetrics := r.GetMetrics("akvorado_inlet_kafka_", "sent_", "errors")
expectedMetrics := map[string]string{
`sent_bytes_total{exporter="127.0.0.1"}`: "26",
`errors_total{error="kafka: Failed to produce message to topic flows-v5: noooo"}`: "1",
`sent_messages_total{exporter="127.0.0.1"}`: "2",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
}
func TestKafkaMetrics(t *testing.T) {
r := reporter.NewMock(t)
c, err := New(r, DefaultConfiguration(), Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
// Manually put some metrics
gometrics.GetOrRegisterMeter("incoming-byte-rate-for-broker-1111", c.kafkaConfig.MetricRegistry).
Mark(100)
gometrics.GetOrRegisterMeter("incoming-byte-rate-for-broker-1112", c.kafkaConfig.MetricRegistry).
Mark(200)
gometrics.GetOrRegisterMeter("outgoing-byte-rate-for-broker-1111", c.kafkaConfig.MetricRegistry).
Mark(199)
gometrics.GetOrRegisterMeter("outgoing-byte-rate-for-broker-1112", c.kafkaConfig.MetricRegistry).
Mark(20)
gometrics.GetOrRegisterHistogram("request-size-for-broker-1111", c.kafkaConfig.MetricRegistry,
gometrics.NewExpDecaySample(10, 1)).
Update(100)
gometrics.GetOrRegisterCounter("requests-in-flight-for-broker-1111", c.kafkaConfig.MetricRegistry).
Inc(20)
gometrics.GetOrRegisterCounter("requests-in-flight-for-broker-1112", c.kafkaConfig.MetricRegistry).
Inc(20)
gotMetrics := r.GetMetrics("akvorado_inlet_kafka_")
expectedMetrics := map[string]string{
`brokers_incoming_byte_rate{broker="1111"}`: "0",
`brokers_incoming_byte_rate{broker="1112"}`: "0",
`brokers_outgoing_byte_rate{broker="1111"}`: "0",
`brokers_outgoing_byte_rate{broker="1112"}`: "0",
`brokers_request_size_bucket{broker="1111",le="+Inf"}`: "1",
`brokers_request_size_bucket{broker="1111",le="0.5"}`: "100",
`brokers_request_size_bucket{broker="1111",le="0.9"}`: "100",
`brokers_request_size_bucket{broker="1111",le="0.99"}`: "100",
`brokers_request_size_count{broker="1111"}`: "1",
`brokers_request_size_sum{broker="1111"}`: "100",
`brokers_inflight_requests{broker="1111"}`: "20",
`brokers_inflight_requests{broker="1112"}`: "20",
`sent_bytes_total{exporter="127.0.0.1"}`: "34",
`sent_messages_total{exporter="127.0.0.1"}`: "3",
`errors_total{error="CORRUPT_MESSAGE"}`: "1",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)

View File

@@ -8,8 +8,7 @@ package kafka
import (
"testing"
"github.com/IBM/sarama"
"github.com/IBM/sarama/mocks"
"github.com/twmb/franz-go/pkg/kfake"
"akvorado/common/daemon"
"akvorado/common/helpers"
@@ -18,22 +17,34 @@ import (
// NewMock creates a new Kafka component with a mocked Kafka. It will
// panic if it cannot be started.
func NewMock(t *testing.T, reporter *reporter.Reporter, configuration Configuration) (*Component, *mocks.AsyncProducer) {
func NewMock(t *testing.T, reporter *reporter.Reporter, configuration Configuration) (*Component, *kfake.Cluster) {
t.Helper()
// Use a fake Kafka cluster for testing
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.AllowAutoTopicCreation(),
)
if err != nil {
t.Fatalf("NewCluster() error: %v", err)
}
t.Cleanup(func() {
cluster.Close()
})
configuration.Brokers = cluster.ListenAddrs()
c, err := New(reporter, configuration, Dependencies{
Daemon: daemon.NewMock(t),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
// Use a mocked Kafka producer
var mockProducer *mocks.AsyncProducer
c.createKafkaProducer = func() (sarama.AsyncProducer, error) {
mockProducer = mocks.NewAsyncProducer(t, c.kafkaConfig)
return mockProducer, nil
}
helpers.StartStop(t, c)
return c, mockProducer
return c, cluster
}
// Flush force flushing the currently buffered records.
func (c *Component) Flush(t *testing.T) {
if err := c.kafkaClient.Flush(t.Context()); err != nil {
t.Fatalf("Flush() error:\n%+v", err)
}
}

View File

@@ -4,12 +4,14 @@
package kafka
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
"github.com/IBM/sarama"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kmsg"
"akvorado/common/helpers"
"akvorado/common/kafka"
@@ -20,10 +22,7 @@ import (
func TestTopicCreation(t *testing.T) {
client, brokers := kafka.SetupKafkaBroker(t)
adminClient, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
t.Fatalf("NewClusterAdmin() error:\n%+v", err)
}
adminClient := kadm.NewClient(client)
topicName := fmt.Sprintf("test-topic-%d", rand.Int())
retentionMs := "76548"
@@ -76,7 +75,7 @@ func TestTopicCreation(t *testing.T) {
ConfigEntriesStrictSync: true,
}
configuration.Brokers = brokers
configuration.Version = kafka.Version(sarama.V2_8_1_0)
// No version configuration needed for franz-go
c, err := New(reporter.NewMock(t), configuration, Dependencies{Schema: schema.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
@@ -85,14 +84,11 @@ func TestTopicCreation(t *testing.T) {
deadline := time.Now().Add(1 * time.Second)
for {
if err := client.RefreshMetadata(); err != nil {
t.Fatalf("RefreshMetadata() error:\n%+v", err)
}
topics, err := adminClient.ListTopics()
topics, err := adminClient.ListTopics(t.Context())
if err != nil {
t.Fatalf("ListTopics() error:\n%+v", err)
}
topic, ok := topics[expectedTopicName]
_, ok := topics[expectedTopicName]
if !ok {
if time.Now().Before(deadline) {
time.Sleep(100 * time.Millisecond)
@@ -100,7 +96,17 @@ func TestTopicCreation(t *testing.T) {
}
t.Fatal("ListTopics() did not find the topic")
}
if diff := helpers.Diff(topic.ConfigEntries, tc.ConfigEntries); diff != "" {
configs, err := adminClient.DescribeTopicConfigs(context.Background(), c.kafkaTopic)
if err != nil {
t.Fatalf("DescribeTopicConfigs() error:\n%+v", err)
}
got := map[string]*string{}
for _, config := range configs[0].Configs {
if config.Source != kmsg.ConfigSourceDefaultConfig {
got[config.Key] = config.Value
}
}
if diff := helpers.Diff(got, tc.ConfigEntries); diff != "" {
if time.Now().Before(deadline) {
time.Sleep(100 * time.Millisecond)
continue
@@ -115,10 +121,7 @@ func TestTopicCreation(t *testing.T) {
func TestTopicMorePartitions(t *testing.T) {
client, brokers := kafka.SetupKafkaBroker(t)
adminClient, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
t.Fatalf("NewClusterAdmin() error:\n%+v", err)
}
adminClient := kadm.NewClient(client)
topicName := fmt.Sprintf("test-topic-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
@@ -132,7 +135,7 @@ func TestTopicMorePartitions(t *testing.T) {
}
configuration.Brokers = brokers
configuration.Version = kafka.Version(sarama.V2_8_1_0)
// No version configuration needed for franz-go
c, err := New(reporter.NewMock(t), configuration, Dependencies{Schema: schema.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
@@ -141,10 +144,7 @@ func TestTopicMorePartitions(t *testing.T) {
deadline := time.Now().Add(1 * time.Second)
for {
if err := client.RefreshMetadata(); err != nil {
t.Fatalf("RefreshMetadata() error:\n%+v", err)
}
topics, err := adminClient.ListTopics()
topics, err := adminClient.ListTopics(t.Context())
if err != nil {
t.Fatalf("ListTopics() error:\n%+v", err)
}
@@ -156,13 +156,13 @@ func TestTopicMorePartitions(t *testing.T) {
}
t.Fatal("ListTopics() did not find the topic")
}
if topic.NumPartitions != 1 || topic.ReplicationFactor != 1 {
if len(topic.Partitions) != 1 || topic.Partitions.NumReplicas() != 1 {
if time.Now().Before(deadline) {
time.Sleep(100 * time.Millisecond)
continue
}
t.Fatalf("Topic does not have 1/1 for partitions/replication but %d/%d",
topic.NumPartitions, topic.ReplicationFactor)
len(topic.Partitions), topic.Partitions.NumReplicas())
}
break
}
@@ -177,22 +177,19 @@ func TestTopicMorePartitions(t *testing.T) {
deadline = time.Now().Add(1 * time.Second)
for {
if err := client.RefreshMetadata(); err != nil {
t.Fatalf("RefreshMetadata() error:\n%+v", err)
}
topics, err := adminClient.ListTopics()
topics, err := adminClient.ListTopics(t.Context())
if err != nil {
t.Fatalf("ListTopics() error:\n%+v", err)
}
topic := topics[expectedTopicName]
t.Logf("Topic configuration:\n%+v", topic)
if topic.NumPartitions != 4 || topic.ReplicationFactor != 1 {
if len(topic.Partitions) != 4 || topic.Partitions.NumReplicas() != 1 {
if time.Now().Before(deadline) {
time.Sleep(100 * time.Millisecond)
continue
}
t.Fatalf("Topic does not have 4/1 for partitions/replication but %d/%d",
topic.NumPartitions, topic.ReplicationFactor)
len(topic.Partitions), topic.Partitions.NumReplicas())
}
break
}

View File

@@ -5,10 +5,13 @@
package kafka
import (
"context"
"fmt"
"strings"
"github.com/IBM/sarama"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"akvorado/common/kafka"
"akvorado/common/pb"
@@ -22,7 +25,7 @@ type Component struct {
d Dependencies
config Configuration
kafkaConfig *sarama.Config
kafkaOpts []kgo.Opt
kafkaTopic string
}
@@ -33,20 +36,17 @@ type Dependencies struct {
// New creates a new Kafka configurator.
func New(r *reporter.Reporter, config Configuration, dependencies Dependencies) (*Component, error) {
kafkaConfig, err := kafka.NewConfig(config.Configuration)
kafkaOpts, err := kafka.NewConfig(r, config.Configuration)
if err != nil {
return nil, err
}
if err := kafkaConfig.Validate(); err != nil {
return nil, fmt.Errorf("cannot validate Kafka configuration: %w", err)
}
c := Component{
r: r,
d: dependencies,
config: config,
kafkaConfig: kafkaConfig,
kafkaOpts: kafkaOpts,
kafkaTopic: fmt.Sprintf("%s-v%d", config.Topic, pb.Version),
}
return &c, nil
@@ -55,61 +55,82 @@ func New(r *reporter.Reporter, config Configuration, dependencies Dependencies)
// Start starts Kafka configuration.
func (c *Component) Start() error {
c.r.Info().Msg("starting Kafka component")
kafka.GlobalKafkaLogger.Register(c.r)
defer func() {
kafka.GlobalKafkaLogger.Unregister()
c.r.Info().Msg("Kafka component stopped")
}()
defer c.r.Info().Msg("Kafka component stopped")
// Create topic
admin, err := sarama.NewClusterAdmin(c.config.Brokers, c.kafkaConfig)
// Create kafka client and admin
client, err := kgo.NewClient(c.kafkaOpts...)
if err != nil {
c.r.Err(err).
Str("brokers", strings.Join(c.config.Brokers, ",")).
Msg("unable to get admin client for topic creation")
return fmt.Errorf("unable to get admin client for topic creation: %w", err)
Msg("unable to create Kafka client for topic creation")
return fmt.Errorf("unable to create Kafka client for topic creation: %w", err)
}
defer admin.Close()
defer client.Close()
admin := kadm.NewClient(client)
l := c.r.With().
Str("brokers", strings.Join(c.config.Brokers, ",")).
Str("topic", c.kafkaTopic).
Logger()
topics, err := admin.ListTopics()
topics, err := admin.ListTopics(context.Background())
if err != nil {
l.Err(err).Msg("unable to get metadata for topics")
return fmt.Errorf("unable to get metadata for topics: %w", err)
}
if topic, ok := topics[c.kafkaTopic]; !ok {
if err := admin.CreateTopic(c.kafkaTopic,
&sarama.TopicDetail{
NumPartitions: c.config.TopicConfiguration.NumPartitions,
ReplicationFactor: c.config.TopicConfiguration.ReplicationFactor,
ConfigEntries: c.config.TopicConfiguration.ConfigEntries,
}, false); err != nil {
if _, err := admin.CreateTopics(context.Background(), c.config.TopicConfiguration.NumPartitions, c.config.TopicConfiguration.ReplicationFactor, c.config.TopicConfiguration.ConfigEntries, c.kafkaTopic); err != nil {
l.Err(err).Msg("unable to create topic")
return fmt.Errorf("unable to create topic %q: %w", c.kafkaTopic, err)
}
l.Info().Msg("topic created")
} else {
if topic.NumPartitions > c.config.TopicConfiguration.NumPartitions {
nbPartitions := len(topic.Partitions)
if nbPartitions > int(c.config.TopicConfiguration.NumPartitions) {
l.Warn().Msgf("cannot decrease the number of partitions (from %d to %d)",
topic.NumPartitions, c.config.TopicConfiguration.NumPartitions)
} else if topic.NumPartitions < c.config.TopicConfiguration.NumPartitions {
nb := c.config.TopicConfiguration.NumPartitions
if err := admin.CreatePartitions(c.kafkaTopic, nb, nil, false); err != nil {
nbPartitions, c.config.TopicConfiguration.NumPartitions)
} else if nbPartitions < int(c.config.TopicConfiguration.NumPartitions) {
add := int(c.config.TopicConfiguration.NumPartitions) - nbPartitions
if _, err := admin.CreatePartitions(context.Background(), add, c.kafkaTopic); err != nil {
l.Err(err).Msg("unable to add more partitions")
return fmt.Errorf("unable to add more partitions to topic %q: %w",
c.kafkaTopic, err)
}
l.Info().Msg("number of partitions increased")
}
if c.config.TopicConfiguration.ReplicationFactor != topic.ReplicationFactor {
if int(c.config.TopicConfiguration.ReplicationFactor) != topic.Partitions.NumReplicas() {
// TODO: https://github.com/deviceinsight/kafkactl/blob/main/internal/topic/topic-operation.go
l.Warn().Msgf("mismatch for replication factor: got %d, want %d",
topic.ReplicationFactor, c.config.TopicConfiguration.ReplicationFactor)
topic.Partitions.NumReplicas(), c.config.TopicConfiguration.ReplicationFactor)
}
if ShouldAlterConfiguration(c.config.TopicConfiguration.ConfigEntries, topic.ConfigEntries, c.config.TopicConfiguration.ConfigEntriesStrictSync) {
if err := admin.AlterConfig(sarama.TopicResource, c.kafkaTopic, c.config.TopicConfiguration.ConfigEntries, false); err != nil {
configs, err := admin.DescribeTopicConfigs(context.Background(), c.kafkaTopic)
if err != nil || len(configs) != 1 {
l.Err(err).Msg("unable to get topic configuration")
return fmt.Errorf("unable to get topic %q configuration: %w", c.kafkaTopic, err)
}
got := map[string]*string{}
for _, config := range configs[0].Configs {
if config.Source == kmsg.ConfigSourceDynamicTopicConfig {
got[config.Key] = config.Value
}
}
if ShouldAlterConfiguration(c.config.TopicConfiguration.ConfigEntries, got, c.config.TopicConfiguration.ConfigEntriesStrictSync) {
alterConfigs := []kadm.AlterConfig{}
for k, v := range c.config.TopicConfiguration.ConfigEntries {
alterConfigs = append(alterConfigs, kadm.AlterConfig{
Op: kadm.SetConfig,
Name: k,
Value: v,
})
}
for k, v := range got {
if _, ok := c.config.TopicConfiguration.ConfigEntries[k]; !ok {
alterConfigs = append(alterConfigs, kadm.AlterConfig{
Op: kadm.DeleteConfig,
Name: k,
Value: v,
})
}
}
if _, err := admin.AlterTopicConfigs(context.Background(), alterConfigs, c.kafkaTopic); err != nil {
l.Err(err).Msg("unable to set topic configuration")
return fmt.Errorf("unable to set topic configuration for %q: %w",
c.kafkaTopic, err)

View File

@@ -6,6 +6,7 @@ package kafka
import (
"time"
"akvorado/common/helpers"
"akvorado/common/kafka"
)
@@ -16,16 +17,11 @@ type Configuration struct {
Workers int `validate:"min=1"`
// ConsumerGroup is the name of the consumer group to use
ConsumerGroup string `validate:"min=1,ascii"`
// MaxMessageBytes is the maximum permitted size of a message. Should be set
// equal or smaller than broker's `message.max.bytes`.
MaxMessageBytes int32 `validate:"min=1"`
// FetchMinBytes is the minimum number of bytes to wait before fetching a message.
FetchMinBytes int32 `validate:"min=1"`
// FetchMaxWaitTime is the minimum duration to wait to get at least the
// minimum number of bytes.
FetchMaxWaitTime time.Duration `validate:"min=100ms"`
// QueueSize defines the size of the channel used to receive from Kafka.
QueueSize int `validate:"min=1"`
}
// DefaultConfiguration represents the default configuration for the Kafka exporter.
@@ -34,9 +30,11 @@ func DefaultConfiguration() Configuration {
Configuration: kafka.DefaultConfiguration(),
Workers: 1,
ConsumerGroup: "akvorado-outlet",
MaxMessageBytes: 1_000_000,
FetchMinBytes: 1_000_000,
FetchMaxWaitTime: time.Second,
QueueSize: 32,
}
}
func init() {
helpers.RegisterMapstructureDeprecatedFields[Configuration]("MaxMessageBytes", "QueueSize")
}

View File

@@ -6,12 +6,11 @@ package kafka
import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"github.com/IBM/sarama"
"github.com/rs/zerolog"
"github.com/twmb/franz-go/pkg/kgo"
"akvorado/common/reporter"
)
@@ -19,13 +18,11 @@ import (
// ErrStopProcessing should be returned as an error when we need to stop processing more flows.
var ErrStopProcessing = errors.New("stop processing further flows")
// Consumer is a Sarama consumer group consumer and should process flow
// messages.
// Consumer is a franz-go consumer and should process flow messages.
type Consumer struct {
r *reporter.Reporter
l zerolog.Logger
healthy chan reporter.ChannelHealthcheckFunc
metrics metrics
worker int
callback ReceiveFunc
@@ -48,68 +45,42 @@ func (c *realComponent) NewConsumer(worker int, callback ReceiveFunc) *Consumer
r: c.r,
l: c.r.With().Int("worker", worker).Logger(),
healthy: c.healthy,
worker: worker,
metrics: c.metrics,
callback: callback,
}
}
// Setup is called at the beginning of a new consumer session, before
// ConsumeClaim.
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
c.l.Debug().Msg("start consumer group")
return nil
}
// Cleanup is called once all ConsumeClaim goroutines have exited
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
c.l.Debug().Msg("stop consumer group")
return nil
}
// ConsumeClaim should process the incoming claims.
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
l := c.l.With().
Str("topic", claim.Topic()).
Int32("partition", claim.Partition()).
Int64("offset", claim.InitialOffset()).Logger()
l.Debug().Msg("process new consumer group claim")
// ProcessFetches processes the fetched records.
func (c *Consumer) ProcessFetches(ctx context.Context, fetches kgo.Fetches) error {
worker := strconv.Itoa(c.worker)
c.metrics.claimsReceived.WithLabelValues(worker).Inc()
c.metrics.fetchesReceived.WithLabelValues(worker).Inc()
if errs := fetches.Errors(); len(errs) > 0 {
for _, err := range errs {
if errors.Is(err.Err, context.Canceled) {
return nil
}
c.metrics.errorsReceived.WithLabelValues(worker).Inc()
c.l.Err(err.Err).
Str("topic", err.Topic).
Int32("partition", err.Partition).
Msg("fetch error")
}
// Assume the error is fatal.
return ErrStopProcessing
}
messagesReceived := c.metrics.messagesReceived.WithLabelValues(worker)
bytesReceived := c.metrics.bytesReceived.WithLabelValues(worker)
ctx := session.Context()
for {
select {
case cb, ok := <-c.healthy:
if ok {
cb(reporter.HealthcheckOK, fmt.Sprintf("worker %d ok", c.worker))
}
case message, ok := <-claim.Messages():
if !ok {
return nil
}
for iter := fetches.RecordIter(); !iter.Done(); {
record := iter.Next()
messagesReceived.Inc()
bytesReceived.Add(float64(len(message.Value)))
bytesReceived.Add(float64(len(record.Value)))
if err := c.callback(ctx, record.Value); err != nil {
return err
}
}
// ConsumeClaim can be called from multiple goroutines. We want each
// worker/consumer to not invoke callbacks concurrently.
c.mu.Lock()
if err := c.callback(ctx, message.Value); err == ErrStopProcessing {
c.mu.Unlock()
return nil
} else if err != nil {
c.mu.Unlock()
c.metrics.errorsReceived.WithLabelValues(worker).Inc()
l.Err(err).Msg("unable to handle incoming message")
return fmt.Errorf("unable to handle incoming message: %w", err)
}
c.mu.Unlock()
session.MarkMessage(message, "")
case <-ctx.Done():
return nil
}
}
}

View File

@@ -7,10 +7,13 @@ import (
"context"
"fmt"
"math/rand"
"strconv"
"strings"
"testing"
"time"
"github.com/IBM/sarama"
"github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kgo"
"akvorado/common/daemon"
"akvorado/common/helpers"
@@ -19,36 +22,35 @@ import (
"akvorado/common/reporter"
)
func TestRealKafka(t *testing.T) {
client, brokers := kafka.SetupKafkaBroker(t)
// Create the topic
func TestFakeKafka(t *testing.T) {
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
admin, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
t.Fatalf("NewClusterAdminFromClient() error:\n%+v", err)
}
defer admin.Close()
topicDetail := &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
}
err = admin.CreateTopic(expectedTopicName, topicDetail, false)
if err != nil {
t.Fatalf("CreateTopic() error:\n%+v", err)
}
// Create a producer
producer, err := sarama.NewSyncProducerFromClient(client)
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
)
if err != nil {
t.Fatalf("NewSyncProducerFromClient() error:\n%+v", err)
t.Fatalf("NewCluster() error: %v", err)
}
defer cluster.Close()
// Create a producer client
producerConfiguration := kafka.DefaultConfiguration()
producerConfiguration.Brokers = cluster.ListenAddrs()
producerOpts, err := kafka.NewConfig(reporter.NewMock(t), producerConfiguration)
if err != nil {
t.Fatalf("NewConfig() error:\n%+v", err)
}
producer, err := kgo.NewClient(producerOpts...)
if err != nil {
t.Fatalf("NewClient() error:\n%+v", err)
}
defer producer.Close()
// Callback
got := []string{}
expected := []string{"hello", "hello 2", "hello 3"}
expected := []string{"hello 1", "hello 2", "hello 3"}
gotAll := make(chan bool)
callback := func(_ context.Context, message []byte) error {
got = append(got, string(message))
@@ -61,9 +63,9 @@ func TestRealKafka(t *testing.T) {
// Start the component
configuration := DefaultConfiguration()
configuration.Topic = topicName
configuration.Brokers = brokers
configuration.Version = kafka.Version(sarama.V2_8_1_0)
configuration.Brokers = cluster.ListenAddrs()
configuration.FetchMaxWaitTime = 100 * time.Millisecond
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
r := reporter.NewMock(t)
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
@@ -75,34 +77,24 @@ func TestRealKafka(t *testing.T) {
shutdownCalled := false
c.StartWorkers(func(_ int) (ReceiveFunc, ShutdownFunc) { return callback, func() { shutdownCalled = true } })
// Wait for a claim to be processed. Due to rebalance, it could take more than 3 seconds.
timeout := time.After(10 * time.Second)
for {
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_")
if gotMetrics[`received_claims_total{worker="0"}`] == "1" {
break
}
select {
case <-timeout:
t.Fatal("No claim received")
case <-time.After(20 * time.Millisecond):
}
}
// Send messages
time.Sleep(100 * time.Millisecond)
t.Log("producing values")
for _, value := range expected {
msg := &sarama.ProducerMessage{
record := &kgo.Record{
Topic: expectedTopicName,
Value: sarama.StringEncoder(value),
Value: []byte(value),
}
if _, _, err := producer.SendMessage(msg); err != nil {
t.Fatalf("SendMessage() error:\n%+v", err)
results := producer.ProduceSync(context.Background(), record)
if err := results.FirstErr(); err != nil {
t.Fatalf("ProduceSync() error:\n%+v", err)
}
}
t.Log("values produced")
// Wait for them
select {
case <-time.After(5 * time.Second):
case <-time.After(time.Second):
t.Fatal("Too long to get messages")
case <-gotAll:
}
@@ -112,26 +104,16 @@ func TestRealKafka(t *testing.T) {
}
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "received_")
fetches, _ := strconv.Atoi(gotMetrics[`received_fetches_total{worker="0"}`])
expectedMetrics := map[string]string{
`received_bytes_total{worker="0"}`: "19",
`received_claims_total{worker="0"}`: "1",
`received_bytes_total{worker="0"}`: "21",
`received_fetches_total{worker="0"}`: strconv.Itoa(max(fetches, 1)),
`received_messages_total{worker="0"}`: "3",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Errorf("Metrics (-got, +want):\n%s", diff)
}
{
// Test the healthcheck function
got := r.RunHealthchecks(context.Background())
if diff := helpers.Diff(got.Details["kafka"], reporter.HealthcheckResult{
Status: reporter.HealthcheckOK,
Reason: "worker 0 ok",
}); diff != "" {
t.Fatalf("runHealthcheck() (-got, +want):\n%s", diff)
}
}
if err := c.Stop(); err != nil {
t.Fatalf("Stop() error:\n%+v", err)
}
@@ -139,3 +121,70 @@ func TestRealKafka(t *testing.T) {
t.Fatal("Stop() didn't call shutdown function")
}
}
func TestStartSeveralWorkers(t *testing.T) {
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
)
if err != nil {
t.Fatalf("NewCluster() error: %v", err)
}
defer cluster.Close()
// Start the component
configuration := DefaultConfiguration()
configuration.Topic = topicName
configuration.Brokers = cluster.ListenAddrs()
configuration.FetchMaxWaitTime = 100 * time.Millisecond
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
configuration.Workers = 5
r := reporter.NewMock(t)
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
if err := c.(*realComponent).Start(); err != nil {
t.Fatalf("Start() error:\n%+v", err)
}
c.StartWorkers(func(int) (ReceiveFunc, ShutdownFunc) {
return func(context.Context, []byte) error { return nil }, func() {}
})
time.Sleep(20 * time.Millisecond)
if err := c.Stop(); err != nil {
t.Fatalf("Stop() error:\n%+v", err)
}
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_")
connectsTotal := 0
writeBytesTotal := 0
readBytesTotal := 0
for k := range gotMetrics {
if strings.HasPrefix(k, "write_bytes_total") {
writeBytesTotal++
}
if strings.HasPrefix(k, "read_bytes_total") {
readBytesTotal++
}
if strings.HasPrefix(k, "connects_total") {
connectsTotal++
}
}
got := map[string]int{
"write_bytes_total": writeBytesTotal,
"read_bytes_total": readBytesTotal,
"connects_total": connectsTotal,
}
expected := map[string]int{
// For some reason, we have each metric in double, with one seed_0.
"write_bytes_total": 10,
"read_bytes_total": 10,
"connects_total": 10,
}
if diff := helpers.Diff(got, expected); diff != "" {
t.Errorf("Metrics (-got, +want):\n%s", diff)
}
}

View File

@@ -4,17 +4,14 @@
package kafka
import (
"akvorado/common/kafka"
"akvorado/common/reporter"
)
type metrics struct {
messagesReceived *reporter.CounterVec
claimsReceived *reporter.CounterVec
fetchesReceived *reporter.CounterVec
bytesReceived *reporter.CounterVec
errorsReceived *reporter.CounterVec
kafkaMetrics kafka.Metrics
}
func (c *realComponent) initMetrics() {
@@ -25,10 +22,10 @@ func (c *realComponent) initMetrics() {
},
[]string{"worker"},
)
c.metrics.claimsReceived = c.r.CounterVec(
c.metrics.fetchesReceived = c.r.CounterVec(
reporter.CounterOpts{
Name: "received_claims_total",
Help: "Number of claims received for a given worker.",
Name: "received_fetches_total",
Help: "Number of fetches received for a given worker.",
},
[]string{"worker"},
)
@@ -46,6 +43,4 @@ func (c *realComponent) initMetrics() {
},
[]string{"worker"},
)
c.metrics.kafkaMetrics.Init(c.r, c.kafkaConfig.MetricRegistry)
}

View File

@@ -8,10 +8,12 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/IBM/sarama"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/plugin/kprom"
"gopkg.in/tomb.v2"
"akvorado/common/daemon"
@@ -33,11 +35,9 @@ type realComponent struct {
t tomb.Tomb
config Configuration
kafkaConfig *sarama.Config
kafkaTopic string
kafkaOpts []kgo.Opt
healthy chan reporter.ChannelHealthcheckFunc
clients []sarama.Client
clients []*kgo.Client
metrics metrics
}
@@ -49,36 +49,32 @@ type Dependencies struct {
// New creates a new Kafka exporter component.
func New(r *reporter.Reporter, configuration Configuration, dependencies Dependencies) (Component, error) {
// Build Kafka configuration
kafkaConfig, err := kafka.NewConfig(configuration.Configuration)
kafkaOpts, err := kafka.NewConfig(r, configuration.Configuration)
if err != nil {
return nil, err
}
kafkaConfig.Consumer.Fetch.Max = configuration.MaxMessageBytes
kafkaConfig.Consumer.Fetch.Min = configuration.FetchMinBytes
kafkaConfig.Consumer.MaxWaitTime = configuration.FetchMaxWaitTime
kafkaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
sarama.NewBalanceStrategyRoundRobin(),
}
// kafkaConfig.Consumer.Offsets.AutoCommit.Enable = false
kafkaConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
kafkaConfig.Metadata.RefreshFrequency = time.Minute
kafkaConfig.Metadata.AllowAutoTopicCreation = false
kafkaConfig.ChannelBufferSize = configuration.QueueSize
if err := kafkaConfig.Validate(); err != nil {
return nil, fmt.Errorf("cannot validate Kafka configuration: %w", err)
}
c := realComponent{
r: r,
d: &dependencies,
config: configuration,
healthy: make(chan reporter.ChannelHealthcheckFunc),
kafkaConfig: kafkaConfig,
kafkaTopic: fmt.Sprintf("%s-v%d", configuration.Topic, pb.Version),
}
c.initMetrics()
c.r.RegisterHealthcheck("kafka", c.channelHealthcheck())
kafkaOpts = append(kafkaOpts,
kgo.FetchMinBytes(configuration.FetchMinBytes),
kgo.FetchMaxWait(configuration.FetchMaxWaitTime),
kgo.ConsumerGroup(configuration.ConsumerGroup),
kgo.ConsumeStartOffset(kgo.NewOffset().AtEnd()),
kgo.ConsumeTopics(fmt.Sprintf("%s-v%d", configuration.Topic, pb.Version)),
// Do not use kgo.BlockRebalanceOnPoll(). It needs more code to ensure
// we are not blocked while polling.
)
if err := kgo.ValidateOpts(kafkaOpts...); err != nil {
return nil, fmt.Errorf("invalid Kafka configuration: %w", err)
}
c.kafkaOpts = kafkaOpts
c.d.Daemon.Track(&c.t, "outlet/kafka")
return &c, nil
}
@@ -86,12 +82,15 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
// Start starts the Kafka component.
func (c *realComponent) Start() error {
c.r.Info().Msg("starting Kafka component")
kafka.GlobalKafkaLogger.Register(c.r)
// Start the clients
for i := range c.config.Workers {
logger := c.r.With().Int("worker", i).Logger()
logger.Debug().Msg("starting")
client, err := sarama.NewClient(c.config.Brokers, c.kafkaConfig)
kafkaMetrics := kprom.NewMetrics("", kprom.WithStaticLabel(prometheus.Labels{"worker": strconv.Itoa(i)}))
kafkaOpts := append(c.kafkaOpts, kgo.WithHooks(kafkaMetrics))
c.r.MetricCollectorForCurrentModule(kafkaMetrics)
client, err := kgo.NewClient(kafkaOpts...)
if err != nil {
logger.Err(err).
Int("worker", i).
@@ -107,36 +106,31 @@ func (c *realComponent) Start() error {
// StartWorkers will start the workers. This should only be called once.
func (c *realComponent) StartWorkers(workerBuilder WorkerBuilderFunc) error {
ctx := c.t.Context(context.Background())
topics := []string{c.kafkaTopic}
for i := range c.config.Workers {
callback, shutdown := workerBuilder(i)
consumer := c.NewConsumer(i, callback)
client := c.clients[i]
c.t.Go(func() error {
logger := c.r.With().
Int("worker", i).
Logger()
client, err := sarama.NewConsumerGroupFromClient(c.config.ConsumerGroup, c.clients[i])
if err != nil {
logger.Err(err).
Int("worker", i).
Str("brokers", strings.Join(c.config.Brokers, ",")).
Msg("unable to create group consumer")
return fmt.Errorf("unable to create Kafka group consumer: %w", err)
}
defer client.Close()
consumer := c.NewConsumer(i, callback)
defer shutdown()
for {
if err := client.Consume(ctx, topics, consumer); err != nil {
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
select {
case <-ctx.Done():
return nil
}
if errors.Is(err, context.Canceled) {
default:
fetches := client.PollFetches(ctx)
if err := consumer.ProcessFetches(ctx, fetches); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, ErrStopProcessing) {
return nil
}
logger.Err(err).
Int("worker", i).
Msg("cannot get message from consumer")
return fmt.Errorf("cannot get message from consumer: %w", err)
Msg("cannot process fetched messages")
return fmt.Errorf("cannot process fetched messages: %w", err)
}
}
}
})
@@ -147,11 +141,8 @@ func (c *realComponent) StartWorkers(workerBuilder WorkerBuilderFunc) error {
// Stop stops the Kafka component
func (c *realComponent) Stop() error {
defer func() {
c.kafkaConfig.MetricRegistry.UnregisterAll()
kafka.GlobalKafkaLogger.Unregister()
close(c.healthy)
for _, client := range c.clients {
client.Close()
client.CloseAllowingRebalance()
}
c.r.Info().Msg("Kafka component stopped")
}()
@@ -159,7 +150,3 @@ func (c *realComponent) Stop() error {
c.t.Kill(nil)
return c.t.Wait()
}
func (c *realComponent) channelHealthcheck() reporter.HealthcheckFunc {
return reporter.ChannelHealthcheck(c.t.Context(nil), c.healthy)
}

View File

@@ -8,11 +8,7 @@ import (
"testing"
"time"
gometrics "github.com/rcrowley/go-metrics"
"akvorado/common/daemon"
"akvorado/common/helpers"
"akvorado/common/reporter"
)
func TestMock(t *testing.T) {
@@ -54,53 +50,3 @@ func TestMock(t *testing.T) {
t.Error("Stop() should have triggered shutdown function")
}
}
func TestKafkaMetrics(t *testing.T) {
r := reporter.NewMock(t)
c, err := New(r, DefaultConfiguration(), Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
kafkaConfig := c.(*realComponent).kafkaConfig
// Manually put some metrics
gometrics.GetOrRegisterMeter("consumer-fetch-rate", kafkaConfig.MetricRegistry).
Mark(30)
gometrics.GetOrRegisterHistogram("consumer-batch-size", kafkaConfig.MetricRegistry,
gometrics.NewExpDecaySample(10, 1)).
Update(100)
gometrics.GetOrRegisterHistogram("consumer-fetch-response-size", kafkaConfig.MetricRegistry,
gometrics.NewExpDecaySample(10, 1)).
Update(200)
gometrics.GetOrRegisterCounter("consumer-group-join-total-akvorado", kafkaConfig.MetricRegistry).
Inc(20)
gometrics.GetOrRegisterCounter("consumer-group-join-failed-akvorado", kafkaConfig.MetricRegistry).
Inc(1)
gometrics.GetOrRegisterCounter("consumer-group-sync-total-akvorado", kafkaConfig.MetricRegistry).
Inc(4)
gometrics.GetOrRegisterCounter("consumer-group-sync-failed-akvorado", kafkaConfig.MetricRegistry).
Inc(1)
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "-consumer_fetch_rate")
expectedMetrics := map[string]string{
`consumer_batch_messages_bucket{le="+Inf"}`: "1",
`consumer_batch_messages_bucket{le="0.5"}`: "100",
`consumer_batch_messages_bucket{le="0.9"}`: "100",
`consumer_batch_messages_bucket{le="0.99"}`: "100",
`consumer_batch_messages_count`: "1",
`consumer_batch_messages_sum`: "100",
`consumer_fetch_bytes_bucket{le="+Inf"}`: "1",
`consumer_fetch_bytes_bucket{le="0.5"}`: "200",
`consumer_fetch_bytes_bucket{le="0.9"}`: "200",
`consumer_fetch_bytes_bucket{le="0.99"}`: "200",
`consumer_fetch_bytes_count`: "1",
`consumer_fetch_bytes_sum`: "200",
`consumer_group_join_total{group="akvorado"}`: "20",
`consumer_group_join_failed_total{group="akvorado"}`: "1",
`consumer_group_sync_total{group="akvorado"}`: "4",
`consumer_group_sync_failed_total{group="akvorado"}`: "1",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
}