Files
akvorado/common/kafka/tests.go
Vincent Bernat e68b2de72c
Some checks failed
CI / 🤖 Check dependabot status (push) Has been cancelled
CI / 🐧 Test on Linux (${{ github.ref_type == 'tag' }}, misc) (push) Has been cancelled
CI / 🐧 Test on Linux (coverage) (push) Has been cancelled
CI / 🐧 Test on Linux (regular) (push) Has been cancelled
CI / ❄️ Build on Nix (push) Has been cancelled
CI / 🍏 Build and test on macOS (push) Has been cancelled
CI / 🧪 End-to-end testing (push) Has been cancelled
CI / 🔍 Upload code coverage (push) Has been cancelled
CI / 🔬 Test only Go (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 20) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 22) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 24) (push) Has been cancelled
CI / ⚖️ Check licenses (push) Has been cancelled
CI / 🐋 Build Docker images (push) Has been cancelled
CI / 🐋 Tag Docker images (push) Has been cancelled
CI / 🚀 Publish release (push) Has been cancelled
common/helpers: migrate from verify to skip-verify in TLS config
Otherwise, the default is "false" for verify. This is a breaking change.

Fix #2055.
2025-10-30 08:31:27 +01:00

150 lines
3.9 KiB
Go

// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-FileCopyrightText: 2020 Travis Bischel
// SPDX-License-Identifier: AGPL-3.0-only AND BSD-3-Clause
//go:build !release
package kafka
import (
"context"
"encoding/binary"
"fmt"
"testing"
"time"
"akvorado/common/helpers"
"akvorado/common/reporter"
"github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
)
// SetupKafkaBroker configures a client to use for testing.
func SetupKafkaBroker(t *testing.T) (*kgo.Client, []string) {
broker := helpers.CheckExternalService(t, "Kafka",
[]string{"kafka:9092", "127.0.0.1:9092"})
// Wait for broker to be ready
r := reporter.NewMock(t)
opts, err := NewConfig(r, Configuration{
Brokers: []string{broker},
})
if err != nil {
t.Fatalf("NewConfig() error: %v", err)
}
// Add additional options for testing
opts = append(opts,
kgo.RequestTimeoutOverhead(1*time.Second),
kgo.ProduceRequestTimeout(1*time.Second),
kgo.ConnIdleTimeout(1*time.Second),
)
ready := false
var client *kgo.Client
for i := 0; i < 90 && !ready; i++ {
if client != nil {
client.Close()
}
if client, err = kgo.NewClient(opts...); err != nil {
continue
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := client.Ping(ctx); err != nil {
cancel()
continue
}
cancel()
ready = true
}
if !ready {
t.Fatalf("broker is not ready")
}
return client, []string{broker}
}
// forEachBatchRecord iterates through all records in a record batch. This
// function is stolen from franz-go/pkg/kfake/data.go.
func forEachBatchRecord(batch kmsg.RecordBatch, cb func(kmsg.Record) error) error {
records, err := kgo.DefaultDecompressor().Decompress(
batch.Records,
kgo.CompressionCodecType(batch.Attributes&0x0007),
)
if err != nil {
return err
}
for range batch.NumRecords {
rec := kmsg.NewRecord()
err := rec.ReadFrom(records)
if err != nil {
return fmt.Errorf("corrupt batch: %w", err)
}
if err := cb(rec); err != nil {
return err
}
length, amt := binary.Varint(records)
records = records[length+int64(amt):]
}
if len(records) > 0 {
return fmt.Errorf("corrupt batch, extra left over bytes after parsing batch: %v", len(records))
}
return nil
}
// InterceptMessages sets up a ControlKey to intercept all messages produced to a fake cluster
// and calls the callback function for each record received.
func InterceptMessages(t *testing.T, cluster *kfake.Cluster, callback func(*kgo.Record)) {
t.Helper()
// Use ControlKey to intercept ProduceRequest messages
cluster.ControlKey(0, func(kreq kmsg.Request) (kmsg.Response, error, bool) {
cluster.KeepControl()
if req, ok := kreq.(*kmsg.ProduceRequest); ok {
for _, topicData := range req.Topics {
for _, partitionData := range topicData.Partitions {
if partitionData.Records != nil {
var batch kmsg.RecordBatch
if err := batch.ReadFrom(partitionData.Records); err != nil {
t.Fatalf("batch.ReadFrom() error:\n%+v", err)
}
if err := forEachBatchRecord(batch, func(rec kmsg.Record) error {
kgoRecord := &kgo.Record{
Topic: topicData.Topic,
Partition: partitionData.Partition,
Key: rec.Key,
Value: rec.Value,
}
callback(kgoRecord)
return nil
}); err != nil {
t.Fatalf("forEachBatchRecord() error:\n%+v", err)
}
}
}
}
}
// Don't modify the response, just let it pass through
return nil, nil, false
})
}
var _ kfake.Logger = &Logger{}
// Logf logs a message at the specified level for kfake.
func (l *Logger) Logf(level kfake.LogLevel, msg string, keyvals ...any) {
switch level {
case kfake.LogLevelError:
l.r.Error().Fields(keyvals).Msg(msg)
case kfake.LogLevelWarn:
l.r.Warn().Fields(keyvals).Msg(msg)
case kfake.LogLevelInfo:
l.r.Info().Fields(keyvals).Msg(msg)
case kfake.LogLevelDebug:
l.r.Debug().Fields(keyvals).Msg(msg)
}
}