From b6bb6e4af1e1363cf28fdba00098ed899ebe1d17 Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Mon, 10 Nov 2025 17:03:32 +0100 Subject: [PATCH] outlet/clickhouse: ensure we don't cancel flushing when we don't need We really want to use max(current deadline, grace period timeout), but this is not easy with contexts. So, we spawn a goroutine for that. We need some extra care to ensure it does not leak. Maybe it would be easier to just use `context.Background()`? --- outlet/clickhouse/worker.go | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/outlet/clickhouse/worker.go b/outlet/clickhouse/worker.go index 23144045..af810ab1 100644 --- a/outlet/clickhouse/worker.go +++ b/outlet/clickhouse/worker.go @@ -136,9 +136,32 @@ func (w *realWorker) Flush(ctx context.Context) { return err } - // Don't use the parent context, it may be too short. - chCtx, cancel := context.WithTimeout(context.Background(), w.c.config.GracePeriod) - defer cancel() + // Ensure the context lives for at least GracePeriod. + chCtx, cancel := context.WithCancel(context.Background()) + defer cancel() // needed in case the operation completes before grace period and parent context + go func() { + gracePeriodTimer := time.NewTimer(w.c.config.GracePeriod) + defer gracePeriodTimer.Stop() + + select { + case <-gracePeriodTimer.C: + // Grace period elapsed, now wait for parent or end of operation. + select { + case <-ctx.Done(): + case <-chCtx.Done(): + } + case <-ctx.Done(): + // Parent done before grace period, wait for grace period or end of operation. + select { + case <-gracePeriodTimer.C: + w.logger.Info().Msg("grace period to flush batch expired") + case <-chCtx.Done(): + } + case <-chCtx.Done(): + // Operation done! + } + cancel() + }() // Send to ClickHouse in flows_XXXXX_raw. start := time.Now()