outlet/clickhouse: make grace period for flushing configurable

This commit is contained in:
Vincent Bernat
2025-11-10 16:20:43 +01:00
parent be364899cc
commit 04eccbe95f
3 changed files with 5 additions and 1 deletions

View File

@@ -613,6 +613,7 @@ are configurable:
- `maximum-batch-size` defines how many flows to send to ClickHouse in a single batch at most
- `minimum-wait-time` defines how long to wait before sending an incomplete batch
- `grace-period` defines how long to wait when flushing data to ClickHouse on shutdown
These numbers are per-worker (as defined in the Kafka component). A worker will
send a batch of size at most `maximum-batch-size` at least every

View File

@@ -9,6 +9,8 @@ import (
// Configuration describes the configuration for the ClickHouse exporter.
type Configuration struct {
// GracePeriod defines how long to wait for flushing a batch to ClickHouse on shutdown.
GracePeriod time.Duration `validate:"min=10s"`
// MaximumBatchSize is the maximum number of rows to send to ClickHouse in one batch.
MaximumBatchSize uint `validate:"min=1"`
// MaximumWaitTime is the maximum number of seconds to wait before sending the current batch.
@@ -22,6 +24,7 @@ const minimumBatchSizeDivider = 10
// DefaultConfiguration represents the default configuration for the ClickHouse exporter.
func DefaultConfiguration() Configuration {
return Configuration{
GracePeriod: time.Minute,
MaximumBatchSize: 50_000,
MaximumWaitTime: 5 * time.Second,
}

View File

@@ -137,7 +137,7 @@ func (w *realWorker) Flush(ctx context.Context) {
}
// Don't use the parent context, it may be too short.
chCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
chCtx, cancel := context.WithTimeout(context.Background(), w.c.config.GracePeriod)
defer cancel()
// Send to ClickHouse in flows_XXXXX_raw.