From f4875ed7b3c82fb114f7ceda49a8a62b6b18034a Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Thu, 13 Nov 2025 20:39:59 +0100 Subject: [PATCH] outlet/kafka: execute shutdown before committing work And add a bit more logging to understand what happens on shutdown. --- outlet/core/worker.go | 2 ++ outlet/kafka/worker.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/outlet/core/worker.go b/outlet/core/worker.go index 22a3b382..1c2dc9ff 100644 --- a/outlet/core/worker.go +++ b/outlet/core/worker.go @@ -44,9 +44,11 @@ func (c *Component) newWorker(i int, scaleRequestChan chan<- kafka.ScaleRequest) // shutdown shutdowns the worker, flushing any remaining data. func (w *worker) shutdown() { + w.l.Info().Msg("flush final batch to ClickHouse") ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() w.cw.Flush(ctx) + w.l.Info().Msg("worker stopped") } // processIncomingFlow processes one incoming flow from Kafka. diff --git a/outlet/kafka/worker.go b/outlet/kafka/worker.go index 479c3de9..ca425048 100644 --- a/outlet/kafka/worker.go +++ b/outlet/kafka/worker.go @@ -66,6 +66,7 @@ func (c *realComponent) startOneWorker() error { Logger() defer func() { logger.Info().Msg("stopping worker") + shutdown() // Allow a small grace time to commit uncommited work. ctx, cancelCommit := context.WithTimeout(context.Background(), 10*time.Second) @@ -74,7 +75,6 @@ func (c *realComponent) startOneWorker() error { logger.Err(err).Msg("cannot commit marked partition offsets") } - shutdown() client.CloseAllowingRebalance() }()