mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-12 06:24:10 +01:00
Today the timestamp can only be from kernel timetstamp put on the UDP packet by the kernel. I propose to add 2 alternative methods of getting the timestamp for netflow/IPFix packets: - TimestampSourceNetflowPacket: use the timestamp field in the netflow packet itself - TimestampSourceNetflowFirstSwitched: use the FirstSwitched field from each flow (the field is actually in uptime, so we need to shift it according to sysUptime) Using those fields requires the router to have accurate time (probably NTP), but it allows for architectures where a UDP packet is not immediately received by the collector, eg. if there is a kafka in-between. That in turns allows to do maintenance on the collector, without messing up the statistics
167 lines
3.8 KiB
Go
167 lines
3.8 KiB
Go
// SPDX-FileCopyrightText: 2022 Free Mobile
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
// Package flow handle incoming flows (currently Netflow v9 and IPFIX).
|
|
package flow
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"net/netip"
|
|
|
|
"gopkg.in/tomb.v2"
|
|
|
|
"akvorado/common/daemon"
|
|
"akvorado/common/httpserver"
|
|
"akvorado/common/reporter"
|
|
"akvorado/common/schema"
|
|
"akvorado/inlet/flow/decoder"
|
|
"akvorado/inlet/flow/input"
|
|
)
|
|
|
|
// Component represents the flow component.
|
|
type Component struct {
|
|
r *reporter.Reporter
|
|
d *Dependencies
|
|
t tomb.Tomb
|
|
config Configuration
|
|
|
|
metrics struct {
|
|
decoderStats *reporter.CounterVec
|
|
decoderErrors *reporter.CounterVec
|
|
}
|
|
|
|
// Channel for sending flows out of the package.
|
|
outgoingFlows chan *schema.FlowMessage
|
|
|
|
// Per-exporter rate-limiters
|
|
limiters map[netip.Addr]*limiter
|
|
|
|
// Inputs
|
|
inputs []input.Input
|
|
}
|
|
|
|
// Dependencies are the dependencies of the flow component.
|
|
type Dependencies struct {
|
|
Daemon daemon.Component
|
|
HTTP *httpserver.Component
|
|
Schema *schema.Component
|
|
}
|
|
|
|
// New creates a new flow component.
|
|
func New(r *reporter.Reporter, configuration Configuration, dependencies Dependencies) (*Component, error) {
|
|
if len(configuration.Inputs) == 0 {
|
|
return nil, errors.New("no input configured")
|
|
}
|
|
|
|
c := Component{
|
|
r: r,
|
|
d: &dependencies,
|
|
config: configuration,
|
|
outgoingFlows: make(chan *schema.FlowMessage),
|
|
limiters: make(map[netip.Addr]*limiter),
|
|
inputs: make([]input.Input, len(configuration.Inputs)),
|
|
}
|
|
|
|
// Initialize decoders (at most once each)
|
|
alreadyInitialized := map[string]decoder.Decoder{}
|
|
decs := make([]decoder.Decoder, len(configuration.Inputs))
|
|
for idx, input := range c.config.Inputs {
|
|
dec, ok := alreadyInitialized[input.Decoder]
|
|
if ok {
|
|
decs[idx] = dec
|
|
continue
|
|
}
|
|
decoderfunc, ok := decoders[input.Decoder]
|
|
if !ok {
|
|
return nil, fmt.Errorf("unknown decoder %q", input.Decoder)
|
|
}
|
|
dec = decoderfunc(r, decoder.Dependencies{Schema: c.d.Schema}, decoder.Option{TimestampSource: input.TimestampSource})
|
|
alreadyInitialized[input.Decoder] = dec
|
|
decs[idx] = c.wrapDecoder(dec, input.UseSrcAddrForExporterAddr)
|
|
}
|
|
|
|
// Initialize inputs
|
|
for idx, input := range c.config.Inputs {
|
|
var err error
|
|
c.inputs[idx], err = input.Config.New(r, c.d.Daemon, decs[idx])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Metrics
|
|
c.metrics.decoderStats = c.r.CounterVec(
|
|
reporter.CounterOpts{
|
|
Name: "decoder_flows_total",
|
|
Help: "Decoder processed count.",
|
|
},
|
|
[]string{"name"},
|
|
)
|
|
c.metrics.decoderErrors = c.r.CounterVec(
|
|
reporter.CounterOpts{
|
|
Name: "decoder_errors_total",
|
|
Help: "Decoder processed error count.",
|
|
},
|
|
[]string{"name"},
|
|
)
|
|
|
|
c.d.Daemon.Track(&c.t, "inlet/flow")
|
|
|
|
c.d.HTTP.AddHandler("/api/v0/inlet/flow/schema.proto",
|
|
http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
w.Header().Set("Content-Type", "text/plain")
|
|
w.Write([]byte(c.d.Schema.ProtobufDefinition()))
|
|
}))
|
|
|
|
return &c, nil
|
|
}
|
|
|
|
// Flows returns a channel to receive flows.
|
|
func (c *Component) Flows() <-chan *schema.FlowMessage {
|
|
return c.outgoingFlows
|
|
}
|
|
|
|
// Start starts the flow component.
|
|
func (c *Component) Start() error {
|
|
for _, input := range c.inputs {
|
|
ch, err := input.Start()
|
|
stopper := input.Stop
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.t.Go(func() error {
|
|
defer stopper()
|
|
for {
|
|
select {
|
|
case <-c.t.Dying():
|
|
return nil
|
|
case fmsgs := <-ch:
|
|
if c.allowMessages(fmsgs) {
|
|
for _, fmsg := range fmsgs {
|
|
select {
|
|
case <-c.t.Dying():
|
|
return nil
|
|
case c.outgoingFlows <- fmsg:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Stop stops the flow component
|
|
func (c *Component) Stop() error {
|
|
defer func() {
|
|
close(c.outgoingFlows)
|
|
c.r.Info().Msg("flow component stopped")
|
|
}()
|
|
c.r.Info().Msg("stopping flow component")
|
|
c.t.Kill(nil)
|
|
return c.t.Wait()
|
|
}
|