common/remotedatasource: rename from remotedatasourcefetcher

Also rename RemoteDataSource to Source.
This commit is contained in:
Vincent Bernat
2025-07-28 15:26:30 +02:00
parent 6a62d641d6
commit cce61cb0d6
15 changed files with 62 additions and 62 deletions

View File

@@ -1,9 +1,9 @@
// SPDX-FileCopyrightText: 2024 Free Mobile // SPDX-FileCopyrightText: 2024 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
// Package remotedatasourcefetcher offers a component to refresh internal data periodically // Package remotedatasource offers a component to refresh internal data
// from a set of remote HTTP sources in JSON format. // periodically from a set of remote HTTP sources in JSON format.
package remotedatasourcefetcher package remotedatasource
import ( import (
"time" "time"
@@ -13,8 +13,8 @@ import (
"akvorado/common/helpers" "akvorado/common/helpers"
) )
// RemoteDataSource defines a remote network definition. // Source defines a remote data source.
type RemoteDataSource struct { type Source struct {
// URL is the URL to fetch to get remote network definition. // URL is the URL to fetch to get remote network definition.
// It should provide a JSON file. // It should provide a JSON file.
URL string `validate:"url"` URL string `validate:"url"`
@@ -61,9 +61,9 @@ func (jq TransformQuery) MarshalText() ([]byte, error) {
return []byte(jq.String()), nil return []byte(jq.String()), nil
} }
// DefaultRemoteDataSourceConfiguration is the default configuration for a network source. // DefaultSourceConfiguration is the default configuration for a network source.
func DefaultRemoteDataSourceConfiguration() RemoteDataSource { func DefaultSourceConfiguration() Source {
return RemoteDataSource{ return Source{
Method: "GET", Method: "GET",
Timeout: time.Minute, Timeout: time.Minute,
} }
@@ -71,5 +71,5 @@ func DefaultRemoteDataSourceConfiguration() RemoteDataSource {
func init() { func init() {
helpers.RegisterMapstructureUnmarshallerHook( helpers.RegisterMapstructureUnmarshallerHook(
helpers.DefaultValuesUnmarshallerHook(DefaultRemoteDataSourceConfiguration())) helpers.DefaultValuesUnmarshallerHook(DefaultSourceConfiguration()))
} }

View File

@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: 2024 Free Mobile // SPDX-FileCopyrightText: 2024 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
package remotedatasourcefetcher package remotedatasource
import ( import (
"fmt" "fmt"
@@ -14,18 +14,18 @@ import (
"akvorado/common/helpers" "akvorado/common/helpers"
) )
func TestRemoteDataSourceDecode(t *testing.T) { func TestSourceDecode(t *testing.T) {
helpers.TestConfigurationDecode(t, helpers.ConfigurationDecodeCases{ helpers.TestConfigurationDecode(t, helpers.ConfigurationDecodeCases{
{ {
Description: "Empty", Description: "Empty",
Initial: func() interface{} { return RemoteDataSource{} }, Initial: func() interface{} { return Source{} },
Configuration: func() interface{} { Configuration: func() interface{} {
return gin.H{ return gin.H{
"url": "https://example.net", "url": "https://example.net",
"interval": "10m", "interval": "10m",
} }
}, },
Expected: RemoteDataSource{ Expected: Source{
URL: "https://example.net", URL: "https://example.net",
Method: "GET", Method: "GET",
Timeout: time.Minute, Timeout: time.Minute,
@@ -33,7 +33,7 @@ func TestRemoteDataSourceDecode(t *testing.T) {
}, },
}, { }, {
Description: "Simple transform", Description: "Simple transform",
Initial: func() interface{} { return RemoteDataSource{} }, Initial: func() interface{} { return Source{} },
Configuration: func() interface{} { Configuration: func() interface{} {
return gin.H{ return gin.H{
"url": "https://example.net", "url": "https://example.net",
@@ -41,7 +41,7 @@ func TestRemoteDataSourceDecode(t *testing.T) {
"transform": ".[]", "transform": ".[]",
} }
}, },
Expected: RemoteDataSource{ Expected: Source{
URL: "https://example.net", URL: "https://example.net",
Method: "GET", Method: "GET",
Timeout: time.Minute, Timeout: time.Minute,
@@ -50,7 +50,7 @@ func TestRemoteDataSourceDecode(t *testing.T) {
}, },
}, { }, {
Description: "Use POST", Description: "Use POST",
Initial: func() interface{} { return RemoteDataSource{} }, Initial: func() interface{} { return Source{} },
Configuration: func() interface{} { Configuration: func() interface{} {
return gin.H{ return gin.H{
"url": "https://example.net", "url": "https://example.net",
@@ -60,7 +60,7 @@ func TestRemoteDataSourceDecode(t *testing.T) {
"transform": ".[]", "transform": ".[]",
} }
}, },
Expected: RemoteDataSource{ Expected: Source{
URL: "https://example.net", URL: "https://example.net",
Method: "POST", Method: "POST",
Timeout: 2 * time.Minute, Timeout: 2 * time.Minute,
@@ -69,7 +69,7 @@ func TestRemoteDataSourceDecode(t *testing.T) {
}, },
}, { }, {
Description: "Complex transform", Description: "Complex transform",
Initial: func() interface{} { return RemoteDataSource{} }, Initial: func() interface{} { return Source{} },
Configuration: func() interface{} { Configuration: func() interface{} {
return gin.H{ return gin.H{
"url": "https://example.net", "url": "https://example.net",
@@ -79,7 +79,7 @@ func TestRemoteDataSourceDecode(t *testing.T) {
`, `,
} }
}, },
Expected: RemoteDataSource{ Expected: Source{
URL: "https://example.net", URL: "https://example.net",
Method: "GET", Method: "GET",
Timeout: time.Minute, Timeout: time.Minute,
@@ -90,7 +90,7 @@ func TestRemoteDataSourceDecode(t *testing.T) {
}, },
}, { }, {
Description: "Incorrect transform", Description: "Incorrect transform",
Initial: func() interface{} { return RemoteDataSource{} }, Initial: func() interface{} { return Source{} },
Configuration: func() interface{} { Configuration: func() interface{} {
return gin.H{ return gin.H{
"url": "https://example.net", "url": "https://example.net",

View File

@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: 2024 Free Mobile // SPDX-FileCopyrightText: 2024 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
package remotedatasourcefetcher package remotedatasource
import "akvorado/common/reporter" import "akvorado/common/reporter"

View File

@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: 2024 Free Mobile // SPDX-FileCopyrightText: 2024 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
package remotedatasourcefetcher package remotedatasource
import ( import (
"bufio" "bufio"
@@ -23,7 +23,7 @@ import (
// ProviderFunc is the callback function to call when a datasource is refreshed. // ProviderFunc is the callback function to call when a datasource is refreshed.
// The error returned is used for metrics. One should avoid having too many // The error returned is used for metrics. One should avoid having too many
// different errors. // different errors.
type ProviderFunc func(ctx context.Context, name string, source RemoteDataSource) (int, error) type ProviderFunc func(ctx context.Context, name string, source Source) (int, error)
// Component represents a remote data source fetcher. // Component represents a remote data source fetcher.
type Component[T interface{}] struct { type Component[T interface{}] struct {
@@ -31,14 +31,14 @@ type Component[T interface{}] struct {
t tomb.Tomb t tomb.Tomb
provider ProviderFunc provider ProviderFunc
dataType string dataType string
dataSources map[string]RemoteDataSource dataSources map[string]Source
metrics metrics metrics metrics
DataSourcesReady chan bool // closed when all data sources are ready DataSourcesReady chan bool // closed when all data sources are ready
} }
// New creates a new remote data source fetcher component. // New creates a new remote data source fetcher component.
func New[T interface{}](r *reporter.Reporter, provider ProviderFunc, dataType string, dataSources map[string]RemoteDataSource) (*Component[T], error) { func New[T interface{}](r *reporter.Reporter, provider ProviderFunc, dataType string, dataSources map[string]Source) (*Component[T], error) {
c := Component[T]{ c := Component[T]{
r: r, r: r,
provider: provider, provider: provider,
@@ -67,11 +67,11 @@ var (
ErrEmpty = errors.New("empty result") ErrEmpty = errors.New("empty result")
) )
// Fetch retrieves data from a configured RemoteDataSource, and returns a list // Fetch retrieves data from a configured Source, and returns a list
// of results decoded from JSON to generic type. Fetch should be used in // of results decoded from JSON to generic type. Fetch should be used in
// UpdateRemoteDataSource implementations to update internal data from results. // UpdateSource implementations to update internal data from results.
// It outputs errors without details because they are used for metrics. // It outputs errors without details because they are used for metrics.
func (c *Component[T]) Fetch(ctx context.Context, name string, source RemoteDataSource) ([]T, error) { func (c *Component[T]) Fetch(ctx context.Context, name string, source Source) ([]T, error) {
var results []T var results []T
l := c.r.With().Str("name", name).Str("url", source.URL).Logger() l := c.r.With().Str("name", name).Str("url", source.URL).Logger()
l.Info().Msg("update data source") l.Info().Msg("update data source")

View File

@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: 2024 Free Mobile // SPDX-FileCopyrightText: 2024 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
package remotedatasourcefetcher package remotedatasource
import ( import (
"context" "context"
@@ -27,7 +27,7 @@ type remoteDataHandler struct {
dataLock sync.RWMutex dataLock sync.RWMutex
} }
func (h *remoteDataHandler) UpdateData(ctx context.Context, name string, source RemoteDataSource) (int, error) { func (h *remoteDataHandler) UpdateData(ctx context.Context, name string, source Source) (int, error) {
results, err := h.fetcher.Fetch(ctx, name, source) results, err := h.fetcher.Fetch(ctx, name, source)
if err != nil { if err != nil {
return 0, err return 0, err
@@ -38,7 +38,7 @@ func (h *remoteDataHandler) UpdateData(ctx context.Context, name string, source
return len(results), nil return len(results), nil
} }
func TestRemoteDataSourceFetcher(t *testing.T) { func TestSource(t *testing.T) {
// Mux to answer requests // Mux to answer requests
ready := make(chan bool) ready := make(chan bool)
mux := http.NewServeMux() mux := http.NewServeMux()
@@ -74,7 +74,7 @@ func TestRemoteDataSourceFetcher(t *testing.T) {
defer server.Shutdown(context.Background()) defer server.Shutdown(context.Background())
r := reporter.NewMock(t) r := reporter.NewMock(t)
config := map[string]RemoteDataSource{ config := map[string]Source{
"local": { "local": {
URL: fmt.Sprintf("http://%s/data.json", address), URL: fmt.Sprintf("http://%s/data.json", address),
Method: "GET", Method: "GET",
@@ -120,7 +120,7 @@ func TestRemoteDataSourceFetcher(t *testing.T) {
} }
handler.dataLock.RUnlock() handler.dataLock.RUnlock()
gotMetrics := r.GetMetrics("akvorado_common_remotedatasourcefetcher_data_") gotMetrics := r.GetMetrics("akvorado_common_remotedatasource_data_")
expectedMetrics := map[string]string{ expectedMetrics := map[string]string{
`total{source="local",type="test"}`: "1", `total{source="local",type="test"}`: "1",
} }

View File

@@ -3,7 +3,7 @@
//go:build !release //go:build !release
package remotedatasourcefetcher package remotedatasource
import "github.com/itchyny/gojq" import "github.com/itchyny/gojq"

View File

@@ -7,7 +7,7 @@ import (
"reflect" "reflect"
"time" "time"
"akvorado/common/remotedatasourcefetcher" "akvorado/common/remotedatasource"
"akvorado/common/helpers" "akvorado/common/helpers"
@@ -34,7 +34,7 @@ type Configuration struct {
// definitions to map IP networks to attributes. It is used to // definitions to map IP networks to attributes. It is used to
// instantiate the SrcNet* and DstNet* columns. The results // instantiate the SrcNet* and DstNet* columns. The results
// are overridden by the content of Networks. // are overridden by the content of Networks.
NetworkSources map[string]remotedatasourcefetcher.RemoteDataSource `validate:"dive"` NetworkSources map[string]remotedatasource.Source `validate:"dive"`
// NetworkSourceTimeout tells how long to wait for network // NetworkSourceTimeout tells how long to wait for network
// sources to be ready. 503 is returned when not. // sources to be ready. 503 is returned when not.
NetworkSourcesTimeout time.Duration `validate:"min=0"` NetworkSourcesTimeout time.Duration `validate:"min=0"`

View File

@@ -11,7 +11,7 @@ import (
"sync" "sync"
"time" "time"
"akvorado/common/remotedatasourcefetcher" "akvorado/common/remotedatasource"
"github.com/cenkalti/backoff/v4" "github.com/cenkalti/backoff/v4"
"gopkg.in/tomb.v2" "gopkg.in/tomb.v2"
@@ -36,7 +36,7 @@ type Component struct {
migrationsDone chan bool // closed when migrations are done migrationsDone chan bool // closed when migrations are done
migrationsOnce chan bool // closed after first attempt to migrate migrationsOnce chan bool // closed after first attempt to migrate
networkSourcesFetcher *remotedatasourcefetcher.Component[externalNetworkAttributes] networkSourcesFetcher *remotedatasource.Component[externalNetworkAttributes]
networkSources map[string][]externalNetworkAttributes networkSources map[string][]externalNetworkAttributes
networkSourcesLock sync.RWMutex networkSourcesLock sync.RWMutex
@@ -68,8 +68,8 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
networksCSVUpdateChan: make(chan bool, 1), networksCSVUpdateChan: make(chan bool, 1),
} }
var err error var err error
c.networkSourcesFetcher, err = remotedatasourcefetcher.New[externalNetworkAttributes]( c.networkSourcesFetcher, err = remotedatasource.New[externalNetworkAttributes](
r, c.UpdateRemoteDataSource, "network_source", configuration.NetworkSources) r, c.UpdateSource, "network_source", configuration.NetworkSources)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to initialize remote data source fetcher component: %w", err) return nil, fmt.Errorf("unable to initialize remote data source fetcher component: %w", err)
} }

View File

@@ -7,7 +7,7 @@ import (
"context" "context"
"net/netip" "net/netip"
"akvorado/common/remotedatasourcefetcher" "akvorado/common/remotedatasource"
) )
type externalNetworkAttributes struct { type externalNetworkAttributes struct {
@@ -15,9 +15,9 @@ type externalNetworkAttributes struct {
NetworkAttributes `mapstructure:",squash"` NetworkAttributes `mapstructure:",squash"`
} }
// UpdateRemoteDataSource updates a remote network source. It returns the // UpdateSource updates a remote network source. It returns the
// number of networks retrieved. // number of networks retrieved.
func (c *Component) UpdateRemoteDataSource(ctx context.Context, name string, source remotedatasourcefetcher.RemoteDataSource) (int, error) { func (c *Component) UpdateSource(ctx context.Context, name string, source remotedatasource.Source) (int, error) {
results, err := c.networkSourcesFetcher.Fetch(ctx, name, source) results, err := c.networkSourcesFetcher.Fetch(ctx, name, source)
if err != nil { if err != nil {
return 0, err return 0, err

View File

@@ -12,7 +12,7 @@ import (
"time" "time"
"akvorado/common/clickhousedb" "akvorado/common/clickhousedb"
"akvorado/common/remotedatasourcefetcher" "akvorado/common/remotedatasource"
"akvorado/orchestrator/geoip" "akvorado/orchestrator/geoip"
"akvorado/common/daemon" "akvorado/common/daemon"
@@ -84,7 +84,7 @@ func TestNetworkSources(t *testing.T) {
config := DefaultConfiguration() config := DefaultConfiguration()
config.SkipMigrations = true config.SkipMigrations = true
config.NetworkSourcesTimeout = 10 * time.Millisecond config.NetworkSourcesTimeout = 10 * time.Millisecond
config.NetworkSources = map[string]remotedatasourcefetcher.RemoteDataSource{ config.NetworkSources = map[string]remotedatasource.Source{
"amazon": { "amazon": {
URL: fmt.Sprintf("http://%s/amazon.json", address), URL: fmt.Sprintf("http://%s/amazon.json", address),
Method: "GET", Method: "GET",
@@ -93,7 +93,7 @@ func TestNetworkSources(t *testing.T) {
}, },
Timeout: 20 * time.Millisecond, Timeout: 20 * time.Millisecond,
Interval: 100 * time.Millisecond, Interval: 100 * time.Millisecond,
Transform: remotedatasourcefetcher.MustParseTransformQuery(` Transform: remotedatasource.MustParseTransformQuery(`
(.prefixes + .ipv6_prefixes)[] | (.prefixes + .ipv6_prefixes)[] |
{ prefix: (.ip_prefix // .ipv6_prefix), tenant: "amazon", region: .region, role: .service|ascii_downcase } { prefix: (.ip_prefix // .ipv6_prefix), tenant: "amazon", region: .region, role: .service|ascii_downcase }
`), `),
@@ -135,7 +135,7 @@ func TestNetworkSources(t *testing.T) {
}, },
}) })
gotMetrics := r.GetMetrics("akvorado_common_remotedatasourcefetcher_data_") gotMetrics := r.GetMetrics("akvorado_common_remotedatasource_data_")
expectedMetrics := map[string]string{ expectedMetrics := map[string]string{
`total{source="amazon",type="network_source"}`: "3", `total{source="amazon",type="network_source"}`: "3",
} }

View File

@@ -7,7 +7,7 @@ import (
"time" "time"
"akvorado/common/helpers" "akvorado/common/helpers"
"akvorado/common/remotedatasourcefetcher" "akvorado/common/remotedatasource"
"akvorado/outlet/metadata/provider" "akvorado/outlet/metadata/provider"
) )
@@ -18,7 +18,7 @@ type Configuration struct {
// ExporterSources defines a set of remote Exporters // ExporterSources defines a set of remote Exporters
// definitions to map IP address to their configuration. // definitions to map IP address to their configuration.
// The results are overridden by the content of Exporters. // The results are overridden by the content of Exporters.
ExporterSources map[string]remotedatasourcefetcher.RemoteDataSource `validate:"dive"` ExporterSources map[string]remotedatasource.Source `validate:"dive"`
// ExporterSourcesTimeout tells how long to wait for exporter // ExporterSourcesTimeout tells how long to wait for exporter
// sources to be ready. 503 is returned when not. // sources to be ready. 503 is returned when not.
ExporterSourcesTimeout time.Duration `validate:"min=0"` ExporterSourcesTimeout time.Duration `validate:"min=0"`

View File

@@ -8,7 +8,7 @@ import (
"time" "time"
"akvorado/common/helpers" "akvorado/common/helpers"
"akvorado/common/remotedatasourcefetcher" "akvorado/common/remotedatasource"
"akvorado/outlet/metadata/provider" "akvorado/outlet/metadata/provider"
) )
@@ -60,7 +60,7 @@ func TestValidation(t *testing.T) {
}, },
}, },
}), }),
ExporterSources: map[string]remotedatasourcefetcher.RemoteDataSource{ ExporterSources: map[string]remotedatasource.Source{
"http-endpoint": { "http-endpoint": {
URL: "https://foo.bar", URL: "https://foo.bar",
Method: "GET", Method: "GET",

View File

@@ -9,7 +9,7 @@ import (
"time" "time"
"akvorado/common/helpers" "akvorado/common/helpers"
"akvorado/common/remotedatasourcefetcher" "akvorado/common/remotedatasource"
"akvorado/common/reporter" "akvorado/common/reporter"
"akvorado/outlet/metadata/provider" "akvorado/outlet/metadata/provider"
@@ -23,7 +23,7 @@ import (
type Provider struct { type Provider struct {
r *reporter.Reporter r *reporter.Reporter
exporterSourcesFetcher *remotedatasourcefetcher.Component[exporterInfo] exporterSourcesFetcher *remotedatasource.Component[exporterInfo]
exportersMap map[string][]exporterInfo exportersMap map[string][]exporterInfo
exporters atomic.Pointer[helpers.SubnetMap[ExporterConfiguration]] exporters atomic.Pointer[helpers.SubnetMap[ExporterConfiguration]]
exportersLock sync.Mutex exportersLock sync.Mutex
@@ -46,8 +46,8 @@ func (configuration Configuration) New(r *reporter.Reporter) (provider.Provider,
p.initStaticExporters() p.initStaticExporters()
var err error var err error
p.exporterSourcesFetcher, err = remotedatasourcefetcher.New[exporterInfo](r, p.exporterSourcesFetcher, err = remotedatasource.New[exporterInfo](r,
p.UpdateRemoteDataSource, "metadata", configuration.ExporterSources) p.UpdateSource, "metadata", configuration.ExporterSources)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to initialize remote data source fetcher component: %w", err) return nil, fmt.Errorf("unable to initialize remote data source fetcher component: %w", err)
} }

View File

@@ -8,7 +8,7 @@ import (
"errors" "errors"
"akvorado/common/helpers" "akvorado/common/helpers"
"akvorado/common/remotedatasourcefetcher" "akvorado/common/remotedatasource"
"akvorado/outlet/metadata/provider" "akvorado/outlet/metadata/provider"
) )
@@ -67,9 +67,9 @@ func (p *Provider) initStaticExporters() {
p.exportersMap["static"] = staticExporters p.exportersMap["static"] = staticExporters
} }
// UpdateRemoteDataSource updates a remote metadata exporters source. It returns the // UpdateSource updates a remote metadata exporters source. It returns the
// number of exporters retrieved. // number of exporters retrieved.
func (p *Provider) UpdateRemoteDataSource(ctx context.Context, name string, source remotedatasourcefetcher.RemoteDataSource) (int, error) { func (p *Provider) UpdateSource(ctx context.Context, name string, source remotedatasource.Source) (int, error) {
results, err := p.exporterSourcesFetcher.Fetch(ctx, name, source) results, err := p.exporterSourcesFetcher.Fetch(ctx, name, source)
if err != nil { if err != nil {
return 0, err return 0, err

View File

@@ -14,7 +14,7 @@ import (
"time" "time"
"akvorado/common/helpers" "akvorado/common/helpers"
"akvorado/common/remotedatasourcefetcher" "akvorado/common/remotedatasource"
"akvorado/common/reporter" "akvorado/common/reporter"
"akvorado/outlet/metadata/provider" "akvorado/outlet/metadata/provider"
) )
@@ -172,7 +172,7 @@ func TestRemoteExporterSources(t *testing.T) {
}, },
}), }),
ExporterSourcesTimeout: 10 * time.Millisecond, ExporterSourcesTimeout: 10 * time.Millisecond,
ExporterSources: map[string]remotedatasourcefetcher.RemoteDataSource{ ExporterSources: map[string]remotedatasource.Source{
"local": { "local": {
URL: fmt.Sprintf("http://%s/exporters.json", address), URL: fmt.Sprintf("http://%s/exporters.json", address),
Method: "GET", Method: "GET",
@@ -181,7 +181,7 @@ func TestRemoteExporterSources(t *testing.T) {
}, },
Timeout: 20 * time.Millisecond, Timeout: 20 * time.Millisecond,
Interval: 100 * time.Millisecond, Interval: 100 * time.Millisecond,
Transform: remotedatasourcefetcher.MustParseTransformQuery(` Transform: remotedatasource.MustParseTransformQuery(`
.exporters[] .exporters[]
`), `),
}, },
@@ -206,7 +206,7 @@ func TestRemoteExporterSources(t *testing.T) {
close(ready) close(ready)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
gotMetrics := r.GetMetrics("akvorado_common_remotedatasourcefetcher_data_") gotMetrics := r.GetMetrics("akvorado_common_remotedatasource_data_")
expectedMetrics := map[string]string{ expectedMetrics := map[string]string{
`total{source="local",type="metadata"}`: "3", `total{source="local",type="metadata"}`: "3",
} }