diff --git a/outlet/clickhouse/worker.go b/outlet/clickhouse/worker.go index 23144045..af810ab1 100644 --- a/outlet/clickhouse/worker.go +++ b/outlet/clickhouse/worker.go @@ -136,9 +136,32 @@ func (w *realWorker) Flush(ctx context.Context) { return err } - // Don't use the parent context, it may be too short. - chCtx, cancel := context.WithTimeout(context.Background(), w.c.config.GracePeriod) - defer cancel() + // Ensure the context lives for at least GracePeriod. + chCtx, cancel := context.WithCancel(context.Background()) + defer cancel() // needed in case the operation completes before grace period and parent context + go func() { + gracePeriodTimer := time.NewTimer(w.c.config.GracePeriod) + defer gracePeriodTimer.Stop() + + select { + case <-gracePeriodTimer.C: + // Grace period elapsed, now wait for parent or end of operation. + select { + case <-ctx.Done(): + case <-chCtx.Done(): + } + case <-ctx.Done(): + // Parent done before grace period, wait for grace period or end of operation. + select { + case <-gracePeriodTimer.C: + w.logger.Info().Msg("grace period to flush batch expired") + case <-chCtx.Done(): + } + case <-chCtx.Done(): + // Operation done! + } + cancel() + }() // Send to ClickHouse in flows_XXXXX_raw. start := time.Now()