inlet/snmp: move cache to helpers

We will reuse this implementation for classifiers too.
This commit is contained in:
Vincent Bernat
2023-02-01 00:02:48 +01:00
parent 737d3da843
commit 577e27f7d4
9 changed files with 530 additions and 348 deletions

121
common/helpers/cache/cache.go vendored Normal file
View File

@@ -0,0 +1,121 @@
// SPDX-FileCopyrightText: 2023 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
// Package cache implements a cache with an optional TTL. Each operation should
// provide the current time. Items are expired on demand. Expiration can be done
// on last access or last update. Due to an implementation detail, it relies on
// wall time.
package cache
import (
"errors"
"sync"
"sync/atomic"
"time"
)
var (
// ErrVersion is triggered when loading a cache from an incompatible version
ErrVersion = errors.New("cache version mismatch")
)
// Cache is a thread-safe in-memory key/value store
type Cache[K comparable, V any] struct {
items map[K]*item[V]
mu sync.RWMutex
}
// item is a cache item, including last access and last update
type item[V any] struct {
Object V
LastAccessed int64
LastUpdated int64
}
// New creates a new instance of the cache with the specified duration.
func New[K comparable, V any]() *Cache[K, V] {
return &Cache[K, V]{
items: make(map[K]*item[V]),
}
}
func (c *Cache[K, V]) zero() V {
var v V
return v
}
// Put adds a new object in the cache.
func (c *Cache[K, V]) Put(now time.Time, key K, object V) {
n := now.Unix()
item := item[V]{
Object: object,
LastAccessed: n,
LastUpdated: n,
}
c.mu.Lock()
c.items[key] = &item
c.mu.Unlock()
}
// Get retrieves an object from the cache. If now is uninitialized, time of last
// access is not updated.
func (c *Cache[K, V]) Get(now time.Time, key K) (V, bool) {
c.mu.RLock()
item, ok := c.items[key]
c.mu.RUnlock()
if !ok {
return c.zero(), false
}
if !now.IsZero() {
n := now.Unix()
atomic.StoreInt64(&item.LastAccessed, n)
}
return item.Object, true
}
// Items retrieve all the key/value in the cache.
func (c *Cache[K, V]) Items() map[K]V {
result := map[K]V{}
c.mu.RLock()
defer c.mu.RUnlock()
for k, v := range c.items {
result[k] = v.Object
}
return result
}
// ItemsLastUpdatedBefore returns the items whose last update is before the
// provided time.
func (c *Cache[K, V]) ItemsLastUpdatedBefore(before time.Time) map[K]V {
result := map[K]V{}
c.mu.RLock()
defer c.mu.RUnlock()
for k, v := range c.items {
if v.LastUpdated < before.Unix() {
result[k] = v.Object
}
}
return result
}
// DeleteLastAccessedBefore expires items whose last access is before
// the provided time.
func (c *Cache[K, V]) DeleteLastAccessedBefore(before time.Time) int {
count := 0
c.mu.Lock()
defer c.mu.Unlock()
for k, v := range c.items {
if v.LastAccessed < before.Unix() {
delete(c.items, k)
count++
}
}
return count
}
// Size returns the size of the cache
func (c *Cache[K, V]) Size() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.items)
}

119
common/helpers/cache/cache_test.go vendored Normal file
View File

