Files
akvorado/outlet/kafka/functional_test.go
Vincent Bernat 737b56ed77 outlet/kafka: decouple scaler logic
This way, it is easier to test, notably with synctest package. The
functional test is kept to a minimum.
2025-11-10 21:28:38 +01:00

500 lines
15 KiB
Go

// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package kafka
import (
"context"
"fmt"
"math/rand/v2"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"akvorado/common/daemon"
"akvorado/common/helpers"
"akvorado/common/kafka"
"akvorado/common/pb"
"akvorado/common/reporter"
)
func TestFakeKafka(t *testing.T) {
r := reporter.NewMock(t)
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
kfake.WithLogger(kafka.NewLogger(r)),
)
if err != nil {
t.Fatalf("NewCluster() error: %v", err)
}
defer cluster.Close()
// Create a producer client
producerConfiguration := kafka.DefaultConfiguration()
producerConfiguration.Brokers = cluster.ListenAddrs()
producerOpts, err := kafka.NewConfig(reporter.NewMock(t), producerConfiguration)
if err != nil {
t.Fatalf("NewConfig() error:\n%+v", err)
}
producerOpts = append(producerOpts, kgo.ProducerLinger(0))
producer, err := kgo.NewClient(producerOpts...)
if err != nil {
t.Fatalf("NewClient() error:\n%+v", err)
}
defer producer.Close()
// Callback
got := []string{}
expected := []string{"hello 1", "hello 2", "hello 3"}
gotAll := make(chan bool)
callback := func(_ context.Context, message []byte) error {
got = append(got, string(message))
if len(got) == len(expected) {
close(gotAll)
}
return nil
}
// Start the component
configuration := DefaultConfiguration()
configuration.Topic = topicName
configuration.Brokers = cluster.ListenAddrs()
configuration.FetchMaxWaitTime = 100 * time.Millisecond
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
if err := c.(*realComponent).Start(); err != nil {
t.Fatalf("Start() error:\n%+v", err)
}
shutdownCalled := false
c.StartWorkers(func(int, chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) {
return callback, func() { shutdownCalled = true }
})
// Send messages
time.Sleep(100 * time.Millisecond)
t.Log("producing values")
for _, value := range expected {
record := &kgo.Record{
Topic: expectedTopicName,
Value: []byte(value),
}
results := producer.ProduceSync(context.Background(), record)
if err := results.FirstErr(); err != nil {
t.Fatalf("ProduceSync() error:\n%+v", err)
}
}
t.Log("values produced")
// Wait for them
select {
case <-time.After(time.Second):
t.Fatal("Too long to get messages")
case <-gotAll:
}
if diff := helpers.Diff(got, expected); diff != "" {
t.Errorf("Didn't received the expected messages (-got, +want):\n%s", diff)
}
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "received_")
fetches, _ := strconv.Atoi(gotMetrics[`received_fetches_total{worker="0"}`])
expectedMetrics := map[string]string{
`received_bytes_total{worker="0"}`: "21",
`received_fetches_total{worker="0"}`: strconv.Itoa(max(fetches, 1)),
`received_messages_total{worker="0"}`: "3",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Errorf("Metrics (-got, +want):\n%s", diff)
}
if err := c.Stop(); err != nil {
t.Fatalf("Stop() error:\n%+v", err)
}
if !shutdownCalled {
t.Fatal("Stop() didn't call shutdown function")
}
}
func TestStartSeveralWorkers(t *testing.T) {
r := reporter.NewMock(t)
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
kfake.WithLogger(kafka.NewLogger(r)),
)
if err != nil {
t.Fatalf("NewCluster() error: %v", err)
}
defer cluster.Close()
// Start the component
configuration := DefaultConfiguration()
configuration.Topic = topicName
configuration.Brokers = cluster.ListenAddrs()
configuration.FetchMaxWaitTime = 100 * time.Millisecond
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
configuration.MinWorkers = 5
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
if err := c.(*realComponent).Start(); err != nil {
t.Fatalf("Start() error:\n%+v", err)
}
c.StartWorkers(func(int, chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) {
return func(context.Context, []byte) error { return nil }, func() {}
})
time.Sleep(20 * time.Millisecond)
if err := c.Stop(); err != nil {
t.Fatalf("Stop() error:\n%+v", err)
}
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_")
connectsTotal := 0
writeBytesTotal := 0
readBytesTotal := 0
for k := range gotMetrics {
if strings.HasPrefix(k, "write_bytes_total") {
writeBytesTotal++
}
if strings.HasPrefix(k, "read_bytes_total") {
readBytesTotal++
}
if strings.HasPrefix(k, "connects_total") {
connectsTotal++
}
}
got := map[string]int{
"write_bytes_total": writeBytesTotal,
"read_bytes_total": readBytesTotal,
"connects_total": connectsTotal,
}
expected := map[string]int{
// For some reason, we have each metric in double, with one seed_0.
"write_bytes_total": 10,
"read_bytes_total": 10,
"connects_total": 10,
}
if diff := helpers.Diff(got, expected); diff != "" {
t.Errorf("Metrics (-got, +want):\n%s", diff)
}
}
func TestWorkerScaling(t *testing.T) {
r := reporter.NewMock(t)
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
kfake.WithLogger(kafka.NewLogger(r)),
)
if err != nil {
t.Fatalf("NewCluster() error: %v", err)
}
defer cluster.Close()
// Create a producer client
producerConfiguration := kafka.DefaultConfiguration()
producerConfiguration.Brokers = cluster.ListenAddrs()
producerOpts, err := kafka.NewConfig(reporter.NewMock(t), producerConfiguration)
if err != nil {
t.Fatalf("NewConfig() error:\n%+v", err)
}
producerOpts = append(producerOpts, kgo.ProducerLinger(0))
producer, err := kgo.NewClient(producerOpts...)
if err != nil {
t.Fatalf("NewClient() error:\n%+v", err)
}
defer producer.Close()
// Start the component
configuration := DefaultConfiguration()
configuration.Topic = topicName
configuration.Brokers = cluster.ListenAddrs()
configuration.FetchMaxWaitTime = 10 * time.Millisecond
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
configuration.WorkerIncreaseRateLimit = 10 * time.Millisecond
configuration.WorkerDecreaseRateLimit = 10 * time.Millisecond
configuration.MaxWorkers = 24
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
helpers.StartStop(t, c)
if maxWorkers := c.(*realComponent).config.MaxWorkers; maxWorkers != 16 {
t.Errorf("Start() max workers should have been capped to 16 instead of %d", maxWorkers)
}
msg := atomic.Uint32{}
c.StartWorkers(func(_ int, ch chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) {
return func(context.Context, []byte) error {
c := msg.Add(1)
if c <= 5 || c > 15 {
t.Logf("received message %d, request a scale increase", c)
ch <- ScaleIncrease
} else {
t.Logf("received message %d, request a scale decrease", c)
ch <- ScaleDecrease
}
return nil
}, func() {}
})
// 1 worker
time.Sleep(10 * time.Millisecond)
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "worker", "max", "min")
expected := map[string]string{
"worker_decrease_total": "0",
"worker_increase_total": "1",
"workers": "1",
"min_workers": "1",
"max_workers": "16",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
// Send 5 messages in a row
t.Log("Send 5 messages")
for range 5 {
record := &kgo.Record{
Topic: expectedTopicName,
Value: []byte("hello"),
}
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
}
}
time.Sleep(100 * time.Millisecond)
t.Log("Check if workers increased to 9")
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "worker")
expected = map[string]string{
"worker_decrease_total": "0",
"worker_increase_total": "9",
"workers": "9",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
// Send 1 message
t.Log("Send 1 message")
for range 1 {
record := &kgo.Record{
Topic: expectedTopicName,
Value: []byte("hello"),
}
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
}
}
time.Sleep(100 * time.Millisecond)
t.Log("Check if workers decreased to 5")
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "worker")
expected = map[string]string{
"worker_decrease_total": "4",
"worker_increase_total": "9",
"workers": "5",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
}
func TestKafkaLagMetric(t *testing.T) {
r := reporter.NewMock(t)
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
kfake.WithLogger(kafka.NewLogger(r)),
)
if err != nil {
t.Fatalf("NewCluster() error: %v", err)
}
defer cluster.Close()
// Watch for autocommits to avoid relying on time
clusterCommitNotification := make(chan any)
firstFetch := make(chan any)
var firstFetchOnce sync.Once
cluster.Control(func(request kmsg.Request) (kmsg.Response, error, bool) {
switch k := kmsg.Key(request.Key()); k {
case kmsg.OffsetCommit:
t.Log("offset commit message")
clusterCommitNotification <- nil
case kmsg.Fetch:
firstFetchOnce.Do(func() {
close(firstFetch)
t.Log("fetch request")
})
}
cluster.KeepControl()
return nil, nil, false
})
// Create a producer client
producerConfiguration := kafka.DefaultConfiguration()
producerConfiguration.Brokers = cluster.ListenAddrs()
producerOpts, err := kafka.NewConfig(reporter.NewMock(t), producerConfiguration)
if err != nil {
t.Fatalf("NewConfig() error:\n%+v", err)
}
producerOpts = append(producerOpts, kgo.ProducerLinger(0))
producer, err := kgo.NewClient(producerOpts...)
if err != nil {
t.Fatalf("NewClient() error:\n%+v", err)
}
defer producer.Close()
// Start the component
configuration := DefaultConfiguration()
configuration.Topic = topicName
configuration.Brokers = cluster.ListenAddrs()
configuration.FetchMaxWaitTime = 10 * time.Millisecond
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
helpers.StartStop(t, c)
// Start a worker with a callback that blocks on a channel after receiving a message
workerBlockReceive := make(chan any)
defer close(workerBlockReceive)
c.StartWorkers(func(_ int, _ chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) {
return func(context.Context, []byte) error {
t.Log("worker received a message")
<-workerBlockReceive
return nil
}, func() {}
})
t.Log("wait first fetch")
select {
case <-firstFetch:
case <-time.After(time.Second):
t.Fatal("no initial fetch")
}
// No messages yet, no lag
ctx, cancel := context.WithTimeout(t.Context(), time.Second)
defer cancel()
for {
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "workers")
expected := map[string]string{
"consumergroup_lag_messages": "0",
"workers": "1",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
select {
case <-ctx.Done():
t.Fatalf("Metrics (-got, +want):\n%s", diff)
default:
}
time.Sleep(10 * time.Millisecond)
} else {
break
}
}
t.Log("send a single message")
record := &kgo.Record{
Topic: expectedTopicName,
Value: []byte("hello"),
}
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
}
t.Log("allow message processing")
select {
case workerBlockReceive <- nil:
case <-time.After(time.Second):
t.Fatal("no initial receive")
}
t.Log("wait for autocommit")
select {
case <-time.After(2 * time.Second):
t.Fatal("Timed out waiting for autocommit")
case <-clusterCommitNotification:
}
// The message was processed, there's no lag
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "received_messages_total")
expected := map[string]string{
"consumergroup_lag_messages": "0",
`received_messages_total{worker="0"}`: "1",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
// Send a few more messages without allowing the worker to process them,
// expect the consumer lag to rise
t.Log("send 5 messages")
for range 5 {
record := &kgo.Record{
Topic: expectedTopicName,
Value: []byte("hello"),
}
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
}
}
time.Sleep(20 * time.Millisecond)
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "received_messages_total")
expected = map[string]string{
"consumergroup_lag_messages": "5",
`received_messages_total{worker="0"}`: "2", // The consumer only blocks after incrementing the message counter
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
// Let the worker process all 5 messages (and wait for autocommit), expect
// the lag to drop back to zero
t.Log("accept processing of 5 messages")
for range 5 {
select {
case workerBlockReceive <- nil:
case <-time.After(time.Second):
t.Fatal("no subsequent receive")
}
}
select {
case <-time.After(2 * time.Second):
t.Fatal("Timed out waiting for autocommit")
case <-clusterCommitNotification:
}
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "received_messages_total")
expected = map[string]string{
"consumergroup_lag_messages": "0",
`received_messages_total{worker="0"}`: "6",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
}