From 3291abe680eafd204c3949b80121cbb69ccf2f7c Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Thu, 13 Nov 2025 22:53:34 +0100 Subject: [PATCH] outlet/kafka: delay shutdown when stopping worker after closing client Flushing can take some time and we have an heartbeat to respect to commit offsets. --- outlet/kafka/worker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/outlet/kafka/worker.go b/outlet/kafka/worker.go index ca425048..0be01b2c 100644 --- a/outlet/kafka/worker.go +++ b/outlet/kafka/worker.go @@ -66,7 +66,6 @@ func (c *realComponent) startOneWorker() error { Logger() defer func() { logger.Info().Msg("stopping worker") - shutdown() // Allow a small grace time to commit uncommited work. ctx, cancelCommit := context.WithTimeout(context.Background(), 10*time.Second) @@ -74,8 +73,9 @@ func (c *realComponent) startOneWorker() error { if err := client.CommitMarkedOffsets(ctx); err != nil { logger.Err(err).Msg("cannot commit marked partition offsets") } - client.CloseAllowingRebalance() + + shutdown() }() for {