@@ -0,0 +1,119 @@
// SPDX-FileCopyrightText: 2023 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package cache_test
import (
"net/netip"
"testing"
"time"
"akvorado/common/helpers"
"akvorado/common/helpers/cache"
)
func expectCacheGet(t *testing.T, c *cache.Cache[netip.Addr, string], key string, expectedResult string, expectedOk bool) {
t.Helper()
ip := netip.MustParseAddr(key)
ip = netip.AddrFrom16(ip.As16())
result, ok := c.Get(time.Time{}, ip)
got := struct {
Result string
Ok bool
}{result, ok}
expected := struct {
Result string
Ok bool
}{expectedResult, expectedOk}
if diff := helpers.Diff(got, expected); diff != "" {
t.Errorf("Get() (-got, +want):\n%s", diff)
}
}
func TestGetPut(t *testing.T) {
c := cache.New[netip.Addr, string]()
t1 := time.Date(2022, time.December, 31, 10, 23, 0, 0, time.UTC)
t2 := t1.Add(time.Minute)
t3 := t2.Add(time.Minute)
c.Put(t1, netip.MustParseAddr("::ffff:127.0.0.1"), "entry1")
c.Put(t2, netip.MustParseAddr("::ffff:127.0.0.2"), "entry2")
c.Put(t3, netip.MustParseAddr("::ffff:127.0.0.3"), "entry3")
c.Put(t3, netip.MustParseAddr("::ffff:127.0.0.3"), "entry4")
expectCacheGet(t, c, "127.0.0.1", "entry1", true)
expectCacheGet(t, c, "127.0.0.2", "entry2", true)
expectCacheGet(t, c, "127.0.0.3", "entry4", true)
expectCacheGet(t, c, "127.0.0.4", "", false)
got := c.Items()
expected := map[netip.Addr]string{
netip.MustParseAddr("::ffff:127.0.0.1"): "entry1",
netip.MustParseAddr("::ffff:127.0.0.2"): "entry2",
netip.MustParseAddr("::ffff:127.0.0.3"): "entry4",
}
if diff := helpers.Diff(got, expected); diff != "" {
t.Errorf("Items() (-got, +want):\n%s", diff)
}
}
func TestDeleteLastAccessedBefore(t *testing.T) {
c := cache.New[netip.Addr, string]()
t1 := time.Date(2022, time.December, 31, 10, 23, 0, 0, time.UTC)
t2 := t1.Add(time.Minute)
t3 := t2.Add(time.Minute)
c.Put(t1, netip.MustParseAddr("::ffff:127.0.0.1"), "entry1")
c.Put(t2, netip.MustParseAddr("::ffff:127.0.0.2"), "entry2")
c.Put(t3, netip.MustParseAddr("::ffff:127.0.0.3"), "entry3")
t4 := t3.Add(time.Minute)
c.Get(t4, netip.MustParseAddr("::ffff:127.0.0.1"))
c.Get(time.Time{}, netip.MustParseAddr("::ffff:127.0.0.2"))
c.DeleteLastAccessedBefore(t1)
expectCacheGet(t, c, "127.0.0.1", "entry1", true)
expectCacheGet(t, c, "127.0.0.2", "entry2", true)
expectCacheGet(t, c, "127.0.0.3", "entry3", true)
c.DeleteLastAccessedBefore(t2)
expectCacheGet(t, c, "127.0.0.1", "entry1", true)
expectCacheGet(t, c, "127.0.0.2", "entry2", true)
expectCacheGet(t, c, "127.0.0.3", "entry3", true)
c.DeleteLastAccessedBefore(t3)
expectCacheGet(t, c, "127.0.0.1", "entry1", true)
expectCacheGet(t, c, "127.0.0.2", "", false)
expectCacheGet(t, c, "127.0.0.3", "entry3", true)
if count := c.DeleteLastAccessedBefore(t4); count != 1 {
t.Errorf("DeleteLastAccessedBefore(): got %d, expected %d", count, 1)
}
expectCacheGet(t, c, "127.0.0.1", "entry1", true)
expectCacheGet(t, c, "127.0.0.2", "", false)
expectCacheGet(t, c, "127.0.0.3", "", false)
c.DeleteLastAccessedBefore(t4.Add(time.Minute))
expectCacheGet(t, c, "127.0.0.1", "", false)
expectCacheGet(t, c, "127.0.0.2", "", false)
expectCacheGet(t, c, "127.0.0.3", "", false)
}
func TestItemsLastUpdatedBefore(t *testing.T) {
c := cache.New[netip.Addr, string]()
t1 := time.Date(2022, time.December, 31, 10, 23, 0, 0, time.UTC)
t2 := t1.Add(time.Minute)
t3 := t2.Add(time.Minute)
c.Put(t1, netip.MustParseAddr("::ffff:127.0.0.1"), "entry1")
c.Put(t2, netip.MustParseAddr("::ffff:127.0.0.2"), "entry2")
c.Put(t3, netip.MustParseAddr("::ffff:127.0.0.3"), "entry3")
t4 := t3.Add(time.Minute)
c.Put(t4, netip.MustParseAddr("::ffff:127.0.0.1"), "entry4")
got := c.ItemsLastUpdatedBefore(t3)
expected := map[netip.Addr]string{
netip.MustParseAddr("::ffff:127.0.0.2"): "entry2",
}
if diff := helpers.Diff(got, expected); diff != "" {
t.Errorf("ItemsLastUpdatedBefore() (-got, +want):\n%s", diff)
}
}

118
common/helpers/cache/persist.go vendored Normal file
View File

@@ -0,0 +1,118 @@
// SPDX-FileCopyrightText: 2023 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package cache
import (
"bytes"
"encoding/gob"
"fmt"
"io/ioutil"
"os"
"path/filepath"
)
// Save persists the cache to the specified file
func (c *Cache[K, V]) Save(cacheFile string) error {
tmpFile, err := ioutil.TempFile(
filepath.Dir(cacheFile),
fmt.Sprintf("%s-*", filepath.Base(cacheFile)))
if err != nil {
return fmt.Errorf("unable to create cache file %q: %w", cacheFile, err)
}
defer func() {
tmpFile.Close() // ignore errors
os.Remove(tmpFile.Name()) // ignore errors
}()
// Write cache
encoder := gob.NewEncoder(tmpFile)
if err := encoder.Encode(c); err != nil {
return fmt.Errorf("unable to encode cache: %w", err)
}
// Move cache to new location
if err := os.Rename(tmpFile.Name(), cacheFile); err != nil {
return fmt.Errorf("unable to write cache file %q: %w", cacheFile, err)
}
return nil
}
// Load loads the cache from the provided location.
func (c *Cache[K, V]) Load(cacheFile string) error {
f, err := os.Open(cacheFile)
if err != nil {
return fmt.Errorf("unable to load cache %q: %w", cacheFile, err)
}
decoder := gob.NewDecoder(f)
if err := decoder.Decode(c); err != nil {
return fmt.Errorf("unable to decode cache: %w", err)
}
return nil
}
// currentVersionNumber should be increased each time we change the way we
// encode the cache.
var currentVersionNumber = 10
// GobEncode encodes the cache
func (c *Cache[K, V]) GobEncode() ([]byte, error) {
var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)
// Encode version
if err := encoder.Encode(&currentVersionNumber); err != nil {
return nil, err
}
// Encode a representation of K and V
var zeroK K
var zeroV V
if err := encoder.Encode(&zeroK); err != nil {
return nil, err
}
if err := encoder.Encode(&zeroV); err != nil {
return nil, err
}
c.mu.RLock()
defer c.mu.RUnlock()
if err := encoder.Encode(c.items); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// GobDecode decodes the cache
func (c *Cache[K, V]) GobDecode(data []byte) error {
buf := bytes.NewBuffer(data)
decoder := gob.NewDecoder(buf)
// Check version
version := currentVersionNumber
if err := decoder.Decode(&version); err != nil {
return err
}
if version != currentVersionNumber {
return ErrVersion
}
// Check correct encoding of K and V
var zeroK K
var zeroV V
if err := decoder.Decode(&zeroK); err != nil {
return ErrVersion
}
if err := decoder.Decode(&zeroV); err != nil {
return ErrVersion
}
items := map[K]*item[V]{}
if err := decoder.Decode(&items); err != nil {
return err
}
c.mu.Lock()
c.items = items
c.mu.Unlock()
return nil
}

