inlet/core: also accept a map from subnet for default-sampling-rate

This should be generalized to other parts of configuration (SNMP
community), however, we need to check what happens with default
values. Also, network definition in orchestrator is likely to be able
to reuse that.
This commit is contained in:
Vincent Bernat
2022-07-31 20:36:46 +02:00
parent 4ac1d3bdbf
commit 5cdc567828
12 changed files with 373 additions and 77 deletions

View File

@@ -79,22 +79,10 @@ func (c ConfigRelatedOptions) Parse(out io.Writer, component string, config inte
defaultHook, disableDefaultHook := DefaultHook()
zeroSliceHook, disableZeroSliceHook := ZeroSliceHook()
var metadata mapstructure.Metadata
registeredHooks := helpers.GetMapStructureUnmarshallerHooks()
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
Result: &config,
ErrorUnused: false,
Metadata: &metadata,
WeaklyTypedInput: true,
MatchName: helpers.MapStructureMatchName,
DecodeHook: mapstructure.ComposeDecodeHookFunc(
defaultHook,
zeroSliceHook,
mapstructure.ComposeDecodeHookFunc(registeredHooks...),
mapstructure.TextUnmarshallerHookFunc(),
mapstructure.StringToTimeDurationHookFunc(),
mapstructure.StringToSliceHookFunc(","),
),
})
decoderConfig := helpers.GetMapStructureDecoderConfig(&config, defaultHook, zeroSliceHook)
decoderConfig.ErrorUnused = false
decoderConfig.Metadata = &metadata
decoder, err := mapstructure.NewDecoder(decoderConfig)
if err != nil {
return fmt.Errorf("unable to create configuration decoder: %w", err)
}

View File

