diff --git a/console/data/docs/02-configuration.md b/console/data/docs/02-configuration.md index 9f46eb95..99723715 100644 --- a/console/data/docs/02-configuration.md +++ b/console/data/docs/02-configuration.md @@ -658,6 +658,7 @@ The following keys are accepted for the topic configuration: - `num-partitions` for the number of partitions - `replication-factor` for the replication factor - `config-entries` is a mapping from configuration names to their values +- `config-entries-strict-sync` for the configuration in-sync policy For example: @@ -671,6 +672,7 @@ kafka: segment.bytes: 1073741824 retention.ms: 86400000 cleanup.policy: delete + config-entries-strict-sync: true ``` Another useful setting is `retention.bytes` to limit the size of a @@ -678,8 +680,10 @@ partition in bytes too (divide it by the number of partitions to have a limit for the topic). Currently, the orchestrator service won't update the replication -factor. The configuration entries are kept in sync with the content of -the configuration file. +factor. +By default, the configuration entries are kept in sync with the content of +the configuration file, except if you disable the `config-entries-strict-sync`, +the existing non-listed overrides won't be removed from topic configuration entries. ### ClickHouse diff --git a/orchestrator/kafka/config.go b/orchestrator/kafka/config.go index 2ad44468..030ce45e 100644 --- a/orchestrator/kafka/config.go +++ b/orchestrator/kafka/config.go @@ -3,7 +3,9 @@ package kafka -import "akvorado/common/kafka" +import ( + "akvorado/common/kafka" +) // Configuration describes the configuration for the Kafka configurator. type Configuration struct { @@ -18,8 +20,10 @@ type TopicConfiguration struct { NumPartitions int32 `validate:"min=1"` // ReplicationFactor tells the replication factor for the topic. ReplicationFactor int16 `validate:"min=1"` - // ConfigEntries is a map to specify the topic overrides. Non-listed overrides will be removed + // ConfigEntries is a map to specify the topic overrides. Non-listed overrides will be removed by default. ConfigEntries map[string]*string + // ConfigEntriesStrictSync says if non-listed overrides should be removed (strict sync) or not. Default is True. + ConfigEntriesStrictSync bool } // DefaultConfiguration represents the default configuration for the Kafka configurator. @@ -27,8 +31,22 @@ func DefaultConfiguration() Configuration { return Configuration{ Configuration: kafka.DefaultConfiguration(), TopicConfiguration: TopicConfiguration{ - NumPartitions: 1, - ReplicationFactor: 1, + NumPartitions: 1, + ReplicationFactor: 1, + ConfigEntriesStrictSync: true, }, } } + +// ShouldAlterConfiguration validates if topic configuration update is needed regarding in-sync policy. +func ShouldAlterConfiguration(target, source map[string]*string, strict bool) bool { + for k, v := range target { + if ov, ok := source[k]; !ok || *ov != *v { + return true + } + } + if !strict { + return false + } + return len(target) != len(source) +} diff --git a/orchestrator/kafka/config_test.go b/orchestrator/kafka/config_test.go index df7bef30..51f0e33d 100644 --- a/orchestrator/kafka/config_test.go +++ b/orchestrator/kafka/config_test.go @@ -14,3 +14,34 @@ func TestDefaultConfiguration(t *testing.T) { t.Fatalf("validate.Struct() error:\n%+v", err) } } + +func TestShouldAlterConfiguration(t *testing.T) { + referenceTestFoo := "foo" + referenceTestBar := "bar" + referenceTestOtherFoo := "foo" + cases := []struct { + name string + target map[string]*string + source map[string]*string + strictPolicy bool + expected bool + }{ + {"subset in strict policy", map[string]*string{"a": &referenceTestFoo}, map[string]*string{"a": &referenceTestFoo, "otherconfigentry": &referenceTestBar}, true, true}, + {"subset in non-strict policy", map[string]*string{"a": &referenceTestFoo}, map[string]*string{"a": &referenceTestFoo, "otherconfigentry": &referenceTestBar}, false, false}, + {"subset with different references in non strict policy", map[string]*string{"a": &referenceTestFoo}, map[string]*string{"a": &referenceTestOtherFoo, "otherconfigentry": &referenceTestBar}, false, false}, + {"missing config entry in strict policy", map[string]*string{"a": &referenceTestFoo}, map[string]*string{"otherconfigentry": &referenceTestBar}, true, true}, + {"missing config entry in non-strict policy", map[string]*string{"a": &referenceTestFoo}, map[string]*string{"otherconfigentry": &referenceTestBar}, false, true}, + {"same config in strict policy", map[string]*string{"a": &referenceTestFoo}, map[string]*string{"a": &referenceTestOtherFoo}, true, false}, + {"same config in non-strict policy", map[string]*string{"a": &referenceTestFoo}, map[string]*string{"a": &referenceTestOtherFoo}, false, false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := ShouldAlterConfiguration(tc.target, tc.source, tc.strictPolicy) + if got && !tc.expected { + t.Errorf("Configuration should not update inplace config.") + } else if !got && tc.expected { + t.Errorf("Configuration should update inplace config.") + } + }) + } +} diff --git a/orchestrator/kafka/functional_test.go b/orchestrator/kafka/functional_test.go index 657e0fe5..1d44f074 100644 --- a/orchestrator/kafka/functional_test.go +++ b/orchestrator/kafka/functional_test.go @@ -45,6 +45,13 @@ func TestTopicCreation(t *testing.T) { "segment.bytes": &segmentBytes2, "cleanup.policy": &cleanupPolicy, }, + }, { + Name: "Do not alter equivalent config", + ConfigEntries: map[string]*string{ + "retention.ms": &retentionMs, + "segment.bytes": &segmentBytes2, + "cleanup.policy": &cleanupPolicy, + }, }, { Name: "Remove item", ConfigEntries: map[string]*string{ @@ -59,9 +66,10 @@ func TestTopicCreation(t *testing.T) { configuration := DefaultConfiguration() configuration.Topic = topicName configuration.TopicConfiguration = TopicConfiguration{ - NumPartitions: 1, - ReplicationFactor: 1, - ConfigEntries: tc.ConfigEntries, + NumPartitions: 1, + ReplicationFactor: 1, + ConfigEntries: tc.ConfigEntries, + ConfigEntriesStrictSync: true, } configuration.Brokers = brokers configuration.Version = kafka.Version(sarama.V2_8_1_0) diff --git a/orchestrator/kafka/root.go b/orchestrator/kafka/root.go index 98129e25..d6264e34 100644 --- a/orchestrator/kafka/root.go +++ b/orchestrator/kafka/root.go @@ -105,12 +105,14 @@ func (c *Component) Start() error { l.Warn().Msgf("mismatch for replication factor: got %d, want %d", topic.ReplicationFactor, c.config.TopicConfiguration.ReplicationFactor) } - if err := admin.AlterConfig(sarama.TopicResource, c.kafkaTopic, c.config.TopicConfiguration.ConfigEntries, false); err != nil { - l.Err(err).Msg("unable to set topic configuration") - return fmt.Errorf("unable to set topic configuration for %q: %w", - c.kafkaTopic, err) + if ShouldAlterConfiguration(c.config.TopicConfiguration.ConfigEntries, topic.ConfigEntries, c.config.TopicConfiguration.ConfigEntriesStrictSync) { + if err := admin.AlterConfig(sarama.TopicResource, c.kafkaTopic, c.config.TopicConfiguration.ConfigEntries, false); err != nil { + l.Err(err).Msg("unable to set topic configuration") + return fmt.Errorf("unable to set topic configuration for %q: %w", + c.kafkaTopic, err) + } + l.Info().Msg("topic updated") } - l.Info().Msg("topic updated") } return nil }