63
common/helpers/cache/persist_test.go vendored Normal file
View File

@@ -0,0 +1,63 @@
// SPDX-FileCopyrightText: 2023 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package cache_test
import (
"errors"
"io/fs"
"net/netip"
"path/filepath"
"testing"
"time"
"akvorado/common/helpers/cache"
)
func TestLoadNotExist(t *testing.T) {
c := cache.New[netip.Addr, string]()
err := c.Load("/i/do/not/exist")
if !errors.Is(err, fs.ErrNotExist) {
t.Fatalf("c.Load() error:\n%s", err)
}
}
func TestSaveLoad(t *testing.T) {
c := cache.New[netip.Addr, string]()
t1 := time.Date(2022, time.December, 31, 10, 23, 0, 0, time.UTC)
t2 := t1.Add(time.Minute)
t3 := t2.Add(time.Minute)
c.Put(t1, netip.MustParseAddr("::ffff:127.0.0.1"), "entry1")
c.Put(t2, netip.MustParseAddr("::ffff:127.0.0.2"), "entry2")
c.Put(t3, netip.MustParseAddr("::ffff:127.0.0.3"), "entry3")
target := filepath.Join(t.TempDir(), "cache")
if err := c.Save(target); err != nil {
t.Fatalf("c.Save() error:\n%s", err)
}
c = cache.New[netip.Addr, string]()
if err := c.Load(target); err != nil {
t.Fatalf("c.Load() error:\n%s", err)
}
expectCacheGet(t, c, "127.0.0.1", "entry1", true)
expectCacheGet(t, c, "127.0.0.2", "entry2", true)
expectCacheGet(t, c, "127.0.0.3", "entry3", true)
}
func TestLoadMismatchVersion(t *testing.T) {
c1 := cache.New[netip.Addr, string]()
c1.Put(time.Now(), netip.MustParseAddr("::ffff:127.0.0.1"), "entry1")
target := filepath.Join(t.TempDir(), "cache")
if err := c1.Save(target); err != nil {
t.Fatalf("c.Save() error:\n%s", err)
}
// Try to load it
c2 := cache.New[netip.Addr, int]()
if err := c2.Load(target); !errors.Is(err, cache.ErrVersion) {
t.Fatalf("c.Load() error:\n%s", err)
}
}

View File

