mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-11 22:14:02 +01:00
outlet/kafka: execute shutdown before committing work
Some checks failed
CI / 🤖 Check dependabot status (push) Has been cancelled
CI / 🐧 Test on Linux (${{ github.ref_type == 'tag' }}, misc) (push) Has been cancelled
CI / 🐧 Test on Linux (coverage) (push) Has been cancelled
CI / 🐧 Test on Linux (regular) (push) Has been cancelled
CI / ❄️ Build on Nix (push) Has been cancelled
CI / 🍏 Build and test on macOS (push) Has been cancelled
CI / 🧪 End-to-end testing (push) Has been cancelled
CI / 🔍 Upload code coverage (push) Has been cancelled
CI / 🔬 Test only Go (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 20) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 22) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 24) (push) Has been cancelled
CI / ⚖️ Check licenses (push) Has been cancelled
CI / 🐋 Build Docker images (push) Has been cancelled
CI / 🐋 Tag Docker images (push) Has been cancelled
CI / 🚀 Publish release (push) Has been cancelled
Update Nix dependency hashes / Update dependency hashes (push) Has been cancelled
Some checks failed
CI / 🤖 Check dependabot status (push) Has been cancelled
CI / 🐧 Test on Linux (${{ github.ref_type == 'tag' }}, misc) (push) Has been cancelled
CI / 🐧 Test on Linux (coverage) (push) Has been cancelled
CI / 🐧 Test on Linux (regular) (push) Has been cancelled
CI / ❄️ Build on Nix (push) Has been cancelled
CI / 🍏 Build and test on macOS (push) Has been cancelled
CI / 🧪 End-to-end testing (push) Has been cancelled
CI / 🔍 Upload code coverage (push) Has been cancelled
CI / 🔬 Test only Go (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 20) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 22) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 24) (push) Has been cancelled
CI / ⚖️ Check licenses (push) Has been cancelled
CI / 🐋 Build Docker images (push) Has been cancelled
CI / 🐋 Tag Docker images (push) Has been cancelled
CI / 🚀 Publish release (push) Has been cancelled
Update Nix dependency hashes / Update dependency hashes (push) Has been cancelled
And add a bit more logging to understand what happens on shutdown.
This commit is contained in:
@@ -44,9 +44,11 @@ func (c *Component) newWorker(i int, scaleRequestChan chan<- kafka.ScaleRequest)
|
||||
|
||||
// shutdown shutdowns the worker, flushing any remaining data.
|
||||
func (w *worker) shutdown() {
|
||||
w.l.Info().Msg("flush final batch to ClickHouse")
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
w.cw.Flush(ctx)
|
||||
w.l.Info().Msg("worker stopped")
|
||||
}
|
||||
|
||||
// processIncomingFlow processes one incoming flow from Kafka.
|
||||
|
||||
@@ -66,6 +66,7 @@ func (c *realComponent) startOneWorker() error {
|
||||
Logger()
|
||||
defer func() {
|
||||
logger.Info().Msg("stopping worker")
|
||||
shutdown()
|
||||
|
||||
// Allow a small grace time to commit uncommited work.
|
||||
ctx, cancelCommit := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
@@ -74,7 +75,6 @@ func (c *realComponent) startOneWorker() error {
|
||||
logger.Err(err).Msg("cannot commit marked partition offsets")
|
||||
}
|
||||
|
||||
shutdown()
|
||||
client.CloseAllowingRebalance()
|
||||
}()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user