orchestrator: allow to register several configuration for a given service

Use the first one by default and if the index is not known. Remove
service registration as this is not used yet.
This commit is contained in:
Vincent Bernat
2022-06-29 15:39:19 +02:00
parent 2664377db7
commit f67d2493b0
7 changed files with 83 additions and 23 deletions

View File

@@ -42,6 +42,9 @@ func (c ConfigRelatedOptions) Parse(out io.Writer, component string, config inte
if u.Path == "" { if u.Path == "" {
u.Path = fmt.Sprintf("/api/v0/orchestrator/configuration/%s", component) u.Path = fmt.Sprintf("/api/v0/orchestrator/configuration/%s", component)
} }
if u.Fragment != "" {
u.Path = fmt.Sprintf("%s/%s", u.Path, u.Fragment)
}
resp, err := http.Get(u.String()) resp, err := http.Get(u.String())
if err != nil { if err != nil {
return fmt.Errorf("unable to fetch configuration file: %w", err) return fmt.Errorf("unable to fetch configuration file: %w", err)

View File

@@ -26,8 +26,8 @@ type OrchestratorConfiguration struct {
Kafka kafka.Configuration Kafka kafka.Configuration
Orchestrator orchestrator.Configuration `mapstructure:",squash" yaml:",inline"` Orchestrator orchestrator.Configuration `mapstructure:",squash" yaml:",inline"`
// Other service configurations // Other service configurations
Inlet InletConfiguration Inlet []InletConfiguration
Console ConsoleConfiguration Console []ConsoleConfiguration
} }
// DefaultOrchestratorConfiguration is the default configuration for the orchestrator command. // DefaultOrchestratorConfiguration is the default configuration for the orchestrator command.
@@ -40,8 +40,8 @@ func DefaultOrchestratorConfiguration() OrchestratorConfiguration {
Kafka: kafka.DefaultConfiguration(), Kafka: kafka.DefaultConfiguration(),
Orchestrator: orchestrator.DefaultConfiguration(), Orchestrator: orchestrator.DefaultConfiguration(),
// Other service configurations // Other service configurations
Inlet: DefaultInletConfiguration(), Inlet: []InletConfiguration{DefaultInletConfiguration()},
Console: DefaultConsoleConfiguration(), Console: []ConsoleConfiguration{DefaultConsoleConfiguration()},
} }
} }
@@ -67,8 +67,12 @@ components and centralizes configuration of the various other components.`,
// Override some parts of the configuration // Override some parts of the configuration
config.ClickHouseDB = config.ClickHouse.Configuration config.ClickHouseDB = config.ClickHouse.Configuration
config.ClickHouse.Kafka.Configuration = config.Kafka.Configuration config.ClickHouse.Kafka.Configuration = config.Kafka.Configuration
config.Inlet.Kafka.Configuration = config.Kafka.Configuration for idx := range config.Inlet {
config.Console.ClickHouse = config.ClickHouse.Configuration config.Inlet[idx].Kafka.Configuration = config.Kafka.Configuration
}
for idx := range config.Console {
config.Console[idx].ClickHouse = config.ClickHouse.Configuration
}
} }
if err := OrchestratorOptions.Parse(cmd.OutOrStdout(), "orchestrator", &config); err != nil { if err := OrchestratorOptions.Parse(cmd.OutOrStdout(), "orchestrator", &config); err != nil {
return err return err
@@ -125,8 +129,12 @@ func orchestratorStart(r *reporter.Reporter, config OrchestratorConfiguration, c
if err != nil { if err != nil {
return fmt.Errorf("unable to initialize orchestrator component: %w", err) return fmt.Errorf("unable to initialize orchestrator component: %w", err)
} }
orchestratorComponent.RegisterConfiguration(orchestrator.InletService, config.Inlet) for idx := range config.Inlet {
orchestratorComponent.RegisterConfiguration(orchestrator.ConsoleService, config.Console) orchestratorComponent.RegisterConfiguration(orchestrator.InletService, config.Inlet[idx])
}
for idx := range config.Console {
orchestratorComponent.RegisterConfiguration(orchestrator.ConsoleService, config.Console[idx])
}
// Expose some informations and metrics // Expose some informations and metrics
addCommonHTTPHandlers(r, "orchestrator", httpComponent) addCommonHTTPHandlers(r, "orchestrator", httpComponent)

View File

@@ -36,7 +36,11 @@ AKVORADO_ORCHESTRATOR_KAFKA_BROKERS=192.0.2.1:9092,192.0.2.2:9092
The orchestrator service has its own configuration, as well as the The orchestrator service has its own configuration, as well as the
configuration for the other services under the key matching the configuration for the other services under the key matching the
service name (`inlet` and `console`). service name (`inlet` and `console`). For each service, it is possible
to provide a list of configuration. A service can query the
configuration it wants by appending an index to the configuration URL.
If the index does not match a provided configuration, the first
configuration is provided.
Each service is split into several functional components. Each of them Each service is split into several functional components. Each of them
gets a section of the configuration file matching its name. gets a section of the configuration file matching its name.

View File

@@ -27,6 +27,7 @@ file and the other services should point to it.
$ akvorado orchestrator /etc/akvorado/config.yaml $ akvorado orchestrator /etc/akvorado/config.yaml
$ akvorado inlet http://orchestrator:8080 $ akvorado inlet http://orchestrator:8080
$ akvorado console http://orchestrator:8080 $ akvorado console http://orchestrator:8080
$ akvorado console http://orchestrator:8080#2
``` ```
Each service embeds an HTTP server exposing a few endpoints. All Each service embeds an HTTP server exposing a few endpoints. All

View File

@@ -5,27 +5,39 @@ package orchestrator
import ( import (
"net/http" "net/http"
"strconv"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
func (c *Component) configurationHandlerFunc(gc *gin.Context) { func (c *Component) configurationHandlerFunc(gc *gin.Context) {
service := gc.Param("service") service := gc.Param("service")
indexStr := gc.Param("index")
index, err := strconv.Atoi(indexStr)
if indexStr != "" && err != nil {
gc.JSON(http.StatusNotFound, gin.H{"message": "Invalid configuration index."})
return
}
c.serviceLock.Lock() c.serviceLock.Lock()
configuration, ok := c.serviceConfigurations[ServiceType(service)] var configuration interface{}
serviceConfigurations, ok := c.serviceConfigurations[ServiceType(service)]
if ok {
l := len(serviceConfigurations)
switch {
case l == 0:
ok = false
case index < l:
configuration = serviceConfigurations[index]
default:
configuration = serviceConfigurations[0]
}
}
c.serviceLock.Unlock() c.serviceLock.Unlock()
if !ok { if !ok {
gc.YAML(http.StatusNotFound, gin.H{"message": "Configuration not found."}) gc.JSON(http.StatusNotFound, gin.H{"message": "Configuration not found."})
return return
} }
gc.YAML(http.StatusOK, configuration) gc.YAML(http.StatusOK, configuration)
c.serviceLock.Lock()
if c.registeredServices[ServiceType(service)] == nil {
c.registeredServices[ServiceType(service)] = map[string]bool{}
}
c.registeredServices[ServiceType(service)][gc.ClientIP()] = true
c.serviceLock.Unlock()
} }

View File

@@ -24,6 +24,10 @@ func TestConfigurationEndpoint(t *testing.T) {
"hello": "Hello world!", "hello": "Hello world!",
"bye": "Goodbye world!", "bye": "Goodbye world!",
}) })
c.RegisterConfiguration(InletService, map[string]string{
"hello": "Hello pal!",
"bye": "Goodbye pal!",
})
helpers.TestHTTPEndpoints(t, h.Address, helpers.HTTPEndpointCases{ helpers.TestHTTPEndpoints(t, h.Address, helpers.HTTPEndpointCases{
{ {
@@ -33,6 +37,31 @@ func TestConfigurationEndpoint(t *testing.T) {
`bye: Goodbye world!`, `bye: Goodbye world!`,
`hello: Hello world!`, `hello: Hello world!`,
}, },
}, {
URL: "/api/v0/orchestrator/configuration/inlet/0",
ContentType: "application/x-yaml; charset=utf-8",
FirstLines: []string{
`bye: Goodbye world!`,
`hello: Hello world!`,
},
}, {
URL: "/api/v0/orchestrator/configuration/inlet/1",
ContentType: "application/x-yaml; charset=utf-8",
FirstLines: []string{
`bye: Goodbye pal!`,
`hello: Hello pal!`,
},
}, {
URL: "/api/v0/orchestrator/configuration/inlet/2",
ContentType: "application/x-yaml; charset=utf-8",
FirstLines: []string{
`bye: Goodbye world!`,
`hello: Hello world!`,
},
}, {
URL: "/api/v0/orchestrator/configuration/console/0",
ContentType: "application/json; charset=utf-8",
StatusCode: 404,
}, },
}) })
} }

View File

@@ -18,8 +18,8 @@ type Component struct {
config Configuration config Configuration
serviceLock sync.Mutex serviceLock sync.Mutex
serviceConfigurations map[ServiceType]interface{} serviceConfigurations map[ServiceType][]interface{}
registeredServices map[ServiceType]map[string]bool registeredServices map[ServiceType][]map[string]bool
} }
// Dependencies define the dependencies of the broker. // Dependencies define the dependencies of the broker.
@@ -46,11 +46,11 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
d: &dependencies, d: &dependencies,
config: configuration, config: configuration,
serviceConfigurations: map[ServiceType]interface{}{}, serviceConfigurations: map[ServiceType][]interface{}{},
registeredServices: map[ServiceType]map[string]bool{},
} }
c.d.HTTP.GinRouter.GET("/api/v0/orchestrator/configuration/:service", c.configurationHandlerFunc) c.d.HTTP.GinRouter.GET("/api/v0/orchestrator/configuration/:service", c.configurationHandlerFunc)
c.d.HTTP.GinRouter.GET("/api/v0/orchestrator/configuration/:service/:index", c.configurationHandlerFunc)
return &c, nil return &c, nil
} }
@@ -58,6 +58,9 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
// RegisterConfiguration registers the configuration for a service. // RegisterConfiguration registers the configuration for a service.
func (c *Component) RegisterConfiguration(service ServiceType, configuration interface{}) { func (c *Component) RegisterConfiguration(service ServiceType, configuration interface{}) {
c.serviceLock.Lock() c.serviceLock.Lock()
c.serviceConfigurations[service] = configuration if _, ok := c.serviceConfigurations[service]; !ok {
c.serviceConfigurations[service] = []interface{}{}
}
c.serviceConfigurations[service] = append(c.serviceConfigurations[service], configuration)
c.serviceLock.Unlock() c.serviceLock.Unlock()
} }