inlet/metadata: add a static provider

This commit is contained in:
Vincent Bernat
2023-05-29 19:23:21 +02:00
parent a86a187051
commit 994f7a80ff
13 changed files with 323 additions and 38 deletions

View File

@@ -39,8 +39,8 @@ documentation.
- `clickhouse``asns` to give names to your internal AS numbers
- `clickhouse``networks` to attach attributes to your networks
- `inlet``snmp``communities` to set the communities to use for
SNMP queries
- `inlet``metadata``provider``communities` to set the communities to
use for SNMP queries
- `inlet``core``exporter-classifiers` to define rules to attach
attributes to your exporters
- `inlet``core``interface-classifiers` to define rules to attach

View File

@@ -333,8 +333,8 @@ cache is useful to quickly be able to handle incoming flows. By
default, no persistent cache is configured.
The `provider` key contains the configuration of the provider. The provider type
is defined by the `type` key. Currently, only `snmp` is accepted. It accepts the
following configuration keys:
is defined by the `type` key. The `snmp` provider accepts the following
configuration keys:
- `communities` is a map from subnets to the SNMPv2 community to use
for exporters in the provided subnet. Use `::/0` to set the default
@@ -370,6 +370,39 @@ metadata:
`security-parameters` configuration option. Otherwise, it will use
SNMPv2.
The `static` providers accepts an `exporters` key which maps exporter subnets to
an exporter configuration. An exporter configuration is map:
- `name` is the name of the exporter
- `default` is the default interface when no match is found
- `ifindexes` is a map from interface indexes to interface
An interface is a `name`, a `description` and a `speed`.
For example:
```yaml
metadata
provider:
type: static
exporters:
2001:db8:1::1:
name: exporter1
default:
name: unknown
description: Unknown interface
speed: 100
ifindexes:
10:
name: Gi0/0/10
description: PNI Netflix
speed: 1000
11:
name: Gi0/0/15
description: PNI Google
speed: 1000
```
### HTTP
The builtin HTTP server serves various pages. Its configuration

View File

@@ -13,6 +13,9 @@ identified with a specific icon:
## Unreleased
-*inlet*: metadata retrieval is now pluggable. In addition to SNMP, it is
now possible to set exporter names, interface names and descriptions directly
in the configuration file. See `inlet``metadata`.
-*inlet*: allow extraction of prefix length from BMP. See `inlet``core``net-providers`.
- 🩹 *inlet*: fix Netflow processing when template is received with data
- 🌱 *build*: minimum supported Node version is now 16

View File

