mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-11 22:14:02 +01:00
Some checks failed
CI / 🤖 Check dependabot status (push) Has been cancelled
CI / 🐧 Test on Linux (${{ github.ref_type == 'tag' }}, misc) (push) Has been cancelled
CI / 🐧 Test on Linux (coverage) (push) Has been cancelled
CI / 🐧 Test on Linux (regular) (push) Has been cancelled
CI / ❄️ Build on Nix (push) Has been cancelled
CI / 🍏 Build and test on macOS (push) Has been cancelled
CI / 🧪 End-to-end testing (push) Has been cancelled
CI / 🔍 Upload code coverage (push) Has been cancelled
CI / 🔬 Test only Go (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 20) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 22) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 24) (push) Has been cancelled
CI / ⚖️ Check licenses (push) Has been cancelled
CI / 🐋 Build Docker images (push) Has been cancelled
CI / 🐋 Tag Docker images (push) Has been cancelled
CI / 🚀 Publish release (push) Has been cancelled
Update Nix dependency hashes / Update dependency hashes (push) Has been cancelled
Update Go toolchain / Update Go toolchain (push) Has been cancelled
Update Nix flake.lock / Update Nix lockfile (asn2org) (push) Has been cancelled
Update Nix flake.lock / Update Nix lockfile (nixpkgs) (push) Has been cancelled
Use dichotomy to quickly reach the optimal. This avoid too much Kafka rebalances on big setups.
497 lines
14 KiB
Go
497 lines
14 KiB
Go
// SPDX-FileCopyrightText: 2022 Free Mobile
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"strconv"
|
|
"strings"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/twmb/franz-go/pkg/kfake"
|
|
"github.com/twmb/franz-go/pkg/kgo"
|
|
"github.com/twmb/franz-go/pkg/kmsg"
|
|
|
|
"akvorado/common/daemon"
|
|
"akvorado/common/helpers"
|
|
"akvorado/common/kafka"
|
|
"akvorado/common/pb"
|
|
"akvorado/common/reporter"
|
|
)
|
|
|
|
func TestFakeKafka(t *testing.T) {
|
|
r := reporter.NewMock(t)
|
|
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
|
|
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
|
|
|
|
cluster, err := kfake.NewCluster(
|
|
kfake.NumBrokers(1),
|
|
kfake.SeedTopics(16, expectedTopicName),
|
|
kfake.WithLogger(kafka.NewLogger(r)),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("NewCluster() error: %v", err)
|
|
}
|
|
defer cluster.Close()
|
|
|
|
// Create a producer client
|
|
producerConfiguration := kafka.DefaultConfiguration()
|
|
producerConfiguration.Brokers = cluster.ListenAddrs()
|
|
producerOpts, err := kafka.NewConfig(reporter.NewMock(t), producerConfiguration)
|
|
if err != nil {
|
|
t.Fatalf("NewConfig() error:\n%+v", err)
|
|
}
|
|
producer, err := kgo.NewClient(producerOpts...)
|
|
if err != nil {
|
|
t.Fatalf("NewClient() error:\n%+v", err)
|
|
}
|
|
defer producer.Close()
|
|
|
|
// Callback
|
|
got := []string{}
|
|
expected := []string{"hello 1", "hello 2", "hello 3"}
|
|
gotAll := make(chan bool)
|
|
callback := func(_ context.Context, message []byte) error {
|
|
got = append(got, string(message))
|
|
if len(got) == len(expected) {
|
|
close(gotAll)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Start the component
|
|
configuration := DefaultConfiguration()
|
|
configuration.Topic = topicName
|
|
configuration.Brokers = cluster.ListenAddrs()
|
|
configuration.FetchMaxWaitTime = 100 * time.Millisecond
|
|
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
|
|
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
|
|
if err != nil {
|
|
t.Fatalf("New() error:\n%+v", err)
|
|
}
|
|
if err := c.(*realComponent).Start(); err != nil {
|
|
t.Fatalf("Start() error:\n%+v", err)
|
|
}
|
|
shutdownCalled := false
|
|
c.StartWorkers(func(int, chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) {
|
|
return callback, func() { shutdownCalled = true }
|
|
})
|
|
|
|
// Send messages
|
|
time.Sleep(100 * time.Millisecond)
|
|
t.Log("producing values")
|
|
for _, value := range expected {
|
|
record := &kgo.Record{
|
|
Topic: expectedTopicName,
|
|
Value: []byte(value),
|
|
}
|
|
results := producer.ProduceSync(context.Background(), record)
|
|
if err := results.FirstErr(); err != nil {
|
|
t.Fatalf("ProduceSync() error:\n%+v", err)
|
|
}
|
|
}
|
|
t.Log("values produced")
|
|
|
|
// Wait for them
|
|
select {
|
|
case <-time.After(time.Second):
|
|
t.Fatal("Too long to get messages")
|
|
case <-gotAll:
|
|
}
|
|
|
|
if diff := helpers.Diff(got, expected); diff != "" {
|
|
t.Errorf("Didn't received the expected messages (-got, +want):\n%s", diff)
|
|
}
|
|
|
|
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "received_")
|
|
fetches, _ := strconv.Atoi(gotMetrics[`received_fetches_total{worker="0"}`])
|
|
expectedMetrics := map[string]string{
|
|
`received_bytes_total{worker="0"}`: "21",
|
|
`received_fetches_total{worker="0"}`: strconv.Itoa(max(fetches, 1)),
|
|
`received_messages_total{worker="0"}`: "3",
|
|
}
|
|
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
|
|
t.Errorf("Metrics (-got, +want):\n%s", diff)
|
|
}
|
|
|
|
if err := c.Stop(); err != nil {
|
|
t.Fatalf("Stop() error:\n%+v", err)
|
|
}
|
|
if !shutdownCalled {
|
|
t.Fatal("Stop() didn't call shutdown function")
|
|
}
|
|
}
|
|
|
|
func TestStartSeveralWorkers(t *testing.T) {
|
|
r := reporter.NewMock(t)
|
|
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
|
|
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
|
|
|
|
cluster, err := kfake.NewCluster(
|
|
kfake.NumBrokers(1),
|
|
kfake.SeedTopics(16, expectedTopicName),
|
|
kfake.WithLogger(kafka.NewLogger(r)),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("NewCluster() error: %v", err)
|
|
}
|
|
defer cluster.Close()
|
|
|
|
// Start the component
|
|
configuration := DefaultConfiguration()
|
|
configuration.Topic = topicName
|
|
configuration.Brokers = cluster.ListenAddrs()
|
|
configuration.FetchMaxWaitTime = 100 * time.Millisecond
|
|
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
|
|
configuration.MinWorkers = 5
|
|
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
|
|
if err != nil {
|
|
t.Fatalf("New() error:\n%+v", err)
|
|
}
|
|
if err := c.(*realComponent).Start(); err != nil {
|
|
t.Fatalf("Start() error:\n%+v", err)
|
|
}
|
|
c.StartWorkers(func(int, chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) {
|
|
return func(context.Context, []byte) error { return nil }, func() {}
|
|
})
|
|
time.Sleep(20 * time.Millisecond)
|
|
if err := c.Stop(); err != nil {
|
|
t.Fatalf("Stop() error:\n%+v", err)
|
|
}
|
|
|
|
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_")
|
|
connectsTotal := 0
|
|
writeBytesTotal := 0
|
|
readBytesTotal := 0
|
|
for k := range gotMetrics {
|
|
if strings.HasPrefix(k, "write_bytes_total") {
|
|
writeBytesTotal++
|
|
}
|
|
if strings.HasPrefix(k, "read_bytes_total") {
|
|
readBytesTotal++
|
|
}
|
|
if strings.HasPrefix(k, "connects_total") {
|
|
connectsTotal++
|
|
}
|
|
}
|
|
got := map[string]int{
|
|
"write_bytes_total": writeBytesTotal,
|
|
"read_bytes_total": readBytesTotal,
|
|
"connects_total": connectsTotal,
|
|
}
|
|
expected := map[string]int{
|
|
// For some reason, we have each metric in double, with one seed_0.
|
|
"write_bytes_total": 10,
|
|
"read_bytes_total": 10,
|
|
"connects_total": 10,
|
|
}
|
|
if diff := helpers.Diff(got, expected); diff != "" {
|
|
t.Errorf("Metrics (-got, +want):\n%s", diff)
|
|
}
|
|
}
|
|
|
|
func TestWorkerScaling(t *testing.T) {
|
|
r := reporter.NewMock(t)
|
|
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
|
|
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
|
|
|
|
cluster, err := kfake.NewCluster(
|
|
kfake.NumBrokers(1),
|
|
kfake.SeedTopics(16, expectedTopicName),
|
|
kfake.WithLogger(kafka.NewLogger(r)),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("NewCluster() error: %v", err)
|
|
}
|
|
defer cluster.Close()
|
|
|
|
// Create a producer client
|
|
producerConfiguration := kafka.DefaultConfiguration()
|
|
producerConfiguration.Brokers = cluster.ListenAddrs()
|
|
producerOpts, err := kafka.NewConfig(reporter.NewMock(t), producerConfiguration)
|
|
if err != nil {
|
|
t.Fatalf("NewConfig() error:\n%+v", err)
|
|
}
|
|
producer, err := kgo.NewClient(producerOpts...)
|
|
if err != nil {
|
|
t.Fatalf("NewClient() error:\n%+v", err)
|
|
}
|
|
defer producer.Close()
|
|
|
|
// Start the component
|
|
configuration := DefaultConfiguration()
|
|
configuration.Topic = topicName
|
|
configuration.Brokers = cluster.ListenAddrs()
|
|
configuration.FetchMaxWaitTime = 10 * time.Millisecond
|
|
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
|
|
configuration.WorkerIncreaseRateLimit = 10 * time.Millisecond
|
|
configuration.WorkerDecreaseRateLimit = 10 * time.Millisecond
|
|
configuration.MaxWorkers = 24
|
|
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
|
|
if err != nil {
|
|
t.Fatalf("New() error:\n%+v", err)
|
|
}
|
|
helpers.StartStop(t, c)
|
|
|
|
if maxWorkers := c.(*realComponent).config.MaxWorkers; maxWorkers != 16 {
|
|
t.Errorf("Start() max workers should have been capped to 16 instead of %d", maxWorkers)
|
|
}
|
|
msg := atomic.Uint32{}
|
|
c.StartWorkers(func(_ int, ch chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) {
|
|
return func(context.Context, []byte) error {
|
|
c := msg.Add(1)
|
|
if c <= 5 || c > 15 {
|
|
t.Logf("received message %d, request a scale increase", c)
|
|
ch <- ScaleIncrease
|
|
} else {
|
|
t.Logf("received message %d, request a scale decrease", c)
|
|
ch <- ScaleDecrease
|
|
}
|
|
return nil
|
|
}, func() {}
|
|
})
|
|
|
|
// 1 worker
|
|
time.Sleep(10 * time.Millisecond)
|
|
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "worker", "max", "min")
|
|
expected := map[string]string{
|
|
"worker_decrease_total": "0",
|
|
"worker_increase_total": "1",
|
|
"workers": "1",
|
|
"min_workers": "1",
|
|
"max_workers": "16",
|
|
}
|
|
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
|
|
t.Fatalf("Metrics (-got, +want):\n%s", diff)
|
|
}
|
|
|
|
// Send 5 messages in a row
|
|
t.Log("Send 5 messages")
|
|
for range 5 {
|
|
record := &kgo.Record{
|
|
Topic: expectedTopicName,
|
|
Value: []byte("hello"),
|
|
}
|
|
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
|
|
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
|
|
}
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
t.Log("Check if workers increased to 9")
|
|
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "worker")
|
|
expected = map[string]string{
|
|
"worker_decrease_total": "0",
|
|
"worker_increase_total": "9",
|
|
"workers": "9",
|
|
}
|
|
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
|
|
t.Fatalf("Metrics (-got, +want):\n%s", diff)
|
|
}
|
|
|
|
// Send 5 messages
|
|
t.Log("Send 5 messages")
|
|
for range 5 {
|
|
record := &kgo.Record{
|
|
Topic: expectedTopicName,
|
|
Value: []byte("hello"),
|
|
}
|
|
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
|
|
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
|
|
}
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
t.Log("Check if workers decreased to 5")
|
|
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "worker")
|
|
expected = map[string]string{
|
|
"worker_decrease_total": "4",
|
|
"worker_increase_total": "9",
|
|
"workers": "5",
|
|
}
|
|
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
|
|
t.Fatalf("Metrics (-got, +want):\n%s", diff)
|
|
}
|
|
|
|
// Send more messages until reaching max workers
|
|
t.Log("Send more messages requesting scale up to reach max")
|
|
var diff string
|
|
var matches int
|
|
for range 100 {
|
|
record := &kgo.Record{
|
|
Topic: expectedTopicName,
|
|
Value: []byte("hello"),
|
|
}
|
|
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
|
|
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
|
|
}
|
|
time.Sleep(5 * time.Millisecond)
|
|
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "workers")
|
|
expected = map[string]string{
|
|
"workers": "16",
|
|
}
|
|
diff = helpers.Diff(gotMetrics, expected)
|
|
if diff == "" {
|
|
matches++
|
|
} else {
|
|
matches = 0
|
|
}
|
|
if matches == 5 {
|
|
return
|
|
}
|
|
}
|
|
t.Fatalf("Metrics (-got, +want):\n%s", diff)
|
|
}
|
|
|
|
func TestKafkaLagMetric(t *testing.T) {
|
|
r := reporter.NewMock(t)
|
|
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
|
|
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
|
|
|
|
cluster, err := kfake.NewCluster(
|
|
kfake.NumBrokers(1),
|
|
kfake.SeedTopics(16, expectedTopicName),
|
|
kfake.WithLogger(kafka.NewLogger(r)),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("NewCluster() error: %v", err)
|
|
}
|
|
defer cluster.Close()
|
|
|
|
// Watch for autocommits to avoid relying on time
|
|
clusterCommitNotification := make(chan any)
|
|
cluster.Control(func(request kmsg.Request) (kmsg.Response, error, bool) {
|
|
switch k := kmsg.Key(request.Key()); k {
|
|
case kmsg.OffsetCommit:
|
|
clusterCommitNotification <- nil
|
|
}
|
|
cluster.KeepControl()
|
|
return nil, nil, false
|
|
})
|
|
|
|
// Create a producer client
|
|
producerConfiguration := kafka.DefaultConfiguration()
|
|
producerConfiguration.Brokers = cluster.ListenAddrs()
|
|
producerOpts, err := kafka.NewConfig(reporter.NewMock(t), producerConfiguration)
|
|
if err != nil {
|
|
t.Fatalf("NewConfig() error:\n%+v", err)
|
|
}
|
|
producer, err := kgo.NewClient(producerOpts...)
|
|
if err != nil {
|
|
t.Fatalf("NewClient() error:\n%+v", err)
|
|
}
|
|
defer producer.Close()
|
|
|
|
// Start the component
|
|
configuration := DefaultConfiguration()
|
|
configuration.Topic = topicName
|
|
configuration.Brokers = cluster.ListenAddrs()
|
|
configuration.FetchMaxWaitTime = 10 * time.Millisecond
|
|
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
|
|
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
|
|
if err != nil {
|
|
t.Fatalf("New() error:\n%+v", err)
|
|
}
|
|
helpers.StartStop(t, c)
|
|
|
|
// Start a worker with a callback that blocks on a channel after receiving a message
|
|
workerBlockReceive := make(chan any)
|
|
defer close(workerBlockReceive)
|
|
c.StartWorkers(func(_ int, _ chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) {
|
|
return func(context.Context, []byte) error {
|
|
<-workerBlockReceive
|
|
return nil
|
|
}, func() {}
|
|
})
|
|
|
|
// No messages yet, no lag
|
|
ctx, cancel := context.WithTimeout(t.Context(), time.Second)
|
|
defer cancel()
|
|
for {
|
|
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "workers")
|
|
expected := map[string]string{
|
|
"consumergroup_lag_messages": "0",
|
|
"workers": "1",
|
|
}
|
|
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatalf("Metrics (-got, +want):\n%s", diff)
|
|
default:
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
// Send a single message, allow it to be processed
|
|
record := &kgo.Record{
|
|
Topic: expectedTopicName,
|
|
Value: []byte("hello"),
|
|
}
|
|
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
|
|
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
|
|
}
|
|
workerBlockReceive <- nil
|
|
|
|
// Wait for autocommit
|
|
select {
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("Timed out waiting for autocommit")
|
|
case <-clusterCommitNotification:
|
|
}
|
|
|
|
// The message was processed, there's no lag
|
|
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "received_messages_total")
|
|
expected := map[string]string{
|
|
"consumergroup_lag_messages": "0",
|
|
`received_messages_total{worker="0"}`: "1",
|
|
}
|
|
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
|
|
t.Fatalf("Metrics (-got, +want):\n%s", diff)
|
|
}
|
|
|
|
// Send a few more messages without allowing the worker to process them, expect the consumer lag to rise
|
|
for range 5 {
|
|
record := &kgo.Record{
|
|
Topic: expectedTopicName,
|
|
Value: []byte("hello"),
|
|
}
|
|
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
|
|
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
|
|
}
|
|
}
|
|
|
|
time.Sleep(20 * time.Millisecond)
|
|
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "received_messages_total")
|
|
expected = map[string]string{
|
|
"consumergroup_lag_messages": "5",
|
|
`received_messages_total{worker="0"}`: "2", // The consumer only blocks after incrementing the message counter
|
|
}
|
|
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
|
|
t.Fatalf("Metrics (-got, +want):\n%s", diff)
|
|
}
|
|
|
|
// Let the worker process all 5 messages (and wait for autocommit), expect the lag to drop back to zero
|
|
for range 5 {
|
|
workerBlockReceive <- nil
|
|
}
|
|
select {
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("Timed out waiting for autocommit")
|
|
case <-clusterCommitNotification:
|
|
}
|
|
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "received_messages_total")
|
|
expected = map[string]string{
|
|
"consumergroup_lag_messages": "0",
|
|
`received_messages_total{worker="0"}`: "6",
|
|
}
|
|
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
|
|
t.Fatalf("Metrics (-got, +want):\n%s", diff)
|
|
}
|
|
}
|