@@ -17,10 +17,23 @@ func AddMapstructureUnmarshallerHook(hook mapstructure.DecodeHookFunc) {
mapstructureUnmarshallerHookFuncs = append(mapstructureUnmarshallerHookFuncs, hook)
}
// GetMapStructureUnmarshallerHooks returns all the registered decode
// hooks for mapstructure.
func GetMapStructureUnmarshallerHooks() []mapstructure.DecodeHookFunc {
return mapstructureUnmarshallerHookFuncs
// GetMapStructureDecoderConfig returns a decoder config for
// mapstructure with all registered hooks as well as appropriate
// default configuration.
func GetMapStructureDecoderConfig(config interface{}, hooks ...mapstructure.DecodeHookFunc) *mapstructure.DecoderConfig {
return &mapstructure.DecoderConfig{
Result: config,
ErrorUnused: true,
WeaklyTypedInput: true,
MatchName: MapStructureMatchName,
DecodeHook: mapstructure.ComposeDecodeHookFunc(
mapstructure.ComposeDecodeHookFunc(hooks...),
mapstructure.ComposeDecodeHookFunc(mapstructureUnmarshallerHookFuncs...),
mapstructure.TextUnmarshallerHookFunc(),
mapstructure.StringToTimeDurationHookFunc(),
mapstructure.StringToSliceHookFunc(","),
),
}
}
// MapStructureMatchName tells if map key and field names are equal.

118
common/helpers/subnetmap.go Normal file
View File

@@ -0,0 +1,118 @@
// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package helpers
import (
"fmt"
"net"
"reflect"
"github.com/kentik/patricia"
tree "github.com/kentik/patricia/generics_tree"
"github.com/mitchellh/mapstructure"
)
// SubnetMap maps subnets to values and allow to lookup by IP address.
// Internally, everything is stored as an IPv6 (using v6-mapped IPv4
// addresses).
type SubnetMap[V any] struct {
tree *tree.TreeV6[V]
}
// Lookup will search for the most specific subnet matching the
// provided IP address and return the value associated with it.
func (sm *SubnetMap[V]) Lookup(ip net.IP) (V, bool) {
if sm.tree == nil {
var value V
return value, false
}
ip = ip.To16()
ok, value := sm.tree.FindDeepestTag(patricia.NewIPv6Address(ip, 128))
return value, ok
}
// SubnetMapUnmarshallerHook decodes SubnetMap and notably check that
// valid networks are provided as key. It also accepts a single value
// instead of a map for backward compatibility.
func SubnetMapUnmarshallerHook[V any]() mapstructure.DecodeHookFunc {
return func(from, to reflect.Value) (interface{}, error) {
if to.Type() != reflect.TypeOf(SubnetMap[V]{}) {
return from.Interface(), nil
}
output := map[string]interface{}{}
var zero V
if from.Kind() == reflect.Map {
// First case, we have a map
iter := from.MapRange()
for i := 0; iter.Next(); i++ {
k := iter.Key()
v := iter.Value()
if k.Kind() == reflect.Interface {
k = k.Elem()
}
if k.Kind() != reflect.String {
return nil, fmt.Errorf("key %d is not a string (%s)", i, k.Kind())
}
// Parse key
_, ipNet, err := net.ParseCIDR(k.String())
if err != nil {
return nil, err
}
// Convert key to IPv6
ones, bits := ipNet.Mask.Size()
if bits != 32 && bits != 128 {
return nil, fmt.Errorf("key %d has an invalid netmask", i)
}
var key string
if bits == 32 {
key = fmt.Sprintf("::ffff:%s/%d", ipNet.IP.String(), ones+96)
} else {
key = ipNet.String()
}
output[key] = v.Interface()
}
} else if from.Type() == reflect.TypeOf(zero) || from.Type().ConvertibleTo(reflect.TypeOf(zero)) {
// Second case, we have a single value
output["::/0"] = from.Interface()
} else {
return from.Interface(), nil
}
// We have to decode output map, then turn it into a SubnetMap[V]
var intermediate map[string]V
intermediateDecoder, err := mapstructure.NewDecoder(
GetMapStructureDecoderConfig(&intermediate))
if err != nil {
return nil, fmt.Errorf("cannot create subdecoder: %w", err)
}
if err := intermediateDecoder.Decode(output); err != nil {
return nil, fmt.Errorf("unable to decode %q: %w", reflect.TypeOf(zero).Name(), err)
}
trie := tree.NewTreeV6[V]()
for k, v := range intermediate {
_, ipNet, err := net.ParseCIDR(k)
if err != nil {
// Should not happen
return nil, err
}
plen, _ := ipNet.Mask.Size()
trie.Set(patricia.NewIPv6Address(ipNet.IP.To16(), uint(plen)), v)
}
return SubnetMap[V]{trie}, nil
}
}
func (sm SubnetMap[V]) MarshalYAML() (interface{}, error) {
output := map[string]V{}
if sm.tree == nil {
return output, nil
}
iter := sm.tree.Iterate()
for iter.Next() {
output[iter.Address().String()] = iter.Tags()[0]
}
return output, nil
}

View File

@@ -0,0 +1,128 @@
// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package helpers_test
import (
"net"
"testing"
"github.com/gin-gonic/gin"
"github.com/mitchellh/mapstructure"
"gopkg.in/yaml.v2"
"akvorado/common/helpers"
)
func TestSubnetMapUnmarshalHook(t *testing.T) {
var nilMap map[string]string
cases := []struct {
Description string
Input interface{}
Tests map[string]string
Error bool
YAML interface{}
}{
{
Description: "nil",
Input: nilMap,
Tests: map[string]string{
"203.0.113.1": "",
},
}, {
Description: "empty",
Input: gin.H{},
Tests: map[string]string{
"203.0.113.1": "",
},
}, {
Description: "IPv4",
Input: gin.H{"203.0.113.0/24": "customer1"},
Tests: map[string]string{
"::ffff:203.0.113.18": "customer1",
"203.0.113.16": "customer1",
"203.0.1.1": "",
"::ffff:203.0.1.1": "",
"2001:db8:1::12": "",
},
}, {
Description: "IPv6",
Input: gin.H{"2001:db8:1::/64": "customer2"},
Tests: map[string]string{
"2001:db8:1::1": "customer2",
"2001:db8:1::2": "customer2",
"2001:db8:2::2": "",
},
}, {
Description: "Invalid subnet (1)",
Input: gin.H{"192.0.2.1/38": "customer"},
Error: true,
}, {
Description: "Invalid subnet (2)",
Input: gin.H{"192.0.2.1/255.0.255.0": "customer"},
Error: true,
}, {
Description: "Invalid subnet (3)",
Input: gin.H{"2001:db8::/1000": "customer"},
Error: true,
}, {
Description: "Single value",
Input: "customer",
Tests: map[string]string{
"203.0.113.4": "customer",
"2001:db8::1": "customer",
},
YAML: map[string]string{
"::/0": "customer",
},
},
}
for _, tc := range cases {
if tc.YAML == nil {
if tc.Error {
tc.YAML = map[string]string{}
} else {
tc.YAML = tc.Input
}
}
t.Run(tc.Description, func(t *testing.T) {
var tree helpers.SubnetMap[string]
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
Result: &tree,
ErrorUnused: true,
Metadata: nil,
DecodeHook: helpers.SubnetMapUnmarshallerHook[string](),
})
if err != nil {
t.Fatalf("NewDecoder() error:\n%+v", err)
}
err = decoder.Decode(tc.Input)
if err != nil && !tc.Error {
t.Fatalf("Decode() error:\n%+v", err)
} else if err == nil && tc.Error {
t.Fatal("Decode() did not return an error")
}
got := map[string]string{}
for k := range tc.Tests {
v, _ := tree.Lookup(net.ParseIP(k))
got[k] = v
}
if diff := helpers.Diff(got, tc.Tests); diff != "" {
t.Fatalf("Decode() (-got, +want):\n%s", diff)
}
// Try to unmarshal with YAML
buf, err := yaml.Marshal(tree)
if err != nil {
t.Fatalf("yaml.Marshal() error:\n%+v", err)
}
got = map[string]string{}
if err := yaml.Unmarshal(buf, &got); err != nil {
t.Fatalf("yaml.Unmarshal() error:\n%+v", err)
}
if diff := helpers.Diff(got, tc.YAML); diff != "" {
t.Fatalf("MarshalYAML() (-got, +want):\n%s", diff)
}
})
}
}

