cmd: take configuration as a mandatory argument (+ other changes)

The other changes are:
 - rename configure service to orchestrator service
 - turn DefaultConfiguration variables into functions
This commit is contained in:
Vincent Bernat
2022-04-10 15:14:39 +02:00
parent 56e30e478a
commit 93da599adf
50 changed files with 241 additions and 202 deletions

View File

@@ -1,14 +0,0 @@
package cmd
import (
"testing"
"akvorado/common/reporter"
)
func TestConfigureStart(t *testing.T) {
r := reporter.NewMock(t)
if err := configureStart(r, DefaultConfigureConfiguration, true); err != nil {
t.Fatalf("configureStart() error:\n%+v", err)
}
}

View File

@@ -19,10 +19,12 @@ type ConsoleConfiguration struct {
} }
// DefaultConsoleConfiguration is the default configuration for the console command. // DefaultConsoleConfiguration is the default configuration for the console command.
var DefaultConsoleConfiguration = ConsoleConfiguration{ func DefaultConsoleConfiguration() ConsoleConfiguration {
HTTP: http.DefaultConfiguration, return ConsoleConfiguration{
Reporting: reporter.DefaultConfiguration, HTTP: http.DefaultConfiguration(),
Console: console.DefaultConfiguration, Reporting: reporter.DefaultConfiguration(),
Console: console.DefaultConfiguration(),
}
} }
type consoleOptions struct { type consoleOptions struct {
@@ -39,9 +41,10 @@ var consoleCmd = &cobra.Command{
Short: "Start Akvorado's console service", Short: "Start Akvorado's console service",
Long: `Akvorado is a Netflow/IPFIX collector. The console service exposes a web interface to Long: `Akvorado is a Netflow/IPFIX collector. The console service exposes a web interface to
manage collected flows.`, manage collected flows.`,
Args: cobra.ExactArgs(0), Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
config := DefaultConsoleConfiguration config := DefaultConsoleConfiguration()
ConsoleOptions.Path = args[0]
if err := ConsoleOptions.Parse(cmd.OutOrStdout(), "console", &config); err != nil { if err := ConsoleOptions.Parse(cmd.OutOrStdout(), "console", &config); err != nil {
return err return err
} }
@@ -56,8 +59,6 @@ manage collected flows.`,
func init() { func init() {
RootCmd.AddCommand(consoleCmd) RootCmd.AddCommand(consoleCmd)
consoleCmd.Flags().StringVarP(&ConsoleOptions.ConfigRelatedOptions.Path, "config", "c", "",
"Configuration file")
consoleCmd.Flags().BoolVarP(&ConsoleOptions.ConfigRelatedOptions.Dump, "dump", "D", false, consoleCmd.Flags().BoolVarP(&ConsoleOptions.ConfigRelatedOptions.Dump, "dump", "D", false,
"Dump configuration before starting") "Dump configuration before starting")
consoleCmd.Flags().BoolVarP(&ConsoleOptions.CheckMode, "check", "C", false, consoleCmd.Flags().BoolVarP(&ConsoleOptions.CheckMode, "check", "C", false,

View File

@@ -8,7 +8,7 @@ import (
func TestConsoleStart(t *testing.T) { func TestConsoleStart(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
if err := consoleStart(r, DefaultConsoleConfiguration, true); err != nil { if err := consoleStart(r, DefaultConsoleConfiguration(), true); err != nil {
t.Fatalf("consoleStart() error:\n%+v", err) t.Fatalf("consoleStart() error:\n%+v", err)
} }
} }

View File

@@ -27,14 +27,16 @@ type InletConfiguration struct {
} }
// DefaultInletConfiguration is the default configuration for the inlet command. // DefaultInletConfiguration is the default configuration for the inlet command.
var DefaultInletConfiguration = InletConfiguration{ func DefaultInletConfiguration() InletConfiguration {
HTTP: http.DefaultConfiguration, return InletConfiguration{
Reporting: reporter.DefaultConfiguration, HTTP: http.DefaultConfiguration(),
Flow: flow.DefaultConfiguration, Reporting: reporter.DefaultConfiguration(),
SNMP: snmp.DefaultConfiguration, Flow: flow.DefaultConfiguration(),
GeoIP: geoip.DefaultConfiguration, SNMP: snmp.DefaultConfiguration(),
Kafka: kafka.DefaultConfiguration, GeoIP: geoip.DefaultConfiguration(),
Core: core.DefaultConfiguration, Kafka: kafka.DefaultConfiguration(),
Core: core.DefaultConfiguration(),
}
} }
type inletOptions struct { type inletOptions struct {
@@ -51,9 +53,10 @@ var inletCmd = &cobra.Command{
Short: "Start Akvorado's inlet service", Short: "Start Akvorado's inlet service",
Long: `Akvorado is a Netflow/IPFIX collector. The inlet service handles flow ingestion, Long: `Akvorado is a Netflow/IPFIX collector. The inlet service handles flow ingestion,
hydration and export to Kafka.`, hydration and export to Kafka.`,
Args: cobra.ExactArgs(0), Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
config := DefaultInletConfiguration config := DefaultInletConfiguration()
InletOptions.Path = args[0]
if err := InletOptions.Parse(cmd.OutOrStdout(), "inlet", &config); err != nil { if err := InletOptions.Parse(cmd.OutOrStdout(), "inlet", &config); err != nil {
return err return err
} }
@@ -68,8 +71,6 @@ hydration and export to Kafka.`,
func init() { func init() {
RootCmd.AddCommand(inletCmd) RootCmd.AddCommand(inletCmd)
inletCmd.Flags().StringVarP(&InletOptions.ConfigRelatedOptions.Path, "config", "c", "",
"Configuration file")
inletCmd.Flags().BoolVarP(&InletOptions.ConfigRelatedOptions.Dump, "dump", "D", false, inletCmd.Flags().BoolVarP(&InletOptions.ConfigRelatedOptions.Dump, "dump", "D", false,
"Dump configuration before starting") "Dump configuration before starting")
inletCmd.Flags().BoolVarP(&InletOptions.CheckMode, "check", "C", false, inletCmd.Flags().BoolVarP(&InletOptions.CheckMode, "check", "C", false,

View File

@@ -8,7 +8,7 @@ import (
func TestInletStart(t *testing.T) { func TestInletStart(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
if err := inletStart(r, DefaultInletConfiguration, true); err != nil { if err := inletStart(r, DefaultInletConfiguration(), true); err != nil {
t.Fatalf("inletStart() error:\n%+v", err) t.Fatalf("inletStart() error:\n%+v", err)
} }
} }

View File

@@ -8,50 +8,53 @@ import (
"akvorado/common/daemon" "akvorado/common/daemon"
"akvorado/common/http" "akvorado/common/http"
"akvorado/common/reporter" "akvorado/common/reporter"
"akvorado/configure/clickhouse" "akvorado/orchestrator/clickhouse"
"akvorado/configure/kafka" "akvorado/orchestrator/kafka"
) )
// ConfigureConfiguration represents the configuration file for the configure command. // OrchestratorConfiguration represents the configuration file for the orchestrator command.
type ConfigureConfiguration struct { type OrchestratorConfiguration struct {
Reporting reporter.Configuration Reporting reporter.Configuration
HTTP http.Configuration HTTP http.Configuration
Clickhouse clickhouse.Configuration Clickhouse clickhouse.Configuration
Kafka kafka.Configuration Kafka kafka.Configuration
} }
// DefaultConfigureConfiguration is the default configuration for the configure command. // DefaultOrchestratorConfiguration is the default configuration for the orchestrator command.
var DefaultConfigureConfiguration = ConfigureConfiguration{ func DefaultOrchestratorConfiguration() OrchestratorConfiguration {
HTTP: http.DefaultConfiguration, return OrchestratorConfiguration{
Reporting: reporter.DefaultConfiguration, Reporting: reporter.DefaultConfiguration(),
Clickhouse: clickhouse.DefaultConfiguration, HTTP: http.DefaultConfiguration(),
Kafka: kafka.DefaultConfiguration, Clickhouse: clickhouse.DefaultConfiguration(),
Kafka: kafka.DefaultConfiguration(),
}
} }
type configureOptions struct { type orchestratorOptions struct {
ConfigRelatedOptions ConfigRelatedOptions
CheckMode bool CheckMode bool
} }
// ConfigureOptions stores the command-line option values for the configure // OrchestratorOptions stores the command-line option values for the orchestrator
// command. // command.
var ConfigureOptions configureOptions var OrchestratorOptions orchestratorOptions
var configureCmd = &cobra.Command{ var orchestratorCmd = &cobra.Command{
Use: "configure", Use: "orchestrator",
Short: "Start Akvorado's configure service", Short: "Start Akvorado's orchestrator service",
Long: `Akvorado is a Netflow/IPFIX collector. The configure service configure external Long: `Akvorado is a Netflow/IPFIX collector. The orchestrator service configures external
components: Kafka and Clickhouse.`, components and centralizes configuration of the various other components.`,
Args: cobra.ExactArgs(0), Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
config := DefaultConfigureConfiguration config := DefaultOrchestratorConfiguration()
ConfigureOptions.BeforeDump = func() { OrchestratorOptions.Path = args[0]
OrchestratorOptions.BeforeDump = func() {
if config.Clickhouse.Kafka.Topic == "" { if config.Clickhouse.Kafka.Topic == "" {
fmt.Println(config.Kafka.Configuration) fmt.Println(config.Kafka.Configuration)
config.Clickhouse.Kafka.Configuration = config.Kafka.Configuration config.Clickhouse.Kafka.Configuration = config.Kafka.Configuration
} }
} }
if err := ConfigureOptions.Parse(cmd.OutOrStdout(), "configure", &config); err != nil { if err := OrchestratorOptions.Parse(cmd.OutOrStdout(), "orchestrator", &config); err != nil {
return err return err
} }
@@ -59,21 +62,19 @@ components: Kafka and Clickhouse.`,
if err != nil { if err != nil {
return fmt.Errorf("unable to initialize reporter: %w", err) return fmt.Errorf("unable to initialize reporter: %w", err)
} }
return configureStart(r, config, ConfigureOptions.CheckMode) return orchestratorStart(r, config, OrchestratorOptions.CheckMode)
}, },
} }
func init() { func init() {
RootCmd.AddCommand(configureCmd) RootCmd.AddCommand(orchestratorCmd)
configureCmd.Flags().StringVarP(&ConfigureOptions.ConfigRelatedOptions.Path, "config", "c", "", orchestratorCmd.Flags().BoolVarP(&OrchestratorOptions.ConfigRelatedOptions.Dump, "dump", "D", false,
"Configuration file")
configureCmd.Flags().BoolVarP(&ConfigureOptions.ConfigRelatedOptions.Dump, "dump", "D", false,
"Dump configuration before starting") "Dump configuration before starting")
configureCmd.Flags().BoolVarP(&ConfigureOptions.CheckMode, "check", "C", false, orchestratorCmd.Flags().BoolVarP(&OrchestratorOptions.CheckMode, "check", "C", false,
"Check configuration, but does not start") "Check configuration, but does not start")
} }
func configureStart(r *reporter.Reporter, config ConfigureConfiguration, checkOnly bool) error { func orchestratorStart(r *reporter.Reporter, config OrchestratorConfiguration, checkOnly bool) error {
daemonComponent, err := daemon.New(r) daemonComponent, err := daemon.New(r)
if err != nil { if err != nil {
return fmt.Errorf("unable to initialize daemon component: %w", err) return fmt.Errorf("unable to initialize daemon component: %w", err)
@@ -97,7 +98,7 @@ func configureStart(r *reporter.Reporter, config ConfigureConfiguration, checkOn
} }
// Expose some informations and metrics // Expose some informations and metrics
addCommonHTTPHandlers(r, "configure", httpComponent) addCommonHTTPHandlers(r, "orchestrator", httpComponent)
versionMetrics(r) versionMetrics(r)
// If we only asked for a check, stop here. // If we only asked for a check, stop here.

14
cmd/orchestrator_test.go Normal file
View File

@@ -0,0 +1,14 @@
package cmd
import (
"testing"
"akvorado/common/reporter"
)
func TestOrchestratorStart(t *testing.T) {
r := reporter.NewMock(t)
if err := orchestratorStart(r, DefaultOrchestratorConfiguration(), true); err != nil {
t.Fatalf("orchestratorStart() error:\n%+v", err)
}
}

View File

@@ -9,6 +9,8 @@ type Configuration struct {
} }
// DefaultConfiguration is the default configuration of the HTTP server. // DefaultConfiguration is the default configuration of the HTTP server.
var DefaultConfiguration = Configuration{ func DefaultConfiguration() Configuration {
Listen: "0.0.0.0:8080", return Configuration{
Listen: "0.0.0.0:8080",
}
} }

View File

@@ -13,10 +13,12 @@ type Configuration struct {
} }
// DefaultConfiguration represents the default configuration for connecting to Kafka. // DefaultConfiguration represents the default configuration for connecting to Kafka.
var DefaultConfiguration = Configuration{ func DefaultConfiguration() Configuration {
Topic: "flows", return Configuration{
Brokers: []string{"127.0.0.1:9092"}, Topic: "flows",
Version: Version(sarama.V2_8_1_0), Brokers: []string{"127.0.0.1:9092"},
Version: Version(sarama.V2_8_1_0),
}
} }
// Version represents a supported version of Kafka // Version represents a supported version of Kafka

View File

@@ -12,7 +12,9 @@ type Configuration struct {
} }
// DefaultConfiguration is the default reporter configuration. // DefaultConfiguration is the default reporter configuration.
var DefaultConfiguration = Configuration{ func DefaultConfiguration() Configuration {
Logging: logger.DefaultConfiguration, return Configuration{
Metrics: metrics.DefaultConfiguration, Logging: logger.DefaultConfiguration(),
Metrics: metrics.DefaultConfiguration(),
}
} }

View File

@@ -4,4 +4,6 @@ package logger
type Configuration struct{} type Configuration struct{}
// DefaultConfiguration is the default logging configuration. // DefaultConfiguration is the default logging configuration.
var DefaultConfiguration = Configuration{} func DefaultConfiguration() Configuration {
return Configuration{}
}

View File

@@ -18,7 +18,7 @@ func ExampleNew() {
} }
// Initialize logger // Initialize logger
logger, err := logger.New(logger.DefaultConfiguration) logger, err := logger.New(logger.DefaultConfiguration())
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@@ -5,4 +5,6 @@ package metrics
type Configuration struct{} type Configuration struct{}
// DefaultConfiguration is the default metrics configuration. // DefaultConfiguration is the default metrics configuration.
var DefaultConfiguration = Configuration{} func DefaultConfiguration() Configuration {
return Configuration{}
}

View File

@@ -14,11 +14,11 @@ import (
) )
func TestNew(t *testing.T) { func TestNew(t *testing.T) {
l, err := logger.New(logger.DefaultConfiguration) l, err := logger.New(logger.DefaultConfiguration())
if err != nil { if err != nil {
t.Fatalf("logger.New() err:\n%+v", err) t.Fatalf("logger.New() err:\n%+v", err)
} }
m, err := metrics.New(l, metrics.DefaultConfiguration) m, err := metrics.New(l, metrics.DefaultConfiguration())
if err != nil { if err != nil {
t.Fatalf("metrics.New() err:\n%+v", err) t.Fatalf("metrics.New() err:\n%+v", err)
} }
@@ -78,11 +78,11 @@ func TestNew(t *testing.T) {
} }
func TestFactoryCache(t *testing.T) { func TestFactoryCache(t *testing.T) {
l, err := logger.New(logger.DefaultConfiguration) l, err := logger.New(logger.DefaultConfiguration())
if err != nil { if err != nil {
t.Fatalf("logger.New() err:\n%+v", err) t.Fatalf("logger.New() err:\n%+v", err)
} }
m, err := metrics.New(l, metrics.DefaultConfiguration) m, err := metrics.New(l, metrics.DefaultConfiguration())
if err != nil { if err != nil {
t.Fatalf("metrics.New() err:\n%+v", err) t.Fatalf("metrics.New() err:\n%+v", err)
} }
@@ -95,11 +95,11 @@ func TestFactoryCache(t *testing.T) {
} }
func TestRegisterTwice(t *testing.T) { func TestRegisterTwice(t *testing.T) {
l, err := logger.New(logger.DefaultConfiguration) l, err := logger.New(logger.DefaultConfiguration())
if err != nil { if err != nil {
t.Fatalf("logger.New() err:\n%+v", err) t.Fatalf("logger.New() err:\n%+v", err)
} }
m, err := metrics.New(l, metrics.DefaultConfiguration) m, err := metrics.New(l, metrics.DefaultConfiguration())
if err != nil { if err != nil {
t.Fatalf("metrics.New() err:\n%+v", err) t.Fatalf("metrics.New() err:\n%+v", err)
} }

View File

@@ -7,4 +7,6 @@ type Configuration struct {
} }
// DefaultConfiguration represents the default configuration for the console component. // DefaultConfiguration represents the default configuration for the console component.
var DefaultConfiguration = Configuration{} func DefaultConfiguration() Configuration {
return Configuration{}
}

View File

@@ -51,12 +51,12 @@ service to be running.
The following endpoints are exposed for use by ClickHouse: The following endpoints are exposed for use by ClickHouse:
- `/api/v0/configure/clickhouse/init.sh` contains the schemas in the form of a - `/api/v0/orchestrator/clickhouse/init.sh` contains the schemas in the form of a
script to execute during initialization to get them installed at the script to execute during initialization to get them installed at the
proper location proper location
- `/api/v0/configure/clickhouse/protocols.csv` contains a CSV with the mapping - `/api/v0/orchestrator/clickhouse/protocols.csv` contains a CSV with the mapping
between protocol numbers and names between protocol numbers and names
- `/api/v0/configure/clickhouse/asns.csv` contains a CSV with the mapping - `/api/v0/orchestrator/clickhouse/asns.csv` contains a CSV with the mapping
between AS numbers and organization names between AS numbers and organization names
ClickHouse clusters are currently not supported, despite being able to ClickHouse clusters are currently not supported, despite being able to

View File

@@ -18,7 +18,7 @@ Each component features the following piece of code:
- A `Configuration` structure containing the configuration of the - A `Configuration` structure containing the configuration of the
component. It maps to a section of [Akvorado configuration component. It maps to a section of [Akvorado configuration
file](02-configuration.md). file](02-configuration.md).
- A `DefaultConfiguration` variable with the default values for the - A `DefaultConfiguration` function with the default values for the
configuration. configuration.
- A `New()` function instantiating the component. This method takes - A `New()` function instantiating the component. This method takes
the configuration and the dependencies. It is inert. the configuration and the dependencies. It is inert.

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 554 KiB

After

Width:  |  Height:  |  Size: 554 KiB

View File

@@ -13,9 +13,11 @@ type Configuration struct {
} }
// DefaultConfiguration represents the default configuration for the core component. // DefaultConfiguration represents the default configuration for the core component.
var DefaultConfiguration = Configuration{ func DefaultConfiguration() Configuration {
Workers: 1, return Configuration{
ExporterClassifiers: []ExporterClassifierRule{}, Workers: 1,
InterfaceClassifiers: []InterfaceClassifierRule{}, ExporterClassifiers: []ExporterClassifierRule{},
ClassifierCacheSize: 1000, InterfaceClassifiers: []InterfaceClassifierRule{},
ClassifierCacheSize: 1000,
}
} }

View File

@@ -222,15 +222,15 @@ interfaceclassifiers:
// Prepare all components. // Prepare all components.
daemonComponent := daemon.NewMock(t) daemonComponent := daemon.NewMock(t)
snmpComponent := snmp.NewMock(t, r, snmp.DefaultConfiguration, snmpComponent := snmp.NewMock(t, r, snmp.DefaultConfiguration(),
snmp.Dependencies{Daemon: daemonComponent}) snmp.Dependencies{Daemon: daemonComponent})
flowComponent := flow.NewMock(t, r, flow.DefaultConfiguration) flowComponent := flow.NewMock(t, r, flow.DefaultConfiguration())
geoipComponent := geoip.NewMock(t, r) geoipComponent := geoip.NewMock(t, r)
kafkaComponent, kafkaProducer := kafka.NewMock(t, r, kafka.DefaultConfiguration) kafkaComponent, kafkaProducer := kafka.NewMock(t, r, kafka.DefaultConfiguration())
httpComponent := http.NewMock(t, r) httpComponent := http.NewMock(t, r)
// Prepare a configuration // Prepare a configuration
configuration := DefaultConfiguration configuration := DefaultConfiguration()
if err := yaml.Unmarshal([]byte(tc.Configuration), &configuration); err != nil { if err := yaml.Unmarshal([]byte(tc.Configuration), &configuration); err != nil {
t.Fatalf("Unmarshal() error:\n%+v", err) t.Fatalf("Unmarshal() error:\n%+v", err)
} }

View File

@@ -29,14 +29,14 @@ func TestCore(t *testing.T) {
// Prepare all components. // Prepare all components.
daemonComponent := daemon.NewMock(t) daemonComponent := daemon.NewMock(t)
snmpComponent := snmp.NewMock(t, r, snmp.DefaultConfiguration, snmp.Dependencies{Daemon: daemonComponent}) snmpComponent := snmp.NewMock(t, r, snmp.DefaultConfiguration(), snmp.Dependencies{Daemon: daemonComponent})
flowComponent := flow.NewMock(t, r, flow.DefaultConfiguration) flowComponent := flow.NewMock(t, r, flow.DefaultConfiguration())
geoipComponent := geoip.NewMock(t, r) geoipComponent := geoip.NewMock(t, r)
kafkaComponent, kafkaProducer := kafka.NewMock(t, r, kafka.DefaultConfiguration) kafkaComponent, kafkaProducer := kafka.NewMock(t, r, kafka.DefaultConfiguration())
httpComponent := http.NewMock(t, r) httpComponent := http.NewMock(t, r)
// Instantiate and start core // Instantiate and start core
c, err := New(r, DefaultConfiguration, Dependencies{ c, err := New(r, DefaultConfiguration(), Dependencies{
Daemon: daemonComponent, Daemon: daemonComponent,
Flow: flowComponent, Flow: flowComponent,
Snmp: snmpComponent, Snmp: snmpComponent,

View File

@@ -1,6 +1,7 @@
package flow package flow
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"reflect" "reflect"
@@ -20,11 +21,13 @@ type Configuration struct {
} }
// DefaultConfiguration represents the default configuration for the flow component // DefaultConfiguration represents the default configuration for the flow component
var DefaultConfiguration = Configuration{ func DefaultConfiguration() Configuration {
Inputs: []InputConfiguration{{ return Configuration{
Decoder: "netflow", Inputs: []InputConfiguration{{
Config: &udp.DefaultConfiguration, Decoder: "netflow",
}}, Config: udp.DefaultConfiguration(),
}},
}
} }
// InputConfiguration represents the configuration for an input. // InputConfiguration represents the configuration for an input.
@@ -90,7 +93,7 @@ func ConfigurationUnmarshalerHook() mapstructure.DecodeHookFunc {
// Get current type. // Get current type.
currentType := configField.Elem().Type().Elem() currentType := configField.Elem().Type().Elem()
for k, v := range inputs { for k, v := range inputs {
if reflect.TypeOf(v).Elem() == currentType { if reflect.TypeOf(v()).Elem() == currentType {
inputType = k inputType = k
break break
} }
@@ -107,8 +110,9 @@ func ConfigurationUnmarshalerHook() mapstructure.DecodeHookFunc {
} }
// Alter config with a copy of the concrete type // Alter config with a copy of the concrete type
original := reflect.Indirect(reflect.ValueOf(input)) defaultV := input()
if !configField.IsNil() && configField.Elem().Type().Elem() == reflect.TypeOf(input).Elem() { original := reflect.Indirect(reflect.ValueOf(defaultV))
if !configField.IsNil() && configField.Elem().Type().Elem() == reflect.TypeOf(defaultV).Elem() {
// Use the value we already have instead of default. // Use the value we already have instead of default.
original = reflect.Indirect(configField.Elem()) original = reflect.Indirect(configField.Elem())
} }
@@ -125,7 +129,7 @@ func ConfigurationUnmarshalerHook() mapstructure.DecodeHookFunc {
func (ic InputConfiguration) MarshalYAML() (interface{}, error) { func (ic InputConfiguration) MarshalYAML() (interface{}, error) {
var typeStr string var typeStr string
for k, v := range inputs { for k, v := range inputs {
if reflect.TypeOf(v).Elem() == reflect.TypeOf(ic.Config).Elem() { if reflect.TypeOf(v()).Elem() == reflect.TypeOf(ic.Config).Elem() {
typeStr = k typeStr = k
break break
} }
@@ -144,7 +148,16 @@ func (ic InputConfiguration) MarshalYAML() (interface{}, error) {
return result, nil return result, nil
} }
var inputs = map[string]input.Configuration{ // MarshalJSON undoes ConfigurationUnmarshalerHook().
"udp": &udp.DefaultConfiguration, func (ic InputConfiguration) MarshalJSON() ([]byte, error) {
"file": &file.DefaultConfiguration, result, err := ic.MarshalYAML()
if err != nil {
return nil, err
}
return json.Marshal(result)
}
var inputs = map[string](func() input.Configuration){
"udp": udp.DefaultConfiguration,
"file": file.DefaultConfiguration,
} }

View File

@@ -46,7 +46,7 @@ func TestDecodeConfiguration(t *testing.T) {
From: Configuration{ From: Configuration{
Inputs: []InputConfiguration{{ Inputs: []InputConfiguration{{
Decoder: "netflow", Decoder: "netflow",
Config: &udp.DefaultConfiguration, Config: udp.DefaultConfiguration(),
}}, }},
}, },
Source: map[string]interface{}{ Source: map[string]interface{}{
@@ -74,7 +74,7 @@ func TestDecodeConfiguration(t *testing.T) {
From: Configuration{ From: Configuration{
Inputs: []InputConfiguration{{ Inputs: []InputConfiguration{{
Decoder: "netflow", Decoder: "netflow",
Config: &udp.DefaultConfiguration, Config: udp.DefaultConfiguration(),
}}, }},
}, },
Source: map[string]interface{}{ Source: map[string]interface{}{
@@ -153,15 +153,6 @@ func TestDecodeConfiguration(t *testing.T) {
} }
}) })
} }
// Check we didn't alter the default value for UDP
if diff := helpers.Diff(udp.DefaultConfiguration, udp.Configuration{
Workers: 1,
QueueSize: 100000,
Listen: "0.0.0.0:0",
}); diff != "" {
t.Fatalf("Default configuration altered (-got, +want):\n%s", diff)
}
} }
func TestMarshalYAML(t *testing.T) { func TestMarshalYAML(t *testing.T) {

View File

@@ -1,5 +1,7 @@
package file package file
import "akvorado/inlet/flow/input"
// Configuration describes file input configuration. // Configuration describes file input configuration.
type Configuration struct { type Configuration struct {
// Paths to use as input // Paths to use as input
@@ -7,4 +9,6 @@ type Configuration struct {
} }
// DefaultConfiguration descrives the default configuration for file input. // DefaultConfiguration descrives the default configuration for file input.
var DefaultConfiguration Configuration func DefaultConfiguration() input.Configuration {
return &Configuration{}
}

View File

@@ -13,7 +13,7 @@ import (
func TestFileInput(t *testing.T) { func TestFileInput(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
configuration := DefaultConfiguration configuration := DefaultConfiguration().(*Configuration)
configuration.Paths = []string{path.Join("testdata", "file1.txt"), path.Join("testdata", "file2.txt")} configuration.Paths = []string{path.Join("testdata", "file1.txt"), path.Join("testdata", "file2.txt")}
in, err := configuration.New(r, daemon.NewMock(t), &decoder.DummyDecoder{}) in, err := configuration.New(r, daemon.NewMock(t), &decoder.DummyDecoder{})
if err != nil { if err != nil {

View File

@@ -1,5 +1,7 @@
package udp package udp
import "akvorado/inlet/flow/input"
// Configuration describes UDP input configuration. // Configuration describes UDP input configuration.
type Configuration struct { type Configuration struct {
// Listen tells which port to listen to. // Listen tells which port to listen to.
@@ -19,8 +21,10 @@ type Configuration struct {
} }
// DefaultConfiguration is the default configuration for this input // DefaultConfiguration is the default configuration for this input
var DefaultConfiguration = Configuration{ func DefaultConfiguration() input.Configuration {
Listen: "0.0.0.0:0", return &Configuration{
Workers: 1, Listen: "0.0.0.0:0",
QueueSize: 100000, Workers: 1,
QueueSize: 100000,
}
} }

View File

@@ -13,7 +13,7 @@ import (
func TestUDPInput(t *testing.T) { func TestUDPInput(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
configuration := DefaultConfiguration configuration := DefaultConfiguration().(*Configuration)
configuration.Listen = "127.0.0.1:0" configuration.Listen = "127.0.0.1:0"
in, err := configuration.New(r, daemon.NewMock(t), &decoder.DummyDecoder{}) in, err := configuration.New(r, daemon.NewMock(t), &decoder.DummyDecoder{})
if err != nil { if err != nil {
@@ -87,7 +87,7 @@ func TestUDPInput(t *testing.T) {
func TestOverflow(t *testing.T) { func TestOverflow(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
configuration := DefaultConfiguration configuration := DefaultConfiguration().(*Configuration)
configuration.Listen = "127.0.0.1:0" configuration.Listen = "127.0.0.1:0"
configuration.QueueSize = 1 configuration.QueueSize = 1
in, err := configuration.New(r, daemon.NewMock(t), &decoder.DummyDecoder{}) in, err := configuration.New(r, daemon.NewMock(t), &decoder.DummyDecoder{})

View File

@@ -14,7 +14,7 @@ func TestFlow(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
_, src, _, _ := runtime.Caller(0) _, src, _, _ := runtime.Caller(0)
base := path.Join(path.Dir(src), "decoder", "netflow", "testdata") base := path.Join(path.Dir(src), "decoder", "netflow", "testdata")
config := DefaultConfiguration config := DefaultConfiguration()
config.Inputs = []InputConfiguration{ config.Inputs = []InputConfiguration{
{ {
Decoder: "netflow", Decoder: "netflow",

View File

@@ -9,7 +9,7 @@ import (
func TestHTTPEndpoints(t *testing.T) { func TestHTTPEndpoints(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
c := NewMock(t, r, DefaultConfiguration) c := NewMock(t, r, DefaultConfiguration())
cases := helpers.HTTPEndpointCases{ cases := helpers.HTTPEndpointCases{
{ {

View File

@@ -11,4 +11,6 @@ type Configuration struct {
// DefaultConfiguration represents the default configuration for the // DefaultConfiguration represents the default configuration for the
// GeoIP component. Without databases, the component won't report // GeoIP component. Without databases, the component won't report
// anything. // anything.
var DefaultConfiguration = Configuration{} func DefaultConfiguration() Configuration {
return Configuration{}
}

View File

@@ -32,7 +32,7 @@ func copyFile(src string, dst string) {
func TestDatabaseRefresh(t *testing.T) { func TestDatabaseRefresh(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
config := DefaultConfiguration config := DefaultConfiguration()
config.CountryDatabase = filepath.Join(dir, "country.mmdb") config.CountryDatabase = filepath.Join(dir, "country.mmdb")
config.ASNDatabase = filepath.Join(dir, "asn.mmdb") config.ASNDatabase = filepath.Join(dir, "asn.mmdb")
@@ -83,7 +83,7 @@ func TestDatabaseRefresh(t *testing.T) {
func TestStartWithoutDatabase(t *testing.T) { func TestStartWithoutDatabase(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
c, err := New(r, DefaultConfiguration, Dependencies{Daemon: daemon.NewMock(t)}) c, err := New(r, DefaultConfiguration(), Dependencies{Daemon: daemon.NewMock(t)})
if err != nil { if err != nil {
t.Fatalf("New() error:\n%+v", err) t.Fatalf("New() error:\n%+v", err)
} }
@@ -97,9 +97,9 @@ func TestStartWithoutDatabase(t *testing.T) {
} }
func TestStartWithMissingDatabase(t *testing.T) { func TestStartWithMissingDatabase(t *testing.T) {
countryConfiguration := DefaultConfiguration countryConfiguration := DefaultConfiguration()
countryConfiguration.CountryDatabase = "/i/do/not/exist" countryConfiguration.CountryDatabase = "/i/do/not/exist"
asnConfiguration := DefaultConfiguration asnConfiguration := DefaultConfiguration()
asnConfiguration.ASNDatabase = "/i/do/not/exist" asnConfiguration.ASNDatabase = "/i/do/not/exist"
cases := []struct { cases := []struct {
Name string Name string

View File

@@ -19,7 +19,7 @@ import (
// - https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-Country-Test.json // - https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-Country-Test.json
func NewMock(t *testing.T, r *reporter.Reporter) *Component { func NewMock(t *testing.T, r *reporter.Reporter) *Component {
t.Helper() t.Helper()
config := DefaultConfiguration config := DefaultConfiguration()
_, src, _, _ := runtime.Caller(0) _, src, _, _ := runtime.Caller(0)
config.CountryDatabase = filepath.Join(path.Dir(src), "testdata", "GeoLite2-Country-Test.mmdb") config.CountryDatabase = filepath.Join(path.Dir(src), "testdata", "GeoLite2-Country-Test.mmdb")
config.ASNDatabase = filepath.Join(path.Dir(src), "testdata", "GeoLite2-ASN-Test.mmdb") config.ASNDatabase = filepath.Join(path.Dir(src), "testdata", "GeoLite2-ASN-Test.mmdb")

View File

@@ -27,13 +27,15 @@ type Configuration struct {
} }
// DefaultConfiguration represents the default configuration for the Kafka exporter. // DefaultConfiguration represents the default configuration for the Kafka exporter.
var DefaultConfiguration = Configuration{ func DefaultConfiguration() Configuration {
Configuration: kafka.DefaultConfiguration, return Configuration{
FlushInterval: 10 * time.Second, Configuration: kafka.DefaultConfiguration(),
FlushBytes: int(sarama.MaxRequestSize) - 1, FlushInterval: 10 * time.Second,
MaxMessageBytes: 1000000, FlushBytes: int(sarama.MaxRequestSize) - 1,
CompressionCodec: CompressionCodec(sarama.CompressionNone), MaxMessageBytes: 1000000,
QueueSize: 32, CompressionCodec: CompressionCodec(sarama.CompressionNone),
QueueSize: 32,
}
} }
// CompressionCodec represents a compression codec. // CompressionCodec represents a compression codec.

View File

@@ -21,7 +21,7 @@ func TestRealKafka(t *testing.T) {
rand.Seed(time.Now().UnixMicro()) rand.Seed(time.Now().UnixMicro())
topicName := fmt.Sprintf("test-topic-%d", rand.Int()) topicName := fmt.Sprintf("test-topic-%d", rand.Int())
configuration := DefaultConfiguration configuration := DefaultConfiguration()
configuration.Topic = topicName configuration.Topic = topicName
configuration.Brokers = brokers configuration.Brokers = brokers
configuration.Version = kafka.Version(sarama.V2_8_1_0) configuration.Version = kafka.Version(sarama.V2_8_1_0)

View File

@@ -15,7 +15,7 @@ import (
func TestKafka(t *testing.T) { func TestKafka(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
c, mockProducer := NewMock(t, r, DefaultConfiguration) c, mockProducer := NewMock(t, r, DefaultConfiguration())
// Send one message // Send one message
received := make(chan bool) received := make(chan bool)
@@ -61,7 +61,7 @@ func TestKafka(t *testing.T) {
func TestKafkaMetrics(t *testing.T) { func TestKafkaMetrics(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
c, err := New(r, DefaultConfiguration, Dependencies{Daemon: daemon.NewMock(t)}) c, err := New(r, DefaultConfiguration(), Dependencies{Daemon: daemon.NewMock(t)})
if err != nil { if err != nil {
t.Fatalf("New() error:\n%+v", err) t.Fatalf("New() error:\n%+v", err)
} }

View File

@@ -29,15 +29,17 @@ type Configuration struct {
} }
// DefaultConfiguration represents the default configuration for the SNMP client. // DefaultConfiguration represents the default configuration for the SNMP client.
var DefaultConfiguration = Configuration{ func DefaultConfiguration() Configuration {
CacheDuration: 30 * time.Minute, return Configuration{
CacheRefresh: time.Hour, CacheDuration: 30 * time.Minute,
CacheCheckInterval: 2 * time.Minute, CacheRefresh: time.Hour,
CachePersistFile: "", CacheCheckInterval: 2 * time.Minute,
DefaultCommunity: "public", CachePersistFile: "",
Communities: map[string]string{}, DefaultCommunity: "public",
PollerRetries: 1, Communities: map[string]string{},
PollerTimeout: time.Second, PollerRetries: 1,
PollerCoalesce: 10, PollerTimeout: time.Second,
Workers: 1, PollerCoalesce: 10,
Workers: 1,
}
} }

View File

@@ -25,7 +25,7 @@ func expectSNMPLookup(t *testing.T, c *Component, exporter string, ifIndex uint,
func TestLookup(t *testing.T) { func TestLookup(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
c := NewMock(t, r, DefaultConfiguration, Dependencies{Daemon: daemon.NewMock(t)}) c := NewMock(t, r, DefaultConfiguration(), Dependencies{Daemon: daemon.NewMock(t)})
defer func() { defer func() {
if err := c.Stop(); err != nil { if err := c.Stop(); err != nil {
t.Fatalf("Stop() error:\n%+v", err) t.Fatalf("Stop() error:\n%+v", err)
@@ -42,7 +42,7 @@ func TestLookup(t *testing.T) {
func TestSNMPCommunities(t *testing.T) { func TestSNMPCommunities(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
configuration := DefaultConfiguration configuration := DefaultConfiguration()
configuration.DefaultCommunity = "notpublic" configuration.DefaultCommunity = "notpublic"
configuration.Communities = map[string]string{ configuration.Communities = map[string]string{
"127.0.0.1": "public", "127.0.0.1": "public",
@@ -76,7 +76,7 @@ func TestSNMPCommunities(t *testing.T) {
func TestComponentSaveLoad(t *testing.T) { func TestComponentSaveLoad(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
configuration := DefaultConfiguration configuration := DefaultConfiguration()
configuration.CachePersistFile = filepath.Join(t.TempDir(), "cache") configuration.CachePersistFile = filepath.Join(t.TempDir(), "cache")
c := NewMock(t, r, configuration, Dependencies{Daemon: daemon.NewMock(t)}) c := NewMock(t, r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
@@ -103,7 +103,7 @@ func TestComponentSaveLoad(t *testing.T) {
func TestAutoRefresh(t *testing.T) { func TestAutoRefresh(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
configuration := DefaultConfiguration configuration := DefaultConfiguration()
mockClock := clock.NewMock() mockClock := clock.NewMock()
c := NewMock(t, r, configuration, Dependencies{Daemon: daemon.NewMock(t), Clock: mockClock}) c := NewMock(t, r, configuration, Dependencies{Daemon: daemon.NewMock(t), Clock: mockClock})
@@ -153,7 +153,7 @@ func TestAutoRefresh(t *testing.T) {
func TestConfigCheck(t *testing.T) { func TestConfigCheck(t *testing.T) {
t.Run("refresh", func(t *testing.T) { t.Run("refresh", func(t *testing.T) {
configuration := DefaultConfiguration configuration := DefaultConfiguration()
configuration.CacheDuration = 10 * time.Minute configuration.CacheDuration = 10 * time.Minute
configuration.CacheRefresh = 5 * time.Minute configuration.CacheRefresh = 5 * time.Minute
configuration.CacheCheckInterval = time.Minute configuration.CacheCheckInterval = time.Minute
@@ -162,7 +162,7 @@ func TestConfigCheck(t *testing.T) {
} }
}) })
t.Run("interval", func(t *testing.T) { t.Run("interval", func(t *testing.T) {
configuration := DefaultConfiguration configuration := DefaultConfiguration()
configuration.CacheDuration = 10 * time.Minute configuration.CacheDuration = 10 * time.Minute
configuration.CacheRefresh = 15 * time.Minute configuration.CacheRefresh = 15 * time.Minute
configuration.CacheCheckInterval = 12 * time.Minute configuration.CacheCheckInterval = 12 * time.Minute
@@ -171,7 +171,7 @@ func TestConfigCheck(t *testing.T) {
} }
}) })
t.Run("refresh disabled", func(t *testing.T) { t.Run("refresh disabled", func(t *testing.T) {
configuration := DefaultConfiguration configuration := DefaultConfiguration()
configuration.CacheDuration = 10 * time.Minute configuration.CacheDuration = 10 * time.Minute
configuration.CacheRefresh = 0 configuration.CacheRefresh = 0
configuration.CacheCheckInterval = 2 * time.Minute configuration.CacheCheckInterval = 2 * time.Minute
@@ -183,7 +183,7 @@ func TestConfigCheck(t *testing.T) {
func TestStartStopWithMultipleWorkers(t *testing.T) { func TestStartStopWithMultipleWorkers(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
configuration := DefaultConfiguration configuration := DefaultConfiguration()
configuration.Workers = 5 configuration.Workers = 5
c := NewMock(t, r, configuration, Dependencies{Daemon: daemon.NewMock(t)}) c := NewMock(t, r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err := c.Stop(); err != nil { if err := c.Stop(); err != nil {
@@ -202,7 +202,7 @@ func (fcp *logCoalescePoller) Poll(ctx context.Context, exporterIP string, _ uin
func TestCoalescing(t *testing.T) { func TestCoalescing(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
c := NewMock(t, r, DefaultConfiguration, Dependencies{Daemon: daemon.NewMock(t)}) c := NewMock(t, r, DefaultConfiguration(), Dependencies{Daemon: daemon.NewMock(t)})
lcp := &logCoalescePoller{ lcp := &logCoalescePoller{
received: []lookupRequest{}, received: []lookupRequest{},
} }
@@ -261,7 +261,7 @@ func TestPollerBreaker(t *testing.T) {
for _, tc := range cases { for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) { t.Run(tc.Name, func(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
configuration := DefaultConfiguration configuration := DefaultConfiguration()
configuration.PollerCoalesce = 0 configuration.PollerCoalesce = 0
c := NewMock(t, r, configuration, Dependencies{Daemon: daemon.NewMock(t)}) c := NewMock(t, r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
defer func() { defer func() {

View File

@@ -14,8 +14,8 @@ type Configuration struct {
Password string Password string
// Kafka describes Kafka-specific configuration // Kafka describes Kafka-specific configuration
Kafka KafkaConfiguration Kafka KafkaConfiguration
// AkvoradoURL allows one to override URL to reach Akvorado from Clickhouse // OrchestratorURL allows one to override URL to reach orchestrator from Clickhouse
AkvoradoURL string OrchestratorURL string
} }
// KafkaConfiguration describes Kafka-specific configuration // KafkaConfiguration describes Kafka-specific configuration
@@ -26,11 +26,13 @@ type KafkaConfiguration struct {
} }
// DefaultConfiguration represents the default configuration for the ClickHouse configurator. // DefaultConfiguration represents the default configuration for the ClickHouse configurator.
var DefaultConfiguration = Configuration{ func DefaultConfiguration() Configuration {
Servers: []string{}, // No clickhouse by default return Configuration{
Database: "default", Servers: []string{}, // No clickhouse by default
Username: "default", Database: "default",
Kafka: KafkaConfiguration{ Username: "default",
Consumers: 1, Kafka: KafkaConfiguration{
}, Consumers: 1,
},
}
} }

View File

Can't render this file because it is too large.

View File

@@ -16,7 +16,7 @@ import (
func TestRealClickHouse(t *testing.T) { func TestRealClickHouse(t *testing.T) {
chServer := helpers.CheckExternalService(t, "ClickHouse", []string{"clickhouse", "localhost"}, "9000") chServer := helpers.CheckExternalService(t, "ClickHouse", []string{"clickhouse", "localhost"}, "9000")
configuration := DefaultConfiguration configuration := DefaultConfiguration()
configuration.Servers = []string{chServer} configuration.Servers = []string{chServer}
r := reporter.NewMock(t) r := reporter.NewMock(t)
ch, err := New(r, configuration, Dependencies{ ch, err := New(r, configuration, Dependencies{

View File

@@ -39,7 +39,7 @@ func (c *Component) addHandlerEmbedded(url string, path string) {
// registerHTTPHandler register some handlers that will be useful for // registerHTTPHandler register some handlers that will be useful for
// ClickHouse // ClickHouse
func (c *Component) registerHTTPHandlers() error { func (c *Component) registerHTTPHandlers() error {
c.d.HTTP.AddHandler("/api/v0/configure/clickhouse/init.sh", c.d.HTTP.AddHandler("/api/v0/orchestrator/clickhouse/init.sh",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/x-shellscript") w.Header().Set("Content-Type", "text/x-shellscript")
initShTemplate.Execute(w, flow.VersionedSchemas) initShTemplate.Execute(w, flow.VersionedSchemas)
@@ -53,7 +53,7 @@ func (c *Component) registerHTTPHandlers() error {
if entry.IsDir() { if entry.IsDir() {
continue continue
} }
url := fmt.Sprintf("/api/v0/configure/clickhouse/%s", entry.Name()) url := fmt.Sprintf("/api/v0/orchestrator/clickhouse/%s", entry.Name())
path := fmt.Sprintf("data/%s", entry.Name()) path := fmt.Sprintf("data/%s", entry.Name())
c.addHandlerEmbedded(url, path) c.addHandlerEmbedded(url, path)
} }

View File

@@ -11,7 +11,7 @@ import (
func TestHTTPEndpoints(t *testing.T) { func TestHTTPEndpoints(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
c, err := New(r, DefaultConfiguration, Dependencies{ c, err := New(r, DefaultConfiguration(), Dependencies{
Daemon: daemon.NewMock(t), Daemon: daemon.NewMock(t),
HTTP: http.NewMock(t, r), HTTP: http.NewMock(t, r),
}) })
@@ -21,7 +21,7 @@ func TestHTTPEndpoints(t *testing.T) {
cases := helpers.HTTPEndpointCases{ cases := helpers.HTTPEndpointCases{
{ {
URL: "/api/v0/configure/clickhouse/protocols.csv", URL: "/api/v0/orchestrator/clickhouse/protocols.csv",
ContentType: "text/csv; charset=utf-8", ContentType: "text/csv; charset=utf-8",
FirstLines: []string{ FirstLines: []string{
`proto,name,description`, `proto,name,description`,
@@ -29,14 +29,14 @@ func TestHTTPEndpoints(t *testing.T) {
`1,ICMP,Internet Control Message`, `1,ICMP,Internet Control Message`,
}, },
}, { }, {
URL: "/api/v0/configure/clickhouse/asns.csv", URL: "/api/v0/orchestrator/clickhouse/asns.csv",
ContentType: "text/csv; charset=utf-8", ContentType: "text/csv; charset=utf-8",
FirstLines: []string{ FirstLines: []string{
"asn,name", "asn,name",
"1,LVLT-1", "1,LVLT-1",
}, },
}, { }, {
URL: "/api/v0/configure/clickhouse/init.sh", URL: "/api/v0/orchestrator/clickhouse/init.sh",
ContentType: "text/x-shellscript", ContentType: "text/x-shellscript",
FirstLines: []string{ FirstLines: []string{
`#!/bin/sh`, `#!/bin/sh`,

View File

@@ -85,7 +85,7 @@ ARRAY JOIN arrayEnumerate([1,2]) AS num
} }
func (c *Component) migrateStepCreateProtocolsDictionary(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep { func (c *Component) migrateStepCreateProtocolsDictionary(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
protocolsURL := fmt.Sprintf("%s/api/v0/configure/clickhouse/protocols.csv", c.config.AkvoradoURL) protocolsURL := fmt.Sprintf("%s/api/v0/orchestrator/clickhouse/protocols.csv", c.config.OrchestratorURL)
return migrationStep{ return migrationStep{
CheckQuery: `SELECT 1 FROM system.dictionaries WHERE name = $1 AND database = $2 AND source = $3`, CheckQuery: `SELECT 1 FROM system.dictionaries WHERE name = $1 AND database = $2 AND source = $3`,
Args: []interface{}{"protocols", c.config.Database, protocolsURL}, Args: []interface{}{"protocols", c.config.Database, protocolsURL},
@@ -106,7 +106,7 @@ LAYOUT(HASHED())
} }
func (c *Component) migrateStepCreateASNsDictionary(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep { func (c *Component) migrateStepCreateASNsDictionary(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
asnsURL := fmt.Sprintf("%s/api/v0/configure/clickhouse/asns.csv", c.config.AkvoradoURL) asnsURL := fmt.Sprintf("%s/api/v0/orchestrator/clickhouse/asns.csv", c.config.OrchestratorURL)
return migrationStep{ return migrationStep{
CheckQuery: `SELECT 1 FROM system.dictionaries WHERE name = $1 AND database = $2 AND source = $3`, CheckQuery: `SELECT 1 FROM system.dictionaries WHERE name = $1 AND database = $2 AND source = $3`,
Args: []interface{}{"asns", c.config.Database, asnsURL}, Args: []interface{}{"asns", c.config.Database, asnsURL},

View File

@@ -22,12 +22,12 @@ type migrationStep struct {
// migrateDatabase execute database migration // migrateDatabase execute database migration
func (c *Component) migrateDatabase() error { func (c *Component) migrateDatabase() error {
if c.config.AkvoradoURL == "" { if c.config.OrchestratorURL == "" {
baseURL, err := c.getHTTPBaseURL(c.config.Servers[0]) baseURL, err := c.getHTTPBaseURL(c.config.Servers[0])
if err != nil { if err != nil {
return err return err
} }
c.config.AkvoradoURL = baseURL c.config.OrchestratorURL = baseURL
} }
l := c.r.With(). l := c.r.With().

View File

@@ -14,7 +14,7 @@ import (
func TestGetHTTPBaseURL(t *testing.T) { func TestGetHTTPBaseURL(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
http := http.NewMock(t, r) http := http.NewMock(t, r)
c, err := New(r, DefaultConfiguration, Dependencies{ c, err := New(r, DefaultConfiguration(), Dependencies{
Daemon: daemon.NewMock(t), Daemon: daemon.NewMock(t),
HTTP: http, HTTP: http,
}) })

View File

@@ -20,10 +20,12 @@ type TopicConfiguration struct {
} }
// DefaultConfiguration represents the default configuration for the Kafka configurator. // DefaultConfiguration represents the default configuration for the Kafka configurator.
var DefaultConfiguration = Configuration{ func DefaultConfiguration() Configuration {
Configuration: kafka.DefaultConfiguration, return Configuration{
TopicConfiguration: TopicConfiguration{ Configuration: kafka.DefaultConfiguration(),
NumPartitions: 1, TopicConfiguration: TopicConfiguration{
ReplicationFactor: 1, NumPartitions: 1,
}, ReplicationFactor: 1,
},
}
} }

View File

@@ -53,7 +53,7 @@ func TestTopicCreation(t *testing.T) {
for _, tc := range cases { for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) { t.Run(tc.Name, func(t *testing.T) {
configuration := DefaultConfiguration configuration := DefaultConfiguration()
configuration.Topic = topicName configuration.Topic = topicName
configuration.TopicConfiguration = TopicConfiguration{ configuration.TopicConfiguration = TopicConfiguration{
NumPartitions: 1, NumPartitions: 1,
@@ -97,7 +97,7 @@ func TestTopicMorePartitions(t *testing.T) {
topicName := fmt.Sprintf("test-topic-%d", rand.Int()) topicName := fmt.Sprintf("test-topic-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, flow.CurrentSchemaVersion) expectedTopicName := fmt.Sprintf("%s-v%d", topicName, flow.CurrentSchemaVersion)
configuration := DefaultConfiguration configuration := DefaultConfiguration()
configuration.Topic = topicName configuration.Topic = topicName
configuration.TopicConfiguration = TopicConfiguration{ configuration.TopicConfiguration = TopicConfiguration{
NumPartitions: 1, NumPartitions: 1,