From f2dfe55d276b2a53c5285aea97a95e1b0d5d844a Mon Sep 17 00:00:00 2001 From: rastsislau-matusevich Date: Thu, 20 Nov 2025 13:58:41 -0800 Subject: [PATCH] orchestrator: add manage-topic flag and document skip-migrations (#2109) --- console/data/docs/02-configuration.md | 4 +++- orchestrator/kafka/config.go | 3 +++ orchestrator/kafka/config_test.go | 21 ++++++++++++++++++++- orchestrator/kafka/root.go | 8 ++++++++ 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/console/data/docs/02-configuration.md b/console/data/docs/02-configuration.md index 5529ac9f..d7dcbdd0 100644 --- a/console/data/docs/02-configuration.md +++ b/console/data/docs/02-configuration.md @@ -769,7 +769,8 @@ flows. It accepts the following keys: - `tls` defines the TLS configuration to connect to the cluster - `sasl` defines the SASL configuration to connect to the cluster - `topic` defines the base topic name -- `topic-cofniguration` describes how the topic should be configured +- `manage-topic` controls whether the orchestrator should create/update the Kafka topic (default: `true`). Can be set to `false` when Kafka is not needed (e.g., UI-only setup) or managed externally. +- `topic-configuration` describes how the topic should be configured The following keys are accepted for the TLS configuration: @@ -876,6 +877,7 @@ provided inside `clickhouse`: by ClickHouse (autodetection when not specified) - `orchestrator-basic-auth` enables basic authentication to access the orchestrator URL. It takes two attributes: `username` and `password`. +- `skip-migrations` controls whether to skip ClickHouse schema management (default: `false`). Can be set to `true` when the schema is managed externally or by another orchestrator. The outlet requires the schema to match the expected structure; schema mismatches may cause write errors. The `resolutions` setting contains a list of resolutions. Each resolution has two keys: `interval` and `ttl`. The first one is the diff --git a/orchestrator/kafka/config.go b/orchestrator/kafka/config.go index 030ce45e..5d46b924 100644 --- a/orchestrator/kafka/config.go +++ b/orchestrator/kafka/config.go @@ -10,6 +10,8 @@ import ( // Configuration describes the configuration for the Kafka configurator. type Configuration struct { kafka.Configuration `mapstructure:",squash" yaml:",inline"` + // ManageTopic tells if the Kafka topic should be managed (create/update). Default is true. + ManageTopic bool // TopicConfiguration describes the topic configuration. TopicConfiguration TopicConfiguration } @@ -30,6 +32,7 @@ type TopicConfiguration struct { func DefaultConfiguration() Configuration { return Configuration{ Configuration: kafka.DefaultConfiguration(), + ManageTopic: true, TopicConfiguration: TopicConfiguration{ NumPartitions: 1, ReplicationFactor: 1, diff --git a/orchestrator/kafka/config_test.go b/orchestrator/kafka/config_test.go index 11a18d1a..1f5f5d8f 100644 --- a/orchestrator/kafka/config_test.go +++ b/orchestrator/kafka/config_test.go @@ -7,12 +7,31 @@ import ( "testing" "akvorado/common/helpers" + "akvorado/common/reporter" + "akvorado/common/schema" ) func TestDefaultConfiguration(t *testing.T) { - if err := helpers.Validate.Struct(DefaultConfiguration()); err != nil { + config := DefaultConfiguration() + if err := helpers.Validate.Struct(config); err != nil { t.Fatalf("validate.Struct() error:\n%+v", err) } + if !config.ManageTopic { + t.Error("ManageTopic should be true by default") + } +} + +func TestManageTopicDisabled(t *testing.T) { + config := DefaultConfiguration() + config.ManageTopic = false + c, err := New(reporter.NewMock(t), config, Dependencies{Schema: schema.NewMock(t)}) + if err != nil { + t.Fatalf("New() error:\n%+v", err) + } + if c != nil { + t.Error("Component should be nil when ManageTopic is false") + } + helpers.StartStop(t, c) } func TestShouldAlterConfiguration(t *testing.T) { diff --git a/orchestrator/kafka/root.go b/orchestrator/kafka/root.go index aeeb76bf..d3c4d509 100644 --- a/orchestrator/kafka/root.go +++ b/orchestrator/kafka/root.go @@ -36,6 +36,11 @@ type Dependencies struct { // New creates a new Kafka configurator. func New(r *reporter.Reporter, config Configuration, dependencies Dependencies) (*Component, error) { + if !config.ManageTopic { + r.Info().Msg("Kafka topic management disabled, skipping Kafka initialization") + return nil, nil + } + kafkaOpts, err := kafka.NewConfig(r, config.Configuration) if err != nil { return nil, err @@ -54,6 +59,9 @@ func New(r *reporter.Reporter, config Configuration, dependencies Dependencies) // Start starts Kafka configuration. func (c *Component) Start() error { + if c == nil { + return nil + } c.r.Info().Msg("starting Kafka component") defer c.r.Info().Msg("Kafka component stopped")