inlet/metadata/static: allow exporters configuration refresh from http data source using common/remotedatasourcefetcher, refactored with orchestrator/clickhouse network-sources.

This commit is contained in:
Itah
2024-01-17 13:52:26 +04:00
committed by Vincent Bernat
parent 34237d20c6
commit 705e56cac4
23 changed files with 1135 additions and 395 deletions

View File

@@ -126,36 +126,9 @@ func SubnetMapUnmarshallerHook[V any]() mapstructure.DecodeHookFunc {
return nil, fmt.Errorf("key %d is not a string (%s)", i, k.Kind())
}
// Parse key
var key string
if strings.Contains(k.String(), "/") {
// Subnet
_, ipNet, err := net.ParseCIDR(k.String())
if err != nil {
return nil, err
}
// Convert key to IPv6
ones, bits := ipNet.Mask.Size()
if bits != 32 && bits != 128 {
return nil, fmt.Errorf("key %d has an invalid netmask", i)
}
if bits == 32 {
key = fmt.Sprintf("::ffff:%s/%d", ipNet.IP.String(), ones+96)
} else if ipNet.IP.To4() != nil {
key = fmt.Sprintf("::ffff:%s/%d", ipNet.IP.String(), ones)
} else {
key = ipNet.String()
}
} else {
// IP
ip := net.ParseIP(k.String())
if ip == nil {
return nil, fmt.Errorf("key %d is not a valid subnet", i)
}
if ipv4 := ip.To4(); ipv4 != nil {
key = fmt.Sprintf("::ffff:%s/128", ipv4.String())
} else {
key = fmt.Sprintf("%s/128", ip.String())
}
key, err := SubnetMapParseKey(k.String())
if err != nil {
return nil, fmt.Errorf("failed to parse key %s: %w", key, err)
}
output[key] = v.Interface()
}
@@ -184,6 +157,42 @@ func SubnetMapUnmarshallerHook[V any]() mapstructure.DecodeHookFunc {
}
}
// SubnetMapParseKey decodes and validates a key used in SubnetMap from a network string.
func SubnetMapParseKey(k string) (string, error) {
var key string
if strings.Contains(k, "/") {
// Subnet
_, ipNet, err := net.ParseCIDR(k)
if err != nil {
return "", err
}
// Convert key to IPv6
ones, bits := ipNet.Mask.Size()
if bits != 32 && bits != 128 {
return "", fmt.Errorf("key %s has invalid netmask", k)
}
if bits == 32 {
key = fmt.Sprintf("::ffff:%s/%d", ipNet.IP.String(), ones+96)
} else if ipNet.IP.To4() != nil {
key = fmt.Sprintf("::ffff:%s/%d", ipNet.IP.String(), ones)
} else {
key = ipNet.String()
}
} else {
// IP
ip := net.ParseIP(k)
if ip == nil {
return "", fmt.Errorf("key %s is not a valid subnet", k)
}
if ipv4 := ip.To4(); ipv4 != nil {
key = fmt.Sprintf("::ffff:%s/128", ipv4.String())
} else {
key = fmt.Sprintf("%s/128", ip.String())
}
}
return key, nil
}
// MarshalYAML turns a subnet into a map that can be marshaled.
func (sm SubnetMap[V]) MarshalYAML() (interface{}, error) {
return sm.ToMap(), nil

View File

@@ -231,6 +231,64 @@ func TestSubnetMapUnmarshalHookWithMapValue(t *testing.T) {
}
}
func TestSubnetMapParseKey(t *testing.T) {
cases := []struct {
Description string
Input string
Expected string
Error bool
}{
{
Description: "valid ipv4 address",
Input: "10.0.0.1",
Expected: "::ffff:10.0.0.1/128",
},
{
Description: "valid ipv4 subnet",
Input: "10.0.0.0/28",
Expected: "::ffff:10.0.0.0/124",
},
{
Description: "invalid ipv4 address",
Input: "10.0.0",
Error: true,
},
{
Description: "valid ipv6 address",
Input: "2001:db8:2::a",
Expected: "2001:db8:2::a/128",
},
{
Description: "valid ipv6 subnet",
Input: "2001:db8:2::/48",
Expected: "2001:db8:2::/48",
},
{
Description: "invalid ipv6 address",
Input: "2001:",
Error: true,
},
{
Description: "invalid string",
Input: "foo-bar",
Error: true,
},
}
for _, tc := range cases {
t.Run(tc.Description, func(t *testing.T) {
res, err := helpers.SubnetMapParseKey(tc.Input)
if err != nil && !tc.Error {
t.Fatalf("SubnetMapParseKey() error:\n%+v", err)
} else if err == nil && tc.Error {
t.Fatal("SubnetMapParseKey() did not return an error")
}
if diff := helpers.Diff(res, tc.Expected); diff != "" {
t.Fatalf("Decode() (-got, +want):\n%s", diff)
}
})
}
}
func TestToMap(t *testing.T) {
input := helpers.MustNewSubnetMap(map[string]string{
"2001:db8::/64": "hello",

View File

@@ -0,0 +1,71 @@
// Package remotedatasourcefetcher offers a component to refresh internal data periodically
// from a set of remote HTTP sources in JSON format.
package remotedatasourcefetcher
import (
"time"
"github.com/itchyny/gojq"
"akvorado/common/helpers"
)
// RemoteDataSource defines a remote network definition.
type RemoteDataSource struct {
// URL is the URL to fetch to get remote network definition.
// It should provide a JSON file.
URL string `validate:"url"`
// Method defines which method to use (GET or POST)
Method string `validate:"oneof=GET POST"`
// Headers defines additional headers to send
Headers map[string]string
// Proxy is set to true if a proxy should be used.
Proxy bool
// Timeout tells the maximum time the remote request should take
Timeout time.Duration `validate:"min=1s"`
// Transform is a jq string to transform the received JSON
// data into a list of network attributes.
Transform TransformQuery
// Interval tells how much time to wait before updating the source.
Interval time.Duration `validate:"min=1m"`
}
// TransformQuery represents a jq query to transform data.
type TransformQuery struct {
*gojq.Query
}
// UnmarshalText parses a jq query.
func (jq *TransformQuery) UnmarshalText(text []byte) error {
q, err := gojq.Parse(string(text))
if err != nil {
return err
}
*jq = TransformQuery{q}
return nil
}
// String turns a jq query into a string.
func (jq TransformQuery) String() string {
if jq.Query != nil {
return jq.Query.String()
}
return ".[]"
}
// MarshalText turns a jq query into a bytearray.
func (jq TransformQuery) MarshalText() ([]byte, error) {
return []byte(jq.String()), nil
}
// DefaultRemoteDataSourceConfiguration is the default configuration for a network source.
func DefaultRemoteDataSourceConfiguration() RemoteDataSource {
return RemoteDataSource{
Method: "GET",
Timeout: time.Minute,
}
}
func init() {
helpers.RegisterMapstructureUnmarshallerHook(helpers.DefaultValuesUnmarshallerHook[RemoteDataSource](DefaultRemoteDataSourceConfiguration()))
}

View File

@@ -0,0 +1,101 @@
package remotedatasourcefetcher
import (
"fmt"
"reflect"
"testing"
"time"
"github.com/gin-gonic/gin"
"akvorado/common/helpers"
)
func TestRemoteDataSourceDecode(t *testing.T) {
helpers.TestConfigurationDecode(t, helpers.ConfigurationDecodeCases{
{
Description: "Empty",
Initial: func() interface{} { return RemoteDataSource{} },
Configuration: func() interface{} {
return gin.H{
"url": "https://example.net",
"interval": "10m",
}
},
Expected: RemoteDataSource{
URL: "https://example.net",
Method: "GET",
Timeout: time.Minute,
Interval: 10 * time.Minute,
},
}, {
Description: "Simple transform",
Initial: func() interface{} { return RemoteDataSource{} },
Configuration: func() interface{} {
return gin.H{
"url": "https://example.net",
"interval": "10m",
"transform": ".[]",
}
},
Expected: RemoteDataSource{
URL: "https://example.net",
Method: "GET",
Timeout: time.Minute,
Interval: 10 * time.Minute,
Transform: MustParseTransformQuery(".[]"),
},
}, {
Description: "Use POST",
Initial: func() interface{} { return RemoteDataSource{} },
Configuration: func() interface{} {
return gin.H{
"url": "https://example.net",
"method": "POST",
"timeout": "2m",
"interval": "10m",
"transform": ".[]",
}
},
Expected: RemoteDataSource{
URL: "https://example.net",
Method: "POST",
Timeout: 2 * time.Minute,
Interval: 10 * time.Minute,
Transform: MustParseTransformQuery(".[]"),
},
}, {
Description: "Complex transform",
Initial: func() interface{} { return RemoteDataSource{} },
Configuration: func() interface{} {
return gin.H{
"url": "https://example.net",
"interval": "10m",
"transform": `
.prefixes[] | {prefix: .ip_prefix, tenant: "amazon", region: .region, role: .service|ascii_downcase}
`,
}
},
Expected: RemoteDataSource{
URL: "https://example.net",
Method: "GET",
Timeout: time.Minute,
Interval: 10 * time.Minute,
Transform: MustParseTransformQuery(`
.prefixes[] | {prefix: .ip_prefix, tenant: "amazon", region: .region, role: .service|ascii_downcase}
`),
},
}, {
Description: "Incorrect transform",
Initial: func() interface{} { return RemoteDataSource{} },
Configuration: func() interface{} {
return gin.H{
"url": "https://example.net",
"interval": "10m",
"transform": "878778&&",
}
},
Error: true,
},
}, helpers.DiffFormatter(reflect.TypeOf(TransformQuery{}), fmt.Sprint), helpers.DiffZero)
}

View File

@@ -0,0 +1,33 @@
package remotedatasourcefetcher
import "akvorado/common/reporter"
type metrics struct {
remoteDataSourceUpdates *reporter.CounterVec
remoteDataSourceErrors *reporter.CounterVec
remoteDataSourceCount *reporter.GaugeVec
}
func (c *Component[T]) initMetrics() {
c.metrics.remoteDataSourceUpdates = c.r.CounterVec(
reporter.CounterOpts{
Name: "updates_total",
Help: "Number of successful updates for a remote data source",
},
[]string{"type", "source"},
)
c.metrics.remoteDataSourceErrors = c.r.CounterVec(
reporter.CounterOpts{
Name: "errors_total",
Help: "Number of failed updates for a remote data source",
},
[]string{"type", "source", "error"},
)
c.metrics.remoteDataSourceCount = c.r.GaugeVec(
reporter.GaugeOpts{
Name: "data_total",
Help: "Number of objects imported from a given source",
},
[]string{"type", "source"},
)
}

View File

@@ -0,0 +1,209 @@
package remotedatasourcefetcher
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/itchyny/gojq"
"github.com/mitchellh/mapstructure"
"gopkg.in/tomb.v2"
"akvorado/common/reporter"
)
// ProviderFunc is the callback function to call when a datasource is refreshed implementZ.
type ProviderFunc func(ctx context.Context, name string, source RemoteDataSource) (int, error)
// Component represents a remote data source fetcher.
type Component[T interface{}] struct {
r *reporter.Reporter
t tomb.Tomb
provider ProviderFunc
dataType string
dataSources map[string]RemoteDataSource
metrics metrics
DataSourcesReady chan bool // closed when all data sources are ready
}
// New creates a new remote data source fetcher component.
func New[T interface{}](r *reporter.Reporter, provider ProviderFunc, dataType string, dataSources map[string]RemoteDataSource) (*Component[T], error) {
c := Component[T]{
r: r,
provider: provider,
dataType: dataType,
dataSources: dataSources,
DataSourcesReady: make(chan bool),
}
c.initMetrics()
return &c, nil
}
// Fetch retrieves data from a configured RemoteDataSource, and returns a list of results
// decoded from JSON to generic type.
// Fetch should be used in UpdateRemoteDataSource implementations to update internal data from results.
func (c *Component[T]) Fetch(ctx context.Context, name string, source RemoteDataSource) ([]T, error) {
var results []T
l := c.r.With().Str("name", name).Str("url", source.URL).Logger()
l.Info().Msg("update data source")
client := &http.Client{Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
}}
req, err := http.NewRequestWithContext(ctx, source.Method, source.URL, nil)
for headerName, headerValue := range source.Headers {
req.Header.Set(headerName, headerValue)
}
req.Header.Set("accept", "application/json")
if err != nil {
l.Err(err).Msg("unable to build new request")
return results, fmt.Errorf("unable to build new request: %w", err)
}
resp, err := client.Do(req)
if err != nil {
l.Err(err).Msg("unable to fetch data source")
return results, fmt.Errorf("unable to fetch data source: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
err := fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, resp.Status)
l.Error().Msg(err.Error())
return results, err
}
reader := bufio.NewReader(resp.Body)
decoder := json.NewDecoder(reader)
var got interface{}
if err := decoder.Decode(&got); err != nil {
l.Err(err).Msg("cannot decode JSON output")
return results, fmt.Errorf("cannot decode JSON output: %w", err)
}
iter := source.Transform.Query.RunWithContext(ctx, got)
for {
v, ok := iter.Next()
if !ok {
break
}
if err, ok := v.(error); ok {
l.Err(err).Msg("cannot execute jq filter")
return results, fmt.Errorf("cannot execute jq filter: %w", err)
}
var result T
config := &mapstructure.DecoderConfig{
Metadata: nil,
Result: &result,
DecodeHook: mapstructure.TextUnmarshallerHookFunc(),
}
decoder, err := mapstructure.NewDecoder(config)
if err != nil {
panic(err)
}
if err := decoder.Decode(v); err != nil {
l.Err(err).Msgf("cannot map returned value for %#v", v)
return results, fmt.Errorf("cannot map returned value: %w", err)
}
results = append(results, result)
}
if len(results) == 0 {
err := errors.New("empty results")
l.Error().Msg(err.Error())
return results, err
}
return results, nil
}
// Start the remote data source fetcher component.
func (c *Component[T]) Start() error {
c.r.Info().Msg("starting remote data source fetcher component")
var notReadySources sync.WaitGroup
notReadySources.Add(len(c.dataSources))
go func() {
notReadySources.Wait()
close(c.DataSourcesReady)
}()
for name, source := range c.dataSources {
if source.Transform.Query == nil {
source.Transform.Query, _ = gojq.Parse(".")
}
name := name
source := source
c.t.Go(func() error {
c.metrics.remoteDataSourceCount.WithLabelValues(c.dataType, name).Set(0)
newRetryTicker := func() *backoff.Ticker {
customBackoff := backoff.NewExponentialBackOff()
customBackoff.MaxElapsedTime = 0
customBackoff.MaxInterval = source.Interval
customBackoff.InitialInterval = source.Interval / 10
if customBackoff.InitialInterval > time.Second {
customBackoff.InitialInterval = time.Second
}
return backoff.NewTicker(customBackoff)
}
newRegularTicker := func() *time.Ticker {
return time.NewTicker(source.Interval)
}
retryTicker := newRetryTicker()
regularTicker := newRegularTicker()
regularTicker.Stop()
success := false
ready := false
defer func() {
if !success {
retryTicker.Stop()
} else {
regularTicker.Stop()
}
if !ready {
notReadySources.Done()
}
}()
for {
ctx, cancel := context.WithTimeout(c.t.Context(nil), source.Timeout)
count, err := c.provider(ctx, name, source)
cancel()
if err == nil {
c.metrics.remoteDataSourceUpdates.WithLabelValues(c.dataType, name).Inc()
c.metrics.remoteDataSourceCount.WithLabelValues(c.dataType, name).Set(float64(count))
} else {
c.metrics.remoteDataSourceErrors.WithLabelValues(c.dataType, name, err.Error()).Inc()
}
if err == nil && !ready {
ready = true
notReadySources.Done()
c.r.Debug().Str("name", name).Msg("source ready")
}
if err == nil && !success {
// On success, change the timer to a regular timer interval
retryTicker.Stop()
retryTicker.C = nil
regularTicker = newRegularTicker()
success = true
c.r.Debug().Str("name", name).Msg("switch to regular polling")
} else if err != nil && success {
// On failure, switch to the retry ticker
regularTicker.Stop()
retryTicker = newRetryTicker()
success = false
c.r.Debug().Str("name", name).Msg("switch to retry polling")
}
select {
case <-c.t.Dying():
return nil
case <-retryTicker.C:
case <-regularTicker.C:
}
}
})
}
return nil
}

View File

@@ -0,0 +1,129 @@
package remotedatasourcefetcher
import (
"akvorado/common/helpers"
"akvorado/common/reporter"
"context"
"fmt"
"net"
"net/http"
"sync"
"testing"
"time"
)
type remoteData struct {
name string
description string
}
type remoteDataHandler struct {
data []remoteData
fetcher *Component[remoteData]
dataLock sync.RWMutex
}
func (h *remoteDataHandler) UpdateData(ctx context.Context, name string, source RemoteDataSource) (int, error) {
results, err := h.fetcher.Fetch(ctx, name, source)
if err != nil {
return 0, err
}
h.dataLock.Lock()
h.data = results
h.dataLock.Unlock()
return len(results), nil
}
func TestRemoteDataSourceFetcher(t *testing.T) {
// Mux to answer requests
ready := make(chan bool)
mux := http.NewServeMux()
mux.Handle("/data.json", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case <-ready:
default:
w.WriteHeader(404)
return
}
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(200)
w.Write([]byte(`
{
"results": [
{"name": "foo", "description": "bar"}
]
}
`))
}))
// Setup an HTTP server to serve the JSON
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Listen() error:\n%+v", err)
}
server := &http.Server{
Addr: listener.Addr().String(),
Handler: mux,
}
address := listener.Addr()
go server.Serve(listener)
defer server.Shutdown(context.Background())
r := reporter.NewMock(t)
config := map[string]RemoteDataSource{
"local": {
URL: fmt.Sprintf("http://%s/data.json", address),
Method: "GET",
Headers: map[string]string{
"X-Foo": "hello",
},
Timeout: 20 * time.Millisecond,
Interval: 100 * time.Millisecond,
Transform: MustParseTransformQuery(`
.results[]
`),
},
}
handler := remoteDataHandler{
data: []remoteData{},
}
var expected []remoteData
handler.fetcher, _ = New[remoteData](r, handler.UpdateData, "test", config)
handler.fetcher.Start()
handler.dataLock.RLock()
if diff := helpers.Diff(handler.data, expected); diff != "" {
t.Fatalf("static provider (-got, +want):\n%s", diff)
}
handler.dataLock.RUnlock()
// before ready
close(ready)
time.Sleep(50 * time.Millisecond)
expected = []remoteData{
{
name: "foo",
description: "bar",
},
}
handler.dataLock.RLock()
if diff := helpers.Diff(handler.data, expected); diff != "" {
t.Fatalf("static provider (-got, +want):\n%s", diff)
}
handler.dataLock.RUnlock()
gotMetrics := r.GetMetrics("akvorado_common_remotedatasourcefetcher_data_")
expectedMetrics := map[string]string{
`total{source="local",type="test"}`: "1",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
// We now should be able to resolve our remote data from remote source
}

