outlet/flow: persist decoders' state

Currently, for NetFlow, persist data/options templates and options. This
should allow the outlet to restart without loosing any message.

Fix #2100
This commit is contained in:
Vincent Bernat
2025-11-17 18:01:17 +01:00
parent 7c0a8db76d
commit cb92fc7199
17 changed files with 533 additions and 31 deletions

View File

@@ -36,6 +36,7 @@ type OutletConfiguration struct {
Kafka kafka.Configuration
ClickHouseDB clickhousedb.Configuration
ClickHouse clickhouse.Configuration
Flow flow.Configuration
Core core.Configuration
Schema schema.Configuration
}
@@ -50,6 +51,7 @@ func (c *OutletConfiguration) Reset() {
Kafka: kafka.DefaultConfiguration(),
ClickHouseDB: clickhousedb.DefaultConfiguration(),
ClickHouse: clickhouse.DefaultConfiguration(),
Flow: flow.DefaultConfiguration(),
Core: core.DefaultConfiguration(),
Schema: schema.DefaultConfiguration(),
}
@@ -111,7 +113,7 @@ func outletStart(r *reporter.Reporter, config OutletConfiguration, checkOnly boo
if err != nil {
return fmt.Errorf("unable to initialize schema component: %w", err)
}
flowComponent, err := flow.New(r, flow.Dependencies{
flowComponent, err := flow.New(r, config.Flow, flow.Dependencies{
Schema: schemaComponent,
})
if err != nil {

View File

@@ -42,7 +42,7 @@ func (c *Cache[K, V]) Load(cacheFile string) error {
// currentVersionNumber should be increased each time we change the way we
// encode the cache.
var currentVersionNumber = 11
const currentVersionNumber = 11
// GobEncode encodes the cache
func (c *Cache[K, V]) GobEncode() ([]byte, error) {
@@ -50,7 +50,8 @@ func (c *Cache[K, V]) GobEncode() ([]byte, error) {
encoder := gob.NewEncoder(&buf)
// Encode version
if err := encoder.Encode(&currentVersionNumber); err != nil {
version := currentVersionNumber
if err := encoder.Encode(&version); err != nil {
return nil, err
}
// Encode a representation of K and V. Gob decoding is pretty forgiving, we

View File

@@ -19,6 +19,7 @@ var Version = 5
var decoderMap = bimap.New(map[RawFlow_Decoder]string{
RawFlow_DECODER_NETFLOW: "netflow",
RawFlow_DECODER_SFLOW: "sflow",
RawFlow_DECODER_GOB: "gob",
})
// MarshalText turns a decoder to text
@@ -27,7 +28,7 @@ func (d RawFlow_Decoder) MarshalText() ([]byte, error) {
if ok {
return []byte(got), nil
}
return nil, errors.New("unknown decoder")
return nil, fmt.Errorf("unknown decoder %d", d)
}
// UnmarshalText provides a decoder from text

View File

@@ -608,7 +608,7 @@ exporter-classifiers:
### ClickHouse
The ClickHouse component pushes data to ClickHouse. There are two settings that
The ClickHouse component pushes data to ClickHouse. There are three settings that
are configurable:
- `maximum-batch-size` defines how many flows to send to ClickHouse in a single batch at most
@@ -621,6 +621,14 @@ send a batch of size at most `maximum-batch-size` at least every
The default value is 100 000 and allows ClickHouse to handle incoming flows
efficiently.
### Flow
The flow component decodes flows received from Kafka. There is only one setting:
- `state-persist-file` defines the location of the file to save the state of the
flow decoders and read it back on startup. It is used to store IPFIX/NetFlow
templates and options.
## Orchestrator service
The three main components of the orchestrator service are `schema`,

View File

@@ -22,6 +22,7 @@ identified with a specific icon:
- 🩹 *outlet*: accept flows where interface names or descriptions are missing
- 🩹 *docker*: update Traefik to 3.6.1 (for compatibility with Docker Engine 29)
- 🌱 *common*: enable block and mutex profiling
- 🌱 *outlet*: save IPFIX decoder state to a file to prevent discarding flows on start
- 🌱 *config*: rename `verify` to `skip-verify` in TLS configurations for
ClickHouse, Kafka and remote data sources (with inverted logic)
- 🌱 *config*: remote data sources accept a specific TLS configuration

View File

@@ -190,6 +190,7 @@ services:
- akvorado-run:/run/akvorado
environment:
AKVORADO_CFG_OUTLET_METADATA_CACHEPERSISTFILE: /run/akvorado/metadata.cache
AKVORADO_CFG_OUTLET_FLOW_STATEPERSISTFILE: /run/akvorado/flow.state
labels:
- traefik.enable=true
# Disable access logging of /api/v0/outlet/metrics

View File

@@ -626,7 +626,7 @@ ClassifyProviderRegex(Interface.Description, "^Transit: ([^ ]+)", "$1")`,
daemonComponent := daemon.NewMock(t)
metadataComponent := metadata.NewMock(t, r, metadata.DefaultConfiguration(),
metadata.Dependencies{Daemon: daemonComponent})
flowComponent, err := flow.New(r, flow.Dependencies{Schema: schema.NewMock(t)})
flowComponent, err := flow.New(r, flow.DefaultConfiguration(), flow.Dependencies{Schema: schema.NewMock(t)})
if err != nil {
t.Fatalf("flow.New() error:\n%+v", err)
}

View File

@@ -40,7 +40,7 @@ func TestCore(t *testing.T) {
daemonComponent := daemon.NewMock(t)
metadataComponent := metadata.NewMock(t, r, metadata.DefaultConfiguration(),
metadata.Dependencies{Daemon: daemonComponent})
flowComponent, err := flow.New(r, flow.Dependencies{Schema: schema.NewMock(t)})
flowComponent, err := flow.New(r, flow.DefaultConfiguration(), flow.Dependencies{Schema: schema.NewMock(t)})
if err != nil {
t.Fatalf("flow.New() error:\n%+v", err)
}

16
outlet/flow/config.go Normal file
View File

@@ -0,0 +1,16 @@
// SPDX-FileCopyrightText: 2025 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package flow
// Configuration describes the configuration for the flow component.
type Configuration struct {
// StatePersistFile defines a file to store decoder state (templates, sampling
// rates) to survive restarts.
StatePersistFile string `validate:"isdefault|filepath"`
}
// DefaultConfiguration returns the default configuration for the flow component.
func DefaultConfiguration() Configuration {
return Configuration{}
}

View File

@@ -0,0 +1,126 @@
// SPDX-FileCopyrightText: 2025 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package netflow
import (
"encoding/json"
"fmt"
"reflect"
"github.com/netsampler/goflow2/v2/decoders/netflow"
)
// MarshalText implements encoding.TextMarshaler for templateKey.
func (tk templateKey) MarshalText() ([]byte, error) {
return fmt.Appendf(nil, "%d-%d-%d", tk.version, tk.obsDomainID, tk.templateID), nil
}
// UnmarshalText implements encoding.TextUnmarshaler for templateKey.
func (tk *templateKey) UnmarshalText(text []byte) error {
_, err := fmt.Sscanf(string(text), "%d-%d-%d", &tk.version, &tk.obsDomainID, &tk.templateID)
if err != nil {
return fmt.Errorf("invalid template key %q: %w", string(text), err)
}
return nil
}
// MarshalText implements encoding.TextMarshaler for samplingRateKey.
func (srk samplingRateKey) MarshalText() ([]byte, error) {
return fmt.Appendf(nil, "%d-%d-%d", srk.version, srk.obsDomainID, srk.samplerID), nil
}
// UnmarshalText implements encoding.TextUnmarshaler for samplingRateKey.
func (srk *samplingRateKey) UnmarshalText(text []byte) error {
_, err := fmt.Sscanf(string(text), "%d-%d-%d", &srk.version, &srk.obsDomainID, &srk.samplerID)
if err != nil {
return fmt.Errorf("invalid sampling rate key %q: %w", string(text), err)
}
return nil
}
// MarshalJSON encodes a set of NetFlow templates.
func (t *templates) MarshalJSON() ([]byte, error) {
type typedTemplate struct {
Type string
Template any
}
data := make(map[templateKey]typedTemplate, len(*t))
for k, v := range *t {
switch v := v.(type) {
case netflow.TemplateRecord:
data[k] = typedTemplate{
Type: "data",
Template: v,
}
case netflow.IPFIXOptionsTemplateRecord:
data[k] = typedTemplate{
Type: "ipfix-option",
Template: v,
}
case netflow.NFv9OptionsTemplateRecord:
data[k] = typedTemplate{
Type: "nfv9-option",
Template: v,
}
default:
return nil, fmt.Errorf("unknown template type %q", reflect.TypeOf(v).String())
}
}
return json.Marshal(&data)
}
// UnmarshalJSON decodes a set of NetFlow templates.
func (t *templates) UnmarshalJSON(data []byte) error {
type typedTemplate struct {
Type string
Template json.RawMessage
}
var templatesWithTypes map[templateKey]typedTemplate
if err := json.Unmarshal(data, &templatesWithTypes); err != nil {
return err
}
targetTemplates := make(templates, len(templatesWithTypes))
for k, v := range templatesWithTypes {
var targetTemplate any
var err error
switch v.Type {
case "data":
var tmpl netflow.TemplateRecord
err = json.Unmarshal(v.Template, &tmpl)
targetTemplate = tmpl
case "ipfix-option":
var tmpl netflow.IPFIXOptionsTemplateRecord
err = json.Unmarshal(v.Template, &tmpl)
targetTemplate = tmpl
case "nfv9-option":
var tmpl netflow.NFv9OptionsTemplateRecord
err = json.Unmarshal(v.Template, &tmpl)
targetTemplate = tmpl
default:
return fmt.Errorf("unknown type %q", v.Type)
}
if err != nil {
return err
}
targetTemplates[k] = targetTemplate
}
*t = targetTemplates
return nil
}
// MarshalJSON encodes the NetFlow decoder's collection.
func (nd *Decoder) MarshalJSON() ([]byte, error) {
return json.Marshal(&nd.collection.Collection)
}
// UnmarshalJSON decodes the NetFlow decoder's collection.
func (nd *Decoder) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(data, &nd.collection.Collection); err != nil {
return err
}
for _, tao := range nd.collection.Collection {
tao.nd = nd
}
return nil
}

View File

@@ -0,0 +1,83 @@
// SPDX-FileCopyrightText: 2025 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package netflow
import (
"encoding/json"
"testing"
"akvorado/common/helpers"
"akvorado/common/reporter"
"akvorado/common/schema"
"akvorado/outlet/flow/decoder"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/netsampler/goflow2/v2/decoders/netflow"
)
func TestMarshalUnmarshalTemplates(t *testing.T) {
r := reporter.NewMock(t)
sch := schema.NewMock(t)
nfdecoder := New(r, decoder.Dependencies{Schema: sch})
collection := &nfdecoder.(*Decoder).collection
exporter := collection.Get("::ffff:192.168.1.1")
exporter.SetSamplingRate(10, 300, 10, 2048)
exporter.SetSamplingRate(9, 301, 11, 4096)
exporter.AddTemplate(10, 300, 300, netflow.TemplateRecord{
TemplateId: 300,
FieldCount: 2,
Fields: []netflow.Field{
{
Type: netflow.IPFIX_FIELD_applicationName,
Length: 10,
}, {
Type: netflow.IPFIX_FIELD_VRFname,
Length: 25,
},
},
})
exporter.AddTemplate(10, 300, 301, netflow.IPFIXOptionsTemplateRecord{
TemplateId: 301,
FieldCount: 2,
ScopeFieldCount: 0,
Options: []netflow.Field{
{
Type: netflow.IPFIX_FIELD_samplerRandomInterval,
Length: 4,
}, {
Type: netflow.IPFIX_FIELD_samplerMode,
Length: 4,
},
},
})
exporter.AddTemplate(9, 301, 300, netflow.NFv9OptionsTemplateRecord{
TemplateId: 300,
OptionLength: 2,
Options: []netflow.Field{
{
Type: netflow.NFV9_FIELD_FLOW_ACTIVE_TIMEOUT,
Length: 4,
}, {
Type: netflow.NFV9_FIELD_FORWARDING_STATUS,
Length: 2,
},
},
})
jsonBytes, err := json.Marshal(&nfdecoder)
if err != nil {
t.Fatalf("json.Marshal() error:\n%+v", err)
}
nfdecoder2 := New(r, decoder.Dependencies{Schema: sch})
if err := json.Unmarshal(jsonBytes, &nfdecoder2); err != nil {
t.Fatalf("json.Unmarshal() error:\n%+v", err)
}
collection1 := &nfdecoder.(*Decoder).collection.Collection
collection2 := &nfdecoder2.(*Decoder).collection.Collection
if diff := helpers.Diff(collection1, collection2,
cmpopts.IgnoreUnexported(templatesAndOptions{})); diff != "" {
t.Fatalf("json.Marshal()/json.Unmarshal() (-got, +want):\n%s", diff)
}
}

View File

@@ -47,7 +47,7 @@ func New(r *reporter.Reporter, dependencies decoder.Dependencies) decoder.Decode
}
nd.collection = templateAndOptionCollection{
nd: nd,
collection: make(map[string]*templatesAndOptions),
Collection: make(map[string]*templatesAndOptions),
}
nd.metrics.errors = nd.r.CounterVec(

View File

@@ -13,21 +13,27 @@ import (
// templateAndOptionCollection map exporters to the set of templates and options we
// received from them.
type templateAndOptionCollection struct {
nd *Decoder
lock sync.Mutex
collection map[string]*templatesAndOptions
nd *Decoder
lock sync.Mutex
Collection map[string]*templatesAndOptions
}
// templatesAndOptions contains templates and options associated to an exporter.
type templatesAndOptions struct {
nd *Decoder
key string
templateLock sync.RWMutex
templates map[templateKey]any
samplingRateLock sync.RWMutex
samplingRates map[samplingRateKey]uint32
Key string
Templates templates
SamplingRates map[samplingRateKey]uint32
}
// templates is a mapping to one of netflow.TemplateRecord,
// netflow.IPFIXOptionsTemplateRecord, netflow.NFv9OptionsTemplateRecord.
type templates map[templateKey]any
// templateKey is the key structure to access a template.
type templateKey struct {
version uint16
@@ -51,17 +57,17 @@ var (
func (c *templateAndOptionCollection) Get(key string) *templatesAndOptions {
c.lock.Lock()
defer c.lock.Unlock()
t, ok := c.collection[key]
t, ok := c.Collection[key]
if ok {
return t
}
t = &templatesAndOptions{
nd: c.nd,
key: key,
templates: make(map[templateKey]any),
samplingRates: make(map[samplingRateKey]uint32),
Key: key,
Templates: make(map[templateKey]any),
SamplingRates: make(map[samplingRateKey]uint32),
}
c.collection[key] = t
c.Collection[key] = t
return t
}
@@ -75,7 +81,7 @@ func (t *templatesAndOptions) RemoveTemplate(uint16, uint32, uint16) (any, error
func (t *templatesAndOptions) GetTemplate(version uint16, obsDomainID uint32, templateID uint16) (any, error) {
t.templateLock.RLock()
defer t.templateLock.RUnlock()
template, ok := t.templates[templateKey{version: version, obsDomainID: obsDomainID, templateID: templateID}]
template, ok := t.Templates[templateKey{version: version, obsDomainID: obsDomainID, templateID: templateID}]
if !ok {
return nil, netflow.ErrorTemplateNotFound
}
@@ -98,7 +104,7 @@ func (t *templatesAndOptions) AddTemplate(version uint16, obsDomainID uint32, te
}
t.nd.metrics.templates.WithLabelValues(
t.key,
t.Key,
strconv.Itoa(int(version)),
strconv.Itoa(int(obsDomainID)),
strconv.Itoa(int(templateID)),
@@ -107,7 +113,7 @@ func (t *templatesAndOptions) AddTemplate(version uint16, obsDomainID uint32, te
t.templateLock.Lock()
defer t.templateLock.Unlock()
t.templates[templateKey{version: version, obsDomainID: obsDomainID, templateID: templateID}] = template
t.Templates[templateKey{version: version, obsDomainID: obsDomainID, templateID: templateID}] = template
return nil
}
@@ -115,7 +121,7 @@ func (t *templatesAndOptions) AddTemplate(version uint16, obsDomainID uint32, te
func (t *templatesAndOptions) GetSamplingRate(version uint16, obsDomainID uint32, samplerID uint64) uint32 {
t.samplingRateLock.RLock()
defer t.samplingRateLock.RUnlock()
rate := t.samplingRates[samplingRateKey{
rate := t.SamplingRates[samplingRateKey{
version: version,
obsDomainID: obsDomainID,
samplerID: samplerID,
@@ -127,7 +133,7 @@ func (t *templatesAndOptions) GetSamplingRate(version uint16, obsDomainID uint32
func (t *templatesAndOptions) SetSamplingRate(version uint16, obsDomainID uint32, samplerID uint64, samplingRate uint32) {
t.samplingRateLock.Lock()
defer t.samplingRateLock.Unlock()
t.samplingRates[samplingRateKey{
t.SamplingRates[samplingRateKey{
version: version,
obsDomainID: obsDomainID,
samplerID: samplerID,

View File

@@ -24,10 +24,12 @@ import (
func TestFlowDecode(t *testing.T) {
r := reporter.NewMock(t)
sch := schema.NewMock(t)
c, err := New(r, Dependencies{Schema: sch})
c, err := New(r, DefaultConfiguration(), Dependencies{Schema: sch})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
helpers.StartStop(t, c)
bf := sch.NewFlowMessage()
got := []*schema.FlowMessage{}
finalize := func() {

80
outlet/flow/persist.go Normal file
View File

@@ -0,0 +1,80 @@
// SPDX-FileCopyrightText: 2025 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package flow
import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"akvorado/common/pb"
"github.com/google/renameio/v2"
)
// ErrStateVersion is triggered when loading a collection from an incompatible version
var ErrStateVersion = errors.New("collection version mismatch")
// currentStateVersionNumber should be increased each time we change the way we
// encode the collection.
const currentStateVersionNumber = 1
// SaveState save the decoders' state to a file. This is not goroutine-safe.
func (c *Component) SaveState(target string) error {
state := struct {
Version int
Decoders any
}{
Version: currentStateVersionNumber,
Decoders: c.decoders,
}
data, err := json.Marshal(&state)
if err != nil {
return fmt.Errorf("unable to encode decoders' state: %w", err)
}
if err := renameio.WriteFile(target, data, 0o666, renameio.WithTempDir(filepath.Dir(target))); err != nil {
return fmt.Errorf("unable to write state file %q: %w", target, err)
}
return nil
}
// RestoreState restores the decoders' state from a file. This is not goroutine-safe.
func (c *Component) RestoreState(source string) error {
data, err := os.ReadFile(source)
if err != nil {
return fmt.Errorf("unable to read state file %q: %w", source, err)
}
// Check the version.
var stateVersion struct {
Version int
}
if err := json.Unmarshal(data, &stateVersion); err != nil {
return err
}
if stateVersion.Version != currentStateVersionNumber {
return ErrStateVersion
}
// Decode decoders.
var stateDecoders struct {
Decoders map[pb.RawFlow_Decoder]json.RawMessage
}
if err := json.Unmarshal(data, &stateDecoders); err != nil {
return fmt.Errorf("unable to decode decoders' state: %w", err)
}
for k, v := range c.decoders {
decoderJSON, ok := stateDecoders.Decoders[k]
if !ok {
continue
}
if err := json.Unmarshal(decoderJSON, &v); err != nil {
return fmt.Errorf("unable to decode decoder' state (%s): %w", k, err)
}
}
return nil
}

154
outlet/flow/persist_test.go Normal file
View File

@@ -0,0 +1,154 @@
// SPDX-FileCopyrightText: 2025 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package flow
import (
"errors"
"net"
"os"
"path"
"path/filepath"
"runtime"
"slices"
"testing"
"time"
"akvorado/common/helpers"
"akvorado/common/pb"
"akvorado/common/reporter"
"akvorado/common/schema"
)
func TestSaveAndRestore(t *testing.T) {
r := reporter.NewMock(t)
sch := schema.NewMock(t)
config := DefaultConfiguration()
config.StatePersistFile = filepath.Join(t.TempDir(), "state")
c, err := New(r, config, Dependencies{Schema: sch})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
if err := c.Start(); err != nil {
t.Fatalf("Start() error:\n%+v", err)
}
bf := sch.NewFlowMessage()
_, src, _, _ := runtime.Caller(0)
base := path.Join(path.Dir(src), "decoder", "netflow", "testdata")
for _, pcap := range []string{"options-template.pcap", "options-data.pcap", "template.pcap"} {
data := helpers.ReadPcapL4(t, path.Join(base, pcap))
rawFlow := &pb.RawFlow{
TimeReceived: uint64(time.Now().UnixNano()),
Payload: data,
SourceAddress: net.ParseIP("127.0.0.1").To16(),
UseSourceAddress: false,
Decoder: pb.RawFlow_DECODER_NETFLOW,
TimestampSource: pb.RawFlow_TS_INPUT,
}
err := c.Decode(rawFlow, bf, func() {})
if err != nil {
t.Fatalf("Decode() error:\n%+v", err)
}
}
if err := c.Stop(); err != nil {
t.Fatalf("Stop() error:\n%+v", err)
}
// Create a second component that will reuse saved templates.
r2 := reporter.NewMock(t)
c2, err := New(r2, config, Dependencies{Schema: sch})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
if err := c2.Start(); err != nil {
t.Fatalf("Start() error:\n%+v", err)
}
got := []*schema.FlowMessage{}
for _, pcap := range []string{"data.pcap"} {
data := helpers.ReadPcapL4(t, path.Join(base, pcap))
rawFlow := &pb.RawFlow{
TimeReceived: uint64(time.Now().UnixNano()),
Payload: data,
SourceAddress: net.ParseIP("127.0.0.1").To16(),
UseSourceAddress: false,
Decoder: pb.RawFlow_DECODER_NETFLOW,
TimestampSource: pb.RawFlow_TS_INPUT,
}
err := c2.Decode(rawFlow, bf, func() {
clone := *bf
got = append(got, &clone)
bf.Finalize()
})
if err != nil {
t.Fatalf("Decode() error:\n%+v", err)
}
}
if len(got) == 0 {
t.Fatalf("Decode() returned no flows")
}
}
func TestRestoreCorruptedFile(t *testing.T) {
// Create a file with invalid data
tmpDir := t.TempDir()
corruptedFile := filepath.Join(tmpDir, "corrupted.json")
err := os.WriteFile(corruptedFile, []byte("not valid JSON data"), 0644)
if err != nil {
t.Fatalf("WriteFile() error:\n%+v", err)
}
r := reporter.NewMock(t)
sch := schema.NewMock(t)
config := DefaultConfiguration()
config.StatePersistFile = corruptedFile
c, err := New(r, config, Dependencies{Schema: sch})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
err = c.RestoreState(corruptedFile)
if err == nil {
t.Error("Restore(): no error")
}
}
func TestRestoreVersionMismatch(t *testing.T) {
// Create a file with a different version number
tmpDir := t.TempDir()
versionMismatchFile := filepath.Join(tmpDir, "version_mismatch.json")
// Write a JSON file with version 999 (incompatible version)
incompatibleData := `{"version":999,"collection":{}}`
err := os.WriteFile(versionMismatchFile, []byte(incompatibleData), 0644)
if err != nil {
t.Fatalf("WriteFile() error:\n%+v", err)
}
r := reporter.NewMock(t)
sch := schema.NewMock(t)
config := DefaultConfiguration()
config.StatePersistFile = versionMismatchFile
c, err := New(r, config, Dependencies{Schema: sch})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
err = c.RestoreState(versionMismatchFile)
if err == nil {
t.Fatal("Restore(): expected error for version mismatch, got nil")
}
if !errors.Is(err, ErrStateVersion) {
t.Errorf("Restore(): expected ErrVersion, got %v", err)
}
// Also check we have c.decoders OK
names := []string{}
for _, d := range c.decoders {
names = append(names, d.Name())
}
slices.Sort(names)
if diff := helpers.Diff(names, []string{"gob", "netflow", "sflow"}); diff != "" {
t.Fatalf("RestoreState(): invalid decoders:\n%s", diff)
}
}

View File

@@ -9,7 +9,6 @@ import (
"akvorado/common/pb"
"akvorado/common/reporter"
"akvorado/common/schema"
"akvorado/outlet/flow/decoder"
)
@@ -17,6 +16,7 @@ import (
type Component struct {
r *reporter.Reporter
d *Dependencies
config Configuration
errLogger reporter.Logger
metrics struct {
@@ -29,22 +29,21 @@ type Component struct {
}
// Dependencies are the dependencies of the flow component.
type Dependencies struct {
Schema *schema.Component
}
type Dependencies = decoder.Dependencies
// New creates a new flow component.
func New(r *reporter.Reporter, dependencies Dependencies) (*Component, error) {
func New(r *reporter.Reporter, config Configuration, dependencies Dependencies) (*Component, error) {
c := Component{
r: r,
d: &dependencies,
config: config,
errLogger: r.Sample(reporter.BurstSampler(30*time.Second, 3)),
decoders: make(map[pb.RawFlow_Decoder]decoder.Decoder),
}
// Initialize available decoders
for decoderType, decoderFunc := range availableDecoders {
c.decoders[decoderType] = decoderFunc(r, decoder.Dependencies{Schema: c.d.Schema})
c.decoders[decoderType] = decoderFunc(r, dependencies)
}
// Metrics
@@ -65,3 +64,25 @@ func New(r *reporter.Reporter, dependencies Dependencies) (*Component, error) {
return &c, nil
}
// Start starts the flow component.
func (c *Component) Start() error {
if c.config.StatePersistFile != "" {
if err := c.RestoreState(c.config.StatePersistFile); err != nil {
c.r.Warn().Err(err).Msg("cannot load decoders' state, ignoring")
} else {
c.r.Info().Msg("previous decoders' state loaded")
}
}
return nil
}
// Stop stops the flow component
func (c *Component) Stop() error {
if c.config.StatePersistFile != "" {
if err := c.SaveState(c.config.StatePersistFile); err != nil {
c.r.Err(err).Msg("cannot save decorders' state")
}
}
return nil
}