diff --git a/console/data/docs/02-configuration.md b/console/data/docs/02-configuration.md index d5fa54d5..b701585a 100644 --- a/console/data/docs/02-configuration.md +++ b/console/data/docs/02-configuration.md @@ -613,6 +613,7 @@ are configurable: - `maximum-batch-size` defines how many flows to send to ClickHouse in a single batch at most - `minimum-wait-time` defines how long to wait before sending an incomplete batch +- `grace-period` defines how long to wait when flushing data to ClickHouse on shutdown These numbers are per-worker (as defined in the Kafka component). A worker will send a batch of size at most `maximum-batch-size` at least every diff --git a/outlet/clickhouse/config.go b/outlet/clickhouse/config.go index 7d39bf43..b3928ba4 100644 --- a/outlet/clickhouse/config.go +++ b/outlet/clickhouse/config.go @@ -9,6 +9,8 @@ import ( // Configuration describes the configuration for the ClickHouse exporter. type Configuration struct { + // GracePeriod defines how long to wait for flushing a batch to ClickHouse on shutdown. + GracePeriod time.Duration `validate:"min=10s"` // MaximumBatchSize is the maximum number of rows to send to ClickHouse in one batch. MaximumBatchSize uint `validate:"min=1"` // MaximumWaitTime is the maximum number of seconds to wait before sending the current batch. @@ -22,6 +24,7 @@ const minimumBatchSizeDivider = 10 // DefaultConfiguration represents the default configuration for the ClickHouse exporter. func DefaultConfiguration() Configuration { return Configuration{ + GracePeriod: time.Minute, MaximumBatchSize: 50_000, MaximumWaitTime: 5 * time.Second, } diff --git a/outlet/clickhouse/worker.go b/outlet/clickhouse/worker.go index c9588223..23144045 100644 --- a/outlet/clickhouse/worker.go +++ b/outlet/clickhouse/worker.go @@ -137,7 +137,7 @@ func (w *realWorker) Flush(ctx context.Context) { } // Don't use the parent context, it may be too short. - chCtx, cancel := context.WithTimeout(context.Background(), time.Minute) + chCtx, cancel := context.WithTimeout(context.Background(), w.c.config.GracePeriod) defer cancel() // Send to ClickHouse in flows_XXXXX_raw.