outlet/kafka: expose consumer lag as a prometheus metric

Monitoring consumer lag is useful to troubleshoot performance/scaling
issues. It can currenctly be seen through kafka-ui, but a proper metric
is more practical.

Unfortunately, JMX metrics on the broker don't expose this. It seems
that people usually resort to monitoring from the consumer side, or
through other external exporters like Burrow or kafka_exporter.

franz-go/kadm provides a function to compute the consumer lag so let's
do it from the consumer side (the outlet)
This commit is contained in:
François HORTA
2025-08-28 12:02:39 +02:00
committed by Vincent Bernat
parent 77306ddcee
commit e3a778552d
3 changed files with 242 additions and 0 deletions

View File

@@ -15,6 +15,7 @@ import (
"github.com/twmb/franz-go/pkg/kfake" "github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"akvorado/common/daemon" "akvorado/common/daemon"
"akvorado/common/helpers" "akvorado/common/helpers"
@@ -351,3 +352,141 @@ func TestWorkerScaling(t *testing.T) {
t.Fatalf("Metrics (-got, +want):\n%s", diff) t.Fatalf("Metrics (-got, +want):\n%s", diff)
} }
} }
func TestKafkaLagMetric(t *testing.T) {
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
)
if err != nil {
t.Fatalf("NewCluster() error: %v", err)
}
defer cluster.Close()
// Watch for autocommits to avoid relying on time
clusterCommitNotification := make(chan interface{})
cluster.Control(func(request kmsg.Request) (kmsg.Response, error, bool) {
switch k := kmsg.Key(request.Key()); k {
case kmsg.OffsetCommit:
clusterCommitNotification <- nil
}
cluster.KeepControl()
return nil, nil, false
})
// Create a producer client
producerConfiguration := kafka.DefaultConfiguration()
producerConfiguration.Brokers = cluster.ListenAddrs()
producerOpts, err := kafka.NewConfig(reporter.NewMock(t), producerConfiguration)
if err != nil {
t.Fatalf("NewConfig() error:\n%+v", err)
}
producer, err := kgo.NewClient(producerOpts...)
if err != nil {
t.Fatalf("NewClient() error:\n%+v", err)
}
defer producer.Close()
// Start the component
configuration := DefaultConfiguration()
configuration.Topic = topicName
configuration.Brokers = cluster.ListenAddrs()
configuration.FetchMaxWaitTime = 10 * time.Millisecond
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
r := reporter.NewMock(t)
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
helpers.StartStop(t, c)
// Start a worker with a callback that blocks on a channel after receiving a message
workerBlockReceive := make(chan interface{})
defer close(workerBlockReceive)
c.StartWorkers(func(_ int, _ chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) {
return func(context.Context, []byte) error {
<-workerBlockReceive
return nil
}, func() {}
})
// No messages yet, no lag
time.Sleep(10 * time.Millisecond)
gotMetrics := r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "workers")
expected := map[string]string{
"consumergroup_lag_messages": "0",
"workers": "1",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
// Send a single message, allow it to be processed
record := &kgo.Record{
Topic: expectedTopicName,
Value: []byte("hello"),
}
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
}
workerBlockReceive <- nil
// Wait for autocommit
select {
case <-time.After(2 * time.Second):
t.Fatal("Timed out waiting for autocommit")
case <-clusterCommitNotification:
}
// The message was processed, there's no lag
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "received_messages_total")
expected = map[string]string{
"consumergroup_lag_messages": "0",
`received_messages_total{worker="0"}`: "1",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
// Send a few more messages without allowing the worker to process them, expect the consumer lag to rise
for range 5 {
record := &kgo.Record{
Topic: expectedTopicName,
Value: []byte("hello"),
}
if results := producer.ProduceSync(context.Background(), record); results.FirstErr() != nil {
t.Fatalf("ProduceSync() error:\n%+v", results.FirstErr())
}
}
time.Sleep(20 * time.Millisecond)
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "received_messages_total")
expected = map[string]string{
"consumergroup_lag_messages": "5",
`received_messages_total{worker="0"}`: "2", // The consumer only blocks after incrementing the message counter
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
// Let the worker process all 5 messages (and wait for autocommit), expect the lag to drop back to zero
for range 5 {
workerBlockReceive <- nil
}
select {
case <-time.After(2 * time.Second):
t.Fatal("Timed out waiting for autocommit")
case <-clusterCommitNotification:
}
gotMetrics = r.GetMetrics("akvorado_outlet_kafka_", "consumergroup", "received_messages_total")
expected = map[string]string{
"consumergroup_lag_messages": "0",
`received_messages_total{worker="0"}`: "6",
}
if diff := helpers.Diff(gotMetrics, expected); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
}

View File

