diff --git a/cmd/serve.go b/cmd/serve.go index 66c29713..d784f91c 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -7,6 +7,7 @@ import ( netHTTP "net/http" "os" "runtime" + "strconv" "strings" "github.com/mitchellh/mapstructure" @@ -117,12 +118,20 @@ and exports them to Kafka.`, continue } // From AKVORADO_SQUID_PURPLE_QUIRK=47, we - // build a map "squid -> purple -> quirk -> 47" + // build a map "squid -> purple -> quirk -> + // 47". From AKVORADO_SQUID_3_PURPLE=47, we + // build "squid[3] -> purple -> 47" var rawConfig interface{} rawConfig = kv[1] for i := len(kk) - 1; i > 0; i-- { - rawConfig = map[string]interface{}{ - kk[i]: rawConfig, + if index, err := strconv.Atoi(kk[i]); err == nil { + newRawConfig := make([]interface{}, index+1) + newRawConfig[index] = rawConfig + rawConfig = newRawConfig + } else { + rawConfig = map[string]interface{}{ + kk[i]: rawConfig, + } } } if err := decoder.Decode(rawConfig); err != nil { diff --git a/cmd/serve_test.go b/cmd/serve_test.go index 40a48c2e..12bffc4a 100644 --- a/cmd/serve_test.go +++ b/cmd/serve_test.go @@ -106,10 +106,15 @@ core: ioutil.WriteFile(configFile, []byte(config), 0644) // Environment - os.Setenv("AKVORADO_FLOW_WORKERS", "3") os.Setenv("AKVORADO_SNMP_CACHEDURATION", "22m") os.Setenv("AKVORADO_SNMP_DEFAULTCOMMUNITY", "privateer") os.Setenv("AKVORADO_KAFKA_BROKERS", "127.0.0.1:9092,127.0.0.2:9092") + os.Setenv("AKVORADO_FLOW_WORKERS", "3") + os.Setenv("AKVORADO_FLOW_INPUTS_0_LISTEN", "0.0.0.0:2056") + // We may be lucky or the environment is keeping order + os.Setenv("AKVORADO_FLOW_INPUTS_1_TYPE", "file") + os.Setenv("AKVORADO_FLOW_INPUTS_1_DECODER", "netflow") + os.Setenv("AKVORADO_FLOW_INPUTS_1_PATHS", "f1,f2") // Start serves with it root := cmd.RootCmd @@ -127,8 +132,23 @@ core: if err := yaml.Unmarshal(buf.Bytes(), &got); err != nil { t.Fatalf("Unmarshal() error:\n%+v", err) } - want(t, got["flow"]["workers"], 3) want(t, got["snmp"]["cacheduration"], "22m0s") want(t, got["snmp"]["defaultcommunity"], "privateer") want(t, got["kafka"]["brokers"], []string{"127.0.0.1:9092", "127.0.0.2:9092"}) + want(t, got["flow"], map[string]interface{}{ + "inputs": []map[string]interface{}{ + { + "type": "udp", + "decoder": "netflow", + "listen": "0.0.0.0:2056", + "queuesize": 100000, + "workers": 5, + }, { + "type": "file", + "decoder": "netflow", + "paths": []string{"f1", "f2"}, + }, + }, + "workers": 3, + }) } diff --git a/flow/config.go b/flow/config.go index f38045a5..c59549f7 100644 --- a/flow/config.go +++ b/flow/config.go @@ -112,6 +112,10 @@ func ConfigurationUnmarshalerHook() mapstructure.DecodeHookFunc { // Alter config with a copy of the concrete type original := reflect.Indirect(reflect.ValueOf(input)) + if !configField.IsNil() && configField.Elem().Type().Elem() == reflect.TypeOf(input).Elem() { + // Use the value we already have instead of default. + original = reflect.Indirect(configField.Elem()) + } copy := reflect.New(original.Type()) copy.Elem().Set(reflect.ValueOf(original.Interface())) configField.Set(copy) diff --git a/flow/config_test.go b/flow/config_test.go index e0d19d85..dadf37fb 100644 --- a/flow/config_test.go +++ b/flow/config_test.go @@ -107,7 +107,11 @@ func TestDecodeConfiguration(t *testing.T) { Workers: 10, Inputs: []InputConfiguration{{ Decoder: "netflow", - Config: &udp.DefaultConfiguration, + Config: &udp.Configuration{ + Workers: 2, + QueueSize: 100, + Listen: "127.0.0.1:2055", + }, }}, }, Source: map[string]interface{}{ @@ -122,8 +126,8 @@ func TestDecodeConfiguration(t *testing.T) { Inputs: []InputConfiguration{{ Decoder: "netflow", Config: &udp.Configuration{ - Workers: 1, - QueueSize: 100000, + Workers: 2, + QueueSize: 100, Listen: "192.0.2.1:2055", }, }}, @@ -160,7 +164,7 @@ func TestDecodeConfiguration(t *testing.T) { }) } - // Check we didn't alter the default value + // Check we didn't alter the default value for UDP if diff := helpers.Diff(udp.DefaultConfiguration, udp.Configuration{ Workers: 1, QueueSize: 100000,