diff --git a/common/helpers/intern.go b/common/helpers/intern.go index 429fd6af..ed5df770 100644 --- a/common/helpers/intern.go +++ b/common/helpers/intern.go @@ -146,18 +146,3 @@ func (p *InternPool[T]) Put(value T) InternReference[T] { func (p *InternPool[T]) Len() int { return len(p.values) - len(p.availableIndexes) - 1 } - -// Clone returns a copy of the intern pool. -func (p *InternPool[T]) Clone() *InternPool[T] { - result := &InternPool[T]{ - values: make([]internValue[T], len(p.values)), - availableIndexes: make([]InternReference[T], len(p.availableIndexes)), - valueIndexes: make(map[uint64]InternReference[T], len(p.valueIndexes)), - } - copy(result.values, p.values) - copy(result.availableIndexes, p.availableIndexes) - for k, v := range p.valueIndexes { - result.valueIndexes[k] = v - } - return result -} diff --git a/common/helpers/intern_test.go b/common/helpers/intern_test.go index 2adb3c1f..8ff556e3 100644 --- a/common/helpers/intern_test.go +++ b/common/helpers/intern_test.go @@ -166,30 +166,3 @@ func TestTake(t *testing.T) { t.Fatalf("Take() didn't free everything (%d remaining)", diff) } } - -func TestClone(t *testing.T) { - p := NewInternPool[likeInt]() - a := p.Put(likeInt(10)) - b := p.Put(likeInt(11)) - c := p.Put(likeInt(12)) - - q := p.Clone() - q.Take(a) - q.Take(b) - d := q.Put(likeInt(12)) - e := q.Put(likeInt(13)) - f := p.Put(likeInt(13)) - - if p.Len() != 4 { - t.Errorf("p.Len() should be 4, not %d", q.Len()) - } - if q.Len() != 2 { - t.Errorf("q.Len() should be 2, not %d", q.Len()) - } - if d != c { - t.Error("12 should have the same ref in both") - } - if e == f { - t.Error("13 should not have the same ref in both") - } -} diff --git a/console/data/docs/02-configuration.md b/console/data/docs/02-configuration.md index 6ee77bca..a944d05d 100644 --- a/console/data/docs/02-configuration.md +++ b/console/data/docs/02-configuration.md @@ -129,15 +129,6 @@ The following keys are accepted: not supported) - `keep` tells how much time the routes sent from a terminated BMP connection should be kept -- `rib-mode` gives a choice on how to handle the internal RIB. `memory` (the - default) keeps only one copy of the RIB and lookups may be slightly since they - are processed sequentially by a single routine. `performance` copy the live - RIB to a read-only version from time to time (after `rib-idle-update-delay` - when the RIB is not updated, no more than `rib-minimum-update-delay` but at - least every `rib-maximum-update-delay`). With this mode, lookups can be - processed in parallel but there are three copies of the RIB in memory (in the - future, we may reduce this to two copies). With 1 million routes, a RIB is - about 50 MB. If you are not interested in AS paths and communities, disabling them will decrease the memory usage of *Akvorado*, as well as the disk diff --git a/console/data/docs/05-troubleshooting.md b/console/data/docs/05-troubleshooting.md index 9fc5f5f0..7b2af503 100644 --- a/console/data/docs/05-troubleshooting.md +++ b/console/data/docs/05-troubleshooting.md @@ -206,13 +206,6 @@ for other exporters. However, ensuring the exporters accept to answer requests is the first fix. If not enough, you can increase the number of workers. Workers handle SNMP requests synchronously. -#### BMP collector - -The BMP collector may handle million of routes. This can stress the system -during large updates. If you send many routes to the BMP collector, you can -switch the RIB mode of the collector to `performance`. Check the [documentation -about BMP](02-configuration.md#bmp) for more details. - ### Reported traffic levels are incorrect Use `curl -s http://akvorado/api/v0/inlet/flows\?limit=1 | grep diff --git a/console/data/docs/99-changelog.md b/console/data/docs/99-changelog.md index 02a05086..0366a369 100644 --- a/console/data/docs/99-changelog.md +++ b/console/data/docs/99-changelog.md @@ -13,18 +13,12 @@ identified with a specific icon: ## Unreleased -The BMP collector has been optimized in memory (more than 50%) and locks were -removed to reduce latency during lookups. There are two modes available. They -can be selected with `inlet.bmp.rib-mode`. If you have several million routes, -be sure to check the [BMP documentation](02-configuration.md#bmp) for more -details. - - ✨ *console*: add *100% stacked* graph type - 🩹 *inlet*: handle non-fatal BMP decoding errors more gracefully - 🩹 *inlet*: fix a small memory leak in BMP collector - 🩹 *console*: fix selection of the aggregate table to not get empty graphs - 🩹 *console*: use configured dimensions limit for “Visualize” tab -- 🌱 *inlet*: optimize BMP collector (see above) +- 🌱 *inlet*: optimize BMP CPU usage, memory usage, and lock times - 🌱 *inlet*: replace LRU cache for classifiers by a time-based cache - 🌱 *inlet*: add TLS support for Kafka transport - 🌱 *console*: Ctrl-Enter or Cmd-Enter when editing a filter now applies the changes diff --git a/inlet/bmp/config.go b/inlet/bmp/config.go index d023ab74..66b30f6c 100644 --- a/inlet/bmp/config.go +++ b/inlet/bmp/config.go @@ -3,11 +3,7 @@ package bmp -import ( - "errors" - "strings" - "time" -) +import "time" // Configuration describes the configuration for the BMP server. type Configuration struct { @@ -24,74 +20,27 @@ type Configuration struct { CollectCommunities bool // Keep tells how long to keep routes from a BMP client when it goes down Keep time.Duration `validate:"min=1s"` - // RIBMode tells which mode to use for the RIB - RIBMode RIBMode - // RIBIdleUpdateDelay tells to update the read-only RIB after being idle for - // that duration. This is only when RIB is in performance mode. - RIBIdleUpdateDelay time.Duration `validate:"min=1s"` - // RIBMinimumUpdateDelay tells to not update the read-only RIB less than - // that. This is only when RIB is in performance mode. - RIBMinimumUpdateDelay time.Duration `validate:"min=1s,gtfield=RIBIdleUpdateDelay"` - // RIBMaximumUpdateDelay tells to update the read-only RIB at least once - // every the specified delay (if there are updates). This is only if RIB is - // in performance mode. - RIBMaximumUpdateDelay time.Duration `validate:"min=1s,gtfield=RIBMinimumUpdateDelay"` - // RIBPeerRemovalBatchRoutes tells how many routes to remove before checking - // if we have a higher priority request. This is only if RIB is in memory - // mode. - RIBPeerRemovalBatchRoutes int `validate:"min=1"` + // PeerRemovalMaxTime tells the maximum time the removal worker should run to remove a peer + PeerRemovalMaxTime time.Duration `validate:"min=10ms"` + // PeerRemovalSleepInterval tells how much time to sleep between two runs of the removal worker + PeerRemovalSleepInterval time.Duration `validate:"min=10ms"` + // PeerRemovalMaxQueue tells how many pending removal requests to keep + PeerRemovalMaxQueue int `validate:"min=1"` + // PeerRemovalMinRoutes tells how many routes we have to remove in one run before yielding + PeerRemovalMinRoutes int `validate:"min=1"` } // DefaultConfiguration represents the default configuration for the BMP server func DefaultConfiguration() Configuration { return Configuration{ - Listen: "0.0.0.0:10179", - CollectASNs: true, - CollectASPaths: true, - CollectCommunities: true, - Keep: 5 * time.Minute, - RIBMode: RIBModeMemory, - RIBIdleUpdateDelay: 5 * time.Second, - RIBMinimumUpdateDelay: 20 * time.Second, - RIBMaximumUpdateDelay: 2 * time.Minute, - RIBPeerRemovalBatchRoutes: 1000, + Listen: "0.0.0.0:10179", + CollectASNs: true, + CollectASPaths: true, + CollectCommunities: true, + Keep: 5 * time.Minute, + PeerRemovalMaxTime: 200 * time.Millisecond, + PeerRemovalSleepInterval: 500 * time.Millisecond, + PeerRemovalMaxQueue: 10000, + PeerRemovalMinRoutes: 5000, } } - -// RIBMode is the mode used for the RIB -type RIBMode int - -const ( - // RIBModeMemory tries to minimize used memory - RIBModeMemory RIBMode = iota - // RIBModePerformance keep a read-only copy of the RIB for lookups - RIBModePerformance -) - -// UnmarshalText parses a RIBMode -func (m *RIBMode) UnmarshalText(text []byte) error { - modes := map[string]RIBMode{ - "memory": RIBModeMemory, - "performance": RIBModePerformance, - } - mode, ok := modes[strings.ToLower(string(text))] - if !ok { - return errors.New("unknown RIB mode") - } - *m = mode - return nil -} - -// String turns a RIB mode to a string -func (m RIBMode) String() string { - modes := map[RIBMode]string{ - RIBModeMemory: "memory", - RIBModePerformance: "performance", - } - return modes[m] -} - -// MarshalText turns a RIB mode to a string -func (m RIBMode) MarshalText() ([]byte, error) { - return []byte(m.String()), nil -} diff --git a/inlet/bmp/events.go b/inlet/bmp/events.go index c5b88df1..bcf634f3 100644 --- a/inlet/bmp/events.go +++ b/inlet/bmp/events.go @@ -4,14 +4,11 @@ package bmp import ( - "context" "encoding/binary" - "errors" + "fmt" "net/netip" "time" - "akvorado/common/reporter" - "github.com/osrg/gobgp/v3/pkg/packet/bgp" "github.com/osrg/gobgp/v3/pkg/packet/bmp" ) @@ -28,8 +25,9 @@ type peerKey struct { // peerInfo contains some information attached to a peer. type peerInfo struct { - reference uint32 // used as a reference in the RIB - staleUntil time.Time // when to remove because it is stale + reference uint32 // used as a reference in the RIB + staleUntil time.Time // when to remove because it is stale + marshallingOptions []*bgp.MarshallingOption // decoding option (add-path mostly) } // peerKeyFromBMPPeerHeader computes the peer key from the BMP peer header. @@ -45,223 +43,11 @@ func peerKeyFromBMPPeerHeader(exporter netip.AddrPort, header *bmp.BMPPeerHeader } } -// ribWorkerState is the state of the rib worker (accessible through the worker -// only). -type ribWorkerState struct { - ctx context.Context - rib *rib - peers map[peerKey]*peerInfo - peerLastReference uint32 -} - -// ribWorkerPayload is what we provide the RIB worker with. The channel will be -// closed when done. -type ribWorkerPayload struct { - fn func(*ribWorkerState) error - interruptible bool - done chan<- struct{} -} - -var errRIBWorkerCanceled = errors.New("RIB worker timeout") - -// ribWorker handle RIB-related operations (everything involving updating RIB -// and related structures). Tasks are functions queued inside the worker. -func (c *Component) ribWorker() error { - state := &ribWorkerState{ - rib: newRIB(), - peers: make(map[peerKey]*peerInfo), - } - // Assume the last copy was done before minimum update delay - lastCopy := time.Now().Add(-c.config.RIBMinimumUpdateDelay) - nextTimer := time.NewTimer(c.config.RIBMaximumUpdateDelay) - timer := "maximum" - uptodate := true - priorityPayloads := make(chan ribWorkerPayload, 1) - pausedPayloads := make(chan ribWorkerPayload, 1) - - if c.config.RIBMode == RIBModePerformance { - c.r.GaugeFunc( - reporter.GaugeOpts{ - Name: "rib_lag_seconds", - Help: "How outdated is the readonly RIB.", - }, - func() float64 { - if uptodate { - return 0 - } - return time.Now().Sub(lastCopy).Seconds() - }, - ) - } - - handleLowPriorityPayload := func(payload ribWorkerPayload) error { - // These low priority operations can be canceled when a high priority request happens. - if c.config.RIBMode == RIBModeMemory && payload.interruptible { - ctx, cancel := context.WithCancel(c.t.Context(context.Background())) - state.ctx = ctx - defer cancel() - go func() { - select { - case <-c.t.Dying(): - case <-ctx.Done(): - case payload := <-c.ribWorkerPrioChan: - priorityPayloads <- payload - cancel() - } - }() - } - - uptodate = false - err := payload.fn(state) - if err == errRIBWorkerCanceled { - pausedPayloads <- payload - return nil - } - close(payload.done) - if err != nil { - return err - } - - if c.config.RIBMode == RIBModePerformance { - if !nextTimer.Stop() { - select { - case <-nextTimer.C: - default: - } - } - now := time.Now() - delta := now.Sub(lastCopy) - if delta < c.config.RIBMinimumUpdateDelay { - nextTimer.Reset(c.config.RIBMinimumUpdateDelay - delta) - timer = "minimum" - } else if delta < c.config.RIBMaximumUpdateDelay-c.config.RIBIdleUpdateDelay { - nextTimer.Reset(c.config.RIBIdleUpdateDelay) - timer = "idle" - } else if delta >= c.config.RIBMaximumUpdateDelay { - c.updateRIBReadonly(state, "maximum") - lastCopy = now - uptodate = true - } else { - nextTimer.Reset(c.config.RIBMaximumUpdateDelay - delta) - timer = "maximum" - } - } - return nil - } - - for { - state.ctx = nil - select { - // Two "high priority" chans: one for internal, one for external - case payload := <-priorityPayloads: - err := payload.fn(state) - close(payload.done) - if err != nil { - return err - } - case payload := <-c.ribWorkerPrioChan: - err := payload.fn(state) - close(payload.done) - if err != nil { - return err - } - default: - select { - case <-c.t.Dying(): - return nil - // No need to watch for internal high priority, it should have been - // handled before. We can still get high priority requests from the - // external one. - case payload := <-c.ribWorkerPrioChan: - err := payload.fn(state) - close(payload.done) - if err != nil { - return err - } - case payload := <-pausedPayloads: - if err := handleLowPriorityPayload(payload); err != nil { - return err - } - case payload := <-c.ribWorkerChan: - if err := handleLowPriorityPayload(payload); err != nil { - return err - } - case <-nextTimer.C: - c.updateRIBReadonly(state, timer) - lastCopy = time.Now() - uptodate = true - } - } - } -} - -type ribWorkerQueueOptions int - -const ( - ribWorkerHighPriority ribWorkerQueueOptions = iota - ribWorkerInterruptible -) - -// ribWorkerQueue queues a new task for the RIB worker thread. We wait for the -// task to be handled. We don't want to queue up a lot of tasks asynchronously. -func (c *Component) ribWorkerQueue(fn func(*ribWorkerState) error, options ...ribWorkerQueueOptions) { - ch := c.ribWorkerChan - done := make(chan struct{}) - payload := ribWorkerPayload{ - fn: fn, - done: done, - } - for _, option := range options { - switch option { - case ribWorkerHighPriority: - ch = c.ribWorkerPrioChan - case ribWorkerInterruptible: - payload.interruptible = true - } - } - select { - case <-c.t.Dying(): - case ch <- payload: - select { - case <-c.t.Dying(): - case <-done: - } - } -} - -// updateRIBReadonly updates the read-only copy of the RIB -func (c *Component) updateRIBReadonly(s *ribWorkerState, timer string) { - if c.config.RIBMode == RIBModePerformance { - c.r.Debug().Msg("copy live RIB to read-only version") - start := time.Now() - defer c.metrics.ribCopies.WithLabelValues(timer).Observe( - float64(time.Now().Sub(start).Nanoseconds()) / 1000 / 1000 / 1000) - new := s.rib.clone() - c.ribReadonly.Store(new) - } -} - -// addPeer provides a reference to a new peer. -func (c *Component) addPeer(s *ribWorkerState, pkey peerKey) *peerInfo { - s.peerLastReference++ - if s.peerLastReference == 0 { - // This is a very unlikely event, but we don't - // have anything better. Let's crash (and - // hopefully be restarted). - c.r.Fatal().Msg("too many peer up events") - go c.Stop() - } - pinfo := &peerInfo{ - reference: s.peerLastReference, - } - s.peers[pkey] = pinfo - return pinfo -} - -// scheduleStalePeersRemoval schedule the next time a peer should be removed. -func (c *Component) scheduleStalePeersRemoval(s *ribWorkerState) { +// scheduleStalePeersRemoval schedule the next time a peer should be +// removed. This should be called with the lock held. +func (c *Component) scheduleStalePeersRemoval() { var next time.Time - for _, pinfo := range s.peers { + for _, pinfo := range c.peers { if pinfo.staleUntil.IsZero() { continue } @@ -271,85 +57,102 @@ func (c *Component) scheduleStalePeersRemoval(s *ribWorkerState) { } if next.IsZero() { c.r.Debug().Msg("no stale peer") - c.peerStaleTimer.Stop() + c.staleTimer.Stop() } else { c.r.Debug().Msgf("next removal for stale peer scheduled on %s", next) - c.peerStaleTimer.Reset(c.d.Clock.Until(next)) + c.staleTimer.Reset(c.d.Clock.Until(next)) } } -// removePeer remove a peer. -func (c *Component) removePeer(s *ribWorkerState, pkey peerKey, reason string) error { +// removeStalePeers remove the stale peers. +func (c *Component) removeStalePeers() { + start := c.d.Clock.Now() + c.r.Debug().Msg("remove stale peers") + c.mu.Lock() + defer c.mu.Unlock() + for pkey, pinfo := range c.peers { + if pinfo.staleUntil.IsZero() || pinfo.staleUntil.After(start) { + continue + } + c.removePeer(pkey, "stale") + } + c.scheduleStalePeersRemoval() +} + +func (c *Component) addPeer(pkey peerKey) *peerInfo { + c.lastPeerReference++ + if c.lastPeerReference == 0 { + // This is a very unlikely event, but we don't + // have anything better. Let's crash (and + // hopefully be restarted). + c.r.Fatal().Msg("too many peer up events") + go c.Stop() + } + pinfo := &peerInfo{ + reference: c.lastPeerReference, + } + c.peers[pkey] = pinfo + return pinfo +} + +// removePeer remove a peer (with lock held) +func (c *Component) removePeer(pkey peerKey, reason string) { exporterStr := pkey.exporter.Addr().Unmap().String() peerStr := pkey.ip.Unmap().String() - pinfo := s.peers[pkey] c.r.Info().Msgf("remove peer %s for exporter %s (reason: %s)", peerStr, exporterStr, reason) - removed, done := s.rib.flushPeerContext(s.ctx, pinfo.reference, c.config.RIBPeerRemovalBatchRoutes) - c.metrics.routes.WithLabelValues(exporterStr).Sub(float64(removed)) - if !done { - c.metrics.peerRemovalPartial.WithLabelValues(exporterStr).Inc() - return errRIBWorkerCanceled + select { + case c.peerRemovalChan <- pkey: + return + default: } - c.metrics.peerRemovalDone.WithLabelValues(exporterStr).Inc() - c.metrics.peers.WithLabelValues(exporterStr).Dec() - delete(s.peers, pkey) - return nil + c.metrics.peerRemovalQueueFull.WithLabelValues(exporterStr).Inc() + c.mu.Unlock() + select { + case c.peerRemovalChan <- pkey: + case <-c.t.Dying(): + } + c.mu.Lock() } -// handleStalePeers remove the stale peers. -func (c *Component) handleStalePeers() { - c.ribWorkerQueue(func(s *ribWorkerState) error { - start := c.d.Clock.Now() - c.r.Debug().Msg("remove stale peers") - for pkey, pinfo := range s.peers { - if pinfo.staleUntil.IsZero() || pinfo.staleUntil.After(start) { - continue - } - if err := c.removePeer(s, pkey, "stale"); err != nil { - return err - } +// markExporterAsStale marks all peers from an exporter as stale. +func (c *Component) markExporterAsStale(exporter netip.AddrPort, until time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + for pkey, pinfo := range c.peers { + if pkey.exporter != exporter { + continue } - c.scheduleStalePeersRemoval(s) - return nil - }, ribWorkerInterruptible) + pinfo.staleUntil = until + } + c.scheduleStalePeersRemoval() } // handlePeerDownNotification handles a peer-down notification by // removing the peer. func (c *Component) handlePeerDownNotification(pkey peerKey) { - c.ribWorkerQueue(func(s *ribWorkerState) error { - _, ok := s.peers[pkey] - if !ok { - c.r.Info().Msgf("received peer down from exporter %s for peer %s, but no peer up", - pkey.exporter.Addr().Unmap().String(), - pkey.ip.Unmap().String()) - return nil - } - return c.removePeer(s, pkey, "down") - }, ribWorkerInterruptible) + c.mu.Lock() + defer c.mu.Unlock() + _, ok := c.peers[pkey] + if !ok { + c.r.Info().Msgf("received peer down from exporter %s for peer %s, but no peer up", + pkey.exporter.Addr().Unmap().String(), + pkey.ip.Unmap().String()) + return + } + c.removePeer(pkey, "down") } // handleConnectionDown handles a disconnect or a session termination // by marking all associated peers as stale. func (c *Component) handleConnectionDown(exporter netip.AddrPort) { until := c.d.Clock.Now().Add(c.config.Keep) - c.ribWorkerQueue(func(s *ribWorkerState) error { - for pkey, pinfo := range s.peers { - if pkey.exporter != exporter { - continue - } - pinfo.staleUntil = until - } - c.scheduleStalePeersRemoval(s) - return nil - }) + c.markExporterAsStale(exporter, until) } // handleConnectionUp handles the connection from a new exporter. func (c *Component) handleConnectionUp(exporter netip.AddrPort) { - // Do it without RIB worker, we just update counters. - // Do not set to 0, exporterStr may cover several exporters. exporterStr := exporter.Addr().Unmap().String() + // Do not set to 0, exporterStr may cover several exporters. c.metrics.peers.WithLabelValues(exporterStr).Add(0) c.metrics.routes.WithLabelValues(exporterStr).Add(0) } @@ -359,23 +162,63 @@ func (c *Component) handlePeerUpNotification(pkey peerKey, body *bmp.BMPPeerUpNo if body.ReceivedOpenMsg == nil || body.SentOpenMsg == nil { return } + c.mu.Lock() + defer c.mu.Unlock() - c.ribWorkerQueue(func(s *ribWorkerState) error { - exporterStr := pkey.exporter.Addr().Unmap().String() - peerStr := pkey.ip.Unmap().String() - _, ok := s.peers[pkey] - if ok { - c.r.Info().Msgf("received extra peer up from exporter %s for peer %s", - exporterStr, peerStr) - } else { - // Peer does not exist at all - c.metrics.peers.WithLabelValues(exporterStr).Inc() - c.addPeer(s, pkey) + exporterStr := pkey.exporter.Addr().Unmap().String() + peerStr := pkey.ip.Unmap().String() + pinfo, ok := c.peers[pkey] + if ok { + c.r.Info().Msgf("received extra peer up from exporter %s for peer %s", + exporterStr, peerStr) + } else { + // Peer does not exist at all + c.metrics.peers.WithLabelValues(exporterStr).Inc() + pinfo = c.addPeer(pkey) + } + + // Check for ADD-PATH support. + receivedAddPath := map[bgp.RouteFamily]bgp.BGPAddPathMode{} + received, _ := body.ReceivedOpenMsg.Body.(*bgp.BGPOpen) + for _, param := range received.OptParams { + switch param := param.(type) { + case *bgp.OptionParameterCapability: + for _, cap := range param.Capability { + switch cap := cap.(type) { + case *bgp.CapAddPath: + for _, tuple := range cap.Tuples { + receivedAddPath[tuple.RouteFamily] = tuple.Mode + } + } + } } + } + sent, _ := body.SentOpenMsg.Body.(*bgp.BGPOpen) + addPathOption := map[bgp.RouteFamily]bgp.BGPAddPathMode{} + for _, param := range sent.OptParams { + switch param := param.(type) { + case *bgp.OptionParameterCapability: + for _, cap := range param.Capability { + switch cap := cap.(type) { + case *bgp.CapAddPath: + for _, sent := range cap.Tuples { + receivedMode := receivedAddPath[sent.RouteFamily] + if receivedMode == bgp.BGP_ADD_PATH_BOTH || receivedMode == bgp.BGP_ADD_PATH_SEND { + if sent.Mode == bgp.BGP_ADD_PATH_BOTH || sent.Mode == bgp.BGP_ADD_PATH_RECEIVE { + // We have at least the receive mode. We only do decoding. + addPathOption[sent.RouteFamily] = bgp.BGP_ADD_PATH_RECEIVE + } + } + } + } + } + } + } + pinfo.marshallingOptions = []*bgp.MarshallingOption{{AddPath: addPathOption}} - c.r.Debug().Msgf("new peer %s from exporter %s", peerStr, exporterStr) - return nil - }) + c.r.Debug(). + Str("addpath", fmt.Sprintf("%s", addPathOption)). + Msgf("new peer %s from exporter %s", peerStr, exporterStr) } func (c *Component) handleRouteMonitoring(pkey peerKey, body *bmp.BMPRouteMonitoring) { @@ -388,195 +231,194 @@ func (c *Component) handleRouteMonitoring(pkey peerKey, body *bmp.BMPRouteMonito return } - c.ribWorkerQueue(func(s *ribWorkerState) error { + c.mu.Lock() + defer c.mu.Unlock() - // Ignore this peer if this is a L3VPN and it does not have - // the right RD. - if pkey.ptype == bmp.BMP_PEER_TYPE_L3VPN && !c.isAcceptedRD(pkey.distinguisher) { - return nil - } + // Ignore this peer if this is a L3VPN and it does not have + // the right RD. + if pkey.ptype == bmp.BMP_PEER_TYPE_L3VPN && !c.isAcceptedRD(pkey.distinguisher) { + return + } - exporterStr := pkey.exporter.Addr().Unmap().String() - peerStr := pkey.ip.Unmap().String() - pinfo, ok := s.peers[pkey] - if !ok { - // We may have missed the peer down notification? - c.r.Info().Msgf("received route monitoring from exporter %s for peer %s, but no peer up", - exporterStr, peerStr) - c.metrics.peers.WithLabelValues(exporterStr).Inc() - pinfo = c.addPeer(s, pkey) - } + exporterStr := pkey.exporter.Addr().Unmap().String() + peerStr := pkey.ip.Unmap().String() + pinfo, ok := c.peers[pkey] + if !ok { + // We may have missed the peer down notification? + c.r.Info().Msgf("received route monitoring from exporter %s for peer %s, but no peer up", + exporterStr, peerStr) + c.metrics.peers.WithLabelValues(exporterStr).Inc() + pinfo = c.addPeer(pkey) + } - var nh netip.Addr - var rta routeAttributes - for _, attr := range update.PathAttributes { - switch attr := attr.(type) { - case *bgp.PathAttributeNextHop: - nh, _ = netip.AddrFromSlice(attr.Value.To16()) - case *bgp.PathAttributeAsPath: - if c.config.CollectASNs || c.config.CollectASPaths { - rta.asPath = asPathFlat(attr) - } - case *bgp.PathAttributeCommunities: - if c.config.CollectCommunities { - rta.communities = attr.Value - } - case *bgp.PathAttributeLargeCommunities: - if c.config.CollectCommunities { - rta.largeCommunities = make([]bgp.LargeCommunity, len(attr.Values)) - for idx, c := range attr.Values { - rta.largeCommunities[idx] = *c - } + var nh netip.Addr + var rta routeAttributes + for _, attr := range update.PathAttributes { + switch attr := attr.(type) { + case *bgp.PathAttributeNextHop: + nh, _ = netip.AddrFromSlice(attr.Value.To16()) + case *bgp.PathAttributeAsPath: + if c.config.CollectASNs || c.config.CollectASPaths { + rta.asPath = asPathFlat(attr) + } + case *bgp.PathAttributeCommunities: + if c.config.CollectCommunities { + rta.communities = attr.Value + } + case *bgp.PathAttributeLargeCommunities: + if c.config.CollectCommunities { + rta.largeCommunities = make([]bgp.LargeCommunity, len(attr.Values)) + for idx, c := range attr.Values { + rta.largeCommunities[idx] = *c } } } - // If no AS path, consider the peer AS as the origin AS, - // otherwise the last AS. - if c.config.CollectASNs { - if path := rta.asPath; len(path) == 0 { - rta.asn = pkey.asn - } else { - rta.asn = path[len(path)-1] - } - } - if !c.config.CollectASPaths { - rta.asPath = rta.asPath[:0] + } + // If no AS path, consider the peer AS as the origin AS, + // otherwise the last AS. + if c.config.CollectASNs { + if path := rta.asPath; len(path) == 0 { + rta.asn = pkey.asn + } else { + rta.asn = path[len(path)-1] } + } + if !c.config.CollectASPaths { + rta.asPath = rta.asPath[:0] + } - added := 0 - removed := 0 + added := 0 + removed := 0 - // Regular NLRI and withdrawn routes - if pkey.ptype == bmp.BMP_PEER_TYPE_L3VPN || c.isAcceptedRD(0) { - for _, ipprefix := range update.NLRI { - prefix := ipprefix.Prefix - plen := int(ipprefix.Length) - if prefix.To4() != nil { - prefix = prefix.To16() - plen += 96 - } - p, _ := netip.AddrFromSlice(prefix) - added += s.rib.addPrefix(p, plen, route{ - peer: pinfo.reference, - nlri: s.rib.nlris.Put(nlri{ - family: bgp.RF_IPv4_UC, - path: ipprefix.PathIdentifier(), - rd: pkey.distinguisher, - }), - nextHop: s.rib.nextHops.Put(nextHop(nh)), - attributes: s.rib.rtas.Put(rta), - }) + // Regular NLRI and withdrawn routes + if pkey.ptype == bmp.BMP_PEER_TYPE_L3VPN || c.isAcceptedRD(0) { + for _, ipprefix := range update.NLRI { + prefix := ipprefix.Prefix + plen := int(ipprefix.Length) + if prefix.To4() != nil { + prefix = prefix.To16() + plen += 96 } - for _, ipprefix := range update.WithdrawnRoutes { - prefix := ipprefix.Prefix - plen := int(ipprefix.Length) - if prefix.To4() != nil { - prefix = prefix.To16() - plen += 96 - } - p, _ := netip.AddrFromSlice(prefix) - if nlriRef, ok := s.rib.nlris.Ref(nlri{ + p, _ := netip.AddrFromSlice(prefix) + added += c.rib.addPrefix(p, plen, route{ + peer: pinfo.reference, + nlri: c.rib.nlris.Put(nlri{ family: bgp.RF_IPv4_UC, path: ipprefix.PathIdentifier(), rd: pkey.distinguisher, + }), + nextHop: c.rib.nextHops.Put(nextHop(nh)), + attributes: c.rib.rtas.Put(rta), + }) + } + for _, ipprefix := range update.WithdrawnRoutes { + prefix := ipprefix.Prefix + plen := int(ipprefix.Length) + if prefix.To4() != nil { + prefix = prefix.To16() + plen += 96 + } + p, _ := netip.AddrFromSlice(prefix) + if nlriRef, ok := c.rib.nlris.Ref(nlri{ + family: bgp.RF_IPv4_UC, + path: ipprefix.PathIdentifier(), + rd: pkey.distinguisher, + }); ok { + removed += c.rib.removePrefix(p, plen, route{ + peer: pinfo.reference, + nlri: nlriRef, + }) + } + } + } + + // MP reach and unreach NLRI + for _, attr := range update.PathAttributes { + var p netip.Addr + var plen int + var rd RD + var ipprefixes []bgp.AddrPrefixInterface + switch attr := attr.(type) { + case *bgp.PathAttributeMpReachNLRI: + nh, _ = netip.AddrFromSlice(attr.Nexthop.To16()) + ipprefixes = attr.Value + case *bgp.PathAttributeMpUnreachNLRI: + ipprefixes = attr.Value + } + for _, ipprefix := range ipprefixes { + switch ipprefix := ipprefix.(type) { + case *bgp.IPAddrPrefix: + p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16()) + plen = int(ipprefix.Length + 96) + rd = pkey.distinguisher + case *bgp.IPv6AddrPrefix: + p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16()) + plen = int(ipprefix.Length) + rd = pkey.distinguisher + case *bgp.LabeledIPAddrPrefix: + p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16()) + plen = int(ipprefix.IPPrefixLen() + 96) + rd = pkey.distinguisher + case *bgp.LabeledIPv6AddrPrefix: + p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16()) + plen = int(ipprefix.IPPrefixLen()) + rd = pkey.distinguisher + case *bgp.LabeledVPNIPAddrPrefix: + p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16()) + plen = int(ipprefix.IPPrefixLen() + 96) + rd = RDFromRouteDistinguisherInterface(ipprefix.RD) + case *bgp.LabeledVPNIPv6AddrPrefix: + p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16()) + plen = int(ipprefix.IPPrefixLen()) + rd = RDFromRouteDistinguisherInterface(ipprefix.RD) + case *bgp.EVPNNLRI: + switch route := ipprefix.RouteTypeData.(type) { + case *bgp.EVPNIPPrefixRoute: + prefix := route.IPPrefix + plen = int(route.IPPrefixLength) + if prefix.To4() != nil { + prefix = prefix.To16() + plen += 96 + } + p, _ = netip.AddrFromSlice(prefix.To16()) + rd = RDFromRouteDistinguisherInterface(route.RD) + } + default: + c.metrics.ignoredNlri.WithLabelValues(exporterStr, + bgp.AfiSafiToRouteFamily(ipprefix.AFI(), ipprefix.SAFI()).String()).Inc() + continue + } + if pkey.ptype != bmp.BMP_PEER_TYPE_L3VPN && !c.isAcceptedRD(rd) { + continue + } + switch attr.(type) { + case *bgp.PathAttributeMpReachNLRI: + added += c.rib.addPrefix(p, plen, route{ + peer: pinfo.reference, + nlri: c.rib.nlris.Put(nlri{ + family: bgp.AfiSafiToRouteFamily(ipprefix.AFI(), ipprefix.SAFI()), + rd: rd, + path: ipprefix.PathIdentifier(), + }), + nextHop: c.rib.nextHops.Put(nextHop(nh)), + attributes: c.rib.rtas.Put(rta), + }) + case *bgp.PathAttributeMpUnreachNLRI: + if nlriRef, ok := c.rib.nlris.Ref(nlri{ + family: bgp.AfiSafiToRouteFamily(ipprefix.AFI(), ipprefix.SAFI()), + rd: rd, + path: ipprefix.PathIdentifier(), }); ok { - removed += s.rib.removePrefix(p, plen, route{ + removed += c.rib.removePrefix(p, plen, route{ peer: pinfo.reference, nlri: nlriRef, }) } } } + } - // MP reach and unreach NLRI - for _, attr := range update.PathAttributes { - var p netip.Addr - var plen int - var rd RD - var ipprefixes []bgp.AddrPrefixInterface - switch attr := attr.(type) { - case *bgp.PathAttributeMpReachNLRI: - nh, _ = netip.AddrFromSlice(attr.Nexthop.To16()) - ipprefixes = attr.Value - case *bgp.PathAttributeMpUnreachNLRI: - ipprefixes = attr.Value - } - for _, ipprefix := range ipprefixes { - switch ipprefix := ipprefix.(type) { - case *bgp.IPAddrPrefix: - p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16()) - plen = int(ipprefix.Length + 96) - rd = pkey.distinguisher - case *bgp.IPv6AddrPrefix: - p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16()) - plen = int(ipprefix.Length) - rd = pkey.distinguisher - case *bgp.LabeledIPAddrPrefix: - p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16()) - plen = int(ipprefix.IPPrefixLen() + 96) - rd = pkey.distinguisher - case *bgp.LabeledIPv6AddrPrefix: - p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16()) - plen = int(ipprefix.IPPrefixLen()) - rd = pkey.distinguisher - case *bgp.LabeledVPNIPAddrPrefix: - p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16()) - plen = int(ipprefix.IPPrefixLen() + 96) - rd = RDFromRouteDistinguisherInterface(ipprefix.RD) - case *bgp.LabeledVPNIPv6AddrPrefix: - p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16()) - plen = int(ipprefix.IPPrefixLen()) - rd = RDFromRouteDistinguisherInterface(ipprefix.RD) - case *bgp.EVPNNLRI: - switch route := ipprefix.RouteTypeData.(type) { - case *bgp.EVPNIPPrefixRoute: - prefix := route.IPPrefix - plen = int(route.IPPrefixLength) - if prefix.To4() != nil { - prefix = prefix.To16() - plen += 96 - } - p, _ = netip.AddrFromSlice(prefix.To16()) - rd = RDFromRouteDistinguisherInterface(route.RD) - } - default: - c.metrics.ignoredNlri.WithLabelValues(exporterStr, - bgp.AfiSafiToRouteFamily(ipprefix.AFI(), ipprefix.SAFI()).String()).Inc() - continue - } - if pkey.ptype != bmp.BMP_PEER_TYPE_L3VPN && !c.isAcceptedRD(rd) { - continue - } - switch attr.(type) { - case *bgp.PathAttributeMpReachNLRI: - added += s.rib.addPrefix(p, plen, route{ - peer: pinfo.reference, - nlri: s.rib.nlris.Put(nlri{ - family: bgp.AfiSafiToRouteFamily(ipprefix.AFI(), ipprefix.SAFI()), - rd: rd, - path: ipprefix.PathIdentifier(), - }), - nextHop: s.rib.nextHops.Put(nextHop(nh)), - attributes: s.rib.rtas.Put(rta), - }) - case *bgp.PathAttributeMpUnreachNLRI: - if nlriRef, ok := s.rib.nlris.Ref(nlri{ - family: bgp.AfiSafiToRouteFamily(ipprefix.AFI(), ipprefix.SAFI()), - rd: rd, - path: ipprefix.PathIdentifier(), - }); ok { - removed += s.rib.removePrefix(p, plen, route{ - peer: pinfo.reference, - nlri: nlriRef, - }) - } - } - } - } - - c.metrics.routes.WithLabelValues(exporterStr).Add(float64(added - removed)) - return nil - }) + c.metrics.routes.WithLabelValues(exporterStr).Add(float64(added - removed)) } func (c *Component) isAcceptedRD(rd RD) bool { diff --git a/inlet/bmp/lookup.go b/inlet/bmp/lookup.go index bca6a810..b42963a2 100644 --- a/inlet/bmp/lookup.go +++ b/inlet/bmp/lookup.go @@ -23,55 +23,45 @@ type LookupResult struct { // provided next hop if provided. This is somewhat approximate because // we use the best route we have, while the exporter may not have this // best route available. The returned result should not be modified! -func (c *Component) Lookup(addrIP net.IP, nextHopIP net.IP) (result LookupResult) { +func (c *Component) Lookup(addrIP net.IP, nextHopIP net.IP) LookupResult { if !c.config.CollectASNs && !c.config.CollectASPaths && !c.config.CollectCommunities { - return + return LookupResult{} } ip, _ := netip.AddrFromSlice(addrIP.To16()) nh, _ := netip.AddrFromSlice(nextHopIP.To16()) v6 := patricia.NewIPv6Address(ip.AsSlice(), 128) - lookup := func(rib *rib) error { - bestFound := false - found := false - _, routes := rib.tree.FindDeepestTagsWithFilter(v6, func(route route) bool { - if bestFound { - // We already have the best route, skip remaining routes - return false - } - if rib.nextHops.Get(route.nextHop) == nextHop(nh) { - // Exact match found, use it and don't search further - bestFound = true - return true - } - // If we don't have a match already, use this one. - if !found { - found = true - return true - } - // Otherwise, skip it - return false - }) - if len(routes) == 0 { - return nil - } - attributes := rib.rtas.Get(routes[len(routes)-1].attributes) - result = LookupResult{ - ASN: attributes.asn, - ASPath: attributes.asPath, - Communities: attributes.communities, - LargeCommunities: attributes.largeCommunities, - } - return nil - } + c.mu.RLock() + defer c.mu.RUnlock() - switch c.config.RIBMode { - case RIBModeMemory: - c.ribWorkerQueue(func(s *ribWorkerState) error { - return lookup(s.rib) - }, ribWorkerHighPriority) - case RIBModePerformance: - lookup(c.ribReadonly.Load()) + bestFound := false + found := false + _, routes := c.rib.tree.FindDeepestTagsWithFilter(v6, func(route route) bool { + if bestFound { + // We already have the best route, skip remaining routes + return false + } + if c.rib.nextHops.Get(route.nextHop) == nextHop(nh) { + // Exact match found, use it and don't search further + bestFound = true + return true + } + // If we don't have a match already, use this one. + if !found { + found = true + return true + } + // Otherwise, skip it + return false + }) + if len(routes) == 0 { + return LookupResult{} + } + attributes := c.rib.rtas.Get(routes[len(routes)-1].attributes) + return LookupResult{ + ASN: attributes.asn, + ASPath: attributes.asPath, + Communities: attributes.communities, + LargeCommunities: attributes.largeCommunities, } - return } diff --git a/inlet/bmp/metrics.go b/inlet/bmp/metrics.go index 0ee402c7..bf294b5e 100644 --- a/inlet/bmp/metrics.go +++ b/inlet/bmp/metrics.go @@ -6,18 +6,19 @@ package bmp import "akvorado/common/reporter" type metrics struct { - openedConnections *reporter.CounterVec - closedConnections *reporter.CounterVec - peers *reporter.GaugeVec - routes *reporter.GaugeVec - ignoredNlri *reporter.CounterVec - messages *reporter.CounterVec - errors *reporter.CounterVec - ignored *reporter.CounterVec - panics *reporter.CounterVec - ribCopies *reporter.SummaryVec - peerRemovalPartial *reporter.CounterVec - peerRemovalDone *reporter.CounterVec + openedConnections *reporter.CounterVec + closedConnections *reporter.CounterVec + peers *reporter.GaugeVec + routes *reporter.GaugeVec + ignoredNlri *reporter.CounterVec + messages *reporter.CounterVec + errors *reporter.CounterVec + ignored *reporter.CounterVec + panics *reporter.CounterVec + locked *reporter.SummaryVec + peerRemovalDone *reporter.CounterVec + peerRemovalPartial *reporter.CounterVec + peerRemovalQueueFull *reporter.CounterVec } // initMetrics initialize the metrics for the BMP component. @@ -85,13 +86,13 @@ func (c *Component) initMetrics() { }, []string{"exporter"}, ) - c.metrics.ribCopies = c.r.SummaryVec( + c.metrics.locked = c.r.SummaryVec( reporter.SummaryOpts{ - Name: "rib_copies_total", - Help: "Duration of RIB copies to read-only version.", + Name: "locked_duration_seconds", + Help: "Duration during which the RIB is locked.", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }, - []string{"timer"}, + []string{"reason"}, ) c.metrics.peerRemovalDone = c.r.CounterVec( reporter.CounterOpts{ @@ -107,4 +108,11 @@ func (c *Component) initMetrics() { }, []string{"exporter"}, ) + c.metrics.peerRemovalQueueFull = c.r.CounterVec( + reporter.CounterOpts{ + Name: "peer_removal_queue_full_total", + Help: "Number of time the removal queue was full.", + }, + []string{"exporter"}, + ) } diff --git a/inlet/bmp/remove.go b/inlet/bmp/remove.go new file mode 100644 index 00000000..b4d200b9 --- /dev/null +++ b/inlet/bmp/remove.go @@ -0,0 +1,56 @@ +// SPDX-FileCopyrightText: 2022 Free Mobile +// SPDX-License-Identifier: AGPL-3.0-only + +package bmp + +import ( + "context" + "time" +) + +func (c *Component) peerRemovalWorker() error { + for { + select { + case <-c.t.Dying(): + return nil + case pkey := <-c.peerRemovalChan: + exporterStr := pkey.exporter.Addr().Unmap().String() + for { + // Do one run of removal. + removed, done := func() (int, bool) { + ctx, cancel := context.WithTimeout(c.t.Context(context.Background()), + c.config.PeerRemovalMaxTime) + defer cancel() + start := c.d.Clock.Now() + c.mu.Lock() + defer func() { + c.mu.Unlock() + c.metrics.locked.WithLabelValues("peer-removal").Observe( + float64(c.d.Clock.Now().Sub(start).Nanoseconds()) / 1000 / 1000 / 1000) + }() + pinfo := c.peers[pkey] + removed, done := c.rib.flushPeer(ctx, pinfo.reference, c.config.PeerRemovalMinRoutes) + if done { + // Run was complete, remove the peer (we need the lock) + delete(c.peers, pkey) + } + return removed, done + }() + c.metrics.routes.WithLabelValues(exporterStr).Sub(float64(removed)) + if done { + // Run was complete, update metrics + c.metrics.peers.WithLabelValues(exporterStr).Dec() + c.metrics.peerRemovalDone.WithLabelValues(exporterStr).Inc() + break + } + // Run is incompletem, update metrics and sleep a bit + c.metrics.peerRemovalPartial.WithLabelValues(exporterStr).Inc() + select { + case <-c.t.Dying(): + return nil + case <-time.After(c.config.PeerRemovalSleepInterval): + } + } + } + } +} diff --git a/inlet/bmp/rib.go b/inlet/bmp/rib.go index 3e3ef1e3..4dec57ae 100644 --- a/inlet/bmp/rib.go +++ b/inlet/bmp/rib.go @@ -169,29 +169,19 @@ func (r *rib) removePrefix(ip netip.Addr, bits int, old route) int { // flushPeer removes a whole peer from the RIB, returning the number // of removed routes. -func (r *rib) flushPeer(peer uint32) int { - removed, _ := r.flushPeerContext(nil, peer, 0) - return removed -} - -// flushPeerContext removes a whole peer from the RIB, with a context returning -// the number of removed routes and a bool to say if the operation was completed -// before cancellation. -func (r *rib) flushPeerContext(ctx context.Context, peer uint32, steps int) (int, bool) { - done := atomic.Bool{} - stop := make(chan struct{}) - lastStep := 0 - if ctx != nil { - defer close(stop) - go func() { - select { - case <-stop: - return - case <-ctx.Done(): - done.Store(true) - } - }() - } +func (r *rib) flushPeer(ctx context.Context, peer uint32, min int) (int, bool) { + // Handle context done state + done := atomic.Bool{} // signal for the loop to stop when true + stop := make(chan struct{}) // signal for the goroutine to stop + defer close(stop) + go func() { + select { + case <-stop: + return + case <-ctx.Done(): + done.Store(true) + } + }() // Flush routes removed := 0 @@ -208,10 +198,7 @@ func (r *rib) flushPeerContext(ctx context.Context, peer uint32, steps int) (int } return false }, route{}) - if ctx != nil && removed/steps > lastStep { - runtime.Gosched() - instrumentFlushPeer() - lastStep = removed / steps + if removed >= min { if done.Load() { return removed, false } @@ -220,18 +207,6 @@ func (r *rib) flushPeerContext(ctx context.Context, peer uint32, steps int) (int return removed, true } -var instrumentFlushPeer = func() {} - -// clone clone an existing RIB. -func (r *rib) clone() *rib { - return &rib{ - tree: r.tree.Clone(), - nlris: r.nlris.Clone(), - nextHops: r.nextHops.Clone(), - rtas: r.rtas.Clone(), - } -} - // newRIB initializes a new RIB. func newRIB() *rib { return &rib{ diff --git a/inlet/bmp/rib_test.go b/inlet/bmp/rib_test.go index c15393c9..b9952a89 100644 --- a/inlet/bmp/rib_test.go +++ b/inlet/bmp/rib_test.go @@ -4,6 +4,7 @@ package bmp import ( + "context" "fmt" "math/rand" "net/netip" @@ -294,7 +295,7 @@ func TestRIB(t *testing.T) { // Remove everything for _, peer := range peers { - r.flushPeer(peer) + r.flushPeer(context.Background(), peer, 100) } // Check for leak of interned values diff --git a/inlet/bmp/root.go b/inlet/bmp/root.go index fa92d749..aab7835e 100644 --- a/inlet/bmp/root.go +++ b/inlet/bmp/root.go @@ -8,7 +8,7 @@ package bmp import ( "fmt" "net" - "sync/atomic" + "sync" "time" "github.com/benbjohnson/clock" @@ -29,11 +29,13 @@ type Component struct { address net.Addr metrics metrics - // RIB management - ribReadonly atomic.Pointer[rib] - ribWorkerChan chan ribWorkerPayload - ribWorkerPrioChan chan ribWorkerPayload - peerStaleTimer *clock.Timer + // RIB management with peers + rib *rib + peers map[peerKey]*peerInfo + peerRemovalChan chan peerKey + lastPeerReference uint32 + staleTimer *clock.Timer + mu sync.RWMutex } // Dependencies define the dependencies of the BMP component. @@ -52,8 +54,9 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende d: &dependencies, config: configuration, - ribWorkerChan: make(chan ribWorkerPayload, 100), - ribWorkerPrioChan: make(chan ribWorkerPayload, 100), + rib: newRIB(), + peers: make(map[peerKey]*peerInfo), + peerRemovalChan: make(chan peerKey, configuration.PeerRemovalMaxQueue), } if len(c.config.RDs) > 0 { c.acceptedRDs = make(map[uint64]struct{}) @@ -61,10 +64,7 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende c.acceptedRDs[uint64(rd)] = struct{}{} } } - c.peerStaleTimer = c.d.Clock.AfterFunc(time.Hour, c.handleStalePeers) - if c.config.RIBMode == RIBModePerformance { - c.ribReadonly.Store(newRIB()) - } + c.staleTimer = c.d.Clock.AfterFunc(time.Hour, c.removeStalePeers) c.d.Daemon.Track(&c.t, "inlet/bmp") c.initMetrics() @@ -80,8 +80,8 @@ func (c *Component) Start() error { } c.address = listener.Addr() - // RIB worker - c.t.Go(c.ribWorker) + // Peer removal + c.t.Go(c.peerRemovalWorker) // Listener c.t.Go(func() error { @@ -109,8 +109,7 @@ func (c *Component) Start() error { // Stop stops the BMP component func (c *Component) Stop() error { defer func() { - close(c.ribWorkerChan) - close(c.ribWorkerPrioChan) + close(c.peerRemovalChan) c.r.Info().Msg("BMP component stopped") }() c.r.Info().Msg("stopping BMP component") diff --git a/inlet/bmp/root_test.go b/inlet/bmp/root_test.go index 52d02378..897d6c36 100644 --- a/inlet/bmp/root_test.go +++ b/inlet/bmp/root_test.go @@ -8,6 +8,7 @@ import ( "net" "net/netip" "path" + "strconv" "testing" "time" @@ -38,37 +39,36 @@ func TestBMP(t *testing.T) { } dumpRIB := func(t *testing.T, c *Component) map[netip.Addr][]string { t.Helper() + c.mu.RLock() + defer c.mu.RUnlock() result := map[netip.Addr][]string{} - c.ribWorkerQueue(func(s *ribWorkerState) error { - iter := s.rib.tree.Iterate() - for iter.Next() { - addr := iter.Address() - for _, route := range iter.Tags() { - nlriRef := s.rib.nlris.Get(route.nlri) - nh := s.rib.nextHops.Get(route.nextHop) - attrs := s.rib.rtas.Get(route.attributes) - var peer netip.Addr - for pkey, pinfo := range s.peers { - if pinfo.reference == route.peer { - peer = pkey.ip - break - } + iter := c.rib.tree.Iterate() + for iter.Next() { + addr := iter.Address() + for _, route := range iter.Tags() { + nlriRef := c.rib.nlris.Get(route.nlri) + nh := c.rib.nextHops.Get(route.nextHop) + attrs := c.rib.rtas.Get(route.attributes) + var peer netip.Addr + for pkey, pinfo := range c.peers { + if pinfo.reference == route.peer { + peer = pkey.ip + break } - if _, ok := result[peer.Unmap()]; !ok { - result[peer.Unmap()] = []string{} - } - result[peer.Unmap()] = append(result[peer.Unmap()], - fmt.Sprintf("[%s] %s via %s %s/%d %d %v %v %v", - nlriRef.family, - addr, netip.Addr(nh).Unmap(), - nlriRef.rd, - nlriRef.path, - attrs.asn, attrs.asPath, - attrs.communities, attrs.largeCommunities)) } + if _, ok := result[peer.Unmap()]; !ok { + result[peer.Unmap()] = []string{} + } + result[peer.Unmap()] = append(result[peer.Unmap()], + fmt.Sprintf("[%s] %s via %s %s/%d %d %v %v %v", + nlriRef.family, + addr, netip.Addr(nh).Unmap(), + nlriRef.rd, + nlriRef.path, + attrs.asn, attrs.asPath, + attrs.communities, attrs.largeCommunities)) } - return nil - }) + } return result } @@ -295,8 +295,8 @@ func TestBMP(t *testing.T) { `messages_received_total{exporter="127.0.0.1",type="statistics-report"}`: "5", `opened_connections_total{exporter="127.0.0.1"}`: "1", `peers_total{exporter="127.0.0.1"}`: "3", - `peer_removal_done_total{exporter="127.0.0.1"}`: "1", `routes_total{exporter="127.0.0.1"}`: "14", + `peer_removal_done_total{exporter="127.0.0.1"}`: "1", } if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" { t.Errorf("Metrics (-got, +want):\n%s", diff) @@ -800,8 +800,8 @@ func TestBMP(t *testing.T) { `opened_connections_total{exporter="127.0.0.1"}`: "1", `closed_connections_total{exporter="127.0.0.1"}`: "1", `peers_total{exporter="127.0.0.1"}`: "0", - `peer_removal_done_total{exporter="127.0.0.1"}`: "1", `routes_total{exporter="127.0.0.1"}`: "0", + `peer_removal_done_total{exporter="127.0.0.1"}`: "1", } if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" { t.Errorf("Metrics (-got, +want):\n%s", diff) @@ -908,8 +908,8 @@ func TestBMP(t *testing.T) { `opened_connections_total{exporter="127.0.0.1"}`: "2", `closed_connections_total{exporter="127.0.0.1"}`: "1", `peers_total{exporter="127.0.0.1"}`: "1", - `peer_removal_done_total{exporter="127.0.0.1"}`: "1", `routes_total{exporter="127.0.0.1"}`: "2", + `peer_removal_done_total{exporter="127.0.0.1"}`: "1", } if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" { t.Errorf("Metrics (-got, +want):\n%s", diff) @@ -938,8 +938,8 @@ func TestBMP(t *testing.T) { `opened_connections_total{exporter="127.0.0.1"}`: "2", `closed_connections_total{exporter="127.0.0.1"}`: "2", `peers_total{exporter="127.0.0.1"}`: "1", - `peer_removal_done_total{exporter="127.0.0.1"}`: "1", `routes_total{exporter="127.0.0.1"}`: "2", + `peer_removal_done_total{exporter="127.0.0.1"}`: "1", } if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" { t.Errorf("Metrics (-got, +want):\n%s", diff) @@ -961,8 +961,8 @@ func TestBMP(t *testing.T) { `opened_connections_total{exporter="127.0.0.1"}`: "2", `closed_connections_total{exporter="127.0.0.1"}`: "2", `peers_total{exporter="127.0.0.1"}`: "0", - `peer_removal_done_total{exporter="127.0.0.1"}`: "2", `routes_total{exporter="127.0.0.1"}`: "0", + `peer_removal_done_total{exporter="127.0.0.1"}`: "2", } if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" { t.Errorf("Metrics (-got, +want):\n%s", diff) @@ -972,55 +972,40 @@ func TestBMP(t *testing.T) { if diff := helpers.Diff(gotRIB, expectedRIB); diff != "" { t.Errorf("RIB (-got, +want):\n%s", diff) } + }) t.Run("init, peers up, eor, reach NLRI, conn down, immediate timeout", func(t *testing.T) { r := reporter.NewMock(t) config := DefaultConfiguration() - config.RIBPeerRemovalBatchRoutes = 1 + config.PeerRemovalMaxTime = 1 + config.PeerRemovalSleepInterval = 1 + config.PeerRemovalMinRoutes = 1 c, mockClock := NewMock(t, r, config) helpers.StartStop(t, c) conn := dial(t, c) - pauseFlushPeer := make(chan bool) - instrumentFlushPeer = func() { <-pauseFlushPeer } - defer func() { instrumentFlushPeer = func() {} }() - send(t, conn, "bmp-init.pcap") send(t, conn, "bmp-peers-up.pcap") send(t, conn, "bmp-eor.pcap") send(t, conn, "bmp-reach.pcap") conn.Close() - time.Sleep(20 * time.Millisecond) mockClock.Add(2 * time.Hour) - - for i := 0; i < 5; i++ { - t.Logf("iteration %d", i) - // Sleep a bit to be sure flush is blocked - time.Sleep(10 * time.Millisecond) - done := make(chan bool) - go func() { - t.Logf("start lookup %d", i) - c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::a")) - t.Logf("got lookup result %d", i) - close(done) - }() - // Sleep a bit to let some time to queue the request - time.Sleep(10 * time.Millisecond) - pauseFlushPeer <- false - // Wait for request to be completed - select { - case <-done: - case <-time.After(50 * time.Millisecond): - close(pauseFlushPeer) - t.Fatalf("no lookup answer") - } - } - // Process remaining requests - close(pauseFlushPeer) time.Sleep(20 * time.Millisecond) - + if helpers.RaceEnabled { + t.Skip("unreliable results when running with the race detector") + } gotMetrics := r.GetMetrics("akvorado_inlet_bmp_", "-locked_duration") + // For peer_removal_partial_total, we have 18 routes, but only 14 routes + // can be removed while keeping 1 route on each peer. 14 is the max, but + // we rely on good-willing from the scheduler to get this number. + peerRemovalPartial, _ := strconv.Atoi(gotMetrics[`peer_removal_partial_total{exporter="127.0.0.1"}`]) + if peerRemovalPartial > 14 { + t.Errorf("Metrics: peer_removal_partial_total %d > 14", peerRemovalPartial) + } + if peerRemovalPartial < 5 { + t.Errorf("Metrics: peer_removal_partial_total %d < 5", peerRemovalPartial) + } expectedMetrics := map[string]string{ `messages_received_total{exporter="127.0.0.1",type="initiation"}`: "1", `messages_received_total{exporter="127.0.0.1",type="peer-up-notification"}`: "4", @@ -1031,78 +1016,48 @@ func TestBMP(t *testing.T) { `peers_total{exporter="127.0.0.1"}`: "0", `routes_total{exporter="127.0.0.1"}`: "0", `peer_removal_done_total{exporter="127.0.0.1"}`: "4", - `peer_removal_partial_total{exporter="127.0.0.1"}`: "5", + `peer_removal_partial_total{exporter="127.0.0.1"}`: fmt.Sprintf("%d", peerRemovalPartial), } if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" { t.Errorf("Metrics (-got, +want):\n%s", diff) } }) - for _, mode := range []RIBMode{RIBModeMemory, RIBModePerformance} { - t.Run(fmt.Sprintf("lookup %s", mode), func(t *testing.T) { - r := reporter.NewMock(t) - config := DefaultConfiguration() - config.RIBMode = mode - config.RIBIdleUpdateDelay = 40 * time.Millisecond - config.RIBMinimumUpdateDelay = 300 * time.Millisecond - config.RIBMaximumUpdateDelay = 5 * time.Second - c, _ := NewMock(t, r, config) - helpers.StartStop(t, c) - conn := dial(t, c) + t.Run("lookup", func(t *testing.T) { + r := reporter.NewMock(t) + config := DefaultConfiguration() + c, _ := NewMock(t, r, config) + helpers.StartStop(t, c) + conn := dial(t, c) - send(t, conn, "bmp-init.pcap") - send(t, conn, "bmp-peers-up.pcap") - send(t, conn, "bmp-reach.pcap") - send(t, conn, "bmp-eor.pcap") + send(t, conn, "bmp-init.pcap") + send(t, conn, "bmp-peers-up.pcap") + send(t, conn, "bmp-reach.pcap") + send(t, conn, "bmp-eor.pcap") + time.Sleep(20 * time.Millisecond) - for i := 0; i < 50; i++ { - lookup := c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::a")) - if lookup.ASN != 174 { - if i == 99 { - t.Errorf("Lookup() == %d, expected 174", lookup.ASN) - } - } else { - break - } - time.Sleep(5 * time.Millisecond) - } + lookup := c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::a")) + if lookup.ASN != 174 { + t.Errorf("Lookup() == %d, expected 174", lookup.ASN) + } - // Add another prefix - c.ribWorkerQueue(func(s *ribWorkerState) error { - s.rib.addPrefix(netip.MustParseAddr("2001:db8:1::"), 64, route{ - peer: 1, - nlri: s.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC}), - nextHop: s.rib.nextHops.Put(nextHop(netip.MustParseAddr("2001:db8::a"))), - attributes: s.rib.rtas.Put(routeAttributes{asn: 176}), - }) - return nil - }) - if mode == RIBModePerformance { - time.Sleep(100 * time.Millisecond) // < 300 ms but > 40 ms - // Despite that, we hit the minimum update delay - lookup := c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::a")) - if lookup.ASN != 174 { - t.Errorf("Lookup() == %d, expected 174", lookup.ASN) - } - } - - for i := 0; i < 100; i++ { - lookup := c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::a")) - if lookup.ASN != 176 { - if i == 99 { - t.Errorf("Lookup() == %d, expected 176", lookup.ASN) - } else { - break - } - time.Sleep(5 * time.Millisecond) - } - } - lookup := c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::b")) - if lookup.ASN != 174 { - t.Errorf("Lookup() == %d, expected 174", lookup.ASN) - } + // Add another prefix + c.rib.addPrefix(netip.MustParseAddr("2001:db8:1::"), 64, route{ + peer: 1, + nlri: c.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC}), + nextHop: c.rib.nextHops.Put(nextHop(netip.MustParseAddr("2001:db8::a"))), + attributes: c.rib.rtas.Put(routeAttributes{asn: 176}), }) - } + + lookup = c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::a")) + if lookup.ASN != 176 { + t.Errorf("Lookup() == %d, expected 176", lookup.ASN) + } + lookup = c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::b")) + if lookup.ASN != 174 { + t.Errorf("Lookup() == %d, expected 174", lookup.ASN) + } + }) t.Run("populate", func(t *testing.T) { r := reporter.NewMock(t) diff --git a/inlet/bmp/serve.go b/inlet/bmp/serve.go index 6041e4a6..19b3d265 100644 --- a/inlet/bmp/serve.go +++ b/inlet/bmp/serve.go @@ -61,7 +61,6 @@ func (c *Component) serveConnection(conn *net.TCPConn) error { // Reading from connection c.handleConnectionUp(exporter) - peerAddPathModes := map[peerKey]map[bgp.RouteFamily]bgp.BGPAddPathMode{} init := false header := make([]byte, bmp.BMP_HEADER_SIZE) for { @@ -139,9 +138,11 @@ func (c *Component) serveConnection(conn *net.TCPConn) error { } body = body[bmp.BMP_PEER_HEADER_SIZE:] pkey = peerKeyFromBMPPeerHeader(exporter, &msg.PeerHeader) - if modes, ok := peerAddPathModes[pkey]; ok { - marshallingOptions = []*bgp.MarshallingOption{{AddPath: modes}} + c.mu.RLock() + if pinfo, ok := c.peers[pkey]; ok { + marshallingOptions = pinfo.marshallingOptions } + c.mu.RUnlock() } if err := msg.Body.ParseBody(&msg, body, marshallingOptions...); err != nil { @@ -201,44 +202,6 @@ func (c *Component) serveConnection(conn *net.TCPConn) error { logger.Info().Msg("termination message received") return nil case *bmp.BMPPeerUpNotification: - // Check for ADD-PATH support. - receivedAddPath := map[bgp.RouteFamily]bgp.BGPAddPathMode{} - received, _ := body.ReceivedOpenMsg.Body.(*bgp.BGPOpen) - for _, param := range received.OptParams { - switch param := param.(type) { - case *bgp.OptionParameterCapability: - for _, cap := range param.Capability { - switch cap := cap.(type) { - case *bgp.CapAddPath: - for _, tuple := range cap.Tuples { - receivedAddPath[tuple.RouteFamily] = tuple.Mode - } - } - } - } - } - sent, _ := body.SentOpenMsg.Body.(*bgp.BGPOpen) - addPathOption := map[bgp.RouteFamily]bgp.BGPAddPathMode{} - for _, param := range sent.OptParams { - switch param := param.(type) { - case *bgp.OptionParameterCapability: - for _, cap := range param.Capability { - switch cap := cap.(type) { - case *bgp.CapAddPath: - for _, sent := range cap.Tuples { - receivedMode := receivedAddPath[sent.RouteFamily] - if receivedMode == bgp.BGP_ADD_PATH_BOTH || receivedMode == bgp.BGP_ADD_PATH_SEND { - if sent.Mode == bgp.BGP_ADD_PATH_BOTH || sent.Mode == bgp.BGP_ADD_PATH_RECEIVE { - // We have at least the receive mode. We only do decoding. - addPathOption[sent.RouteFamily] = bgp.BGP_ADD_PATH_RECEIVE - } - } - } - } - } - } - } - peerAddPathModes[pkey] = addPathOption c.handlePeerUpNotification(pkey, body) case *bmp.BMPPeerDownNotification: c.handlePeerDownNotification(pkey) diff --git a/inlet/bmp/tests.go b/inlet/bmp/tests.go index b5b420bb..6ef03d31 100644 --- a/inlet/bmp/tests.go +++ b/inlet/bmp/tests.go @@ -37,53 +37,50 @@ func NewMock(t *testing.T, r *reporter.Reporter, conf Configuration) (*Component // PopulateRIB populates the RIB with a few entries. func (c *Component) PopulateRIB(t *testing.T) { t.Helper() - c.ribWorkerQueue(func(s *ribWorkerState) error { - pinfo := c.addPeer(s, peerKey{ - exporter: netip.MustParseAddrPort("[::ffff:127.0.0.1]:47389"), - ip: netip.MustParseAddr("::ffff:203.0.113.4"), - ptype: bmp.BMP_PEER_TYPE_GLOBAL, - asn: 64500, - }) - s.rib.addPrefix(netip.MustParseAddr("::ffff:192.0.2.0"), 96+27, route{ - peer: pinfo.reference, - nlri: s.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC, path: 1}), - nextHop: s.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.4"))), - attributes: s.rib.rtas.Put(routeAttributes{ - asn: 174, - asPath: []uint32{64200, 1299, 174}, - communities: []uint32{100, 200, 400}, - largeCommunities: []bgp.LargeCommunity{{ASN: 64200, LocalData1: 2, LocalData2: 3}}, - }), - }) - s.rib.addPrefix(netip.MustParseAddr("::ffff:192.0.2.0"), 96+27, route{ - peer: pinfo.reference, - nlri: s.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC, path: 2}), - nextHop: s.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.8"))), - attributes: s.rib.rtas.Put(routeAttributes{ - asn: 174, - asPath: []uint32{64200, 174, 174, 174}, - communities: []uint32{100}, - }), - }) - s.rib.addPrefix(netip.MustParseAddr("::ffff:192.0.2.128"), 96+27, route{ - peer: pinfo.reference, - nlri: s.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC}), - nextHop: s.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.8"))), - attributes: s.rib.rtas.Put(routeAttributes{ - asn: 1299, - asPath: []uint32{64200, 1299}, - communities: []uint32{500}, - }), - }) - s.rib.addPrefix(netip.MustParseAddr("::ffff:1.0.0.0"), 96+24, route{ - peer: pinfo.reference, - nlri: s.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC}), - nextHop: s.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.8"))), - attributes: s.rib.rtas.Put(routeAttributes{ - asn: 65300, - }), - }) - return nil + pinfo := c.addPeer(peerKey{ + exporter: netip.MustParseAddrPort("[::ffff:127.0.0.1]:47389"), + ip: netip.MustParseAddr("::ffff:203.0.113.4"), + ptype: bmp.BMP_PEER_TYPE_GLOBAL, + asn: 64500, + }) + c.rib.addPrefix(netip.MustParseAddr("::ffff:192.0.2.0"), 96+27, route{ + peer: pinfo.reference, + nlri: c.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC, path: 1}), + nextHop: c.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.4"))), + attributes: c.rib.rtas.Put(routeAttributes{ + asn: 174, + asPath: []uint32{64200, 1299, 174}, + communities: []uint32{100, 200, 400}, + largeCommunities: []bgp.LargeCommunity{{ASN: 64200, LocalData1: 2, LocalData2: 3}}, + }), + }) + c.rib.addPrefix(netip.MustParseAddr("::ffff:192.0.2.0"), 96+27, route{ + peer: pinfo.reference, + nlri: c.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC, path: 2}), + nextHop: c.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.8"))), + attributes: c.rib.rtas.Put(routeAttributes{ + asn: 174, + asPath: []uint32{64200, 174, 174, 174}, + communities: []uint32{100}, + }), + }) + c.rib.addPrefix(netip.MustParseAddr("::ffff:192.0.2.128"), 96+27, route{ + peer: pinfo.reference, + nlri: c.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC}), + nextHop: c.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.8"))), + attributes: c.rib.rtas.Put(routeAttributes{ + asn: 1299, + asPath: []uint32{64200, 1299}, + communities: []uint32{500}, + }), + }) + c.rib.addPrefix(netip.MustParseAddr("::ffff:1.0.0.0"), 96+24, route{ + peer: pinfo.reference, + nlri: c.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC}), + nextHop: c.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.8"))), + attributes: c.rib.rtas.Put(routeAttributes{ + asn: 65300, + }), }) } diff --git a/inlet/core/enricher_test.go b/inlet/core/enricher_test.go index 12f8c4d2..c2ff08ab 100644 --- a/inlet/core/enricher_test.go +++ b/inlet/core/enricher_test.go @@ -351,7 +351,6 @@ ClassifyProviderRegex(Interface.Description, "^Transit: ([^ ]+)", "$1")`, kafkaComponent, kafkaProducer := kafka.NewMock(t, r, kafka.DefaultConfiguration()) httpComponent := http.NewMock(t, r) bmpComponent, _ := bmp.NewMock(t, r, bmp.DefaultConfiguration()) - helpers.StartStop(t, bmpComponent) bmpComponent.PopulateRIB(t) // Prepare a configuration @@ -460,8 +459,6 @@ func TestGetASNumber(t *testing.T) { configuration := DefaultConfiguration() configuration.ASNProviders = tc.Providers bmpComponent, _ := bmp.NewMock(t, r, bmp.DefaultConfiguration()) - bmpComponent.Start() - defer bmpComponent.Stop() bmpComponent.PopulateRIB(t) c, err := New(r, configuration, Dependencies{ diff --git a/inlet/core/root_test.go b/inlet/core/root_test.go index f2d7d042..23ed9910 100644 --- a/inlet/core/root_test.go +++ b/inlet/core/root_test.go @@ -42,7 +42,6 @@ func TestCore(t *testing.T) { kafkaComponent, kafkaProducer := kafka.NewMock(t, r, kafka.DefaultConfiguration()) httpComponent := http.NewMock(t, r) bmpComponent, _ := bmp.NewMock(t, r, bmp.DefaultConfiguration()) - helpers.StartStop(t, bmpComponent) bmpComponent.PopulateRIB(t) // Instantiate and start core