@@ -6,11 +6,8 @@ package core
import (
"net/netip"
"strconv"
"time"
"akvorado/common/reporter"
"akvorado/common/schema"
"akvorado/inlet/snmp"
)
// exporterAndInterfaceInfo aggregates both exporter info and interface info
@@ -25,15 +22,10 @@ func (c *Component) enrichFlow(exporterIP netip.Addr, exporterStr string, flow *
var flowInIfName, flowInIfDescription, flowOutIfName, flowOutIfDescription string
var flowInIfSpeed, flowOutIfSpeed uint32
errLogger := c.r.Sample(reporter.BurstSampler(time.Minute, 10))
if flow.InIf != 0 {
exporterName, iface, err := c.d.SNMP.Lookup(exporterIP, uint(flow.InIf))
if err != nil {
if err != snmp.ErrCacheMiss {
errLogger.Err(err).Str("exporter", exporterStr).Msg("unable to query SNMP cache")
}
c.metrics.flowsErrors.WithLabelValues(exporterStr, err.Error()).Inc()
exporterName, iface, ok := c.d.SNMP.Lookup(exporterIP, uint(flow.InIf))
if !ok {
c.metrics.flowsErrors.WithLabelValues(exporterStr, "SNMP cache miss").Inc()
skip = true
} else {
flowExporterName = exporterName
@@ -44,15 +36,12 @@ func (c *Component) enrichFlow(exporterIP netip.Addr, exporterStr string, flow *
}
if flow.OutIf != 0 {
exporterName, iface, err := c.d.SNMP.Lookup(exporterIP, uint(flow.OutIf))
if err != nil {
exporterName, iface, ok := c.d.SNMP.Lookup(exporterIP, uint(flow.OutIf))
if !ok {
// Only register a cache miss if we don't have one.
// TODO: maybe we could do one SNMP query for both interfaces.
if !skip {
if err != snmp.ErrCacheMiss {
errLogger.Err(err).Str("exporter", exporterStr).Msg("unable to query SNMP cache")
}
c.metrics.flowsErrors.WithLabelValues(exporterStr, err.Error()).Inc()
c.metrics.flowsErrors.WithLabelValues(exporterStr, "SNMP cache miss").Inc()
skip = true
}
} else {

View File

@@ -4,55 +4,29 @@
package snmp
import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"io/ioutil"
"net/netip"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/benbjohnson/clock"
"akvorado/common/helpers/cache"
"akvorado/common/reporter"
)
var (
// ErrCacheMiss is triggered on lookup cache miss
ErrCacheMiss = errors.New("SNMP cache miss")
// ErrCacheVersion is triggered when loading a cache from an incompatible version
ErrCacheVersion = errors.New("SNMP cache version mismatch")
// cacheCurrentVersionNumber is the current version of the on-disk cache format
cacheCurrentVersionNumber = 9
)
// snmpCache represents the SNMP cache.
type snmpCache struct {
r *reporter.Reporter
cache map[netip.Addr]*cachedExporter
cacheLock sync.RWMutex
clock clock.Clock
r *reporter.Reporter
cache *cache.Cache[key, value]
clock clock.Clock
metrics struct {
cacheHit reporter.Counter
cacheMiss reporter.Counter
cacheExpired reporter.Counter
cacheSize reporter.GaugeFunc
cacheExporters reporter.GaugeFunc
cacheHit reporter.Counter
cacheMiss reporter.Counter
cacheExpired reporter.Counter
cacheSize reporter.GaugeFunc
}
}
// cachedExporter represents information about a exporter. It includes
// the mapping from ifIndex to interfaces.
type cachedExporter struct {
Name string
Interfaces map[uint]*cachedInterface
}
// Interface contains the information about an interface.
type Interface struct {
Name string
@@ -60,17 +34,19 @@ type Interface struct {
Speed uint
}
// cachedInterface contains the information about a cached interface.
type cachedInterface struct {
LastUpdated int64
LastAccessed int64
type key struct {
IP netip.Addr
Index uint
}
type value struct {
ExporterName string
Interface
}
func newSNMPCache(r *reporter.Reporter, clock clock.Clock) *snmpCache {
sc := &snmpCache{
r: r,
cache: make(map[netip.Addr]*cachedExporter),
cache: cache.New[key, value](),
clock: clock,
}
sc.metrics.cacheHit = r.Counter(
@@ -92,205 +68,71 @@ func newSNMPCache(r *reporter.Reporter, clock clock.Clock) *snmpCache {
reporter.GaugeOpts{
Name: "cache_size",
Help: "Number of entries in cache.",
}, func() (result float64) {
sc.cacheLock.RLock()
defer sc.cacheLock.RUnlock()
for _, exporter := range sc.cache {
result += float64(len(exporter.Interfaces))
}
return
})
sc.metrics.cacheExporters = r.GaugeFunc(
reporter.GaugeOpts{
Name: "cache_exporters",
Help: "Number of exporters in cache.",
}, func() float64 {
sc.cacheLock.RLock()
defer sc.cacheLock.RUnlock()
return float64(len(sc.cache))
return float64(sc.cache.Size())
})
return sc
}
// Lookup will perform a lookup of the cache. It returns the exporter
// name as well as the requested interface.
func (sc *snmpCache) Lookup(ip netip.Addr, ifIndex uint) (string, Interface, error) {
return sc.lookup(ip, ifIndex, true)
func (sc *snmpCache) Lookup(ip netip.Addr, index uint) (string, Interface, bool) {
return sc.lookup(ip, index, true)
}
func (sc *snmpCache) lookup(ip netip.Addr, ifIndex uint, touchAccess bool) (string, Interface, error) {
sc.cacheLock.RLock()
defer sc.cacheLock.RUnlock()
exporter, ok := sc.cache[ip]
if !ok {
sc.metrics.cacheMiss.Inc()
return "", Interface{}, ErrCacheMiss
func (sc *snmpCache) lookup(ip netip.Addr, index uint, touchAccess bool) (string, Interface, bool) {
t := time.Time{}
if touchAccess {
t = sc.clock.Now()
}
iface, ok := exporter.Interfaces[ifIndex]
result, ok := sc.cache.Get(t, key{ip, index})
if !ok {
sc.metrics.cacheMiss.Inc()
return "", Interface{}, ErrCacheMiss
return "", Interface{}, false
}
sc.metrics.cacheHit.Inc()
if touchAccess {
atomic.StoreInt64(&iface.LastAccessed, sc.clock.Now().Unix())
}
return exporter.Name, iface.Interface, nil
return result.ExporterName, result.Interface, true
}
// Put a new entry in the cache.
func (sc *snmpCache) Put(ip netip.Addr, exporterName string, ifIndex uint, iface Interface) {
sc.cacheLock.Lock()
defer sc.cacheLock.Unlock()
now := sc.clock.Now().Unix()
ciface := cachedInterface{
LastUpdated: now,
LastAccessed: now,
func (sc *snmpCache) Put(ip netip.Addr, exporterName string, index uint, iface Interface) {
t := sc.clock.Now()
sc.cache.Put(t, key{ip, index}, value{
ExporterName: exporterName,
Interface: iface,
}
exporter, ok := sc.cache[ip]
if !ok {
exporter = &cachedExporter{Interfaces: make(map[uint]*cachedInterface)}
sc.cache[ip] = exporter
}
exporter.Name = exporterName
exporter.Interfaces[ifIndex] = &ciface
})
}
// Expire expire entries older than the provided duration (rely on last access).
func (sc *snmpCache) Expire(older time.Duration) (count uint) {
threshold := sc.clock.Now().Add(-older).Unix()
sc.cacheLock.Lock()
defer sc.cacheLock.Unlock()
for ip, exporter := range sc.cache {
for ifindex, iface := range exporter.Interfaces {
if iface.LastAccessed < threshold {
delete(exporter.Interfaces, ifindex)
sc.metrics.cacheExpired.Inc()
count++
}
}
if len(exporter.Interfaces) == 0 {
delete(sc.cache, ip)
}
}
return
func (sc *snmpCache) Expire(older time.Duration) int {
threshold := sc.clock.Now().Add(-older)
expired := sc.cache.DeleteLastAccessedBefore(threshold)
sc.metrics.cacheExpired.Add(float64(expired))
return expired
}
// Return entries older than the provided duration. If LastAccessed is
// true, rely on last access, otherwise on last update.
func (sc *snmpCache) entriesOlderThan(older time.Duration, lastAccessed bool) map[netip.Addr]map[uint]Interface {
threshold := sc.clock.Now().Add(-older).Unix()
result := make(map[netip.Addr]map[uint]Interface)
sc.cacheLock.RLock()
defer sc.cacheLock.RUnlock()
for ip, exporter := range sc.cache {
for ifindex, iface := range exporter.Interfaces {
what := &iface.LastAccessed
if !lastAccessed {
what = &iface.LastUpdated
}
if atomic.LoadInt64(what) < threshold {
_, ok := result[ip]
if !ok {
rifaces := make(map[uint]Interface)
result[ip] = rifaces
}
result[ip][ifindex] = iface.Interface
}
// NeedUpdates returns a map of interface entries that would need to
// be updated. It relies on last update.
func (sc *snmpCache) NeedUpdates(older time.Duration) map[netip.Addr]map[uint]Interface {
t := sc.clock.Now().Add(-older)
result := map[netip.Addr]map[uint]Interface{}
for k, v := range sc.cache.ItemsLastUpdatedBefore(t) {
interfaces, ok := result[k.IP]
if !ok {
interfaces = map[uint]Interface{}
result[k.IP] = interfaces
}
interfaces[k.Index] = v.Interface
}
return result
}
// Need updates returns a map of interface entries that would need to
// be updated. It relies on last update.
func (sc *snmpCache) NeedUpdates(older time.Duration) map[netip.Addr]map[uint]Interface {
return sc.entriesOlderThan(older, false)
}
// Need updates returns a map of interface entries that would have
// expired. It relies on last access.
func (sc *snmpCache) WouldExpire(older time.Duration) map[netip.Addr]map[uint]Interface {
return sc.entriesOlderThan(older, true)
}
// Save stores the cache to the provided location.
func (sc *snmpCache) Save(cacheFile string) error {
tmpFile, err := ioutil.TempFile(
filepath.Dir(cacheFile),
fmt.Sprintf("%s-*", filepath.Base(cacheFile)))
if err != nil {
return fmt.Errorf("unable to create cache file %q: %w", cacheFile, err)
}
defer func() {
tmpFile.Close() // ignore errors
os.Remove(tmpFile.Name()) // ignore errors
}()
// Write cache
encoder := gob.NewEncoder(tmpFile)
if err := encoder.Encode(sc); err != nil {
return fmt.Errorf("unable to encode cache: %w", err)
}
// Move cache to new location
if err := os.Rename(tmpFile.Name(), cacheFile); err != nil {
return fmt.Errorf("unable to write cache file %q: %w", cacheFile, err)
}
return nil
return sc.cache.Save(cacheFile)
}
// Load loads the cache from the provided location.
func (sc *snmpCache) Load(cacheFile string) error {
f, err := os.Open(cacheFile)
if err != nil {
return fmt.Errorf("unable to load cache %q: %w", cacheFile, err)
}
decoder := gob.NewDecoder(f)
if err := decoder.Decode(sc); err != nil {
return fmt.Errorf("unable to decode cache: %w", err)
}
return nil
}
// GobEncode encodes the SNMP cache.
func (sc *snmpCache) GobEncode() ([]byte, error) {
var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)
if err := encoder.Encode(&cacheCurrentVersionNumber); err != nil {
return nil, err
}
sc.cacheLock.Lock() // needed because we won't do atomic access
defer sc.cacheLock.Unlock()
if err := encoder.Encode(sc.cache); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// GobDecode decodes the SNMP cache.
func (sc *snmpCache) GobDecode(data []byte) error {
buf := bytes.NewBuffer(data)
decoder := gob.NewDecoder(buf)
version := cacheCurrentVersionNumber
if err := decoder.Decode(&version); err != nil {
return err
}
if version != cacheCurrentVersionNumber {
return ErrCacheVersion
}
cache := map[netip.Addr]*cachedExporter{}
if err := decoder.Decode(&cache); err != nil {
return err
}
sc.cacheLock.Lock()
sc.cache = cache
sc.cacheLock.Unlock()
return nil
return sc.cache.Load(cacheFile)
}

View File

@@ -33,15 +33,15 @@ func setupTestCache(t *testing.T) (*reporter.Reporter, *clock.Mock, *snmpCache)
type answer struct {
ExporterName string
Interface Interface
Err error
NOk bool
}
func expectCacheLookup(t *testing.T, sc *snmpCache, exporterIP string, ifIndex uint, expected answer) {
t.Helper()
ip := netip.MustParseAddr(exporterIP)
ip = netip.AddrFrom16(ip.As16())
gotExporterName, gotInterface, err := sc.lookup(ip, ifIndex, false)
got := answer{gotExporterName, gotInterface, err}
gotExporterName, gotInterface, ok := sc.lookup(ip, ifIndex, false)
got := answer{gotExporterName, gotInterface, !ok}
if diff := helpers.Diff(got, expected); diff != "" {
t.Errorf("Lookup() (-got, +want):\n%s", diff)
}
@@ -49,15 +49,14 @@ func expectCacheLookup(t *testing.T, sc *snmpCache, exporterIP string, ifIndex u
func TestGetEmpty(t *testing.T) {
r, _, sc := setupTestCache(t)
expectCacheLookup(t, sc, "127.0.0.1", 676, answer{Err: ErrCacheMiss})
expectCacheLookup(t, sc, "127.0.0.1", 676, answer{NOk: true})
gotMetrics := r.GetMetrics("akvorado_inlet_snmp_cache_")
expectedMetrics := map[string]string{
`expired`: "0",
`hit`: "0",
`miss`: "1",
`size`: "0",
`exporters`: "0",
`expired`: "0",
`hit`: "0",
`miss`: "1",
`size`: "0",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
@@ -70,16 +69,15 @@ func TestSimpleLookup(t *testing.T) {
expectCacheLookup(t, sc, "127.0.0.1", 676, answer{
ExporterName: "localhost",
Interface: Interface{Name: "Gi0/0/0/1", Description: "Transit", Speed: 1000}})
expectCacheLookup(t, sc, "127.0.0.1", 787, answer{Err: ErrCacheMiss})
expectCacheLookup(t, sc, "127.0.0.2", 676, answer{Err: ErrCacheMiss})
expectCacheLookup(t, sc, "127.0.0.1", 787, answer{NOk: true})
expectCacheLookup(t, sc, "127.0.0.2", 676, answer{NOk: true})
gotMetrics := r.GetMetrics("akvorado_inlet_snmp_cache_")
expectedMetrics := map[string]string{
`expired`: "0",
`hit`: "1",
`miss`: "2",
`size`: "1",
`exporters`: "1",
`expired`: "0",
`hit`: "1",
`miss`: "2",
`size`: "1",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
@@ -96,7 +94,7 @@ func TestExpire(t *testing.T) {
clock.Add(10 * time.Minute)
sc.Expire(time.Hour)
expectCacheLookup(t, sc, "127.0.0.1", 676, answer{
ExporterName: "localhost2",
ExporterName: "localhost",
Interface: Interface{Name: "Gi0/0/0/1", Description: "Transit"}})
expectCacheLookup(t, sc, "127.0.0.1", 678, answer{
ExporterName: "localhost2",
@@ -105,7 +103,7 @@ func TestExpire(t *testing.T) {
ExporterName: "localhost3",
Interface: Interface{Name: "Gi0/0/0/1", Description: "IX"}})
sc.Expire(29 * time.Minute)
expectCacheLookup(t, sc, "127.0.0.1", 676, answer{Err: ErrCacheMiss})
expectCacheLookup(t, sc, "127.0.0.1", 676, answer{NOk: true})
expectCacheLookup(t, sc, "127.0.0.1", 678, answer{
ExporterName: "localhost2",
Interface: Interface{Name: "Gi0/0/0/2", Description: "Peering"}})
@@ -113,15 +111,15 @@ func TestExpire(t *testing.T) {
ExporterName: "localhost3",
Interface: Interface{Name: "Gi0/0/0/1", Description: "IX"}})
sc.Expire(19 * time.Minute)
expectCacheLookup(t, sc, "127.0.0.1", 676, answer{Err: ErrCacheMiss})
expectCacheLookup(t, sc, "127.0.0.1", 678, answer{Err: ErrCacheMiss})
expectCacheLookup(t, sc, "127.0.0.1", 676, answer{NOk: true})
expectCacheLookup(t, sc, "127.0.0.1", 678, answer{NOk: true})
expectCacheLookup(t, sc, "127.0.0.2", 678, answer{
ExporterName: "localhost3",
Interface: Interface{Name: "Gi0/0/0/1", Description: "IX"}})
sc.Expire(9 * time.Minute)
expectCacheLookup(t, sc, "127.0.0.1", 676, answer{Err: ErrCacheMiss})
expectCacheLookup(t, sc, "127.0.0.1", 678, answer{Err: ErrCacheMiss})
expectCacheLookup(t, sc, "127.0.0.2", 678, answer{Err: ErrCacheMiss})
expectCacheLookup(t, sc, "127.0.0.1", 676, answer{NOk: true})
expectCacheLookup(t, sc, "127.0.0.1", 678, answer{NOk: true})
expectCacheLookup(t, sc, "127.0.0.2", 678, answer{NOk: true})
sc.Put(netip.MustParseAddr("::ffff:127.0.0.1"), "localhost", 676, Interface{Name: "Gi0/0/0/1", Description: "Transit"})
clock.Add(10 * time.Minute)
sc.Expire(19 * time.Minute)
@@ -131,11 +129,10 @@ func TestExpire(t *testing.T) {
gotMetrics := r.GetMetrics("akvorado_inlet_snmp_cache_")
expectedMetrics := map[string]string{
`expired`: "3",
`hit`: "7",
`miss`: "6",
`size`: "1",
`exporters`: "1",
`expired`: "3",
`hit`: "7",
`miss`: "6",
`size`: "1",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
@@ -159,62 +156,12 @@ func TestExpireRefresh(t *testing.T) {
expectCacheLookup(t, sc, "127.0.0.1", 676, answer{
ExporterName: "localhost",
Interface: Interface{Name: "Gi0/0/0/1", Description: "Transit"}})
expectCacheLookup(t, sc, "127.0.0.1", 678, answer{Err: ErrCacheMiss})
expectCacheLookup(t, sc, "127.0.0.1", 678, answer{NOk: true})
expectCacheLookup(t, sc, "127.0.0.2", 678, answer{
ExporterName: "localhost2",
Interface: Interface{Name: "Gi0/0/0/1", Description: "IX"}})
}
func TestWouldExpire(t *testing.T) {
_, clock, sc := setupTestCache(t)
sc.Put(netip.MustParseAddr("::ffff:127.0.0.1"), "localhost", 676, Interface{Name: "Gi0/0/0/1", Description: "Transit"})
clock.Add(10 * time.Minute)
sc.Put(netip.MustParseAddr("::ffff:127.0.0.1"), "localhost", 678, Interface{Name: "Gi0/0/0/2", Description: "Peering"})
clock.Add(10 * time.Minute)
sc.Put(netip.MustParseAddr("::ffff:127.0.0.2"), "localhost2", 678, Interface{Name: "Gi0/0/0/1", Description: "IX"})
clock.Add(10 * time.Minute)
// Refresh
sc.Lookup(netip.MustParseAddr("::ffff:127.0.0.1"), 676)
clock.Add(10 * time.Minute)
cases := []struct {
Minutes time.Duration
Expected map[string]map[uint]Interface
}{
{9, map[string]map[uint]Interface{
"::ffff:127.0.0.1": {
676: Interface{Name: "Gi0/0/0/1", Description: "Transit"},
678: Interface{Name: "Gi0/0/0/2", Description: "Peering"},
},
"::ffff:127.0.0.2": {
678: Interface{Name: "Gi0/0/0/1", Description: "IX"},
},
}},
{19, map[string]map[uint]Interface{
"::ffff:127.0.0.1": {
678: Interface{Name: "Gi0/0/0/2", Description: "Peering"},
},
"::ffff:127.0.0.2": {
678: Interface{Name: "Gi0/0/0/1", Description: "IX"},
},
}},
{29, map[string]map[uint]Interface{
"::ffff:127.0.0.1": {
678: Interface{Name: "Gi0/0/0/2", Description: "Peering"},
},
}},
{39, map[string]map[uint]Interface{}},
}
for _, tc := range cases {
t.Run(fmt.Sprintf("%d minutes", tc.Minutes), func(t *testing.T) {
got := sc.WouldExpire(tc.Minutes * time.Minute)
if diff := helpers.Diff(got, tc.Expected); diff != "" {
t.Fatalf("WouldExpire(%d minutes) (-got, +want):\n%s", tc.Minutes, diff)
}
})
}
}
func TestNeedUpdates(t *testing.T) {
_, clock, sc := setupTestCache(t)
sc.Put(netip.MustParseAddr("::ffff:127.0.0.1"), "localhost", 676, Interface{Name: "Gi0/0/0/1", Description: "Transit"})
@@ -293,7 +240,7 @@ func TestSaveLoad(t *testing.T) {
}
sc.Expire(29 * time.Minute)
expectCacheLookup(t, sc, "127.0.0.1", 676, answer{Err: ErrCacheMiss})
expectCacheLookup(t, sc, "127.0.0.1", 676, answer{NOk: true})
expectCacheLookup(t, sc, "127.0.0.1", 678, answer{
ExporterName: "localhost",
Interface: Interface{Name: "Gi0/0/0/2", Description: "Peering"}})
@@ -302,25 +249,6 @@ func TestSaveLoad(t *testing.T) {
Interface: Interface{Name: "Gi0/0/0/1", Description: "IX", Speed: 1000}})
}
func TestLoadMismatchVersion(t *testing.T) {
_, _, sc := setupTestCache(t)
sc.Put(netip.MustParseAddr("::ffff:127.0.0.1"), "localhost", 676, Interface{Name: "Gi0/0/0/1", Description: "Transit"})
target := filepath.Join(t.TempDir(), "cache")
cacheCurrentVersionNumber++
if err := sc.Save(target); err != nil {
cacheCurrentVersionNumber--
t.Fatalf("sc.Save() error:\n%s", err)
}
cacheCurrentVersionNumber--
// Try to load it
_, _, sc = setupTestCache(t)
if err := sc.Load(target); !errors.Is(err, ErrCacheVersion) {
t.Fatalf("sc.Load() error:\n%s", err)
}
}
func TestConcurrentOperations(t *testing.T) {
r, clock, sc := setupTestCache(t)
done := make(chan bool)

View File

@@ -234,9 +234,9 @@ type lookupRequest struct {
// Lookup for interface information for the provided exporter and ifIndex.
// If the information is not in the cache, it will be polled, but
// won't be returned immediately.
func (c *Component) Lookup(exporterIP netip.Addr, ifIndex uint) (string, Interface, error) {
exporterName, iface, err := c.sc.Lookup(exporterIP, ifIndex)
if errors.Is(err, ErrCacheMiss) {
func (c *Component) Lookup(exporterIP netip.Addr, ifIndex uint) (string, Interface, bool) {
exporterName, iface, ok := c.sc.Lookup(exporterIP, ifIndex)
if !ok {
req := lookupRequest{
ExporterIP: exporterIP,
IfIndexes: []uint{ifIndex},
@@ -247,7 +247,7 @@ func (c *Component) Lookup(exporterIP netip.Addr, ifIndex uint) (string, Interfa
c.metrics.pollerBusyCount.WithLabelValues(exporterIP.Unmap().String()).Inc()
}
}
return exporterName, iface, err
return exporterName, iface, ok
}
// Dispatch an incoming request to workers. May handle more than the

View File

@@ -22,8 +22,8 @@ import (
func expectSNMPLookup(t *testing.T, c *Component, exporter string, ifIndex uint, expected answer) {
t.Helper()
ip := netip.AddrFrom16(netip.MustParseAddr(exporter).As16())
gotExporterName, gotInterface, err := c.Lookup(ip, ifIndex)
got := answer{gotExporterName, gotInterface, err}
gotExporterName, gotInterface, ok := c.Lookup(ip, ifIndex)
got := answer{gotExporterName, gotInterface, !ok}
if diff := helpers.Diff(got, expected); diff != "" {
t.Fatalf("Lookup() (-got, +want):\n%s", diff)
}
@@ -33,7 +33,7 @@ func TestLookup(t *testing.T) {
r := reporter.NewMock(t)
c := NewMock(t, r, DefaultConfiguration(), Dependencies{Daemon: daemon.NewMock(t)})
expectSNMPLookup(t, c, "127.0.0.1", 765, answer{Err: ErrCacheMiss})
expectSNMPLookup(t, c, "127.0.0.1", 765, answer{NOk: true})
time.Sleep(30 * time.Millisecond)
expectSNMPLookup(t, c, "127.0.0.1", 765, answer{
ExporterName: "127_0_0_1",
@@ -52,7 +52,7 @@ func TestSNMPCommunities(t *testing.T) {
c := NewMock(t, r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
// Use "public" as a community. Should work.
expectSNMPLookup(t, c, "127.0.0.1", 765, answer{Err: ErrCacheMiss})
expectSNMPLookup(t, c, "127.0.0.1", 765, answer{NOk: true})
time.Sleep(30 * time.Millisecond)
expectSNMPLookup(t, c, "127.0.0.1", 765, answer{
ExporterName: "127_0_0_1",
@@ -60,14 +60,14 @@ func TestSNMPCommunities(t *testing.T) {
})
// Use "private", should not work
expectSNMPLookup(t, c, "127.0.0.2", 765, answer{Err: ErrCacheMiss})
expectSNMPLookup(t, c, "127.0.0.2", 765, answer{NOk: true})
time.Sleep(30 * time.Millisecond)
expectSNMPLookup(t, c, "127.0.0.2", 765, answer{Err: ErrCacheMiss})
expectSNMPLookup(t, c, "127.0.0.2", 765, answer{NOk: true})
// Use default community, should not work
expectSNMPLookup(t, c, "127.0.0.3", 765, answer{Err: ErrCacheMiss})
expectSNMPLookup(t, c, "127.0.0.3", 765, answer{NOk: true})
time.Sleep(30 * time.Millisecond)
expectSNMPLookup(t, c, "127.0.0.3", 765, answer{Err: ErrCacheMiss})
expectSNMPLookup(t, c, "127.0.0.3", 765, answer{NOk: true})
}
func TestComponentSaveLoad(t *testing.T) {
@@ -78,7 +78,7 @@ func TestComponentSaveLoad(t *testing.T) {
r := reporter.NewMock(t)
c := NewMock(t, r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
expectSNMPLookup(t, c, "127.0.0.1", 765, answer{Err: ErrCacheMiss})
expectSNMPLookup(t, c, "127.0.0.1", 765, answer{NOk: true})
time.Sleep(30 * time.Millisecond)
expectSNMPLookup(t, c, "127.0.0.1", 765, answer{
ExporterName: "127_0_0_1",
@@ -103,7 +103,7 @@ func TestAutoRefresh(t *testing.T) {
c := NewMock(t, r, configuration, Dependencies{Daemon: daemon.NewMock(t), Clock: mockClock})
// Fetch a value
expectSNMPLookup(t, c, "127.0.0.1", 765, answer{Err: ErrCacheMiss})
expectSNMPLookup(t, c, "127.0.0.1", 765, answer{NOk: true})
time.Sleep(30 * time.Millisecond)
expectSNMPLookup(t, c, "127.0.0.1", 765, answer{
ExporterName: "127_0_0_1",
@@ -132,7 +132,6 @@ func TestAutoRefresh(t *testing.T) {
`hit`: "4",
`miss`: "1",
`size`: "1",
`exporters`: "1",
`refresh_runs`: "31", // 63/2
`refresh`: "1",
}
@@ -200,16 +199,19 @@ func TestCoalescing(t *testing.T) {
blocker := make(chan bool)
c.dispatcherBChannel <- blocker
// Queue requests
expectSNMPLookup(t, c, "127.0.0.1", 766, answer{Err: ErrCacheMiss})
expectSNMPLookup(t, c, "127.0.0.1", 767, answer{Err: ErrCacheMiss})
expectSNMPLookup(t, c, "127.0.0.1", 768, answer{Err: ErrCacheMiss})
expectSNMPLookup(t, c, "127.0.0.1", 769, answer{Err: ErrCacheMiss})
defer func() {
// Unblock
time.Sleep(20 * time.Millisecond)
close(blocker)
time.Sleep(20 * time.Millisecond)
}()
// Queue requests
expectSNMPLookup(t, c, "127.0.0.1", 766, answer{NOk: true})
expectSNMPLookup(t, c, "127.0.0.1", 767, answer{NOk: true})
expectSNMPLookup(t, c, "127.0.0.1", 768, answer{NOk: true})
expectSNMPLookup(t, c, "127.0.0.1", 769, answer{NOk: true})
// Unblock
time.Sleep(20 * time.Millisecond)
close(blocker)
time.Sleep(20 * time.Millisecond)
})
gotMetrics := r.GetMetrics("akvorado_inlet_snmp_poller_", "coalesced_count")
@@ -299,7 +301,7 @@ func TestAgentMapping(t *testing.T) {
c := NewMock(t, r, config, Dependencies{Daemon: daemon.NewMock(t)})
c.poller = alp
expectSNMPLookup(t, c, "192.0.2.1", 766, answer{Err: ErrCacheMiss})
expectSNMPLookup(t, c, "192.0.2.1", 766, answer{NOk: true})
time.Sleep(20 * time.Millisecond)
alp.mu.Lock()
if alp.lastAgent != "192.0.2.10" {
@@ -307,7 +309,7 @@ func TestAgentMapping(t *testing.T) {
t.Fatalf("last agent should have been 192.0.2.10, not %s", alp.lastAgent)
}
alp.mu.Unlock()
expectSNMPLookup(t, c, "192.0.2.2", 766, answer{Err: ErrCacheMiss})
expectSNMPLookup(t, c, "192.0.2.2", 766, answer{NOk: true})
time.Sleep(20 * time.Millisecond)
alp.mu.Lock()
if alp.lastAgent != "192.0.2.2" {