Files
akvorado/outlet/kafka/functional_test.go
Vincent Bernat 1322d42549 outlet/kafka: fix scaler hysteresis
Previously, the scaler was scaling up and down independently. Because
when scaling up/down, Kafka rebalances the topic, temporarily, we get
scale down requests and the rate limiter won't stop them as it is
independant from the scale up rate limiter. Instead, the rate limit for
increase acts as a gracetime where everything is ignored, then between
that and the rate limit for decrease, we only consider increasing the
number of workers, past that, we scaling down as long as we have a
majority of scale down requests (compared to steady ones).

Fix #2080 (hopefully)
2025-11-11 21:26:05 +01:00

496 lines
14 KiB
Go

// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package kafka
import (
"context"
"fmt"
"math/rand/v2"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"akvorado/common/daemon"
"akvorado/common/helpers"
"akvorado/common/kafka"
"akvorado/common/pb"
"akvorado/common/reporter"
)
func TestFakeKafka(t *testing.T) {
r := reporter.NewMock(t)
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
kfake.WithLogger(kafka.NewLogger(r)),
)
if err != nil {
t.Fatalf("NewCluster() error: %v", err)
}
defer cluster.Close()
// Create a producer client
producerConfiguration := kafka.DefaultConfiguration()
producerConfiguration.Brokers = cluster.ListenAddrs()
producerOpts, err := kafka.NewConfig(reporter.NewMock(t), producerConfiguration)
if err != nil {
t.Fatalf("NewConfig() error:\n%+v", err)
}
producerOpts = append(producerOpts, kgo.ProducerLinger(0))
producer, err := kgo.NewClient(producerOpts...)
if err != nil {
t.Fatalf("NewClient() error:\n%+v", err)
}
defer producer.Close()
// Callback
got := []string{}
expected := []string{"hello 1", "hello 2", "hello 3"}
gotAll := make(chan bool)
callback := func(_ context.Context, message []byte) error {
got = append(got, string(message))
if len(got) == len(expected) {
close(gotAll)
}
return nil
}
// Start the component
configuration := DefaultConfiguration()
configuration.Topic = topicName
configuration.Brokers = cluster.ListenAddrs()
configuration.FetchMaxWaitTime = 100 * time.Millisecond
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
if err := c.(*realComponent).Start(); err != nil {
t.Fatalf("Start() error:\n%+v", err)
}
shutdownCalled := false
c.StartWorkers(func(int, chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) {
return callback, func() { shutdownCalled = true }
})
// Send messages
time.Sleep(100 * time.Millisecond)
t.Log("producing values")
for _, value := range expected {
record := &kgo.Record{
Topic: expectedTopicName,
Value: []byte(value),
}
results := producer.ProduceSync(context.Background(), record)
if err := results.FirstErr(); err != nil {
t.Fatalf("ProduceSync() error:\n%+v", err)
}
}
t.Log("values produced")
// Wait for them
select {
case <-time.After(time.Second):
t.Fatal("Too long to get messages")
case <-gotAll:
}
if diff := helpers.Diff(got, expected); diff != "" {
t.Errorf("Didn't received the expected messages (-got, +want):\n%s", diff)
}
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "received_")
fetches, _ := strconv.Atoi(gotMetrics[`received_fetches_total{worker="0"}`])
expectedMetrics := map[string]string{
`received_bytes_total{worker="0"}`: "21",
`received_fetches_total{worker="0"}`: strconv.Itoa(max(fetches, 1)),
`received_messages_total{worker="0"}`: "3",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Errorf("Metrics (-got, +want):\n%s", diff)
}
if err := c.Stop(); err != nil {
t.Fatalf("Stop() error:\n%+v", err)
}
if !shutdownCalled {
t.Fatal("Stop() didn't call shutdown function")
}
}
func TestStartSeveralWorkers(t *testing.T) {
r := reporter.NewMock(t)
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
kfake.WithLogger(kafka.NewLogger(r)),
)
if err != nil {
t.Fatalf("NewCluster() error: %v", err)
}
defer cluster.Close()
// Start the component
configuration := DefaultConfiguration()
configuration.Topic = topicName
configuration.Brokers = cluster.ListenAddrs()
configuration.FetchMaxWaitTime = 100 * time.Millisecond
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
configuration.MinWorkers = 5
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
if err := c.(*realComponent).Start(); err != nil {
t.Fatalf("Start() error:\n%+v", err)
}
c.StartWorkers(func(int, chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) {
return func(context.Context, []byte) error { return nil }, func() {}
})
time.Sleep(20 * time.Millisecond)
if err := c.Stop(); err != nil {
t.Fatalf("Stop() error:\n%+v", err)
}
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_")
connectsTotal := 0
writeBytesTotal := 0
readBytesTotal := 0
for k := range gotMetrics {
if strings.HasPrefix(k, "write_bytes_total") {
writeBytesTotal++
}
if strings.HasPrefix(k, "read_bytes_total") {
readBytesTotal++
}
if strings.HasPrefix(k, "connects_total") {
connectsTotal++
}
}
got := map[string]int{
"write_bytes_total": writeBytesTotal,
"read_bytes_total": readBytesTotal,
"connects_total": connectsTotal,
}
expected := map[string]int{
// For some reason, we have each metric in double, with one seed_0.
"write_bytes_total": 10,
"read_bytes_total": 10,
"connects_total": 10,
}
if diff := helpers.Diff(got, expected); diff != "" {
t.Errorf("Metrics (-got, +want):\n%s", diff)
}
}
func TestWorkerScaling(t *testing.T) {
r := reporter.NewMock(t)
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
kfake.WithLogger(kafka.NewLogger(r)),
)
if err != nil {
t.Fatalf("NewCluster() error: %v", err)
}
defer cluster.Close()
// Create a producer client
producerConfiguration := kafka.DefaultConfiguration()
producerConfiguration.Brokers = cluster.ListenAddrs()
producerOpts, err := kafka.NewConfig(reporter.NewMock(t), producerConfiguration)
if err != nil {
t.Fatalf("NewConfig() error:\n%+v", err)
}
producerOpts = append(producerOpts, kgo.ProducerLinger(0))
producer, err := kgo.NewClient(producerOpts...)
if err != nil {
t.Fatalf("NewClient() error:\n%+v", err)
}
defer producer.Close()
// Start the component
configuration := DefaultConfiguration()
configuration.Topic = topicName
configuration.Brokers = cluster.ListenAddrs()
configuration.FetchMaxWaitTime = 10 * time.Millisecond
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
configuration.WorkerIncreaseRateLimit = 10 * time.Millisecond
configuration.WorkerDecreaseRateLimit = 10 * time.Millisecond
configuration.MaxWorkers = 24
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
helpers.StartStop(t, c)
if maxWorkers := c.(*realComponent).config.MaxWorkers; maxWorkers != 16 {
t.Errorf("Start() max workers should have been capped to 16 instead of %d", maxWorkers)
}
msg := atomic.Uint32{}
c.StartWorkers(func(_ int, ch chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) {
return func(context.Context, []byte) error {
c := msg.Add(1)
if c <= 1 {
t.Logf("received message %d, request a scale increase", c)
ch <- ScaleIncrease
} else {
t.Logf("received message %d, request a scale decrease", c)
ch <- ScaleDecrease
}
return nil
}, func() {}
})
// 1 worker
time.Sleep(10 * time.Millisecond)
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "worker", "max", "min")
expected := map[string]string{
"worker_decrease_total": "0",
"worker_increase_total": "1",
"workers": "1",
"min_workers": "1",
"max_workers": "16",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
t.Log("Send 1 message (increase)")
record := &kgo.Record{
Topic: expectedTopicName,
Value: []byte("hello"),
}
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
}
time.Sleep(100 * time.Millisecond)
t.Log("Check if workers increased to 9")
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "worker")
expected = map[string]string{
"worker_decrease_total": "0",
"worker_increase_total": "9",
"workers": "9",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
t.Log("Send 10 messages (decrease)")
for range 10 {
record := &kgo.Record{
Topic: expectedTopicName,
Value: []byte("hello"),
}
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
}
}
time.Sleep(100 * time.Millisecond)
t.Log("Check if workers decreased to 8")
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "worker")
expected = map[string]string{
"worker_decrease_total": "1",
"worker_increase_total": "9",
"workers": "8",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
}
func TestKafkaLagMetric(t *testing.T) {
r := reporter.NewMock(t)
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
kfake.WithLogger(kafka.NewLogger(r)),
)
if err != nil {
t.Fatalf("NewCluster() error: %v", err)
}
defer cluster.Close()
// Watch for autocommits to avoid relying on time
clusterCommitNotification := make(chan any)
firstFetch := make(chan any)
var firstFetchOnce sync.Once
cluster.Control(func(request kmsg.Request) (kmsg.Response, error, bool) {
switch k := kmsg.Key(request.Key()); k {
case kmsg.OffsetCommit:
t.Log("offset commit message")
clusterCommitNotification <- nil
case kmsg.Fetch:
firstFetchOnce.Do(func() {
close(firstFetch)
t.Log("fetch request")
})
}
cluster.KeepControl()
return nil, nil, false
})
// Create a producer client
producerConfiguration := kafka.DefaultConfiguration()
producerConfiguration.Brokers = cluster.ListenAddrs()
producerOpts, err := kafka.NewConfig(reporter.NewMock(t), producerConfiguration)
if err != nil {
t.Fatalf("NewConfig() error:\n%+v", err)
}
producerOpts = append(producerOpts, kgo.ProducerLinger(0))
producer, err := kgo.NewClient(producerOpts...)
if err != nil {
t.Fatalf("NewClient() error:\n%+v", err)
}
defer producer.Close()
// Start the component
configuration := DefaultConfiguration()
configuration.Topic = topicName
configuration.Brokers = cluster.ListenAddrs()
configuration.FetchMaxWaitTime = 10 * time.Millisecond
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
helpers.StartStop(t, c)
// Start a worker with a callback that blocks on a channel after receiving a message
workerBlockReceive := make(chan any)
defer close(workerBlockReceive)
c.StartWorkers(func(_ int, _ chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) {
return func(context.Context, []byte) error {
t.Log("worker received a message")
<-workerBlockReceive
return nil
}, func() {}
})
t.Log("wait first fetch")
select {
case <-firstFetch:
case <-time.After(time.Second):
t.Fatal("no initial fetch")
}
// No messages yet, no lag
ctx, cancel := context.WithTimeout(t.Context(), time.Second)
defer cancel()
for {
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "workers")
expected := map[string]string{
"consumergroup_lag_messages": "0",
"workers": "1",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
select {
case <-ctx.Done():
t.Fatalf("Metrics (-got, +want):\n%s", diff)
default:
}
time.Sleep(10 * time.Millisecond)
} else {
break
}
}
t.Log("send a single message")
record := &kgo.Record{
Topic: expectedTopicName,
Value: []byte("hello"),
}
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
}
t.Log("allow message processing")
select {
case workerBlockReceive <- nil:
case <-time.After(time.Second):
t.Fatal("no initial receive")
}
t.Log("wait for autocommit")
select {
case <-time.After(2 * time.Second):
t.Fatal("Timed out waiting for autocommit")
case <-clusterCommitNotification:
}
// The message was processed, there's no lag
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "received_messages_total")
expected := map[string]string{
"consumergroup_lag_messages": "0",
`received_messages_total{worker="0"}`: "1",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
// Send a few more messages without allowing the worker to process them,
// expect the consumer lag to rise
t.Log("send 5 messages")
for range 5 {
record := &kgo.Record{
Topic: expectedTopicName,
Value: []byte("hello"),
}
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
}
}
time.Sleep(20 * time.Millisecond)
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "received_messages_total")
expected = map[string]string{
"consumergroup_lag_messages": "5",
`received_messages_total{worker="0"}`: "2", // The consumer only blocks after incrementing the message counter
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
// Let the worker process all 5 messages (and wait for autocommit), expect
// the lag to drop back to zero
t.Log("accept processing of 5 messages")
for range 5 {
select {
case workerBlockReceive <- nil:
case <-time.After(time.Second):
t.Fatal("no subsequent receive")
}
}
select {
case <-time.After(2 * time.Second):
t.Fatal("Timed out waiting for autocommit")
case <-clusterCommitNotification:
}
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "received_messages_total")
expected = map[string]string{
"consumergroup_lag_messages": "0",
`received_messages_total{worker="0"}`: "6",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
}