From f67d2493b0764e223e33dff41c202e7491d4ed9a Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Wed, 29 Jun 2022 15:39:19 +0200 Subject: [PATCH] orchestrator: allow to register several configuration for a given service Use the first one by default and if the index is not known. Remove service registration as this is not used yet. --- cmd/config.go | 3 +++ cmd/orchestrator.go | 24 ++++++++++++++------- console/data/docs/02-configuration.md | 6 +++++- console/data/docs/03-usage.md | 1 + orchestrator/http.go | 30 +++++++++++++++++++-------- orchestrator/http_test.go | 29 ++++++++++++++++++++++++++ orchestrator/root.go | 13 +++++++----- 7 files changed, 83 insertions(+), 23 deletions(-) diff --git a/cmd/config.go b/cmd/config.go index 7db06e34..4f9e13b4 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -42,6 +42,9 @@ func (c ConfigRelatedOptions) Parse(out io.Writer, component string, config inte if u.Path == "" { u.Path = fmt.Sprintf("/api/v0/orchestrator/configuration/%s", component) } + if u.Fragment != "" { + u.Path = fmt.Sprintf("%s/%s", u.Path, u.Fragment) + } resp, err := http.Get(u.String()) if err != nil { return fmt.Errorf("unable to fetch configuration file: %w", err) diff --git a/cmd/orchestrator.go b/cmd/orchestrator.go index 82a84504..8af9a9bd 100644 --- a/cmd/orchestrator.go +++ b/cmd/orchestrator.go @@ -26,8 +26,8 @@ type OrchestratorConfiguration struct { Kafka kafka.Configuration Orchestrator orchestrator.Configuration `mapstructure:",squash" yaml:",inline"` // Other service configurations - Inlet InletConfiguration - Console ConsoleConfiguration + Inlet []InletConfiguration + Console []ConsoleConfiguration } // DefaultOrchestratorConfiguration is the default configuration for the orchestrator command. @@ -40,8 +40,8 @@ func DefaultOrchestratorConfiguration() OrchestratorConfiguration { Kafka: kafka.DefaultConfiguration(), Orchestrator: orchestrator.DefaultConfiguration(), // Other service configurations - Inlet: DefaultInletConfiguration(), - Console: DefaultConsoleConfiguration(), + Inlet: []InletConfiguration{DefaultInletConfiguration()}, + Console: []ConsoleConfiguration{DefaultConsoleConfiguration()}, } } @@ -67,8 +67,12 @@ components and centralizes configuration of the various other components.`, // Override some parts of the configuration config.ClickHouseDB = config.ClickHouse.Configuration config.ClickHouse.Kafka.Configuration = config.Kafka.Configuration - config.Inlet.Kafka.Configuration = config.Kafka.Configuration - config.Console.ClickHouse = config.ClickHouse.Configuration + for idx := range config.Inlet { + config.Inlet[idx].Kafka.Configuration = config.Kafka.Configuration + } + for idx := range config.Console { + config.Console[idx].ClickHouse = config.ClickHouse.Configuration + } } if err := OrchestratorOptions.Parse(cmd.OutOrStdout(), "orchestrator", &config); err != nil { return err @@ -125,8 +129,12 @@ func orchestratorStart(r *reporter.Reporter, config OrchestratorConfiguration, c if err != nil { return fmt.Errorf("unable to initialize orchestrator component: %w", err) } - orchestratorComponent.RegisterConfiguration(orchestrator.InletService, config.Inlet) - orchestratorComponent.RegisterConfiguration(orchestrator.ConsoleService, config.Console) + for idx := range config.Inlet { + orchestratorComponent.RegisterConfiguration(orchestrator.InletService, config.Inlet[idx]) + } + for idx := range config.Console { + orchestratorComponent.RegisterConfiguration(orchestrator.ConsoleService, config.Console[idx]) + } // Expose some informations and metrics addCommonHTTPHandlers(r, "orchestrator", httpComponent) diff --git a/console/data/docs/02-configuration.md b/console/data/docs/02-configuration.md index 63cd5700..c68f344f 100644 --- a/console/data/docs/02-configuration.md +++ b/console/data/docs/02-configuration.md @@ -36,7 +36,11 @@ AKVORADO_ORCHESTRATOR_KAFKA_BROKERS=192.0.2.1:9092,192.0.2.2:9092 The orchestrator service has its own configuration, as well as the configuration for the other services under the key matching the -service name (`inlet` and `console`). +service name (`inlet` and `console`). For each service, it is possible +to provide a list of configuration. A service can query the +configuration it wants by appending an index to the configuration URL. +If the index does not match a provided configuration, the first +configuration is provided. Each service is split into several functional components. Each of them gets a section of the configuration file matching its name. diff --git a/console/data/docs/03-usage.md b/console/data/docs/03-usage.md index 5c24e1a4..92aea902 100644 --- a/console/data/docs/03-usage.md +++ b/console/data/docs/03-usage.md @@ -27,6 +27,7 @@ file and the other services should point to it. $ akvorado orchestrator /etc/akvorado/config.yaml $ akvorado inlet http://orchestrator:8080 $ akvorado console http://orchestrator:8080 +$ akvorado console http://orchestrator:8080#2 ``` Each service embeds an HTTP server exposing a few endpoints. All diff --git a/orchestrator/http.go b/orchestrator/http.go index 05fae48a..f9b2b552 100644 --- a/orchestrator/http.go +++ b/orchestrator/http.go @@ -5,27 +5,39 @@ package orchestrator import ( "net/http" + "strconv" "github.com/gin-gonic/gin" ) func (c *Component) configurationHandlerFunc(gc *gin.Context) { service := gc.Param("service") + indexStr := gc.Param("index") + index, err := strconv.Atoi(indexStr) + if indexStr != "" && err != nil { + gc.JSON(http.StatusNotFound, gin.H{"message": "Invalid configuration index."}) + return + } c.serviceLock.Lock() - configuration, ok := c.serviceConfigurations[ServiceType(service)] + var configuration interface{} + serviceConfigurations, ok := c.serviceConfigurations[ServiceType(service)] + if ok { + l := len(serviceConfigurations) + switch { + case l == 0: + ok = false + case index < l: + configuration = serviceConfigurations[index] + default: + configuration = serviceConfigurations[0] + } + } c.serviceLock.Unlock() if !ok { - gc.YAML(http.StatusNotFound, gin.H{"message": "Configuration not found."}) + gc.JSON(http.StatusNotFound, gin.H{"message": "Configuration not found."}) return } gc.YAML(http.StatusOK, configuration) - - c.serviceLock.Lock() - if c.registeredServices[ServiceType(service)] == nil { - c.registeredServices[ServiceType(service)] = map[string]bool{} - } - c.registeredServices[ServiceType(service)][gc.ClientIP()] = true - c.serviceLock.Unlock() } diff --git a/orchestrator/http_test.go b/orchestrator/http_test.go index 224861a8..afe470d8 100644 --- a/orchestrator/http_test.go +++ b/orchestrator/http_test.go @@ -24,6 +24,10 @@ func TestConfigurationEndpoint(t *testing.T) { "hello": "Hello world!", "bye": "Goodbye world!", }) + c.RegisterConfiguration(InletService, map[string]string{ + "hello": "Hello pal!", + "bye": "Goodbye pal!", + }) helpers.TestHTTPEndpoints(t, h.Address, helpers.HTTPEndpointCases{ { @@ -33,6 +37,31 @@ func TestConfigurationEndpoint(t *testing.T) { `bye: Goodbye world!`, `hello: Hello world!`, }, + }, { + URL: "/api/v0/orchestrator/configuration/inlet/0", + ContentType: "application/x-yaml; charset=utf-8", + FirstLines: []string{ + `bye: Goodbye world!`, + `hello: Hello world!`, + }, + }, { + URL: "/api/v0/orchestrator/configuration/inlet/1", + ContentType: "application/x-yaml; charset=utf-8", + FirstLines: []string{ + `bye: Goodbye pal!`, + `hello: Hello pal!`, + }, + }, { + URL: "/api/v0/orchestrator/configuration/inlet/2", + ContentType: "application/x-yaml; charset=utf-8", + FirstLines: []string{ + `bye: Goodbye world!`, + `hello: Hello world!`, + }, + }, { + URL: "/api/v0/orchestrator/configuration/console/0", + ContentType: "application/json; charset=utf-8", + StatusCode: 404, }, }) } diff --git a/orchestrator/root.go b/orchestrator/root.go index f0540e07..f7e5e185 100644 --- a/orchestrator/root.go +++ b/orchestrator/root.go @@ -18,8 +18,8 @@ type Component struct { config Configuration serviceLock sync.Mutex - serviceConfigurations map[ServiceType]interface{} - registeredServices map[ServiceType]map[string]bool + serviceConfigurations map[ServiceType][]interface{} + registeredServices map[ServiceType][]map[string]bool } // Dependencies define the dependencies of the broker. @@ -46,11 +46,11 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende d: &dependencies, config: configuration, - serviceConfigurations: map[ServiceType]interface{}{}, - registeredServices: map[ServiceType]map[string]bool{}, + serviceConfigurations: map[ServiceType][]interface{}{}, } c.d.HTTP.GinRouter.GET("/api/v0/orchestrator/configuration/:service", c.configurationHandlerFunc) + c.d.HTTP.GinRouter.GET("/api/v0/orchestrator/configuration/:service/:index", c.configurationHandlerFunc) return &c, nil } @@ -58,6 +58,9 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende // RegisterConfiguration registers the configuration for a service. func (c *Component) RegisterConfiguration(service ServiceType, configuration interface{}) { c.serviceLock.Lock() - c.serviceConfigurations[service] = configuration + if _, ok := c.serviceConfigurations[service]; !ok { + c.serviceConfigurations[service] = []interface{}{} + } + c.serviceConfigurations[service] = append(c.serviceConfigurations[service], configuration) c.serviceLock.Unlock() }