@@ -9,6 +9,7 @@ import (
"akvorado/common/helpers"
"akvorado/inlet/metadata/provider"
"akvorado/inlet/metadata/provider/snmp"
"akvorado/inlet/metadata/provider/static"
)
// Configuration describes the configuration for the metadata client
@@ -51,6 +52,7 @@ type ProviderConfiguration struct {
var providers = map[string](func() provider.Configuration){
"snmp": snmp.DefaultConfiguration,
"static": static.DefaultConfiguration,
}
func init() {

View File

@@ -13,9 +13,9 @@ import (
// Interface contains the information about an interface.
type Interface struct {
Name string
Description string
Speed uint
Name string `validate:"required"`
Description string `validate:"required"`
Speed uint `validate:"required"`
}
// Query is the query sent to a provider.
@@ -44,13 +44,12 @@ type Update struct {
// Provider is the interface a provider should implement.
type Provider interface {
// Query asks the provider to query metadata for several requests. The
// updates will be returned by calling the provided callback for each one.
Query(ctx context.Context, query BatchQuery, put func(Update)) error
// Query asks the provider to query metadata for several requests.
Query(ctx context.Context, query BatchQuery) error
}
// Configuration defines an interface to configure a provider.
type Configuration interface {
// New instantiates a new provider from its configuration.
New(r *reporter.Reporter) (Provider, error)
New(r *reporter.Reporter, put func(Update)) (Provider, error)
}

View File

@@ -207,20 +207,20 @@ func TestPoller(t *testing.T) {
config.Ports = helpers.MustNewSubnetMap(map[string]uint16{
"::/0": uint16(port),
})
p, err := config.New(r)
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
put := func(update provider.Update) {
got = append(got, fmt.Sprintf("%s %s %d %s %s %d",
update.ExporterIP.Unmap().String(), update.ExporterName,
update.IfIndex, update.Name, update.Description, update.Speed))
}
p, err := config.New(r, put)
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
p.Query(context.Background(), provider.BatchQuery{ExporterIP: tc.ExporterIP, IfIndexes: []uint{641}}, put)
p.Query(context.Background(), provider.BatchQuery{ExporterIP: tc.ExporterIP, IfIndexes: []uint{642}}, put)
p.Query(context.Background(), provider.BatchQuery{ExporterIP: tc.ExporterIP, IfIndexes: []uint{643, 644}}, put)
p.Query(context.Background(), provider.BatchQuery{ExporterIP: tc.ExporterIP, IfIndexes: []uint{0}}, put)
p.Query(context.Background(), provider.BatchQuery{ExporterIP: tc.ExporterIP, IfIndexes: []uint{641}})
p.Query(context.Background(), provider.BatchQuery{ExporterIP: tc.ExporterIP, IfIndexes: []uint{642}})
p.Query(context.Background(), provider.BatchQuery{ExporterIP: tc.ExporterIP, IfIndexes: []uint{643, 644}})
p.Query(context.Background(), provider.BatchQuery{ExporterIP: tc.ExporterIP, IfIndexes: []uint{0}})
exporterStr := tc.ExporterIP.Unmap().String()
time.Sleep(50 * time.Millisecond)
if diff := helpers.Diff(got, []string{

View File

@@ -2,8 +2,7 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Package snmp handles SNMP polling to get interface names and
// descriptions. It keeps a cache of retrieved entries and refresh
// them.
// descriptions.
package snmp
import (
@@ -25,6 +24,8 @@ type Provider struct {
pendingRequestsLock sync.Mutex
errLogger reporter.Logger
put func(provider.Update)
metrics struct {
pendingRequests reporter.GaugeFunc
successes *reporter.CounterVec
@@ -35,7 +36,7 @@ type Provider struct {
}
// New creates a new SNMP provider from configuration
func (configuration Configuration) New(r *reporter.Reporter) (provider.Provider, error) {
func (configuration Configuration) New(r *reporter.Reporter, put func(provider.Update)) (provider.Provider, error) {
for exporterIP, agentIP := range configuration.Agents {
if exporterIP.Is4() || agentIP.Is4() {
delete(configuration.Agents, exporterIP)
@@ -51,6 +52,8 @@ func (configuration Configuration) New(r *reporter.Reporter) (provider.Provider,
pendingRequests: make(map[string]struct{}),
errLogger: r.Sample(reporter.BurstSampler(10*time.Second, 3)),
put: put,
}
p.metrics.pendingRequests = r.GaugeFunc(
@@ -88,12 +91,12 @@ func (configuration Configuration) New(r *reporter.Reporter) (provider.Provider,
}
// Query queries exporter to get information through SNMP.
func (p *Provider) Query(ctx context.Context, query provider.BatchQuery, put func(provider.Update)) error {
func (p *Provider) Query(ctx context.Context, query provider.BatchQuery) error {
// Avoid querying too much exporters with errors
agentIP, ok := p.config.Agents[query.ExporterIP]
if !ok {
agentIP = query.ExporterIP
}
agentPort := p.config.Ports.LookupOrDefault(agentIP, 161)
return p.Poll(ctx, query.ExporterIP, agentIP, agentPort, query.IfIndexes, put)
return p.Poll(ctx, query.ExporterIP, agentIP, agentPort, query.IfIndexes, p.put)
}

View File

@@ -0,0 +1,37 @@
// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package static
import (
"akvorado/common/helpers"
"akvorado/inlet/metadata/provider"
)
// Configuration describes the configuration for the static provider
type Configuration struct {
// Exporters is a subnet map matching exporters to their configuration
Exporters *helpers.SubnetMap[ExporterConfiguration] `validate:"omitempty,dive"`
}
// ExporterConfiguration is the interface configuration for an exporter.
type ExporterConfiguration struct {
// Name is the name of the exporter
Name string `validate:"required"`
// Default is used if not empty for any unknown ifindexes
Default provider.Interface `validate:"omitempty,dive"`
// IfIndexes is a map from interface indexes to interfaces
IfIndexes map[uint]provider.Interface `validate:"omitempty,dive"`
}
// DefaultConfiguration represents the default configuration for the static provider
func DefaultConfiguration() provider.Configuration {
return Configuration{
Exporters: helpers.MustNewSubnetMap(map[string]ExporterConfiguration{}),
}
}
func init() {
helpers.RegisterMapstructureUnmarshallerHook(helpers.SubnetMapUnmarshallerHook[ExporterConfiguration]())
helpers.RegisterSubnetMapValidation[ExporterConfiguration]()
}

View File

@@ -0,0 +1,54 @@
// SPDX-FileCopyrightText: 2023 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
// Package static is a metadata provider using static configuration to answer to
// requests.
package static
import (
"context"
"akvorado/common/reporter"
"akvorado/inlet/metadata/provider"
)
// Provider represents the static provider.
type Provider struct {
r *reporter.Reporter
config *Configuration
put func(provider.Update)
}
// New creates a new static provider from configuration
func (configuration Configuration) New(r *reporter.Reporter, put func(provider.Update)) (provider.Provider, error) {
return &Provider{
r: r,
config: &configuration,
put: put,
}, nil
}
// Query queries static configuration.
func (p *Provider) Query(_ context.Context, query provider.BatchQuery) error {
exporter, ok := p.config.Exporters.Lookup(query.ExporterIP)
if !ok {
return nil
}
for _, ifIndex := range query.IfIndexes {
iface, ok := exporter.IfIndexes[ifIndex]
if !ok {
iface = exporter.Default
}
p.put(provider.Update{
Query: provider.Query{
ExporterIP: query.ExporterIP,
IfIndex: ifIndex,
},
Answer: provider.Answer{
ExporterName: exporter.Name,
Interface: iface,
},
})
}
return nil
}

View File

@@ -0,0 +1,152 @@
// SPDX-FileCopyrightText: 2023 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package static
import (
"context"
"net/netip"
"testing"
"akvorado/common/helpers"
"akvorado/common/reporter"
"akvorado/inlet/metadata/provider"
)
func TestStaticProvider(t *testing.T) {
config := Configuration{
Exporters: helpers.MustNewSubnetMap(map[string]ExporterConfiguration{
"2001:db8:1::/48": {
Name: "nodefault",
IfIndexes: map[uint]provider.Interface{
10: {
Name: "Gi10",
Description: "10th interface",
Speed: 1000,
},
11: {
Name: "Gi11",
Description: "11th interface",
Speed: 1000,
},
},
},
"2001:db8:2::/48": {
Name: "default",
Default: provider.Interface{
Name: "Default0",
Description: "Default interface",
Speed: 1000,
},
IfIndexes: map[uint]provider.Interface{
10: {
Name: "Gi10",
Description: "10th interface",
Speed: 1000,
},
},
},
}),
}
got := []provider.Update{}
r := reporter.NewMock(t)
p, _ := config.New(r, func(update provider.Update) {
got = append(got, update)
})
p.Query(context.Background(), provider.BatchQuery{
ExporterIP: netip.MustParseAddr("2001:db8:1::10"),
IfIndexes: []uint{9, 10, 11},
})
p.Query(context.Background(), provider.BatchQuery{
ExporterIP: netip.MustParseAddr("2001:db8:2::10"),
IfIndexes: []uint{9, 10, 11},
})
expected := []provider.Update{
{
Query: provider.Query{
ExporterIP: netip.MustParseAddr("2001:db8:1::10"),
IfIndex: 9,
},
Answer: provider.Answer{
ExporterName: "nodefault",
},
},
{
Query: provider.Query{
ExporterIP: netip.MustParseAddr("2001:db8:1::10"),
IfIndex: 10,
},
Answer: provider.Answer{
ExporterName: "nodefault",
Interface: provider.Interface{
Name: "Gi10",
Description: "10th interface",
Speed: 1000,
},
},
},
{
Query: provider.Query{
ExporterIP: netip.MustParseAddr("2001:db8:1::10"),
IfIndex: 11,
},
Answer: provider.Answer{
ExporterName: "nodefault",
Interface: provider.Interface{
Name: "Gi11",
Description: "11th interface",
Speed: 1000,
},
},
},
{
Query: provider.Query{
ExporterIP: netip.MustParseAddr("2001:db8:2::10"),
IfIndex: 9,
},
Answer: provider.Answer{
ExporterName: "default",
Interface: provider.Interface{
Name: "Default0",
Description: "Default interface",
Speed: 1000,
},
},
},
{
Query: provider.Query{
ExporterIP: netip.MustParseAddr("2001:db8:2::10"),
IfIndex: 10,
},
Answer: provider.Answer{
ExporterName: "default",
Interface: provider.Interface{
Name: "Gi10",
Description: "10th interface",
Speed: 1000,
},
},
},
{
Query: provider.Query{
ExporterIP: netip.MustParseAddr("2001:db8:2::10"),
IfIndex: 11,
},
Answer: provider.Answer{
ExporterName: "default",
Interface: provider.Interface{
Name: "Default0",
Description: "Default interface",
Speed: 1000,
},
},
},
}
if diff := helpers.Diff(got, expected); diff != "" {
t.Fatalf("static provider (-got, +want):\n%s", diff)
}
}

View File

@@ -84,7 +84,9 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
c.d.Daemon.Track(&c.t, "inlet/metadata")
// Initialize the provider
selectedProvider, err := c.config.Provider.Config.New(r)
selectedProvider, err := c.config.Provider.Config.New(r, func(update provider.Update) {
c.sc.Put(c.d.Clock.Now(), update.Query, update.Answer)
})
if err != nil {
return nil, err
}
@@ -289,9 +291,7 @@ func (c *Component) providerIncomingRequest(request provider.BatchQuery) {
c.providerBreakersLock.Unlock()
if err := providerBreaker.Run(func() error {
return c.provider.Query(c.t.Context(nil), request, func(update provider.Update) {
c.sc.Put(c.d.Clock.Now(), update.Query, update.Answer)
})
return c.provider.Query(c.t.Context(nil), request)
}); err == breaker.ErrBreakerOpen {
c.metrics.providerBreakerOpenCount.WithLabelValues(request.ExporterIP.Unmap().String()).Inc()
c.providerBreakersLock.Lock()

View File

@@ -156,13 +156,13 @@ func TestStartStopWithMultipleWorkers(t *testing.T) {
type errorProvider struct{}
func (ep errorProvider) Query(_ context.Context, _ provider.BatchQuery, _ func(provider.Update)) error {
func (ep errorProvider) Query(_ context.Context, _ provider.BatchQuery) error {
return errors.New("noooo")
}
type errorProviderConfiguration struct{}
func (epc errorProviderConfiguration) New(_ *reporter.Reporter) (provider.Provider, error) {
func (epc errorProviderConfiguration) New(_ *reporter.Reporter, _ func(provider.Update)) (provider.Provider, error) {
return errorProvider{}, nil
}
@@ -207,7 +207,7 @@ type batchProvider struct {
config *batchProviderConfiguration
}
func (bp *batchProvider) Query(_ context.Context, query provider.BatchQuery, _ func(provider.Update)) error {
func (bp *batchProvider) Query(_ context.Context, query provider.BatchQuery) error {
bp.config.received = append(bp.config.received, query)
return nil
}
@@ -216,7 +216,7 @@ type batchProviderConfiguration struct {
received []provider.BatchQuery
}
func (bpc *batchProviderConfiguration) New(_ *reporter.Reporter) (provider.Provider, error) {
func (bpc *batchProviderConfiguration) New(_ *reporter.Reporter, _ func(provider.Update)) (provider.Provider, error) {
return &batchProvider{config: bpc}, nil
}

View File

@@ -17,10 +17,12 @@ import (
)
// mockProvider represents a mock provider.
type mockProvider struct{}
type mockProvider struct {
put func(provider.Update)
}
// Query query the mock provider for a value.
func (mp mockProvider) Query(_ context.Context, query provider.BatchQuery, put func(provider.Update)) error {
func (mp mockProvider) Query(_ context.Context, query provider.BatchQuery) error {
for _, ifIndex := range query.IfIndexes {
answer := provider.Answer{
ExporterName: strings.ReplaceAll(query.ExporterIP.Unmap().String(), ".", "_"),
@@ -32,7 +34,7 @@ func (mp mockProvider) Query(_ context.Context, query provider.BatchQuery, put f
Speed: 1000,
}
}
put(provider.Update{Query: provider.Query{ExporterIP: query.ExporterIP, IfIndex: ifIndex}, Answer: answer})
mp.put(provider.Update{Query: provider.Query{ExporterIP: query.ExporterIP, IfIndex: ifIndex}, Answer: answer})
}
return nil
}
@@ -41,8 +43,8 @@ func (mp mockProvider) Query(_ context.Context, query provider.BatchQuery, put f
type mockProviderConfiguration struct{}
// New returns a new mock provider.
func (mpc mockProviderConfiguration) New(_ *reporter.Reporter) (provider.Provider, error) {
return mockProvider{}, nil
func (mpc mockProviderConfiguration) New(_ *reporter.Reporter, put func(provider.Update)) (provider.Provider, error) {
return mockProvider{put: put}, nil
}
// NewMock creates a new metadata component building synthetic values. It is already started.