Files
akvorado/outlet/kafka/worker.go
Vincent Bernat 3291abe680
Some checks failed
CI / 🤖 Check dependabot status (push) Has been cancelled
CI / 🐧 Test on Linux (${{ github.ref_type == 'tag' }}, misc) (push) Has been cancelled
CI / 🐧 Test on Linux (coverage) (push) Has been cancelled
CI / 🐧 Test on Linux (regular) (push) Has been cancelled
CI / ❄️ Build on Nix (push) Has been cancelled
CI / 🍏 Build and test on macOS (push) Has been cancelled
CI / 🧪 End-to-end testing (push) Has been cancelled
CI / 🔍 Upload code coverage (push) Has been cancelled
CI / 🔬 Test only Go (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 20) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 22) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 24) (push) Has been cancelled
CI / ⚖️ Check licenses (push) Has been cancelled
CI / 🐋 Build Docker images (push) Has been cancelled
CI / 🐋 Tag Docker images (push) Has been cancelled
CI / 🚀 Publish release (push) Has been cancelled
outlet/kafka: delay shutdown when stopping worker after closing client
Flushing can take some time and we have an heartbeat to respect to
commit offsets.
2025-11-13 22:53:34 +01:00

143 lines
3.6 KiB
Go

// SPDX-FileCopyrightText: 2025 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package kafka
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/plugin/kprom"
)
// worker represents a worker
type worker struct {
stop func()
}
// newClient returns a new Kafka client
func (c *realComponent) newClient(i int) (*kgo.Client, error) {
logger := c.r.With().Int("worker", i).Logger()
logger.Info().Msg("starting new client")
kmetrics := kprom.NewMetrics("", kprom.WithStaticLabel(prometheus.Labels{"worker": strconv.Itoa(i)}))
kafkaOpts := append(c.kafkaOpts, kgo.WithHooks(kmetrics))
client, err := kgo.NewClient(kafkaOpts...)
if err != nil {
logger.Err(err).
Int("worker", i).
Str("brokers", strings.Join(c.config.Brokers, ",")).
Msg("unable to create new client")
return nil, fmt.Errorf("unable to create Kafka client: %w", err)
}
c.kafkaMetrics = append(c.kafkaMetrics, kmetrics)
c.r.RegisterMetricCollector(kmetrics)
return client, nil
}
// startOneWorker starts a new worker.
func (c *realComponent) startOneWorker() error {
c.workerMu.Lock()
defer c.workerMu.Unlock()
// New consumer
i := len(c.workers)
client, err := c.newClient(i)
if err != nil {
return err
}
callback, shutdown := c.workerBuilder(i, c.workerRequestChan)
consumer := c.NewConsumer(i, callback)
// Goroutine for worker
ctx, cancel := context.WithCancelCause(context.Background())
ctx = c.t.Context(ctx)
c.t.Go(func() error {
logger := c.r.With().
Int("worker", i).
Logger()
defer func() {
logger.Info().Msg("stopping worker")
// Allow a small grace time to commit uncommited work.
ctx, cancelCommit := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelCommit()
if err := client.CommitMarkedOffsets(ctx); err != nil {
logger.Err(err).Msg("cannot commit marked partition offsets")
}
client.CloseAllowingRebalance()
shutdown()
}()
for {
select {
case <-ctx.Done():
return nil
default:
fetches := client.PollFetches(ctx)
if fetches.IsClientClosed() {
logger.Error().Msg("client is closed")
return errors.New("client is closed")
}
if err := consumer.ProcessFetches(ctx, client, fetches); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, ErrStopProcessing) {
return nil
}
logger.Err(err).Msg("cannot process fetched messages")
return fmt.Errorf("cannot process fetched messages: %w", err)
}
client.AllowRebalance()
}
}
})
c.workers = append(c.workers, worker{
stop: func() {
cancel(ErrStopProcessing)
},
})
c.metrics.workerIncrease.Inc()
return nil
}
// stopOneWorker stops the last worker
func (c *realComponent) stopOneWorker() {
c.workerMu.Lock()
defer c.workerMu.Unlock()
i := len(c.workers) - 1
worker := c.workers[i]
worker.stop()
c.workers = c.workers[:i]
kmetrics := c.kafkaMetrics[i]
c.r.UnregisterMetricCollector(kmetrics)
c.kafkaMetrics = c.kafkaMetrics[:i]
c.metrics.workerDecrease.Inc()
}
// stopAllWorkers stops all workers
func (c *realComponent) stopAllWorkers() {
c.workerMu.Lock()
defer c.workerMu.Unlock()
for _, worker := range c.workers {
worker.stop()
}
}
// onPartitionsRevoked is called when partitions are revoked. We need to commit.
func (c *realComponent) onPartitionsRevoked(ctx context.Context, client *kgo.Client, _ map[string][]int32) {
if err := client.CommitMarkedOffsets(ctx); err != nil {
c.r.Err(err).Msg("cannot commit marked offsets on partition revoked")
}
}