diff --git a/outlet/kafka/worker.go b/outlet/kafka/worker.go index ca425048..0be01b2c 100644 --- a/outlet/kafka/worker.go +++ b/outlet/kafka/worker.go @@ -66,7 +66,6 @@ func (c *realComponent) startOneWorker() error { Logger() defer func() { logger.Info().Msg("stopping worker") - shutdown() // Allow a small grace time to commit uncommited work. ctx, cancelCommit := context.WithTimeout(context.Background(), 10*time.Second) @@ -74,8 +73,9 @@ func (c *realComponent) startOneWorker() error { if err := client.CommitMarkedOffsets(ctx); err != nil { logger.Err(err).Msg("cannot commit marked partition offsets") } - client.CloseAllowingRebalance() + + shutdown() }() for {