View File

@@ -0,0 +1,12 @@
package remotedatasourcefetcher
import "github.com/itchyny/gojq"
// MustParseTransformQuery parses a transform query or panic.
func MustParseTransformQuery(src string) TransformQuery {
q, err := gojq.Parse(src)
if err != nil {
panic(err)
}
return TransformQuery{q}
}

View File

@@ -462,6 +462,33 @@ metadata:
speed: 1000
```
The `static` provider also accepts a key `exporter-sources`, which will fetch a remote source mapping subnets to
attributes. This is similar to `exporters` but the definition is
fetched through HTTP. It accepts a map from source names to sources.
Each source accepts the following attributes:
- `url` is the URL to fetch
- `method` is the method to use (`GET` or `POST`)
- `headers` is a map from header names to values to add to the request
- `proxy` says if we should use a proxy (defined through environment variables like `http_proxy`)
- `timeout` defines the timeout for fetching and parsing
- `interval` is the interval at which the source should be refreshed
- `transform` is a [jq](https://stedolan.github.io/jq/manual/)
expression to transform the received JSON into a set of network
attributes represented as objects.
For example:
```yaml
metadata:
provider:
type: static
exporter-sources:
gostatic:
url: http://gostatic:8043/my-exporters.json
interval: 10m
transform: .exporters[]
```
### HTTP
The builtin HTTP server serves various pages. Its configuration

View File

@@ -4,14 +4,24 @@
package static
import (
"time"
"akvorado/common/helpers"
"akvorado/common/remotedatasourcefetcher"
"akvorado/inlet/metadata/provider"
)
// Configuration describes the configuration for the static provider
type Configuration struct {
// Exporters is a subnet map matching exporters to their configuration
// Exporters is a subnet map matching Exporters to their configuration
Exporters *helpers.SubnetMap[ExporterConfiguration] `validate:"omitempty,dive"`
// ExporterSources defines a set of remote Exporters
// definitions to map IP address to their configuration.
// The results are overridden by the content of Exporters.
ExporterSources map[string]remotedatasourcefetcher.RemoteDataSource `validate:"dive"`
// ExporterSourcesTimeout tells how long to wait for exporter
// sources to be ready. 503 is returned when not.
ExporterSourcesTimeout time.Duration `validate:"min=0"`
}
// ExporterConfiguration is the interface configuration for an exporter.

View File

@@ -5,8 +5,10 @@ package static
import (
"testing"
"time"
"akvorado/common/helpers"
"akvorado/common/remotedatasourcefetcher"
"akvorado/inlet/metadata/provider"
)
@@ -40,4 +42,27 @@ func TestValidation(t *testing.T) {
}); err == nil {
t.Fatal("validate.Struct() did not error")
}
if err := helpers.Validate.Struct(Configuration{
Exporters: helpers.MustNewSubnetMap(map[string]ExporterConfiguration{
"::ffff:203.0.113.0/120": {
Name: "something",
Default: provider.Interface{
Name: "iface1",
Description: "description 1",
Speed: 10000,
},
},
}),
ExporterSources: map[string]remotedatasourcefetcher.RemoteDataSource{
"http-endpoint": {
URL: "https://foo.bar",
Method: "GET",
Timeout: time.Second * 10,
Interval: time.Minute,
},
},
}); err != nil {
t.Fatalf("validate.Struct() error:\n%+v", err)
}
}

View File

@@ -6,31 +6,49 @@
package static
import (
"context"
"akvorado/common/helpers"
"akvorado/common/remotedatasourcefetcher"
"akvorado/common/reporter"
"akvorado/inlet/metadata/provider"
"context"
"fmt"
"sync"
"sync/atomic"
)
// Provider represents the static provider.
type Provider struct {
r *reporter.Reporter
config *Configuration
put func(provider.Update)
r *reporter.Reporter
exporterSourcesFetcher *remotedatasourcefetcher.Component[exporterInfo]
exportersMap map[string][]exporterInfo
exporters atomic.Pointer[helpers.SubnetMap[ExporterConfiguration]]
exportersLock sync.RWMutex
put func(provider.Update)
}
// New creates a new static provider from configuration
func (configuration Configuration) New(r *reporter.Reporter, put func(provider.Update)) (provider.Provider, error) {
return &Provider{
r: r,
config: &configuration,
put: put,
}, nil
p := &Provider{
r: r,
exportersMap: map[string][]exporterInfo{},
put: put,
}
p.exporters.Store(configuration.Exporters)
p.initStaticExporters()
var err error
p.exporterSourcesFetcher, err = remotedatasourcefetcher.New[exporterInfo](r, p.UpdateRemoteDataSource, "metadata", configuration.ExporterSources)
if err != nil {
return nil, fmt.Errorf("unable to initialize remote data source fetcher component: %w", err)
}
if err := p.exporterSourcesFetcher.Start(); err != nil {
return nil, fmt.Errorf("unable to start network sources fetcher component: %w", err)
}
return p, nil
}
// Query queries static configuration.
func (p *Provider) Query(_ context.Context, query provider.BatchQuery) error {
exporter, ok := p.config.Exporters.Lookup(query.ExporterIP)
exporter, ok := p.exporters.Load().Lookup(query.ExporterIP)
if !ok {
return nil
}

View File

@@ -49,7 +49,7 @@ func TestStaticProvider(t *testing.T) {
}),
}
got := []provider.Update{}
var got []provider.Update
r := reporter.NewMock(t)
p, _ := config.New(r, func(update provider.Update) {
got = append(got, update)

View File

@@ -0,0 +1,105 @@
package static
import (
"context"
"akvorado/common/helpers"
"akvorado/common/remotedatasourcefetcher"
"akvorado/inlet/metadata/provider"
)
type exporterInfo struct {
ExporterSubnet string
Name string `validate:"required"`
// Default is used if not empty for any unknown ifindexes
Default provider.Interface `validate:"omitempty"`
// IfIndexes is a map from interface indexes to interfaces
Interfaces []exporterInterface `validate:"omitempty"`
}
type exporterInterface struct {
IfIndex uint
provider.Interface `validate:"omitempty,dive" mapstructure:",squash"`
}
func (i exporterInfo) toExporterConfiguration() ExporterConfiguration {
ifindexMap := map[uint]provider.Interface{}
for _, iface := range i.Interfaces {
ifindexMap[iface.IfIndex] = iface.Interface
}
return ExporterConfiguration{
Name: i.Name,
Default: i.Default,
IfIndexes: ifindexMap,
}
}
// initStaticExporters initializes the reconciliation map for exporter configurations
// with the static prioritized data from exporters' Configuration.
func (p *Provider) initStaticExporters() {
staticExportersMap := p.exporters.Load().ToMap()
staticExporters := make([]exporterInfo, 0, len(staticExportersMap))
for subnet, config := range staticExportersMap {
interfaces := make([]exporterInterface, 0, len(config.IfIndexes))
for ifindex, iface := range config.IfIndexes {
interfaces = append(interfaces, exporterInterface{
IfIndex: ifindex,
Interface: iface,
})
}
staticExporters = append(
staticExporters,
exporterInfo{
ExporterSubnet: subnet,
Name: config.Name,
Default: config.Default,
Interfaces: interfaces,
},
)
}
p.exportersMap["static"] = staticExporters
}
// UpdateRemoteDataSource updates a remote metadata exporters source. It returns the
// number of exporters retrieved.
func (p *Provider) UpdateRemoteDataSource(ctx context.Context, name string, source remotedatasourcefetcher.RemoteDataSource) (int, error) {
results, err := p.exporterSourcesFetcher.Fetch(ctx, name, source)
if err != nil {
return 0, err
}
finalMap := map[string]ExporterConfiguration{}
p.exportersLock.Lock()
p.exportersMap[name] = results
for id, results := range p.exportersMap {
if id == "static" {
continue
}
for _, exporterData := range results {
exporterSubnet, err := helpers.SubnetMapParseKey(exporterData.ExporterSubnet)
if err != nil {
p.r.Err(err).Msg("failed to decode subnet")
continue
}
// Concurrency for same Exporter config across multiple remote data sources is not handled
finalMap[exporterSubnet] = exporterData.toExporterConfiguration()
}
}
for _, exporterData := range p.exportersMap["static"] {
exporterSubnet, err := helpers.SubnetMapParseKey(exporterData.ExporterSubnet)
if err != nil {
p.r.Err(err).Msg("failed to decode subnet")
continue
}
// This overrides duplicates config for an Exporter if it's also defined as static
finalMap[exporterSubnet] = exporterData.toExporterConfiguration()
}
p.exportersLock.Unlock()
exporters, err := helpers.NewSubnetMap[ExporterConfiguration](finalMap)
if err != nil {
return 0, err
}
p.exporters.Swap(exporters)
return len(results), nil
}

View File

@@ -0,0 +1,247 @@
package static
import (
"context"
"fmt"
"net"
"net/http"
"net/netip"
"testing"
"time"
"akvorado/common/helpers"
"akvorado/common/remotedatasourcefetcher"
"akvorado/common/reporter"
"akvorado/inlet/metadata/provider"
)
func TestInitStaticExporters(t *testing.T) {
r := reporter.NewMock(t)
conf := Configuration{
Exporters: helpers.MustNewSubnetMap(map[string]ExporterConfiguration{
"::ffff:203.0.113.0/120": {
Name: "something",
Default: provider.Interface{
Name: "iface1",
Description: "description 1",
Speed: 10000,
},
},
}),
}
p := &Provider{
r: r,
exportersMap: map[string][]exporterInfo{},
put: func(update provider.Update) {},
}
p.exporters.Store(conf.Exporters)
expected := map[string][]exporterInfo{}
if diff := helpers.Diff(p.exportersMap, expected); diff != "" {
t.Fatalf("static provider (-got, +want):\n%s", diff)
}
expected["static"] = []exporterInfo{
{
ExporterSubnet: "203.0.113.0/24",
Name: "something",
Default: provider.Interface{
Name: "iface1",
Description: "description 1",
Speed: 10000,
},
Interfaces: []exporterInterface{},
},
}
p.initStaticExporters()
if diff := helpers.Diff(p.exportersMap, expected); diff != "" {
t.Fatalf("static provider (-got, +want):\n%s", diff)
}
}
func TestRemoteExporterSources(t *testing.T) {
// Mux to answer requests
ready := make(chan bool)
mux := http.NewServeMux()
mux.Handle("/exporters.json", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case <-ready:
default:
w.WriteHeader(404)
return
}
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(200)
w.Write([]byte(`
{
"exporters": [
{
"exportersubnet": "2001:db8:2::/48",
"name": "exporter1",
"default": {
"name": "default",
"description": "default",
"speed": 100
},
"interfaces": [
{
"ifindex": 1,
"name": "iface1",
"description": "foo:desc1",
"speed": 1000
}
]
},
{
"exportersubnet": "10.0.0.1",
"name": "exporter2",
"default": {
"name": "default",
"description": "default",
"speed": 100
},
"interfaces": [
{
"ifindex": 2,
"name": "iface2",
"description": "foo:desc2",
"speed": 1000
}
]
},
{
"exportersubnet": "10.0.0.1/32",
"name": "exporter3",
"default": {
"name": "default",
"description": "default",
"speed": 100
},
"interfaces": [
{
"ifindex": 3,
"name": "iface3",
"description": "foo:desc3",
"speed": 1000
}
]
}
]
}
`))
}))
// Setup an HTTP server to serve the JSON
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Listen() error:\n%+v", err)
}
server := &http.Server{
Addr: listener.Addr().String(),
Handler: mux,
}
address := listener.Addr()
go server.Serve(listener)
defer server.Shutdown(context.Background())
r := reporter.NewMock(t)
config := Configuration{
Exporters: helpers.MustNewSubnetMap(map[string]ExporterConfiguration{
"2001:db8:1::/48": {
Name: "nodefault",
IfIndexes: map[uint]provider.Interface{
10: {
Name: "Gi10",
Description: "10th interface",
Speed: 1000,
},
},
},
}),
ExporterSourcesTimeout: 10 * time.Millisecond,
ExporterSources: map[string]remotedatasourcefetcher.RemoteDataSource{
"local": {
URL: fmt.Sprintf("http://%s/exporters.json", address),
Method: "GET",
Headers: map[string]string{
"X-Foo": "hello",
},
Timeout: 20 * time.Millisecond,
Interval: 100 * time.Millisecond,
Transform: remotedatasourcefetcher.MustParseTransformQuery(`
.exporters[]
`),
},
},
}
var got []provider.Update
var expected []provider.Update
p, _ := config.New(r, func(update provider.Update) {
got = append(got, update)
})
// Query when json is not ready yet, only static configured data available
p.Query(context.Background(), provider.BatchQuery{
ExporterIP: netip.MustParseAddr("2001:db8:1::10"),
IfIndexes: []uint{9},
})
// Unknown Exporter at this moment
p.Query(context.Background(), provider.BatchQuery{
ExporterIP: netip.MustParseAddr("2001:db8:2::10"),
IfIndexes: []uint{1},
})
expected = append(expected, provider.Update{
Query: provider.Query{
ExporterIP: netip.MustParseAddr("2001:db8:1::10"),
IfIndex: 9,
},
Answer: provider.Answer{
ExporterName: "nodefault",
},
})
if diff := helpers.Diff(got, expected); diff != "" {
t.Fatalf("static provider (-got, +want):\n%s", diff)
}
close(ready)
time.Sleep(50 * time.Millisecond)
gotMetrics := r.GetMetrics("akvorado_common_remotedatasourcefetcher_data_")
expectedMetrics := map[string]string{
`total{source="local",type="metadata"}`: "3",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
// We now should be able to resolve our new exporter from remote source
p.Query(context.Background(), provider.BatchQuery{
ExporterIP: netip.MustParseAddr("2001:db8:2::10"),
IfIndexes: []uint{1},
})
expected = append(expected, provider.Update{
Query: provider.Query{
ExporterIP: netip.MustParseAddr("2001:db8:2::10"),
IfIndex: 1,
},
Answer: provider.Answer{
ExporterName: "exporter1",
Interface: provider.Interface{
Name: "iface1",
Description: "foo:desc1",
Speed: 1000,
},
},
})
if diff := helpers.Diff(got, expected); diff != "" {
t.Fatalf("static provider (-got, +want):\n%s", diff)
}
}

View File

@@ -7,11 +7,12 @@ import (
"reflect"
"time"
"akvorado/common/remotedatasourcefetcher"
"akvorado/common/clickhousedb"
"akvorado/common/helpers"
"akvorado/common/kafka"
"github.com/itchyny/gojq"
"github.com/mitchellh/mapstructure"
)
@@ -43,7 +44,7 @@ type Configuration struct {
// definitions to map IP networks to attributes. It is used to
// instantiate the SrcNet* and DstNet* columns. The results
// are overridden by the content of Networks.
NetworkSources map[string]NetworkSource `validate:"dive"`
NetworkSources map[string]remotedatasourcefetcher.RemoteDataSource `validate:"dive"`
// NetworkSourceTimeout tells how long to wait for network
// sources to be ready. 503 is returned when not.
NetworkSourcesTimeout time.Duration `validate:"min=0"`
@@ -127,65 +128,8 @@ func NetworkAttributesUnmarshallerHook() mapstructure.DecodeHookFunc {
}
}
// NetworkSource defines a remote network definition.
type NetworkSource struct {
// URL is the URL to fetch to get remote network definition.
// It should provide a JSON file.
URL string `validate:"url"`
// Method defines which method to use (GET or POST)
Method string `validate:"oneof=GET POST"`
// Headers defines additional headers to send
Headers map[string]string
// Proxy is set to true if a proxy should be used.
Proxy bool
// Timeout tells the maximum time the remote request should take
Timeout time.Duration `validate:"min=1s"`
// Transform is a jq string to transform the received JSON
// data into a list of network attributes.
Transform TransformQuery
// Interval tells how much time to wait before updating the source.
Interval time.Duration `validate:"min=1m"`
}
// DefaultNetworkSourceConfiguration is the default configuration for a network source.
func DefaultNetworkSourceConfiguration() NetworkSource {
return NetworkSource{
Method: "GET",
Timeout: time.Minute,
}
}
// TransformQuery represents a jq query to transform data.
type TransformQuery struct {
*gojq.Query
}
// UnmarshalText parses a jq query.
func (jq *TransformQuery) UnmarshalText(text []byte) error {
q, err := gojq.Parse(string(text))
if err != nil {
return err
}
*jq = TransformQuery{q}
return nil
}
// String turns a jq query into a string.
func (jq TransformQuery) String() string {
if jq.Query != nil {
return jq.Query.String()
}
return ".[]"
}
// MarshalText turns a jq query into a string.
func (jq TransformQuery) MarshalText() ([]byte, error) {
return []byte(jq.String()), nil
}
func init() {
helpers.RegisterMapstructureUnmarshallerHook(helpers.SubnetMapUnmarshallerHook[NetworkAttributes]())
helpers.RegisterMapstructureUnmarshallerHook(NetworkAttributesUnmarshallerHook())
helpers.RegisterMapstructureUnmarshallerHook(helpers.DefaultValuesUnmarshallerHook[NetworkSource](DefaultNetworkSourceConfiguration()))
helpers.RegisterSubnetMapValidation[NetworkAttributes]()
}

View File

@@ -7,7 +7,6 @@ import (
"fmt"
"reflect"
"testing"
"time"
"github.com/gin-gonic/gin"
@@ -87,95 +86,6 @@ func TestNetworkNamesUnmarshalHook(t *testing.T) {
}, helpers.DiffFormatter(reflect.TypeOf(helpers.SubnetMap[NetworkAttributes]{}), fmt.Sprint))
}
func TestNetworkSourceDecode(t *testing.T) {
helpers.TestConfigurationDecode(t, helpers.ConfigurationDecodeCases{
{
Description: "Empty",
Initial: func() interface{} { return NetworkSource{} },
Configuration: func() interface{} {
return gin.H{
"url": "https://example.net",
"interval": "10m",
}
},
Expected: NetworkSource{
URL: "https://example.net",
Method: "GET",
Timeout: time.Minute,
Interval: 10 * time.Minute,
},
}, {
Description: "Simple transform",
Initial: func() interface{} { return NetworkSource{} },
Configuration: func() interface{} {
return gin.H{
"url": "https://example.net",
"interval": "10m",
"transform": ".[]",
}
},
Expected: NetworkSource{
URL: "https://example.net",
Method: "GET",
Timeout: time.Minute,
Interval: 10 * time.Minute,
Transform: MustParseTransformQuery(".[]"),
},
}, {
Description: "Use POST",
Initial: func() interface{} { return NetworkSource{} },
Configuration: func() interface{} {
return gin.H{
"url": "https://example.net",
"method": "POST",
"timeout": "2m",
"interval": "10m",
"transform": ".[]",
}
},
Expected: NetworkSource{
URL: "https://example.net",
Method: "POST",
Timeout: 2 * time.Minute,
Interval: 10 * time.Minute,
Transform: MustParseTransformQuery(".[]"),
},
}, {
Description: "Complex transform",
Initial: func() interface{} { return NetworkSource{} },
Configuration: func() interface{} {
return gin.H{
"url": "https://example.net",
"interval": "10m",
"transform": `
.prefixes[] | {prefix: .ip_prefix, tenant: "amazon", region: .region, role: .service|ascii_downcase}
`,
}
},
Expected: NetworkSource{
URL: "https://example.net",
Method: "GET",
Timeout: time.Minute,
Interval: 10 * time.Minute,
Transform: MustParseTransformQuery(`
.prefixes[] | {prefix: .ip_prefix, tenant: "amazon", region: .region, role: .service|ascii_downcase}
`),
},
}, {
Description: "Incorrect transform",
Initial: func() interface{} { return NetworkSource{} },
Configuration: func() interface{} {
return gin.H{
"url": "https://example.net",
"interval": "10m",
"transform": "878778&&",
}
},
Error: true,
},
}, helpers.DiffFormatter(reflect.TypeOf(TransformQuery{}), fmt.Sprint), helpers.DiffZero)
}
func TestDefaultConfiguration(t *testing.T) {
config := DefaultConfiguration()
config.Kafka.Topic = "flow"

View File

@@ -112,7 +112,7 @@ func (c *Component) registerHTTPHandlers() error {
v := dict
c.d.HTTP.AddHandler(fmt.Sprintf("/api/v0/orchestrator/clickhouse/custom_dict_%s.csv", k), http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case <-c.networkSourcesReady:
case <-c.networkSourcesFetcher.DataSourcesReady:
case <-time.After(c.config.NetworkSourcesTimeout):
w.WriteHeader(http.StatusServiceUnavailable)
return
@@ -132,7 +132,7 @@ func (c *Component) registerHTTPHandlers() error {
c.d.HTTP.AddHandler("/api/v0/orchestrator/clickhouse/networks.csv",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case <-c.networkSourcesReady:
case <-c.networkSourcesFetcher.DataSourcesReady:
case <-time.After(c.config.NetworkSourcesTimeout):
w.WriteHeader(http.StatusServiceUnavailable)
return

View File

@@ -34,26 +34,4 @@ func (c *Component) initMetrics() {
Help: "Number of migration steps not applied",
},
)
c.metrics.networkSourceUpdates = c.r.CounterVec(
reporter.CounterOpts{
Name: "network_source_updates_total",
Help: "Number of successful updates for a network source",
},
[]string{"source"},
)
c.metrics.networkSourceErrors = c.r.CounterVec(
reporter.CounterOpts{
Name: "network_source_errors_total",
Help: "Number of failed updates for a network source",
},
[]string{"source", "error"},
)
c.metrics.networkSourceCount = c.r.GaugeVec(
reporter.GaugeOpts{
Name: "network_source_networks_total",
Help: "Number of networks imported from a given source",
},
[]string{"source"},
)
}

View File

@@ -5,13 +5,14 @@
package clickhouse
import (
"context"
"fmt"
"sort"
"sync"
"time"
"akvorado/common/remotedatasourcefetcher"
"github.com/cenkalti/backoff/v4"
"github.com/itchyny/gojq"
"gopkg.in/tomb.v2"
"akvorado/common/clickhousedb"
@@ -29,11 +30,11 @@ type Component struct {
config Configuration
metrics metrics
migrationsDone chan bool // closed when migrations are done
migrationsOnce chan bool // closed after first attempt to migrate
networkSourcesReady chan bool // closed when all network sources are ready
networkSourcesLock sync.RWMutex
networkSources map[string][]externalNetworkAttributes
migrationsDone chan bool // closed when migrations are done
migrationsOnce chan bool // closed after first attempt to migrate
networkSourcesFetcher *remotedatasourcefetcher.Component[externalNetworkAttributes]
networkSources map[string][]externalNetworkAttributes
networkSourcesLock sync.RWMutex
}
// Dependencies define the dependencies of the ClickHouse configurator.
@@ -46,14 +47,19 @@ type Dependencies struct {
// New creates a new ClickHouse component.
func New(r *reporter.Reporter, configuration Configuration, dependencies Dependencies) (*Component, error) {
c := Component{
r: r,
d: &dependencies,
config: configuration,
migrationsDone: make(chan bool),
migrationsOnce: make(chan bool),
networkSourcesReady: make(chan bool),
networkSources: make(map[string][]externalNetworkAttributes),
r: r,
d: &dependencies,
config: configuration,
migrationsDone: make(chan bool),
migrationsOnce: make(chan bool),
networkSources: make(map[string][]externalNetworkAttributes),
}
var err error
c.networkSourcesFetcher, err = remotedatasourcefetcher.New[externalNetworkAttributes](r, c.UpdateRemoteDataSource, "network_source", configuration.NetworkSources)
if err != nil {
return nil, fmt.Errorf("unable to initialize remote data source fetcher component: %w", err)
}
c.initMetrics()
if err := c.registerHTTPHandlers(); err != nil {
@@ -81,7 +87,6 @@ func (c *Component) Start() error {
c.t.Go(func() error {
customBackoff := backoff.NewExponentialBackOff()
customBackoff.MaxElapsedTime = 0
customBackoff.MaxInterval = 5 * time.Minute
customBackoff.InitialInterval = time.Second
for {
if !c.config.SkipMigrations {
@@ -107,86 +112,10 @@ func (c *Component) Start() error {
})
// Network sources update
var notReadySources sync.WaitGroup
notReadySources.Add(len(c.config.NetworkSources))
go func() {
notReadySources.Wait()
close(c.networkSourcesReady)
}()
for name, source := range c.config.NetworkSources {
if source.Transform.Query == nil {
source.Transform.Query, _ = gojq.Parse(".")
}
name := name
source := source
c.t.Go(func() error {
c.metrics.networkSourceCount.WithLabelValues(name).Set(0)
newRetryTicker := func() *backoff.Ticker {
customBackoff := backoff.NewExponentialBackOff()
customBackoff.MaxElapsedTime = 0
customBackoff.MaxInterval = source.Interval
customBackoff.InitialInterval = source.Interval / 10
if customBackoff.InitialInterval > time.Second {
customBackoff.InitialInterval = time.Second
}
return backoff.NewTicker(customBackoff)
}
newRegularTicker := func() *time.Ticker {
return time.NewTicker(source.Interval)
}
retryTicker := newRetryTicker()
regularTicker := newRegularTicker()
regularTicker.Stop()
success := false
ready := false
defer func() {
if !success {
retryTicker.Stop()
} else {
regularTicker.Stop()
}
if !ready {
notReadySources.Done()
}
}()
for {
ctx, cancel := context.WithTimeout(c.t.Context(nil), source.Timeout)
count, err := c.updateNetworkSource(ctx, name, source)
cancel()
if err == nil {
c.metrics.networkSourceUpdates.WithLabelValues(name).Inc()
c.metrics.networkSourceCount.WithLabelValues(name).Set(float64(count))
} else {
c.metrics.networkSourceErrors.WithLabelValues(name, err.Error()).Inc()
}
if err == nil && !ready {
ready = true
notReadySources.Done()
c.r.Debug().Str("name", name).Msg("source ready")
}
if err == nil && !success {
// On success, change the timer to a regular timer interval
retryTicker.Stop()
retryTicker.C = nil
regularTicker = newRegularTicker()
success = true
c.r.Debug().Str("name", name).Msg("switch to regular polling")
} else if err != nil && success {
// On failure, switch to the retry ticker
regularTicker.Stop()
retryTicker = newRetryTicker()
success = false
c.r.Debug().Str("name", name).Msg("switch to retry polling")
}
select {
case <-c.t.Dying():
return nil
case <-retryTicker.C:
case <-regularTicker.C:
}
}
})
if err := c.networkSourcesFetcher.Start(); err != nil {
return fmt.Errorf("unable to start network sources fetcher component: %w", err)
}
return nil
}

View File

@@ -4,15 +4,10 @@
package clickhouse
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/netip"
"github.com/mitchellh/mapstructure"
"akvorado/common/remotedatasourcefetcher"
)
type externalNetworkAttributes struct {
@@ -20,72 +15,11 @@ type externalNetworkAttributes struct {
NetworkAttributes `mapstructure:",squash"`
}
// updateNetworkSource updates a remote network source. It returns the
// UpdateRemoteDataSource updates a remote network source. It returns the
// number of networks retrieved.
func (c *Component) updateNetworkSource(ctx context.Context, name string, source NetworkSource) (int, error) {
l := c.r.With().Str("name", name).Str("url", source.URL).Logger()
l.Info().Msg("update network source")
client := &http.Client{Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
}}
req, err := http.NewRequestWithContext(ctx, source.Method, source.URL, nil)
for headerName, headerValue := range source.Headers {
req.Header.Set(headerName, headerValue)
}
req.Header.Set("accept", "application/json")
func (c *Component) UpdateRemoteDataSource(ctx context.Context, name string, source remotedatasourcefetcher.RemoteDataSource) (int, error) {
results, err := c.networkSourcesFetcher.Fetch(ctx, name, source)
if err != nil {
l.Err(err).Msg("unable to build new request")
return 0, fmt.Errorf("unable to build new request: %w", err)
}
resp, err := client.Do(req)
if err != nil {
l.Err(err).Msg("unable to fetch network source")
return 0, fmt.Errorf("unable to fetch network source: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
err := fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, resp.Status)
l.Error().Msg(err.Error())
return 0, err
}
reader := bufio.NewReader(resp.Body)
decoder := json.NewDecoder(reader)
var got interface{}
if err := decoder.Decode(&got); err != nil {
l.Err(err).Msg("cannot decode JSON output")
return 0, fmt.Errorf("cannot decode JSON output: %w", err)
}
results := []externalNetworkAttributes{}
iter := source.Transform.Query.RunWithContext(ctx, got)
for {
v, ok := iter.Next()
if !ok {
break
}
if err, ok := v.(error); ok {
l.Err(err).Msg("cannot execute jq filter")
return 0, fmt.Errorf("cannot execute jq filter: %w", err)
}
var result externalNetworkAttributes
config := &mapstructure.DecoderConfig{
Metadata: nil,
Result: &result,
DecodeHook: mapstructure.TextUnmarshallerHookFunc(),
}
decoder, err := mapstructure.NewDecoder(config)
if err != nil {
panic(err)
}
if err := decoder.Decode(v); err != nil {
l.Err(err).Msgf("cannot map returned value for %#v", v)
return 0, fmt.Errorf("cannot map returned value: %w", err)
}
results = append(results, result)
}
if len(results) == 0 {
err := errors.New("empty results")
l.Error().Msg(err.Error())
return 0, err
}
c.networkSourcesLock.Lock()

View File

@@ -11,6 +11,8 @@ import (
"testing"
"time"
"akvorado/common/remotedatasourcefetcher"
"akvorado/common/daemon"
"akvorado/common/helpers"
"akvorado/common/httpserver"
@@ -78,7 +80,7 @@ func TestNetworkSources(t *testing.T) {
config := DefaultConfiguration()
config.SkipMigrations = true
config.NetworkSourcesTimeout = 10 * time.Millisecond
config.NetworkSources = map[string]NetworkSource{
config.NetworkSources = map[string]remotedatasourcefetcher.RemoteDataSource{
"amazon": {
URL: fmt.Sprintf("http://%s/amazon.json", address),
Method: "GET",
@@ -87,7 +89,7 @@ func TestNetworkSources(t *testing.T) {
},
Timeout: 20 * time.Millisecond,
Interval: 100 * time.Millisecond,
Transform: MustParseTransformQuery(`
Transform: remotedatasourcefetcher.MustParseTransformQuery(`
(.prefixes + .ipv6_prefixes)[] |
{ prefix: (.ip_prefix // .ipv6_prefix), tenant: "amazon", region: .region, role: .service|ascii_downcase }
`),
@@ -127,9 +129,9 @@ func TestNetworkSources(t *testing.T) {
},
})
gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_network_source_networks_")
gotMetrics := r.GetMetrics("akvorado_common_remotedatasourcefetcher_data_")
expectedMetrics := map[string]string{
`total{source="amazon"}`: "3",
`total{source="amazon",type="network_source"}`: "3",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)

View File

@@ -10,19 +10,8 @@ import (
"reflect"
"akvorado/common/helpers"
"github.com/itchyny/gojq"
)
// MustParseTransformQuery parses a transform query or panic.
func MustParseTransformQuery(src string) TransformQuery {
q, err := gojq.Parse(src)
if err != nil {
panic(err)
}
return TransformQuery{q}
}
func init() {
helpers.AddPrettyFormatter(reflect.TypeOf(helpers.SubnetMap[NetworkAttributes]{}), fmt.Sprint)
}