@@ -4,6 +4,11 @@
package kafka package kafka
import ( import (
"context"
"fmt"
"time"
"akvorado/common/pb"
"akvorado/common/reporter" "akvorado/common/reporter"
) )
@@ -15,6 +20,7 @@ type metrics struct {
workers reporter.GaugeFunc workers reporter.GaugeFunc
workerIncrease reporter.Counter workerIncrease reporter.Counter
workerDecrease reporter.Counter workerDecrease reporter.Counter
consumerLag reporter.GaugeFunc
} }
func (c *realComponent) initMetrics() { func (c *realComponent) initMetrics() {
@@ -69,4 +75,73 @@ func (c *realComponent) initMetrics() {
Help: "Number of times a new worker was stopped.", Help: "Number of times a new worker was stopped.",
}, },
) )
c.metrics.consumerLag = c.r.GaugeFunc(
reporter.GaugeOpts{
Name: "consumergroup_lag_messages",
Help: "Current consumer lag across all partitions. A value of -1 indicates an issue with Kafka and/or consumers",
},
func() float64 {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
c.kadmClientMu.Lock()
defer c.kadmClientMu.Unlock()
if c.kadmClient == nil {
return -1
}
lag, err := c.computeLagMetric(ctx)
if err != nil {
c.r.Err(err).Msg("lag metric refresh failed, setting to -1")
return -1
}
return lag
},
)
}
func (c *realComponent) computeLagMetric(ctx context.Context) (float64, error) {
lag, err := c.kadmClient.Lag(ctx, c.config.ConsumerGroup)
if err != nil {
return -1, fmt.Errorf("unable to compute Kafka group lag: %w", err)
}
// The map entry should exist, but let's check anyway to be safe
perGroupLag, ok := lag[c.config.ConsumerGroup]
if !ok {
return -1, fmt.Errorf("unable to find Kafka consumer group %q", c.config.ConsumerGroup)
}
if perGroupLag.FetchErr != nil {
return -1, fmt.Errorf("unable to fetch Kafka consumer group offsets %q: %w", c.config.ConsumerGroup, perGroupLag.FetchErr)
}
if perGroupLag.DescribeErr != nil {
return -1, fmt.Errorf("unable to describe Kafka consumer group %q: %w", c.config.ConsumerGroup, perGroupLag.DescribeErr)
}
// Retrieve only the current topic as there may be several
topic := fmt.Sprintf("%s-v%d", c.config.Topic, pb.Version)
perPartitionGroupLag, ok := perGroupLag.Lag[topic]
if !ok {
return -1, fmt.Errorf("unable to find Kafka consumer group lag for topic %q", topic)
}
// Finally, sum the lag across all partitions
var lagTotal int64
for _, partitionLag := range perPartitionGroupLag {
// Skip possibly unassigned partitions in case of rebalancing
if partitionLag.IsEmpty() {
continue
}
if partitionLag.Err != nil {
memberOrInstanceID := partitionLag.Member.MemberID
if partitionLag.Member.InstanceID != nil {
memberOrInstanceID = *partitionLag.Member.InstanceID
}
return -1, fmt.Errorf("unable to compute Kafka consumer lag because of a commit error on group %q, member %q, partition %q: %w", c.config.ConsumerGroup, memberOrInstanceID, partitionLag.Partition, partitionLag.Err)
}
lagTotal += partitionLag.Lag
}
return float64(lagTotal), nil
} }

View File

@@ -6,9 +6,11 @@ package kafka
import ( import (
"fmt" "fmt"
"strings"
"sync" "sync"
"time" "time"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/plugin/kprom" "github.com/twmb/franz-go/plugin/kprom"
"gopkg.in/tomb.v2" "gopkg.in/tomb.v2"
@@ -33,6 +35,8 @@ type realComponent struct {
config Configuration config Configuration
kafkaOpts []kgo.Opt kafkaOpts []kgo.Opt
kadmClient *kadm.Client
kadmClientMu sync.Mutex
kafkaMetrics []*kprom.Metrics kafkaMetrics []*kprom.Metrics
workerMu sync.Mutex workerMu sync.Mutex
@@ -87,6 +91,24 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
// Start starts the Kafka component. // Start starts the Kafka component.
func (c *realComponent) Start() error { func (c *realComponent) Start() error {
c.r.Info().Msg("starting Kafka component") c.r.Info().Msg("starting Kafka component")
// Create an admin Kafka client
kafkaOpts, err := kafka.NewConfig(c.r, c.config.Configuration)
if err != nil {
return err
}
kadmClient, err := kadm.NewOptClient(kafkaOpts...)
if err != nil {
c.r.Err(err).
Str("brokers", strings.Join(c.config.Brokers, ",")).
Msg("unable to create Kafka admin client")
return fmt.Errorf("unable to create Kafka admin client: %w", err)
}
c.kadmClientMu.Lock()
defer c.kadmClientMu.Unlock()
c.kadmClient = kadmClient
return nil return nil
} }
@@ -106,6 +128,12 @@ func (c *realComponent) StartWorkers(workerBuilder WorkerBuilderFunc) error {
func (c *realComponent) Stop() error { func (c *realComponent) Stop() error {
defer func() { defer func() {
c.stopAllWorkers() c.stopAllWorkers()
c.kadmClientMu.Lock()
defer c.kadmClientMu.Unlock()
if c.kadmClient != nil {
c.kadmClient.Close()
c.kadmClient = nil
}
c.r.Info().Msg("Kafka component stopped") c.r.Info().Msg("Kafka component stopped")
}() }()
c.r.Info().Msg("stopping Kafka component") c.r.Info().Msg("stopping Kafka component")