diff --git a/cmd/serve_test.go b/cmd/serve_test.go index 12bffc4a..2365df3b 100644 --- a/cmd/serve_test.go +++ b/cmd/serve_test.go @@ -31,7 +31,6 @@ flow: decoder: netflow listen: 0.0.0.0:2055 workers: 5 - workers: 2 snmp: workers: 2 cache-duration: 20m @@ -70,7 +69,6 @@ core: "queuesize": 100000, "workers": 5, }}, - "workers": 2, }) want(t, got["snmp"]["workers"], 2) want(t, got["snmp"]["cacheduration"], "20m0s") @@ -91,7 +89,6 @@ flow: decoder: netflow listen: 0.0.0.0:2055 workers: 5 - workers: 2 snmp: workers: 2 cache-duration: 10m @@ -108,8 +105,8 @@ core: // Environment os.Setenv("AKVORADO_SNMP_CACHEDURATION", "22m") os.Setenv("AKVORADO_SNMP_DEFAULTCOMMUNITY", "privateer") + os.Setenv("AKVORADO_SNMP_WORKERS", "3") os.Setenv("AKVORADO_KAFKA_BROKERS", "127.0.0.1:9092,127.0.0.2:9092") - os.Setenv("AKVORADO_FLOW_WORKERS", "3") os.Setenv("AKVORADO_FLOW_INPUTS_0_LISTEN", "0.0.0.0:2056") // We may be lucky or the environment is keeping order os.Setenv("AKVORADO_FLOW_INPUTS_1_TYPE", "file") @@ -134,6 +131,7 @@ core: } want(t, got["snmp"]["cacheduration"], "22m0s") want(t, got["snmp"]["defaultcommunity"], "privateer") + want(t, got["snmp"]["workers"], 3) want(t, got["kafka"]["brokers"], []string{"127.0.0.1:9092", "127.0.0.2:9092"}) want(t, got["flow"], map[string]interface{}{ "inputs": []map[string]interface{}{ @@ -149,6 +147,5 @@ core: "paths": []string{"f1", "f2"}, }, }, - "workers": 3, }) } diff --git a/docs/configuration.md b/docs/configuration.md index d531cb98..be24238b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -73,10 +73,6 @@ The flow component handles flow ingestion. It supports the following configuration keys: - `inputs` to specify the list of inputs -- `workers` to specify the number of workers to spawn to handle - decoding -- `queue-size` to specify the number of flows to queue when pushing - them to the core component Each input should define a `type` and `decoder`. For `decoder`, only `netflow` is currently supported. As for the `type`, both `udp` and diff --git a/flow/config.go b/flow/config.go index dd55f4fb..5c7f2597 100644 --- a/flow/config.go +++ b/flow/config.go @@ -17,8 +17,6 @@ import ( type Configuration struct { // Inputs define a list of input modules to enable Inputs []InputConfiguration - // Workers define the number of workers to use for decoding. - Workers int } // DefaultConfiguration represents the default configuration for the flow component @@ -27,7 +25,6 @@ var DefaultConfiguration = Configuration{ Decoder: "netflow", Config: &udp.DefaultConfiguration, }}, - Workers: 1, } // InputConfiguration represents the configuration for an input. diff --git a/flow/config_test.go b/flow/config_test.go index dadf37fb..b25e9d2a 100644 --- a/flow/config_test.go +++ b/flow/config_test.go @@ -22,7 +22,6 @@ func TestDecodeConfiguration(t *testing.T) { Name: "from empty configuration", From: Configuration{}, Source: map[string]interface{}{ - "workers": 10, "inputs": []map[string]interface{}{ map[string]interface{}{ "type": "udp", @@ -33,7 +32,6 @@ func TestDecodeConfiguration(t *testing.T) { }, }, Expected: Configuration{ - Workers: 10, Inputs: []InputConfiguration{{ Decoder: "netflow", Config: &udp.Configuration{ @@ -46,14 +44,12 @@ func TestDecodeConfiguration(t *testing.T) { }, { Name: "from existing configuration", From: Configuration{ - Workers: 10, Inputs: []InputConfiguration{{ Decoder: "netflow", Config: &udp.DefaultConfiguration, }}, }, Source: map[string]interface{}{ - "workers": 10, "inputs": []map[string]interface{}{ map[string]interface{}{ "type": "udp", @@ -64,7 +60,6 @@ func TestDecodeConfiguration(t *testing.T) { }, }, Expected: Configuration{ - Workers: 10, Inputs: []InputConfiguration{{ Decoder: "netflow", Config: &udp.Configuration{ @@ -77,14 +72,12 @@ func TestDecodeConfiguration(t *testing.T) { }, { Name: "change type", From: Configuration{ - Workers: 10, Inputs: []InputConfiguration{{ Decoder: "netflow", Config: &udp.DefaultConfiguration, }}, }, Source: map[string]interface{}{ - "workers": 10, "inputs": []map[string]interface{}{ map[string]interface{}{ "type": "file", @@ -93,7 +86,6 @@ func TestDecodeConfiguration(t *testing.T) { }, }, Expected: Configuration{ - Workers: 10, Inputs: []InputConfiguration{{ Decoder: "netflow", Config: &file.Configuration{ @@ -104,7 +96,6 @@ func TestDecodeConfiguration(t *testing.T) { }, { Name: "only set one item", From: Configuration{ - Workers: 10, Inputs: []InputConfiguration{{ Decoder: "netflow", Config: &udp.Configuration{ @@ -122,7 +113,6 @@ func TestDecodeConfiguration(t *testing.T) { }, }, Expected: Configuration{ - Workers: 10, Inputs: []InputConfiguration{{ Decoder: "netflow", Config: &udp.Configuration{ @@ -175,7 +165,6 @@ func TestDecodeConfiguration(t *testing.T) { func TestMarshalYAML(t *testing.T) { cfg := Configuration{ - Workers: 10, Inputs: []InputConfiguration{ { Decoder: "netflow", @@ -197,7 +186,6 @@ func TestMarshalYAML(t *testing.T) { queuesize: 1000 type: udp workers: 3 -workers: 10 ` if diff := helpers.Diff(strings.Split(string(got), "\n"), strings.Split(expected, "\n")); diff != "" { t.Fatalf("Marshal() (-got, +want):\n%s", diff) diff --git a/flow/decoder.go b/flow/decoder.go index 18261741..66444ed2 100644 --- a/flow/decoder.go +++ b/flow/decoder.go @@ -5,30 +5,44 @@ import ( "akvorado/flow/decoder" "akvorado/flow/decoder/netflow" - "akvorado/flow/input" ) // Message describes a decoded flow message. type Message = decoder.FlowMessage -// decodeWith decode a flow with the provided decoder -func (c *Component) decodeWith(d decoder.Decoder, in input.Flow) { +type wrappedDecoder struct { + c *Component + orig decoder.Decoder +} + +// Decode decodes a flow while keeping some stats. +func (wd *wrappedDecoder) Decode(in decoder.RawFlow) []*Message { timeTrackStart := time.Now() - decoded := d.Decode(in) + decoded := wd.orig.Decode(in) timeTrackStop := time.Now() if decoded == nil { - c.metrics.decoderErrors.WithLabelValues(d.Name()). + wd.c.metrics.decoderErrors.WithLabelValues(wd.orig.Name()). Inc() - return + return nil } - c.metrics.decoderTime.WithLabelValues(d.Name()). + wd.c.metrics.decoderTime.WithLabelValues(wd.orig.Name()). Observe(float64((timeTrackStop.Sub(timeTrackStart)).Nanoseconds()) / 1000 / 1000 / 1000) - c.metrics.decoderStats.WithLabelValues(d.Name()). + wd.c.metrics.decoderStats.WithLabelValues(wd.orig.Name()). Inc() + return decoded +} - for _, f := range decoded { - c.sendFlow(f) +// Name returns the name of the original decoder. +func (wd *wrappedDecoder) Name() string { + return wd.orig.Name() +} + +// wrapDecoder wraps the provided decoders to get statistics from it. +func (c *Component) wrapDecoder(d decoder.Decoder) decoder.Decoder { + return &wrappedDecoder{ + c: c, + orig: d, } } diff --git a/flow/decoder/netflow/root.go b/flow/decoder/netflow/root.go index b18ecc20..3fb40ea8 100644 --- a/flow/decoder/netflow/root.go +++ b/flow/decoder/netflow/root.go @@ -11,7 +11,6 @@ import ( "github.com/netsampler/goflow2/producer" "akvorado/flow/decoder" - "akvorado/flow/input" "akvorado/reporter" ) @@ -126,7 +125,7 @@ func (s *templateSystem) GetTemplate(version uint16, obsDomainID uint32, templat } // Decode decodes a Netflow payload. -func (nd *Decoder) Decode(in input.Flow) []*decoder.FlowMessage { +func (nd *Decoder) Decode(in decoder.RawFlow) []*decoder.FlowMessage { key := in.Source.String() nd.templatesLock.RLock() templates, ok := nd.templates[key] diff --git a/flow/decoder/netflow/root_test.go b/flow/decoder/netflow/root_test.go index 740717bd..75a30bd5 100644 --- a/flow/decoder/netflow/root_test.go +++ b/flow/decoder/netflow/root_test.go @@ -7,7 +7,6 @@ import ( "testing" "akvorado/flow/decoder" - "akvorado/flow/input" "akvorado/helpers" "akvorado/reporter" ) @@ -21,7 +20,7 @@ func TestDecode(t *testing.T) { if err != nil { panic(err) } - got := nfdecoder.Decode(input.Flow{Payload: template, Source: net.ParseIP("127.0.0.1")}) + got := nfdecoder.Decode(decoder.RawFlow{Payload: template, Source: net.ParseIP("127.0.0.1")}) if got == nil { t.Fatalf("Decode() error on options template") } @@ -46,7 +45,7 @@ func TestDecode(t *testing.T) { if err != nil { panic(err) } - got = nfdecoder.Decode(input.Flow{Payload: data, Source: net.ParseIP("127.0.0.1")}) + got = nfdecoder.Decode(decoder.RawFlow{Payload: data, Source: net.ParseIP("127.0.0.1")}) if got == nil { t.Fatalf("Decode() error on options data") } @@ -73,7 +72,7 @@ func TestDecode(t *testing.T) { if err != nil { panic(err) } - got = nfdecoder.Decode(input.Flow{Payload: template, Source: net.ParseIP("127.0.0.1")}) + got = nfdecoder.Decode(decoder.RawFlow{Payload: template, Source: net.ParseIP("127.0.0.1")}) if got == nil { t.Fatalf("Decode() error on template") } @@ -103,7 +102,7 @@ func TestDecode(t *testing.T) { if err != nil { panic(err) } - got = nfdecoder.Decode(input.Flow{Payload: data, Source: net.ParseIP("127.0.0.1")}) + got = nfdecoder.Decode(decoder.RawFlow{Payload: data, Source: net.ParseIP("127.0.0.1")}) if got == nil { t.Fatalf("Decode() error on data") } diff --git a/flow/decoder/root.go b/flow/decoder/root.go index 72ffc5bb..6f920e45 100644 --- a/flow/decoder/root.go +++ b/flow/decoder/root.go @@ -1,8 +1,9 @@ package decoder import ( - "akvorado/flow/input" "akvorado/reporter" + "net" + "time" ) // Decoder is the interface each decoder should implement. @@ -10,11 +11,18 @@ type Decoder interface { // Decoder takes a raw flow and returns a // slice of flow messages. Returning nil means there was an // error during decoding. - Decode(in input.Flow) []*FlowMessage + Decode(in RawFlow) []*FlowMessage // Name returns the decoder name Name() string } +// RawFlow is an undecoded flow. +type RawFlow struct { + TimeReceived time.Time + Payload []byte + Source net.IP +} + // NewDecoderFunc is the signature of a function to instantiate a decoder. type NewDecoderFunc func(*reporter.Reporter) Decoder diff --git a/flow/decoder/tests.go b/flow/decoder/tests.go new file mode 100644 index 00000000..d6e64b53 --- /dev/null +++ b/flow/decoder/tests.go @@ -0,0 +1,25 @@ +//go:build !release + +package decoder + +// DummyDecoder is a simple decoder producing flows from random data. +// The payload is copied in IfDescription +type DummyDecoder struct{} + +// Decode returns uninteresting flow messages. +func (dc *DummyDecoder) Decode(in RawFlow) []*FlowMessage { + return []*FlowMessage{ + { + TimeReceived: uint64(in.TimeReceived.UTC().Unix()), + SamplerAddress: in.Source.To16(), + Bytes: uint64(len(in.Payload)), + Packets: 1, + InIfDescription: string(in.Payload), + }, + } +} + +// Name returns the original name. +func (dc *DummyDecoder) Name() string { + return "dummy" +} diff --git a/flow/input/file/root.go b/flow/input/file/root.go index 1adc97ff..d44f042c 100644 --- a/flow/input/file/root.go +++ b/flow/input/file/root.go @@ -10,6 +10,7 @@ import ( "gopkg.in/tomb.v2" "akvorado/daemon" + "akvorado/flow/decoder" "akvorado/flow/input" "akvorado/reporter" ) @@ -19,25 +20,28 @@ type Input struct { r *reporter.Reporter t tomb.Tomb config *Configuration - ch chan input.Flow // channel to send flows to + + ch chan []*decoder.FlowMessage // channel to send flows to + decoder decoder.Decoder } // New instantiate a new UDP listener from the provided configuration. -func (configuration *Configuration) New(r *reporter.Reporter, daemon daemon.Component) (input.Input, error) { +func (configuration *Configuration) New(r *reporter.Reporter, daemon daemon.Component, dec decoder.Decoder) (input.Input, error) { if len(configuration.Paths) == 0 { return nil, errors.New("no paths provided for file input") } input := &Input{ - r: r, - config: configuration, - ch: make(chan input.Flow), + r: r, + config: configuration, + ch: make(chan []*decoder.FlowMessage), + decoder: dec, } daemon.Track(&input.t, "flow/input/file") return input, nil } // Start starts listening to the provided UDP socket and producing flows. -func (in *Input) Start() (<-chan input.Flow, error) { +func (in *Input) Start() (<-chan []*decoder.FlowMessage, error) { in.r.Info().Msg("file input starting") in.t.Go(func() error { for idx := 0; true; idx++ { @@ -47,15 +51,18 @@ func (in *Input) Start() (<-chan input.Flow, error) { in.r.Err(err).Str("path", path).Msg("unable to read path") return err } - flow := input.Flow{ + flows := in.decoder.Decode(decoder.RawFlow{ TimeReceived: time.Now(), Payload: data, Source: net.ParseIP("127.0.0.1"), + }) + if len(flows) == 0 { + continue } select { case <-in.t.Dying(): return nil - case in.ch <- flow: + case in.ch <- flows: } } return nil diff --git a/flow/input/file/root_test.go b/flow/input/file/root_test.go index 2ddc923a..7a8b91fb 100644 --- a/flow/input/file/root_test.go +++ b/flow/input/file/root_test.go @@ -6,6 +6,7 @@ import ( "time" "akvorado/daemon" + "akvorado/flow/decoder" "akvorado/helpers" "akvorado/reporter" ) @@ -14,7 +15,7 @@ func TestFileInput(t *testing.T) { r := reporter.NewMock(t) configuration := DefaultConfiguration configuration.Paths = []string{path.Join("testdata", "file1.txt"), path.Join("testdata", "file2.txt")} - in, err := configuration.New(r, daemon.NewMock(t)) + in, err := configuration.New(r, daemon.NewMock(t), &decoder.DummyDecoder{}) if err != nil { t.Fatalf("New() error:\n%+v", err) } @@ -35,7 +36,9 @@ out: for i := 0; i < len(expected); i++ { select { case got1 := <-ch: - got = append(got, string(got1.Payload)) + for _, fl := range got1 { + got = append(got, string(fl.InIfDescription)) + } case <-time.After(50 * time.Millisecond): break out } diff --git a/flow/input/root.go b/flow/input/root.go index 99463c45..7726892b 100644 --- a/flow/input/root.go +++ b/flow/input/root.go @@ -1,30 +1,21 @@ package input import ( - "net" - "time" - "akvorado/daemon" + "akvorado/flow/decoder" "akvorado/reporter" ) // Input is the interface any input should meet type Input interface { // Start instructs an input to start producing flows on the returned channel. - Start() (<-chan Flow, error) + Start() (<-chan []*decoder.FlowMessage, error) // Stop instructs the input to stop producing flows. Stop() error } -// Flow is an incoming flow from an input. -type Flow struct { - TimeReceived time.Time - Payload []byte - Source net.IP -} - // Configuration the interface for the configuration for an input module. type Configuration interface { // New instantiantes a new input from its configuration. - New(r *reporter.Reporter, daemon daemon.Component) (Input, error) + New(r *reporter.Reporter, daemon daemon.Component, dec decoder.Decoder) (Input, error) } diff --git a/flow/input/udp/root.go b/flow/input/udp/root.go index 13abfc64..11497d99 100644 --- a/flow/input/udp/root.go +++ b/flow/input/udp/root.go @@ -12,6 +12,7 @@ import ( "gopkg.in/tomb.v2" "akvorado/daemon" + "akvorado/flow/decoder" "akvorado/flow/input" "akvorado/reporter" ) @@ -30,16 +31,18 @@ type Input struct { drops *reporter.CounterVec } - address net.Addr // listening address, for testing purpoese - ch chan input.Flow // channel to send flows to + address net.Addr // listening address, for testing purpoese + ch chan []*decoder.FlowMessage // channel to send flows to + decoder decoder.Decoder // decoder to use } // New instantiate a new UDP listener from the provided configuration. -func (configuration *Configuration) New(r *reporter.Reporter, daemon daemon.Component) (input.Input, error) { +func (configuration *Configuration) New(r *reporter.Reporter, daemon daemon.Component, dec decoder.Decoder) (input.Input, error) { input := &Input{ - r: r, - config: configuration, - ch: make(chan input.Flow, configuration.QueueSize), + r: r, + config: configuration, + ch: make(chan []*decoder.FlowMessage, configuration.QueueSize), + decoder: dec, } input.metrics.bytes = r.CounterVec( @@ -84,7 +87,7 @@ func (configuration *Configuration) New(r *reporter.Reporter, daemon daemon.Comp } // Start starts listening to the provided UDP socket and producing flows. -func (in *Input) Start() (<-chan input.Flow, error) { +func (in *Input) Start() (<-chan []*decoder.FlowMessage, error) { in.r.Info().Str("listen", in.config.Listen).Msg("starting UDP input") // Listen to UDP port @@ -135,15 +138,18 @@ func (in *Input) Start() (<-chan input.Flow, error) { } srcIP := source.IP.String() - flow := input.Flow{ + flows := in.decoder.Decode(decoder.RawFlow{ TimeReceived: time.Now(), Payload: payload[:size], Source: source.IP, + }) + if len(flows) == 0 { + continue } select { case <-in.t.Dying(): return nil - case in.ch <- flow: + case in.ch <- flows: in.metrics.bytes.WithLabelValues(listen, worker, srcIP). Add(float64(size)) in.metrics.packets.WithLabelValues(listen, worker, srcIP). diff --git a/flow/input/udp/root_test.go b/flow/input/udp/root_test.go index b39e1eb7..d31c38b1 100644 --- a/flow/input/udp/root_test.go +++ b/flow/input/udp/root_test.go @@ -6,7 +6,7 @@ import ( "time" "akvorado/daemon" - "akvorado/flow/input" + "akvorado/flow/decoder" "akvorado/helpers" "akvorado/reporter" ) @@ -15,7 +15,7 @@ func TestUDPInput(t *testing.T) { r := reporter.NewMock(t) configuration := DefaultConfiguration configuration.Listen = "127.0.0.1:0" - in, err := configuration.New(r, daemon.NewMock(t)) + in, err := configuration.New(r, daemon.NewMock(t), &decoder.DummyDecoder{}) if err != nil { t.Fatalf("New() error:\n%+v", err) } @@ -41,21 +41,28 @@ func TestUDPInput(t *testing.T) { } // Get it back - var got input.Flow + var got []*decoder.FlowMessage select { case got = <-ch: + if len(got) == 0 { + t.Fatalf("empty decoded flows received") + } case <-time.After(20 * time.Millisecond): - t.Fatal("Input data missing") + t.Fatal("no decoded flows received") } - delta := got.TimeReceived.Sub(time.Now()) - if delta > time.Second || delta < -time.Second { - t.Errorf("TimeReceived out of range: %s (now: %s)", got.TimeReceived, time.Now()) + delta := uint64(time.Now().UTC().Unix()) - got[0].TimeReceived + if delta > 1 { + t.Errorf("TimeReceived out of range: %d (now: %d)", got[0].TimeReceived, time.Now().UTC().Unix()) } - expected := input.Flow{ - TimeReceived: got.TimeReceived, - Payload: []byte("hello world!"), - Source: net.ParseIP("127.0.0.1"), + expected := []*decoder.FlowMessage{ + { + TimeReceived: got[0].TimeReceived, + SamplerAddress: net.ParseIP("127.0.0.1"), + Bytes: 12, + Packets: 1, + InIfDescription: "hello world!", + }, } if diff := helpers.Diff(got, expected); diff != "" { t.Fatalf("Input data (-got, +want):\n%s", diff) @@ -82,7 +89,7 @@ func TestOverflow(t *testing.T) { configuration := DefaultConfiguration configuration.Listen = "127.0.0.1:0" configuration.QueueSize = 1 - in, err := configuration.New(r, daemon.NewMock(t)) + in, err := configuration.New(r, daemon.NewMock(t), &decoder.DummyDecoder{}) if err != nil { t.Fatalf("New() error:\n%+v", err) } diff --git a/flow/root.go b/flow/root.go index a0d26831..9958a77e 100644 --- a/flow/root.go +++ b/flow/root.go @@ -31,9 +31,8 @@ type Component struct { // Channel for sending flows out of the package. outgoingFlows chan *Message - // Inputs and decoders - inputs []input.Input - decoders []decoder.Decoder + // Inputs + inputs []input.Input } // Dependencies are the dependencies of the flow component. @@ -54,24 +53,15 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende config: configuration, outgoingFlows: make(chan *Message), inputs: make([]input.Input, len(configuration.Inputs)), - decoders: make([]decoder.Decoder, len(configuration.Inputs)), - } - - // Initialize inputs - for idx, input := range c.config.Inputs { - var err error - c.inputs[idx], err = input.Config.New(r, c.d.Daemon) - if err != nil { - return nil, err - } } // Initialize decoders (at most once each) var alreadyInitialized = map[string]decoder.Decoder{} + decs := make([]decoder.Decoder, len(configuration.Inputs)) for idx, input := range c.config.Inputs { dec, ok := alreadyInitialized[input.Decoder] if ok { - c.decoders[idx] = dec + decs[idx] = dec continue } decoderfunc, ok := decoders[input.Decoder] @@ -80,7 +70,16 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende } dec = decoderfunc(r) alreadyInitialized[input.Decoder] = dec - c.decoders[idx] = dec + decs[idx] = c.wrapDecoder(dec) + } + + // Initialize inputs + for idx, input := range c.config.Inputs { + var err error + c.inputs[idx], err = input.Config.New(r, c.d.Daemon, decs[idx]) + if err != nil { + return nil, err + } } // Metrics @@ -119,8 +118,7 @@ func (c *Component) Flows() <-chan *Message { // Start starts the flow component. func (c *Component) Start() error { - for idx, input := range c.inputs { - decoder := c.decoders[idx] + for _, input := range c.inputs { ch, err := input.Start() stopper := input.Stop if err != nil { @@ -132,8 +130,14 @@ func (c *Component) Start() error { select { case <-c.t.Dying(): return nil - case infl := <-ch: - c.decodeWith(decoder, infl) + case fmsgs := <-ch: + for _, fmsg := range fmsgs { + select { + case <-c.t.Dying(): + return nil + case c.outgoingFlows <- fmsg: + } + } } } }) @@ -141,15 +145,6 @@ func (c *Component) Start() error { return nil } -// sendFlow transmits received flows to the next component -func (c *Component) sendFlow(fmsg *Message) { - select { - case <-c.t.Dying(): - return - case c.outgoingFlows <- fmsg: - } -} - // Stop stops the flow component func (c *Component) Stop() error { defer func() {