Files
akvorado/inlet/flow/config_test.go
netixx c2b3cae237 Allow using fields of the netflow packet to set the flow TimeReceived
Today the timestamp can only be from kernel timetstamp put on the UDP packet
by the kernel.

I propose to add 2 alternative methods of getting the timestamp for netflow/IPFix packets:
- TimestampSourceNetflowPacket: use the timestamp field in the netflow packet itself
- TimestampSourceNetflowFirstSwitched: use the FirstSwitched field from each flow
(the field is actually in uptime, so we need to shift it according to sysUptime)

Using those fields requires the router to have accurate time (probably NTP),
but it allows for architectures where a UDP packet is not immediately
received by the collector, eg. if there is a kafka in-between.
That in turns allows to do maintenance on the collector,
without messing up the statistics
2024-03-30 22:01:40 +01:00

318 lines
7.0 KiB
Go

// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package flow
import (
"strings"
"testing"
"github.com/gin-gonic/gin"
"akvorado/common/helpers/yaml"
"akvorado/common/helpers"
"akvorado/inlet/flow/decoder"
"akvorado/inlet/flow/input/file"
"akvorado/inlet/flow/input/udp"
)
func TestDecodeConfiguration(t *testing.T) {
helpers.TestConfigurationDecode(t, helpers.ConfigurationDecodeCases{
{
Description: "from empty configuration",
Initial: func() interface{} { return Configuration{} },
Configuration: func() interface{} {
return gin.H{
"inputs": []gin.H{
{
"type": "udp",
"decoder": "netflow",
"listen": "192.0.2.1:2055",
"workers": 3,
"use-src-addr-for-exporter-addr": true,
}, {
"type": "udp",
"decoder": "sflow",
"listen": "192.0.2.1:6343",
"workers": 3,
},
},
}
},
Expected: Configuration{
Inputs: []InputConfiguration{{
Decoder: "netflow",
Config: &udp.Configuration{
Workers: 3,
QueueSize: 100000,
Listen: "192.0.2.1:2055",
},
UseSrcAddrForExporterAddr: true,
}, {
Decoder: "sflow",
Config: &udp.Configuration{
Workers: 3,
QueueSize: 100000,
Listen: "192.0.2.1:6343",
},
UseSrcAddrForExporterAddr: false,
}},
},
}, {
Description: "from existing configuration",
Initial: func() interface{} {
return Configuration{
Inputs: []InputConfiguration{{
Decoder: "netflow",
Config: udp.DefaultConfiguration(),
}, {
Decoder: "sflow",
Config: udp.DefaultConfiguration(),
}},
}
},
Configuration: func() interface{} {
return gin.H{
"inputs": []gin.H{
{
"type": "udp",
"decoder": "netflow",
"listen": "192.0.2.1:2055",
"workers": 3,
}, {
"type": "udp",
"decoder": "sflow",
"listen": "192.0.2.1:6343",
"workers": 3,
},
},
}
},
Expected: Configuration{
Inputs: []InputConfiguration{{
Decoder: "netflow",
Config: &udp.Configuration{
Workers: 3,
QueueSize: 100000,
Listen: "192.0.2.1:2055",
},
}, {
Decoder: "sflow",
Config: &udp.Configuration{
Workers: 3,
QueueSize: 100000,
Listen: "192.0.2.1:6343",
},
}},
},
}, {
Description: "change type",
Initial: func() interface{} {
return Configuration{
Inputs: []InputConfiguration{{
Decoder: "netflow",
Config: udp.DefaultConfiguration(),
}},
}
},
Configuration: func() interface{} {
return gin.H{
"inputs": []gin.H{
{
"type": "file",
"paths": []string{"file1", "file2"},
},
},
}
},
Expected: Configuration{
Inputs: []InputConfiguration{{
Decoder: "netflow",
Config: &file.Configuration{
Paths: []string{"file1", "file2"},
},
}},
},
}, {
Description: "only set one item",
Initial: func() interface{} {
return Configuration{
Inputs: []InputConfiguration{{
Decoder: "netflow",
Config: &udp.Configuration{
Workers: 2,
QueueSize: 100,
Listen: "127.0.0.1:2055",
},
}},
}
},
Configuration: func() interface{} {
return gin.H{
"inputs": []gin.H{
{
"listen": "192.0.2.1:2055",
},
},
}
},
Expected: Configuration{
Inputs: []InputConfiguration{{
Decoder: "netflow",
Config: &udp.Configuration{
Workers: 2,
QueueSize: 100,
Listen: "192.0.2.1:2055",
},
}},
},
}, {
Description: "incorrect decoder",
Initial: func() interface{} {
return Configuration{}
},
Configuration: func() interface{} {
return gin.H{
"inputs": []gin.H{
{
"type": "avians",
"decoder": "netflow",
"listen": "192.0.2.1:2055",
"workers": 3,
}, {
"type": "udp",
"decoder": "sflow",
"listen": "192.0.2.1:6343",
"workers": 3,
},
},
}
},
Error: true,
},
{
Description: "netflow timestamp source netflow-packet",
Initial: func() interface{} {
return Configuration{
Inputs: []InputConfiguration{{
Decoder: "netflow",
Config: &udp.Configuration{
Workers: 2,
QueueSize: 100,
Listen: "127.0.0.1:2055",
},
}},
}
},
Configuration: func() interface{} {
return gin.H{
"inputs": []gin.H{
{
"timestamp-source": "netflow-packet",
"listen": "192.0.2.1:2055",
},
},
}
},
Expected: Configuration{
Inputs: []InputConfiguration{{
Decoder: "netflow",
TimestampSource: decoder.TimestampSourceNetflowPacket,
Config: &udp.Configuration{
Workers: 2,
QueueSize: 100,
Listen: "192.0.2.1:2055",
},
}},
},
},
{
Description: "netflow timestamp source netflow-first-switched",
Initial: func() interface{} {
return Configuration{
Inputs: []InputConfiguration{{
Decoder: "netflow",
Config: &udp.Configuration{
Workers: 2,
QueueSize: 100,
Listen: "127.0.0.1:2055",
},
}},
}
},
Configuration: func() interface{} {
return gin.H{
"inputs": []gin.H{
{
"timestamp-source": "netflow-first-switched",
"listen": "192.0.2.1:2055",
},
},
}
},
Expected: Configuration{
Inputs: []InputConfiguration{{
Decoder: "netflow",
TimestampSource: decoder.TimestampSourceNetflowFirstSwitched,
Config: &udp.Configuration{
Workers: 2,
QueueSize: 100,
Listen: "192.0.2.1:2055",
},
}},
},
},
})
}
func TestMarshalYAML(t *testing.T) {
cfg := Configuration{
Inputs: []InputConfiguration{
{
Decoder: "netflow",
TimestampSource: decoder.TimestampSourceNetflowFirstSwitched,
Config: &udp.Configuration{
Listen: "192.0.2.11:2055",
QueueSize: 1000,
Workers: 3,
},
}, {
Decoder: "sflow",
Config: &udp.Configuration{
Listen: "192.0.2.11:6343",
QueueSize: 1000,
Workers: 3,
},
UseSrcAddrForExporterAddr: true,
},
},
}
got, err := yaml.Marshal(cfg)
if err != nil {
t.Fatalf("Marshal() error:\n%+v", err)
}
expected := `inputs:
- decoder: netflow
listen: 192.0.2.11:2055
queuesize: 1000
receivebuffer: 0
timestampsource: netflow-first-switched
type: udp
usesrcaddrforexporteraddr: false
workers: 3
- decoder: sflow
listen: 192.0.2.11:6343
queuesize: 1000
receivebuffer: 0
timestampsource: udp
type: udp
usesrcaddrforexporteraddr: true
workers: 3
ratelimit: 0
`
if diff := helpers.Diff(strings.Split(string(got), "\n"), strings.Split(expected, "\n")); diff != "" {
t.Fatalf("Marshal() (-got, +want):\n%s", diff)
}
}