Files
akvorado/outlet/kafka/scaler.go
Vincent Bernat e5a625aecf outlet: make the number of Kafka workers dynamic
Inserting into ClickHouse should be done in large batches to minimize
the number of parts created. This would require the user to tune the
number of Kafka workers to match a target of around 50k-100k rows. Instead,
we dynamically tune the number of workers depending on the load to reach
this target.

We keep using async if we are too low in number of flows.

It is still possible to do better by consolidating batches from various
workers, but that's something I wanted to avoid.

Also, increase the maximum wait time to 5 seconds. It should be good
enough for most people.

Fix #1885
2025-08-09 15:58:25 +02:00

46 lines
1.1 KiB
Go

// SPDX-FileCopyrightText: 2025 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package kafka
import "golang.org/x/time/rate"
// ScaleRequest is a request to scale the workers
type ScaleRequest int
const (
// ScaleIncrease is a request to increase the number of workers
ScaleIncrease ScaleRequest = iota + 1
// ScaleDecrease is a request to decrease the number of workers
ScaleDecrease
)
// startScaler starts the automatic scaling loop
func (c *realComponent) startScaler() chan<- ScaleRequest {
ch := make(chan ScaleRequest, c.config.MaxWorkers)
down := rate.Sometimes{Interval: c.config.WorkerDecreaseRateLimit}
up := rate.Sometimes{Interval: c.config.WorkerIncreaseRateLimit}
c.t.Go(func() error {
for {
select {
case <-c.t.Dying():
return nil
case request := <-ch:
switch request {
case ScaleIncrease:
up.Do(func() {
if err := c.startOneWorker(); err != nil {
c.r.Err(err).Msg("cannot spawn a new worker")
}
})
case ScaleDecrease:
down.Do(func() {
c.stopOneWorker()
})
}
}
}
})
return ch
}