View File

@@ -155,7 +155,8 @@ The following configuration keys are accepted:
- `default-sampling-rate` defines the default sampling rate to use
when the information is missing. If not defined, flows without a
sampling rate will be rejected. Use this option only if your
hardware is unable to advertise a sampling rate.
hardware is unable to advertise a sampling rate. This can either be
a single value or a map from subnets to sampling rates.
- `ignore-asn-from-flow` allows one to ignore the AS numbers from the
received flows. It can be useful for routers with a partial routing
table and a default route learned over BGP.

View File

@@ -18,6 +18,7 @@ tables. This is fixed with this release, but this implies dropping the
existing data (only the country information). See [PR #61][] for more
details.
-*inlet*: `inlet.core.default-sampling-rate` also accepts a map from subnet to sampling rate
- 🩹 *orchestrator*: fix `SrcCountry`/`DstCountry` columns in aggregated tables [PR #61][]
- 🌱 *inlet*: `inlet.geoip.country-database` has been renamed to `inlet.geoip.geo-database`
- 🌱 *inlet*: add counters for GeoIP database hit/miss

3
go.mod
View File

@@ -16,6 +16,7 @@ require (
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/gosnmp/gosnmp v1.35.0
github.com/kentik/patricia v1.0.0
github.com/kylelemons/godebug v1.1.0
github.com/mattn/go-isatty v0.0.14
github.com/mitchellh/mapstructure v1.5.0
@@ -113,3 +114,5 @@ require (
)
replace github.com/slayercat/gosnmp => github.com/slayercat/gosnmp v1.24.1-0.20220124233957-4b805977d286
replace github.com/kentik/patricia => github.com/vincentbernat/patricia v0.0.0-20220731174804-18c23f58bc16

2
go.sum
View File

@@ -436,6 +436,8 @@ github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0
github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY=
github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
github.com/vincentbernat/patricia v0.0.0-20220731174804-18c23f58bc16 h1:tSg0MvHT1QIa8kV9fSMPywmmz9gx7yIH8ckylDrTJeE=
github.com/vincentbernat/patricia v0.0.0-20220731174804-18c23f58bc16/go.mod h1:6jY40ESetsbfi04/S12iJlsiS6DYL2B2W+WAcqoDHtw=
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=

View File

@@ -3,6 +3,8 @@
package core
import "akvorado/common/helpers"
// Configuration describes the configuration for the core component.
type Configuration struct {
// Number of workers for the core component
@@ -14,7 +16,7 @@ type Configuration struct {
// ClassifierCacheSize defines the size of the classifier (in number of items)
ClassifierCacheSize uint
// DefaultSamplingRate defines the default sampling rate to use when the information is missing
DefaultSamplingRate uint
DefaultSamplingRate helpers.SubnetMap[uint]
// Ignore source/dest AS numbers from received flows
IgnoreASNFromFlow bool
}
@@ -29,3 +31,7 @@ func DefaultConfiguration() Configuration {
IgnoreASNFromFlow: false,
}
}
func init() {
helpers.AddMapstructureUnmarshallerHook(helpers.SubnetMapUnmarshallerHook[uint]())
}

View File

@@ -16,16 +16,16 @@ import (
)
// hydrateFlow adds more data to a flow.
func (c *Component) hydrateFlow(exporter string, flow *flow.Message) (skip bool) {
func (c *Component) hydrateFlow(exporterIP net.IP, exporterStr string, flow *flow.Message) (skip bool) {
errLogger := c.r.Sample(reporter.BurstSampler(time.Minute, 10))
if flow.InIf != 0 {
exporterName, iface, err := c.d.Snmp.Lookup(exporter, uint(flow.InIf))
exporterName, iface, err := c.d.Snmp.Lookup(exporterStr, uint(flow.InIf))
if err != nil {
if err != snmp.ErrCacheMiss {
errLogger.Err(err).Str("exporter", exporter).Msg("unable to query SNMP cache")
errLogger.Err(err).Str("exporter", exporterStr).Msg("unable to query SNMP cache")
}
c.metrics.flowsErrors.WithLabelValues(exporter, err.Error()).Inc()
c.metrics.flowsErrors.WithLabelValues(exporterStr, err.Error()).Inc()
skip = true
} else {
flow.ExporterName = exporterName
@@ -36,15 +36,15 @@ func (c *Component) hydrateFlow(exporter string, flow *flow.Message) (skip bool)
}
if flow.OutIf != 0 {
exporterName, iface, err := c.d.Snmp.Lookup(exporter, uint(flow.OutIf))
exporterName, iface, err := c.d.Snmp.Lookup(exporterStr, uint(flow.OutIf))
if err != nil {
// Only register a cache miss if we don't have one.
// TODO: maybe we could do one SNMP query for both interfaces.
if !skip {
if err != snmp.ErrCacheMiss {
errLogger.Err(err).Str("exporter", exporter).Msg("unable to query SNMP cache")
errLogger.Err(err).Str("exporter", exporterStr).Msg("unable to query SNMP cache")
}
c.metrics.flowsErrors.WithLabelValues(exporter, err.Error()).Inc()
c.metrics.flowsErrors.WithLabelValues(exporterStr, err.Error()).Inc()
skip = true
}
} else {
@@ -57,15 +57,15 @@ func (c *Component) hydrateFlow(exporter string, flow *flow.Message) (skip bool)
// We need at least one of them.
if flow.OutIf == 0 && flow.InIf == 0 {
c.metrics.flowsErrors.WithLabelValues(exporter, "input and output interfaces missing").Inc()
c.metrics.flowsErrors.WithLabelValues(exporterStr, "input and output interfaces missing").Inc()
skip = true
}
if flow.SamplingRate == 0 {
if c.config.DefaultSamplingRate != 0 {
flow.SamplingRate = uint64(c.config.DefaultSamplingRate)
if samplingRate, ok := c.config.DefaultSamplingRate.Lookup(exporterIP); ok && samplingRate > 0 {
flow.SamplingRate = uint64(samplingRate)
} else {
c.metrics.flowsErrors.WithLabelValues(exporter, "sampling rate missing").Inc()
c.metrics.flowsErrors.WithLabelValues(exporterStr, "sampling rate missing").Inc()
skip = true
}
}
@@ -75,11 +75,11 @@ func (c *Component) hydrateFlow(exporter string, flow *flow.Message) (skip bool)
}
// Classification
c.classifyExporter(exporter, flow)
c.classifyInterface(exporter, flow,
c.classifyExporter(exporterStr, flow)
c.classifyInterface(exporterStr, flow,
flow.OutIfName, flow.OutIfDescription, flow.OutIfSpeed,
&flow.OutIfConnectivity, &flow.OutIfProvider, &flow.OutIfBoundary)
c.classifyInterface(exporter, flow,
c.classifyInterface(exporterStr, flow,
flow.InIfName, flow.InIfDescription, flow.InIfSpeed,
&flow.InIfConnectivity, &flow.InIfProvider, &flow.InIfBoundary)

View File

@@ -9,8 +9,9 @@ import (
"time"
"github.com/Shopify/sarama"
"github.com/gin-gonic/gin"
"github.com/golang/protobuf/proto"
"gopkg.in/yaml.v2"
"github.com/mitchellh/mapstructure"
"akvorado/common/daemon"
"akvorado/common/helpers"
@@ -25,13 +26,13 @@ import (
func TestHydrate(t *testing.T) {
cases := []struct {
Name string
Configuration string
Configuration gin.H
InputFlow func() *flow.Message
OutputFlow *flow.Message
}{
{
Name: "no rule",
Configuration: `{}`,
Configuration: gin.H{},
InputFlow: func() *flow.Message {
return &flow.Message{
SamplingRate: 1000,
@@ -55,10 +56,36 @@ func TestHydrate(t *testing.T) {
},
},
{
Name: "no rule, no sampling rate",
Configuration: `
defaultsamplingrate: 500
`,
Name: "no rule, no sampling rate, default is one value",
Configuration: gin.H{"defaultsamplingrate": 500},
InputFlow: func() *flow.Message {
return &flow.Message{
ExporterAddress: net.ParseIP("192.0.2.142"),
InIf: 100,
OutIf: 200,
}
},
OutputFlow: &flow.Message{
SamplingRate: 500,
ExporterAddress: net.ParseIP("192.0.2.142"),
ExporterName: "192_0_2_142",
InIf: 100,
OutIf: 200,
InIfName: "Gi0/0/100",
OutIfName: "Gi0/0/200",
InIfDescription: "Interface 100",
OutIfDescription: "Interface 200",
InIfSpeed: 1000,
OutIfSpeed: 1000,
},
},
{
Name: "no rule, no sampling rate, default is map",
Configuration: gin.H{"defaultsamplingrate": gin.H{
"192.0.2.0/24": 100,
"192.0.2.128/25": 500,
"192.0.2.141/32": 1000,
}},
InputFlow: func() *flow.Message {
return &flow.Message{
ExporterAddress: net.ParseIP("192.0.2.142"),
@@ -82,12 +109,13 @@ defaultsamplingrate: 500
},
{
Name: "exporter rule",
Configuration: `
exporterclassifiers:
- Exporter.Name startsWith "hello" && ClassifyRegion("europe")
- Exporter.Name startsWith "192_" && ClassifyRegion("asia")
- ClassifyRegion("other") && ClassifySite("unknown") && ClassifyTenant("alfred")
`,
Configuration: gin.H{
"exporterclassifiers": []string{
`Exporter.Name startsWith "hello" && ClassifyRegion("europe")`,
`Exporter.Name startsWith "192_" && ClassifyRegion("asia")`,
`ClassifyRegion("other") && ClassifySite("unknown") && ClassifyTenant("alfred")`,
},
},
InputFlow: func() *flow.Message {
return &flow.Message{
SamplingRate: 1000,
@@ -115,15 +143,16 @@ exporterclassifiers:
},
{
Name: "interface rule",
Configuration: `
interfaceclassifiers:
- >-
Interface.Description startsWith "Transit:" &&
ClassifyConnectivity("transit") &&
ClassifyExternal() &&
ClassifyProviderRegex(Interface.Description, "^Transit: ([^ ]+)", "$1")
- ClassifyInternal()
`,
Configuration: gin.H{
"interfaceclassifiers": []string{
`
Interface.Description startsWith "Transit:" &&
ClassifyConnectivity("transit") &&
ClassifyExternal() &&
ClassifyProviderRegex(Interface.Description, "^Transit: ([^ ]+)", "$1")`,
`ClassifyInternal()`,
},
},
InputFlow: func() *flow.Message {
return &flow.Message{
SamplingRate: 1000,
@@ -150,11 +179,12 @@ interfaceclassifiers:
},
{
Name: "configure twice boundary",
Configuration: `
interfaceclassifiers:
- ClassifyInternal()
- ClassifyExternal()
`,
Configuration: gin.H{
"interfaceclassifiers": []string{
`ClassifyInternal()`,
`ClassifyExternal()`,
},
},
InputFlow: func() *flow.Message {
return &flow.Message{
SamplingRate: 1000,
@@ -181,11 +211,12 @@ interfaceclassifiers:
},
{
Name: "configure twice provider",
Configuration: `
interfaceclassifiers:
- ClassifyProvider("telia")
- ClassifyProvider("cogent")
`,
Configuration: gin.H{
"interfaceclassifiers": []string{
`ClassifyProvider("telia")`,
`ClassifyProvider("cogent")`,
},
},
InputFlow: func() *flow.Message {
return &flow.Message{
SamplingRate: 1000,
@@ -212,12 +243,13 @@ interfaceclassifiers:
},
{
Name: "classify depending on description",
Configuration: `
interfaceclassifiers:
- ClassifyProvider("Othello")
- ClassifyConnectivityRegex(Interface.Description, " (1\\d+)$", "P$1") && ClassifyExternal()
- ClassifyInternal() && ClassifyConnectivity("core")
`,
Configuration: gin.H{
"interfaceclassifiers": []string{
`ClassifyProvider("Othello")`,
`ClassifyConnectivityRegex(Interface.Description, " (1\\d+)$", "P$1") && ClassifyExternal()`,
`ClassifyInternal() && ClassifyConnectivity("core")`,
},
},
InputFlow: func() *flow.Message {
return &flow.Message{
SamplingRate: 1000,
@@ -262,8 +294,12 @@ interfaceclassifiers:
// Prepare a configuration
configuration := DefaultConfiguration()
if err := yaml.Unmarshal([]byte(tc.Configuration), &configuration); err != nil {
t.Fatalf("Unmarshal() error:\n%+v", err)
decoder, err := mapstructure.NewDecoder(helpers.GetMapStructureDecoderConfig(&configuration))
if err != nil {
t.Fatalf("NewDecoder() error:\n%+v", err)
}
if err := decoder.Decode(tc.Configuration); err != nil {
t.Fatalf("Decode() error:\n%+v", err)
}
// Instantiate and start core

View File

@@ -119,7 +119,7 @@ func (c *Component) runWorker(workerID int) error {
c.metrics.flowsReceived.WithLabelValues(exporter).Inc()
// Hydratation
if skip := c.hydrateFlow(exporter, flow); skip {
if skip := c.hydrateFlow(flow.ExporterAddress, exporter, flow); skip {
continue
}