inlet/outlet: rename a few metrics

For example:

```
 17:35 ❱ curl -s 127.0.0.1:8080/api/v0/outlet/metrics | promtool check metrics
akvorado_outlet_core_classifier_exporter_cache_size_items counter metrics should have "_total" suffix
akvorado_outlet_core_classifier_interface_cache_size_items counter metrics should have "_total" suffix
akvorado_outlet_flow_decoder_netflow_flowset_records_sum counter metrics should have "_total" suffix
akvorado_outlet_flow_decoder_netflow_flowset_records_sum non-histogram and non-summary metrics should not have "_sum" suffix
akvorado_outlet_flow_decoder_netflow_flowset_sum counter metrics should have "_total" suffix
akvorado_outlet_flow_decoder_netflow_flowset_sum non-histogram and non-summary metrics should not have "_sum" suffix
akvorado_outlet_kafka_buffered_fetch_records_total non-counter metrics should not have "_total" suffix
akvorado_outlet_kafka_buffered_produce_records_total non-counter metrics should not have "_total" suffix
akvorado_outlet_metadata_cache_refreshs counter metrics should have "_total" suffix
akvorado_outlet_routing_provider_bmp_peers_total non-counter metrics should not have "_total" suffix
akvorado_outlet_routing_provider_bmp_routes_total non-counter metrics should not have "_total" suffix
```

Also ensure metrics using errors as label don't have a too great
cardinality by using constants for error messages used.
This commit is contained in:
Vincent Bernat
2025-07-20 17:35:46 +02:00
parent dc23ae3c21
commit 4c0b15e1cd
15 changed files with 187 additions and 170 deletions

View File

@@ -8,7 +8,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
"time"
@@ -21,7 +20,9 @@ import (
"akvorado/common/reporter"
)
// ProviderFunc is the callback function to call when a datasource is refreshed implementZ.
// ProviderFunc is the callback function to call when a datasource is refreshed.
// The error returned is used for metrics. One should avoid having too many
// different errors.
type ProviderFunc func(ctx context.Context, name string, source RemoteDataSource) (int, error)
// Component represents a remote data source fetcher.
@@ -49,9 +50,27 @@ func New[T interface{}](r *reporter.Reporter, provider ProviderFunc, dataType st
return &c, nil
}
// Fetch retrieves data from a configured RemoteDataSource, and returns a list of results
// decoded from JSON to generic type.
// Fetch should be used in UpdateRemoteDataSource implementations to update internal data from results.
var (
// ErrBuildRequest is triggered when we cannot build an HTTP request
ErrBuildRequest = errors.New("cannot build HTTP request")
// ErrFetchDataSource is triggered when we cannot fetch the data source
ErrFetchDataSource = errors.New("cannot fetch data source")
// ErrStatusCode is triggered if status code is not 200
ErrStatusCode = errors.New("unexpected HTTP status code")
// ErrJSONDecode is triggered for any decoding issue
ErrJSONDecode = errors.New("cannot decode JSON")
// ErrMapResult is triggered when we cannot map the JSON result to the expected structure
ErrMapResult = errors.New("cannot map JSON")
// ErrJQExecute is triggered when we cannot execute the jq filter
ErrJQExecute = errors.New("cannot execute jq filter")
// ErrEmpty is triggered if the results are empty
ErrEmpty = errors.New("empty result")
)
// Fetch retrieves data from a configured RemoteDataSource, and returns a list
// of results decoded from JSON to generic type. Fetch should be used in
// UpdateRemoteDataSource implementations to update internal data from results.
// It outputs errors without details because they are used for metrics.
func (c *Component[T]) Fetch(ctx context.Context, name string, source RemoteDataSource) ([]T, error) {
var results []T
l := c.r.With().Str("name", name).Str("url", source.URL).Logger()
@@ -61,31 +80,30 @@ func (c *Component[T]) Fetch(ctx context.Context, name string, source RemoteData
Proxy: http.ProxyFromEnvironment,
}}
req, err := http.NewRequestWithContext(ctx, source.Method, source.URL, nil)
if err != nil {
l.Err(err).Msg("unable to build new request")
return results, ErrBuildRequest
}
for headerName, headerValue := range source.Headers {
req.Header.Set(headerName, headerValue)
}
req.Header.Set("accept", "application/json")
if err != nil {
l.Err(err).Msg("unable to build new request")
return results, fmt.Errorf("unable to build new request: %w", err)
}
resp, err := client.Do(req)
if err != nil {
l.Err(err).Msg("unable to fetch data source")
return results, fmt.Errorf("unable to fetch data source: %w", err)
return results, ErrFetchDataSource
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
err := fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, resp.Status)
l.Error().Msg(err.Error())
return results, err
l.Error().Int("status", resp.StatusCode).Msg("unexpected status code")
return results, ErrStatusCode
}
reader := bufio.NewReader(resp.Body)
decoder := json.NewDecoder(reader)
var got interface{}
if err := decoder.Decode(&got); err != nil {
l.Err(err).Msg("cannot decode JSON output")
return results, fmt.Errorf("cannot decode JSON output: %w", err)
return results, ErrJSONDecode
}
iter := source.Transform.Query.RunWithContext(ctx, got)
@@ -96,7 +114,7 @@ func (c *Component[T]) Fetch(ctx context.Context, name string, source RemoteData
}
if err, ok := v.(error); ok {
l.Err(err).Msg("cannot execute jq filter")
return results, fmt.Errorf("cannot execute jq filter: %w", err)
return results, ErrJQExecute
}
var result T
config := &mapstructure.DecoderConfig{
@@ -110,14 +128,13 @@ func (c *Component[T]) Fetch(ctx context.Context, name string, source RemoteData
}
if err := decoder.Decode(v); err != nil {
l.Err(err).Msgf("cannot map returned value for %#v", v)
return results, fmt.Errorf("cannot map returned value: %w", err)
return results, ErrMapResult
}
results = append(results, result)
}
if len(results) == 0 {
err := errors.New("empty results")
l.Error().Msg(err.Error())
return results, err
l.Error().Msg("empty result")
return results, ErrEmpty
}
return results, nil
}