inlet/bmp: revert new BMP design

It needs more work to be stable. Let's do a release without it.
This commit is contained in:
Vincent Bernat
2022-11-26 11:42:46 +01:00
parent 074a672cbe
commit 7196ccf73b
18 changed files with 589 additions and 922 deletions

View File

@@ -146,18 +146,3 @@ func (p *InternPool[T]) Put(value T) InternReference[T] {
func (p *InternPool[T]) Len() int {
return len(p.values) - len(p.availableIndexes) - 1
}
// Clone returns a copy of the intern pool.
func (p *InternPool[T]) Clone() *InternPool[T] {
result := &InternPool[T]{
values: make([]internValue[T], len(p.values)),
availableIndexes: make([]InternReference[T], len(p.availableIndexes)),
valueIndexes: make(map[uint64]InternReference[T], len(p.valueIndexes)),
}
copy(result.values, p.values)
copy(result.availableIndexes, p.availableIndexes)
for k, v := range p.valueIndexes {
result.valueIndexes[k] = v
}
return result
}

View File

@@ -166,30 +166,3 @@ func TestTake(t *testing.T) {
t.Fatalf("Take() didn't free everything (%d remaining)", diff)
}
}
func TestClone(t *testing.T) {
p := NewInternPool[likeInt]()
a := p.Put(likeInt(10))
b := p.Put(likeInt(11))
c := p.Put(likeInt(12))
q := p.Clone()
q.Take(a)
q.Take(b)
d := q.Put(likeInt(12))
e := q.Put(likeInt(13))
f := p.Put(likeInt(13))
if p.Len() != 4 {
t.Errorf("p.Len() should be 4, not %d", q.Len())
}
if q.Len() != 2 {
t.Errorf("q.Len() should be 2, not %d", q.Len())
}
if d != c {
t.Error("12 should have the same ref in both")
}
if e == f {
t.Error("13 should not have the same ref in both")
}
}

View File

@@ -129,15 +129,6 @@ The following keys are accepted:
not supported)
- `keep` tells how much time the routes sent from a terminated BMP
connection should be kept
- `rib-mode` gives a choice on how to handle the internal RIB. `memory` (the
default) keeps only one copy of the RIB and lookups may be slightly since they
are processed sequentially by a single routine. `performance` copy the live
RIB to a read-only version from time to time (after `rib-idle-update-delay`
when the RIB is not updated, no more than `rib-minimum-update-delay` but at
least every `rib-maximum-update-delay`). With this mode, lookups can be
processed in parallel but there are three copies of the RIB in memory (in the
future, we may reduce this to two copies). With 1 million routes, a RIB is
about 50 MB.
If you are not interested in AS paths and communities, disabling them
will decrease the memory usage of *Akvorado*, as well as the disk

View File

@@ -206,13 +206,6 @@ for other exporters. However, ensuring the exporters accept to answer
requests is the first fix. If not enough, you can increase the number
of workers. Workers handle SNMP requests synchronously.
#### BMP collector
The BMP collector may handle million of routes. This can stress the system
during large updates. If you send many routes to the BMP collector, you can
switch the RIB mode of the collector to `performance`. Check the [documentation
about BMP](02-configuration.md#bmp) for more details.
### Reported traffic levels are incorrect
Use `curl -s http://akvorado/api/v0/inlet/flows\?limit=1 | grep

View File

@@ -13,18 +13,12 @@ identified with a specific icon:
## Unreleased
The BMP collector has been optimized in memory (more than 50%) and locks were
removed to reduce latency during lookups. There are two modes available. They
can be selected with `inlet.bmp.rib-mode`. If you have several million routes,
be sure to check the [BMP documentation](02-configuration.md#bmp) for more
details.
-*console*: add *100% stacked* graph type
- 🩹 *inlet*: handle non-fatal BMP decoding errors more gracefully
- 🩹 *inlet*: fix a small memory leak in BMP collector
- 🩹 *console*: fix selection of the aggregate table to not get empty graphs
- 🩹 *console*: use configured dimensions limit for “Visualize” tab
- 🌱 *inlet*: optimize BMP collector (see above)
- 🌱 *inlet*: optimize BMP CPU usage, memory usage, and lock times
- 🌱 *inlet*: replace LRU cache for classifiers by a time-based cache
- 🌱 *inlet*: add TLS support for Kafka transport
- 🌱 *console*: <kbd>Ctrl-Enter</kbd> or <kbd>Cmd-Enter</kbd> when editing a filter now applies the changes

View File

@@ -3,11 +3,7 @@
package bmp
import (
"errors"
"strings"
"time"
)
import "time"
// Configuration describes the configuration for the BMP server.
type Configuration struct {
@@ -24,74 +20,27 @@ type Configuration struct {
CollectCommunities bool
// Keep tells how long to keep routes from a BMP client when it goes down
Keep time.Duration `validate:"min=1s"`
// RIBMode tells which mode to use for the RIB
RIBMode RIBMode
// RIBIdleUpdateDelay tells to update the read-only RIB after being idle for
// that duration. This is only when RIB is in performance mode.
RIBIdleUpdateDelay time.Duration `validate:"min=1s"`
// RIBMinimumUpdateDelay tells to not update the read-only RIB less than
// that. This is only when RIB is in performance mode.
RIBMinimumUpdateDelay time.Duration `validate:"min=1s,gtfield=RIBIdleUpdateDelay"`
// RIBMaximumUpdateDelay tells to update the read-only RIB at least once
// every the specified delay (if there are updates). This is only if RIB is
// in performance mode.
RIBMaximumUpdateDelay time.Duration `validate:"min=1s,gtfield=RIBMinimumUpdateDelay"`
// RIBPeerRemovalBatchRoutes tells how many routes to remove before checking
// if we have a higher priority request. This is only if RIB is in memory
// mode.
RIBPeerRemovalBatchRoutes int `validate:"min=1"`
// PeerRemovalMaxTime tells the maximum time the removal worker should run to remove a peer
PeerRemovalMaxTime time.Duration `validate:"min=10ms"`
// PeerRemovalSleepInterval tells how much time to sleep between two runs of the removal worker
PeerRemovalSleepInterval time.Duration `validate:"min=10ms"`
// PeerRemovalMaxQueue tells how many pending removal requests to keep
PeerRemovalMaxQueue int `validate:"min=1"`
// PeerRemovalMinRoutes tells how many routes we have to remove in one run before yielding
PeerRemovalMinRoutes int `validate:"min=1"`
}
// DefaultConfiguration represents the default configuration for the BMP server
func DefaultConfiguration() Configuration {
return Configuration{
Listen: "0.0.0.0:10179",
CollectASNs: true,
CollectASPaths: true,
CollectCommunities: true,
Keep: 5 * time.Minute,
RIBMode: RIBModeMemory,
RIBIdleUpdateDelay: 5 * time.Second,
RIBMinimumUpdateDelay: 20 * time.Second,
RIBMaximumUpdateDelay: 2 * time.Minute,
RIBPeerRemovalBatchRoutes: 1000,
Listen: "0.0.0.0:10179",
CollectASNs: true,
CollectASPaths: true,
CollectCommunities: true,
Keep: 5 * time.Minute,
PeerRemovalMaxTime: 200 * time.Millisecond,
PeerRemovalSleepInterval: 500 * time.Millisecond,
PeerRemovalMaxQueue: 10000,
PeerRemovalMinRoutes: 5000,
}
}
// RIBMode is the mode used for the RIB
type RIBMode int
const (
// RIBModeMemory tries to minimize used memory
RIBModeMemory RIBMode = iota
// RIBModePerformance keep a read-only copy of the RIB for lookups
RIBModePerformance
)
// UnmarshalText parses a RIBMode
func (m *RIBMode) UnmarshalText(text []byte) error {
modes := map[string]RIBMode{
"memory": RIBModeMemory,
"performance": RIBModePerformance,
}
mode, ok := modes[strings.ToLower(string(text))]
if !ok {
return errors.New("unknown RIB mode")
}
*m = mode
return nil
}
// String turns a RIB mode to a string
func (m RIBMode) String() string {
modes := map[RIBMode]string{
RIBModeMemory: "memory",
RIBModePerformance: "performance",
}
return modes[m]
}
// MarshalText turns a RIB mode to a string
func (m RIBMode) MarshalText() ([]byte, error) {
return []byte(m.String()), nil
}

View File

@@ -4,14 +4,11 @@
package bmp
import (
"context"
"encoding/binary"
"errors"
"fmt"
"net/netip"
"time"
"akvorado/common/reporter"
"github.com/osrg/gobgp/v3/pkg/packet/bgp"
"github.com/osrg/gobgp/v3/pkg/packet/bmp"
)
@@ -28,8 +25,9 @@ type peerKey struct {
// peerInfo contains some information attached to a peer.
type peerInfo struct {
reference uint32 // used as a reference in the RIB
staleUntil time.Time // when to remove because it is stale
reference uint32 // used as a reference in the RIB
staleUntil time.Time // when to remove because it is stale
marshallingOptions []*bgp.MarshallingOption // decoding option (add-path mostly)
}
// peerKeyFromBMPPeerHeader computes the peer key from the BMP peer header.
@@ -45,223 +43,11 @@ func peerKeyFromBMPPeerHeader(exporter netip.AddrPort, header *bmp.BMPPeerHeader
}
}
// ribWorkerState is the state of the rib worker (accessible through the worker
// only).
type ribWorkerState struct {
ctx context.Context
rib *rib
peers map[peerKey]*peerInfo
peerLastReference uint32
}
// ribWorkerPayload is what we provide the RIB worker with. The channel will be
// closed when done.
type ribWorkerPayload struct {
fn func(*ribWorkerState) error
interruptible bool
done chan<- struct{}
}
var errRIBWorkerCanceled = errors.New("RIB worker timeout")
// ribWorker handle RIB-related operations (everything involving updating RIB
// and related structures). Tasks are functions queued inside the worker.
func (c *Component) ribWorker() error {
state := &ribWorkerState{
rib: newRIB(),
peers: make(map[peerKey]*peerInfo),
}
// Assume the last copy was done before minimum update delay
lastCopy := time.Now().Add(-c.config.RIBMinimumUpdateDelay)
nextTimer := time.NewTimer(c.config.RIBMaximumUpdateDelay)
timer := "maximum"
uptodate := true
priorityPayloads := make(chan ribWorkerPayload, 1)
pausedPayloads := make(chan ribWorkerPayload, 1)
if c.config.RIBMode == RIBModePerformance {
c.r.GaugeFunc(
reporter.GaugeOpts{
Name: "rib_lag_seconds",
Help: "How outdated is the readonly RIB.",
},
func() float64 {
if uptodate {
return 0
}
return time.Now().Sub(lastCopy).Seconds()
},
)
}
handleLowPriorityPayload := func(payload ribWorkerPayload) error {
// These low priority operations can be canceled when a high priority request happens.
if c.config.RIBMode == RIBModeMemory && payload.interruptible {
ctx, cancel := context.WithCancel(c.t.Context(context.Background()))
state.ctx = ctx
defer cancel()
go func() {
select {
case <-c.t.Dying():
case <-ctx.Done():
case payload := <-c.ribWorkerPrioChan:
priorityPayloads <- payload
cancel()
}
}()
}
uptodate = false
err := payload.fn(state)
if err == errRIBWorkerCanceled {
pausedPayloads <- payload
return nil
}
close(payload.done)
if err != nil {
return err
}
if c.config.RIBMode == RIBModePerformance {
if !nextTimer.Stop() {
select {
case <-nextTimer.C:
default:
}
}
now := time.Now()
delta := now.Sub(lastCopy)
if delta < c.config.RIBMinimumUpdateDelay {
nextTimer.Reset(c.config.RIBMinimumUpdateDelay - delta)
timer = "minimum"
} else if delta < c.config.RIBMaximumUpdateDelay-c.config.RIBIdleUpdateDelay {
nextTimer.Reset(c.config.RIBIdleUpdateDelay)
timer = "idle"
} else if delta >= c.config.RIBMaximumUpdateDelay {
c.updateRIBReadonly(state, "maximum")
lastCopy = now
uptodate = true
} else {
nextTimer.Reset(c.config.RIBMaximumUpdateDelay - delta)
timer = "maximum"
}
}
return nil
}
for {
state.ctx = nil
select {
// Two "high priority" chans: one for internal, one for external
case payload := <-priorityPayloads:
err := payload.fn(state)
close(payload.done)
if err != nil {
return err
}
case payload := <-c.ribWorkerPrioChan:
err := payload.fn(state)
close(payload.done)
if err != nil {
return err
}
default:
select {
case <-c.t.Dying():
return nil
// No need to watch for internal high priority, it should have been
// handled before. We can still get high priority requests from the
// external one.
case payload := <-c.ribWorkerPrioChan:
err := payload.fn(state)
close(payload.done)
if err != nil {
return err
}
case payload := <-pausedPayloads:
if err := handleLowPriorityPayload(payload); err != nil {
return err
}
case payload := <-c.ribWorkerChan:
if err := handleLowPriorityPayload(payload); err != nil {
return err
}
case <-nextTimer.C:
c.updateRIBReadonly(state, timer)
lastCopy = time.Now()
uptodate = true
}
}
}
}
type ribWorkerQueueOptions int
const (
ribWorkerHighPriority ribWorkerQueueOptions = iota
ribWorkerInterruptible
)
// ribWorkerQueue queues a new task for the RIB worker thread. We wait for the
// task to be handled. We don't want to queue up a lot of tasks asynchronously.
func (c *Component) ribWorkerQueue(fn func(*ribWorkerState) error, options ...ribWorkerQueueOptions) {
ch := c.ribWorkerChan
done := make(chan struct{})
payload := ribWorkerPayload{
fn: fn,
done: done,
}
for _, option := range options {
switch option {
case ribWorkerHighPriority:
ch = c.ribWorkerPrioChan
case ribWorkerInterruptible:
payload.interruptible = true
}
}
select {
case <-c.t.Dying():
case ch <- payload:
select {
case <-c.t.Dying():
case <-done:
}
}
}
// updateRIBReadonly updates the read-only copy of the RIB
func (c *Component) updateRIBReadonly(s *ribWorkerState, timer string) {
if c.config.RIBMode == RIBModePerformance {
c.r.Debug().Msg("copy live RIB to read-only version")
start := time.Now()
defer c.metrics.ribCopies.WithLabelValues(timer).Observe(
float64(time.Now().Sub(start).Nanoseconds()) / 1000 / 1000 / 1000)
new := s.rib.clone()
c.ribReadonly.Store(new)
}
}
// addPeer provides a reference to a new peer.
func (c *Component) addPeer(s *ribWorkerState, pkey peerKey) *peerInfo {
s.peerLastReference++
if s.peerLastReference == 0 {
// This is a very unlikely event, but we don't
// have anything better. Let's crash (and
// hopefully be restarted).
c.r.Fatal().Msg("too many peer up events")
go c.Stop()
}
pinfo := &peerInfo{
reference: s.peerLastReference,
}
s.peers[pkey] = pinfo
return pinfo
}
// scheduleStalePeersRemoval schedule the next time a peer should be removed.
func (c *Component) scheduleStalePeersRemoval(s *ribWorkerState) {
// scheduleStalePeersRemoval schedule the next time a peer should be
// removed. This should be called with the lock held.
func (c *Component) scheduleStalePeersRemoval() {
var next time.Time
for _, pinfo := range s.peers {
for _, pinfo := range c.peers {
if pinfo.staleUntil.IsZero() {
continue
}
@@ -271,85 +57,102 @@ func (c *Component) scheduleStalePeersRemoval(s *ribWorkerState) {
}
if next.IsZero() {
c.r.Debug().Msg("no stale peer")
c.peerStaleTimer.Stop()
c.staleTimer.Stop()
} else {
c.r.Debug().Msgf("next removal for stale peer scheduled on %s", next)
c.peerStaleTimer.Reset(c.d.Clock.Until(next))
c.staleTimer.Reset(c.d.Clock.Until(next))
}
}
// removePeer remove a peer.
func (c *Component) removePeer(s *ribWorkerState, pkey peerKey, reason string) error {
// removeStalePeers remove the stale peers.
func (c *Component) removeStalePeers() {
start := c.d.Clock.Now()
c.r.Debug().Msg("remove stale peers")
c.mu.Lock()
defer c.mu.Unlock()
for pkey, pinfo := range c.peers {
if pinfo.staleUntil.IsZero() || pinfo.staleUntil.After(start) {
continue
}
c.removePeer(pkey, "stale")
}
c.scheduleStalePeersRemoval()
}
func (c *Component) addPeer(pkey peerKey) *peerInfo {
c.lastPeerReference++
if c.lastPeerReference == 0 {
// This is a very unlikely event, but we don't
// have anything better. Let's crash (and
// hopefully be restarted).
c.r.Fatal().Msg("too many peer up events")
go c.Stop()
}
pinfo := &peerInfo{
reference: c.lastPeerReference,
}
c.peers[pkey] = pinfo
return pinfo
}
// removePeer remove a peer (with lock held)
func (c *Component) removePeer(pkey peerKey, reason string) {
exporterStr := pkey.exporter.Addr().Unmap().String()
peerStr := pkey.ip.Unmap().String()
pinfo := s.peers[pkey]
c.r.Info().Msgf("remove peer %s for exporter %s (reason: %s)", peerStr, exporterStr, reason)
removed, done := s.rib.flushPeerContext(s.ctx, pinfo.reference, c.config.RIBPeerRemovalBatchRoutes)
c.metrics.routes.WithLabelValues(exporterStr).Sub(float64(removed))
if !done {
c.metrics.peerRemovalPartial.WithLabelValues(exporterStr).Inc()
return errRIBWorkerCanceled
select {
case c.peerRemovalChan <- pkey:
return
default:
}
c.metrics.peerRemovalDone.WithLabelValues(exporterStr).Inc()
c.metrics.peers.WithLabelValues(exporterStr).Dec()
delete(s.peers, pkey)
return nil
c.metrics.peerRemovalQueueFull.WithLabelValues(exporterStr).Inc()
c.mu.Unlock()
select {
case c.peerRemovalChan <- pkey:
case <-c.t.Dying():
}
c.mu.Lock()
}
// handleStalePeers remove the stale peers.
func (c *Component) handleStalePeers() {
c.ribWorkerQueue(func(s *ribWorkerState) error {
start := c.d.Clock.Now()
c.r.Debug().Msg("remove stale peers")
for pkey, pinfo := range s.peers {
if pinfo.staleUntil.IsZero() || pinfo.staleUntil.After(start) {
continue
}
if err := c.removePeer(s, pkey, "stale"); err != nil {
return err
}
// markExporterAsStale marks all peers from an exporter as stale.
func (c *Component) markExporterAsStale(exporter netip.AddrPort, until time.Time) {
c.mu.Lock()
defer c.mu.Unlock()
for pkey, pinfo := range c.peers {
if pkey.exporter != exporter {
continue
}
c.scheduleStalePeersRemoval(s)
return nil
}, ribWorkerInterruptible)
pinfo.staleUntil = until
}
c.scheduleStalePeersRemoval()
}
// handlePeerDownNotification handles a peer-down notification by
// removing the peer.
func (c *Component) handlePeerDownNotification(pkey peerKey) {
c.ribWorkerQueue(func(s *ribWorkerState) error {
_, ok := s.peers[pkey]
if !ok {
c.r.Info().Msgf("received peer down from exporter %s for peer %s, but no peer up",
pkey.exporter.Addr().Unmap().String(),
pkey.ip.Unmap().String())
return nil
}
return c.removePeer(s, pkey, "down")
}, ribWorkerInterruptible)
c.mu.Lock()
defer c.mu.Unlock()
_, ok := c.peers[pkey]
if !ok {
c.r.Info().Msgf("received peer down from exporter %s for peer %s, but no peer up",
pkey.exporter.Addr().Unmap().String(),
pkey.ip.Unmap().String())
return
}
c.removePeer(pkey, "down")
}
// handleConnectionDown handles a disconnect or a session termination
// by marking all associated peers as stale.
func (c *Component) handleConnectionDown(exporter netip.AddrPort) {
until := c.d.Clock.Now().Add(c.config.Keep)
c.ribWorkerQueue(func(s *ribWorkerState) error {
for pkey, pinfo := range s.peers {
if pkey.exporter != exporter {
continue
}
pinfo.staleUntil = until
}
c.scheduleStalePeersRemoval(s)
return nil
})
c.markExporterAsStale(exporter, until)
}
// handleConnectionUp handles the connection from a new exporter.
func (c *Component) handleConnectionUp(exporter netip.AddrPort) {
// Do it without RIB worker, we just update counters.
// Do not set to 0, exporterStr may cover several exporters.
exporterStr := exporter.Addr().Unmap().String()
// Do not set to 0, exporterStr may cover several exporters.
c.metrics.peers.WithLabelValues(exporterStr).Add(0)
c.metrics.routes.WithLabelValues(exporterStr).Add(0)
}
@@ -359,23 +162,63 @@ func (c *Component) handlePeerUpNotification(pkey peerKey, body *bmp.BMPPeerUpNo
if body.ReceivedOpenMsg == nil || body.SentOpenMsg == nil {
return
}
c.mu.Lock()
defer c.mu.Unlock()
c.ribWorkerQueue(func(s *ribWorkerState) error {
exporterStr := pkey.exporter.Addr().Unmap().String()
peerStr := pkey.ip.Unmap().String()
_, ok := s.peers[pkey]
if ok {
c.r.Info().Msgf("received extra peer up from exporter %s for peer %s",
exporterStr, peerStr)
} else {
// Peer does not exist at all
c.metrics.peers.WithLabelValues(exporterStr).Inc()
c.addPeer(s, pkey)
exporterStr := pkey.exporter.Addr().Unmap().String()
peerStr := pkey.ip.Unmap().String()
pinfo, ok := c.peers[pkey]
if ok {
c.r.Info().Msgf("received extra peer up from exporter %s for peer %s",
exporterStr, peerStr)
} else {
// Peer does not exist at all
c.metrics.peers.WithLabelValues(exporterStr).Inc()
pinfo = c.addPeer(pkey)
}
// Check for ADD-PATH support.
receivedAddPath := map[bgp.RouteFamily]bgp.BGPAddPathMode{}
received, _ := body.ReceivedOpenMsg.Body.(*bgp.BGPOpen)
for _, param := range received.OptParams {
switch param := param.(type) {
case *bgp.OptionParameterCapability:
for _, cap := range param.Capability {
switch cap := cap.(type) {
case *bgp.CapAddPath:
for _, tuple := range cap.Tuples {
receivedAddPath[tuple.RouteFamily] = tuple.Mode
}
}
}
}
}
sent, _ := body.SentOpenMsg.Body.(*bgp.BGPOpen)
addPathOption := map[bgp.RouteFamily]bgp.BGPAddPathMode{}
for _, param := range sent.OptParams {
switch param := param.(type) {
case *bgp.OptionParameterCapability:
for _, cap := range param.Capability {
switch cap := cap.(type) {
case *bgp.CapAddPath:
for _, sent := range cap.Tuples {
receivedMode := receivedAddPath[sent.RouteFamily]
if receivedMode == bgp.BGP_ADD_PATH_BOTH || receivedMode == bgp.BGP_ADD_PATH_SEND {
if sent.Mode == bgp.BGP_ADD_PATH_BOTH || sent.Mode == bgp.BGP_ADD_PATH_RECEIVE {
// We have at least the receive mode. We only do decoding.
addPathOption[sent.RouteFamily] = bgp.BGP_ADD_PATH_RECEIVE
}
}
}
}
}
}
}
pinfo.marshallingOptions = []*bgp.MarshallingOption{{AddPath: addPathOption}}
c.r.Debug().Msgf("new peer %s from exporter %s", peerStr, exporterStr)
return nil
})
c.r.Debug().
Str("addpath", fmt.Sprintf("%s", addPathOption)).
Msgf("new peer %s from exporter %s", peerStr, exporterStr)
}
func (c *Component) handleRouteMonitoring(pkey peerKey, body *bmp.BMPRouteMonitoring) {
@@ -388,195 +231,194 @@ func (c *Component) handleRouteMonitoring(pkey peerKey, body *bmp.BMPRouteMonito
return
}
c.ribWorkerQueue(func(s *ribWorkerState) error {
c.mu.Lock()
defer c.mu.Unlock()
// Ignore this peer if this is a L3VPN and it does not have
// the right RD.
if pkey.ptype == bmp.BMP_PEER_TYPE_L3VPN && !c.isAcceptedRD(pkey.distinguisher) {
return nil
}
// Ignore this peer if this is a L3VPN and it does not have
// the right RD.
if pkey.ptype == bmp.BMP_PEER_TYPE_L3VPN && !c.isAcceptedRD(pkey.distinguisher) {
return
}
exporterStr := pkey.exporter.Addr().Unmap().String()
peerStr := pkey.ip.Unmap().String()
pinfo, ok := s.peers[pkey]
if !ok {
// We may have missed the peer down notification?
c.r.Info().Msgf("received route monitoring from exporter %s for peer %s, but no peer up",
exporterStr, peerStr)
c.metrics.peers.WithLabelValues(exporterStr).Inc()
pinfo = c.addPeer(s, pkey)
}
exporterStr := pkey.exporter.Addr().Unmap().String()
peerStr := pkey.ip.Unmap().String()
pinfo, ok := c.peers[pkey]
if !ok {
// We may have missed the peer down notification?
c.r.Info().Msgf("received route monitoring from exporter %s for peer %s, but no peer up",
exporterStr, peerStr)
c.metrics.peers.WithLabelValues(exporterStr).Inc()
pinfo = c.addPeer(pkey)
}
var nh netip.Addr
var rta routeAttributes
for _, attr := range update.PathAttributes {
switch attr := attr.(type) {
case *bgp.PathAttributeNextHop:
nh, _ = netip.AddrFromSlice(attr.Value.To16())
case *bgp.PathAttributeAsPath:
if c.config.CollectASNs || c.config.CollectASPaths {
rta.asPath = asPathFlat(attr)
}
case *bgp.PathAttributeCommunities:
if c.config.CollectCommunities {
rta.communities = attr.Value
}
case *bgp.PathAttributeLargeCommunities:
if c.config.CollectCommunities {
rta.largeCommunities = make([]bgp.LargeCommunity, len(attr.Values))
for idx, c := range attr.Values {
rta.largeCommunities[idx] = *c
}
var nh netip.Addr
var rta routeAttributes
for _, attr := range update.PathAttributes {
switch attr := attr.(type) {
case *bgp.PathAttributeNextHop:
nh, _ = netip.AddrFromSlice(attr.Value.To16())
case *bgp.PathAttributeAsPath:
if c.config.CollectASNs || c.config.CollectASPaths {
rta.asPath = asPathFlat(attr)
}
case *bgp.PathAttributeCommunities:
if c.config.CollectCommunities {
rta.communities = attr.Value
}
case *bgp.PathAttributeLargeCommunities:
if c.config.CollectCommunities {
rta.largeCommunities = make([]bgp.LargeCommunity, len(attr.Values))
for idx, c := range attr.Values {
rta.largeCommunities[idx] = *c
}
}
}
// If no AS path, consider the peer AS as the origin AS,
// otherwise the last AS.
if c.config.CollectASNs {
if path := rta.asPath; len(path) == 0 {
rta.asn = pkey.asn
} else {
rta.asn = path[len(path)-1]
}
}
if !c.config.CollectASPaths {
rta.asPath = rta.asPath[:0]
}
// If no AS path, consider the peer AS as the origin AS,
// otherwise the last AS.
if c.config.CollectASNs {
if path := rta.asPath; len(path) == 0 {
rta.asn = pkey.asn
} else {
rta.asn = path[len(path)-1]
}
}
if !c.config.CollectASPaths {
rta.asPath = rta.asPath[:0]
}
added := 0
removed := 0
added := 0
removed := 0
// Regular NLRI and withdrawn routes
if pkey.ptype == bmp.BMP_PEER_TYPE_L3VPN || c.isAcceptedRD(0) {
for _, ipprefix := range update.NLRI {
prefix := ipprefix.Prefix
plen := int(ipprefix.Length)
if prefix.To4() != nil {
prefix = prefix.To16()
plen += 96
}
p, _ := netip.AddrFromSlice(prefix)
added += s.rib.addPrefix(p, plen, route{
peer: pinfo.reference,
nlri: s.rib.nlris.Put(nlri{
family: bgp.RF_IPv4_UC,
path: ipprefix.PathIdentifier(),
rd: pkey.distinguisher,
}),
nextHop: s.rib.nextHops.Put(nextHop(nh)),
attributes: s.rib.rtas.Put(rta),
})
// Regular NLRI and withdrawn routes
if pkey.ptype == bmp.BMP_PEER_TYPE_L3VPN || c.isAcceptedRD(0) {
for _, ipprefix := range update.NLRI {
prefix := ipprefix.Prefix
plen := int(ipprefix.Length)
if prefix.To4() != nil {
prefix = prefix.To16()
plen += 96
}
for _, ipprefix := range update.WithdrawnRoutes {
prefix := ipprefix.Prefix
plen := int(ipprefix.Length)
if prefix.To4() != nil {
prefix = prefix.To16()
plen += 96
}
p, _ := netip.AddrFromSlice(prefix)
if nlriRef, ok := s.rib.nlris.Ref(nlri{
p, _ := netip.AddrFromSlice(prefix)
added += c.rib.addPrefix(p, plen, route{
peer: pinfo.reference,
nlri: c.rib.nlris.Put(nlri{
family: bgp.RF_IPv4_UC,
path: ipprefix.PathIdentifier(),
rd: pkey.distinguisher,
}),
nextHop: c.rib.nextHops.Put(nextHop(nh)),
attributes: c.rib.rtas.Put(rta),
})
}
for _, ipprefix := range update.WithdrawnRoutes {
prefix := ipprefix.Prefix
plen := int(ipprefix.Length)
if prefix.To4() != nil {
prefix = prefix.To16()
plen += 96
}
p, _ := netip.AddrFromSlice(prefix)
if nlriRef, ok := c.rib.nlris.Ref(nlri{
family: bgp.RF_IPv4_UC,
path: ipprefix.PathIdentifier(),
rd: pkey.distinguisher,
}); ok {
removed += c.rib.removePrefix(p, plen, route{
peer: pinfo.reference,
nlri: nlriRef,
})
}
}
}
// MP reach and unreach NLRI
for _, attr := range update.PathAttributes {
var p netip.Addr
var plen int
var rd RD
var ipprefixes []bgp.AddrPrefixInterface
switch attr := attr.(type) {
case *bgp.PathAttributeMpReachNLRI:
nh, _ = netip.AddrFromSlice(attr.Nexthop.To16())
ipprefixes = attr.Value
case *bgp.PathAttributeMpUnreachNLRI:
ipprefixes = attr.Value
}
for _, ipprefix := range ipprefixes {
switch ipprefix := ipprefix.(type) {
case *bgp.IPAddrPrefix:
p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16())
plen = int(ipprefix.Length + 96)
rd = pkey.distinguisher
case *bgp.IPv6AddrPrefix:
p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16())
plen = int(ipprefix.Length)
rd = pkey.distinguisher
case *bgp.LabeledIPAddrPrefix:
p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16())
plen = int(ipprefix.IPPrefixLen() + 96)
rd = pkey.distinguisher
case *bgp.LabeledIPv6AddrPrefix:
p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16())
plen = int(ipprefix.IPPrefixLen())
rd = pkey.distinguisher
case *bgp.LabeledVPNIPAddrPrefix:
p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16())
plen = int(ipprefix.IPPrefixLen() + 96)
rd = RDFromRouteDistinguisherInterface(ipprefix.RD)
case *bgp.LabeledVPNIPv6AddrPrefix:
p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16())
plen = int(ipprefix.IPPrefixLen())
rd = RDFromRouteDistinguisherInterface(ipprefix.RD)
case *bgp.EVPNNLRI:
switch route := ipprefix.RouteTypeData.(type) {
case *bgp.EVPNIPPrefixRoute:
prefix := route.IPPrefix
plen = int(route.IPPrefixLength)
if prefix.To4() != nil {
prefix = prefix.To16()
plen += 96
}
p, _ = netip.AddrFromSlice(prefix.To16())
rd = RDFromRouteDistinguisherInterface(route.RD)
}
default:
c.metrics.ignoredNlri.WithLabelValues(exporterStr,
bgp.AfiSafiToRouteFamily(ipprefix.AFI(), ipprefix.SAFI()).String()).Inc()
continue
}
if pkey.ptype != bmp.BMP_PEER_TYPE_L3VPN && !c.isAcceptedRD(rd) {
continue
}
switch attr.(type) {
case *bgp.PathAttributeMpReachNLRI:
added += c.rib.addPrefix(p, plen, route{
peer: pinfo.reference,
nlri: c.rib.nlris.Put(nlri{
family: bgp.AfiSafiToRouteFamily(ipprefix.AFI(), ipprefix.SAFI()),
rd: rd,
path: ipprefix.PathIdentifier(),
}),
nextHop: c.rib.nextHops.Put(nextHop(nh)),
attributes: c.rib.rtas.Put(rta),
})
case *bgp.PathAttributeMpUnreachNLRI:
if nlriRef, ok := c.rib.nlris.Ref(nlri{
family: bgp.AfiSafiToRouteFamily(ipprefix.AFI(), ipprefix.SAFI()),
rd: rd,
path: ipprefix.PathIdentifier(),
}); ok {
removed += s.rib.removePrefix(p, plen, route{
removed += c.rib.removePrefix(p, plen, route{
peer: pinfo.reference,
nlri: nlriRef,
})
}
}
}
}
// MP reach and unreach NLRI
for _, attr := range update.PathAttributes {
var p netip.Addr
var plen int
var rd RD
var ipprefixes []bgp.AddrPrefixInterface
switch attr := attr.(type) {
case *bgp.PathAttributeMpReachNLRI:
nh, _ = netip.AddrFromSlice(attr.Nexthop.To16())
ipprefixes = attr.Value
case *bgp.PathAttributeMpUnreachNLRI:
ipprefixes = attr.Value
}
for _, ipprefix := range ipprefixes {
switch ipprefix := ipprefix.(type) {
case *bgp.IPAddrPrefix:
p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16())
plen = int(ipprefix.Length + 96)
rd = pkey.distinguisher
case *bgp.IPv6AddrPrefix:
p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16())
plen = int(ipprefix.Length)
rd = pkey.distinguisher
case *bgp.LabeledIPAddrPrefix:
p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16())
plen = int(ipprefix.IPPrefixLen() + 96)
rd = pkey.distinguisher
case *bgp.LabeledIPv6AddrPrefix:
p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16())
plen = int(ipprefix.IPPrefixLen())
rd = pkey.distinguisher
case *bgp.LabeledVPNIPAddrPrefix:
p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16())
plen = int(ipprefix.IPPrefixLen() + 96)
rd = RDFromRouteDistinguisherInterface(ipprefix.RD)
case *bgp.LabeledVPNIPv6AddrPrefix:
p, _ = netip.AddrFromSlice(ipprefix.Prefix.To16())
plen = int(ipprefix.IPPrefixLen())
rd = RDFromRouteDistinguisherInterface(ipprefix.RD)
case *bgp.EVPNNLRI:
switch route := ipprefix.RouteTypeData.(type) {
case *bgp.EVPNIPPrefixRoute:
prefix := route.IPPrefix
plen = int(route.IPPrefixLength)
if prefix.To4() != nil {
prefix = prefix.To16()
plen += 96
}
p, _ = netip.AddrFromSlice(prefix.To16())
rd = RDFromRouteDistinguisherInterface(route.RD)
}
default:
c.metrics.ignoredNlri.WithLabelValues(exporterStr,
bgp.AfiSafiToRouteFamily(ipprefix.AFI(), ipprefix.SAFI()).String()).Inc()
continue
}
if pkey.ptype != bmp.BMP_PEER_TYPE_L3VPN && !c.isAcceptedRD(rd) {
continue
}
switch attr.(type) {
case *bgp.PathAttributeMpReachNLRI:
added += s.rib.addPrefix(p, plen, route{
peer: pinfo.reference,
nlri: s.rib.nlris.Put(nlri{
family: bgp.AfiSafiToRouteFamily(ipprefix.AFI(), ipprefix.SAFI()),
rd: rd,
path: ipprefix.PathIdentifier(),
}),
nextHop: s.rib.nextHops.Put(nextHop(nh)),
attributes: s.rib.rtas.Put(rta),
})
case *bgp.PathAttributeMpUnreachNLRI:
if nlriRef, ok := s.rib.nlris.Ref(nlri{
family: bgp.AfiSafiToRouteFamily(ipprefix.AFI(), ipprefix.SAFI()),
rd: rd,
path: ipprefix.PathIdentifier(),
}); ok {
removed += s.rib.removePrefix(p, plen, route{
peer: pinfo.reference,
nlri: nlriRef,
})
}
}
}
}
c.metrics.routes.WithLabelValues(exporterStr).Add(float64(added - removed))
return nil
})
c.metrics.routes.WithLabelValues(exporterStr).Add(float64(added - removed))
}
func (c *Component) isAcceptedRD(rd RD) bool {

View File

@@ -23,55 +23,45 @@ type LookupResult struct {
// provided next hop if provided. This is somewhat approximate because
// we use the best route we have, while the exporter may not have this
// best route available. The returned result should not be modified!
func (c *Component) Lookup(addrIP net.IP, nextHopIP net.IP) (result LookupResult) {
func (c *Component) Lookup(addrIP net.IP, nextHopIP net.IP) LookupResult {
if !c.config.CollectASNs && !c.config.CollectASPaths && !c.config.CollectCommunities {
return
return LookupResult{}
}
ip, _ := netip.AddrFromSlice(addrIP.To16())
nh, _ := netip.AddrFromSlice(nextHopIP.To16())
v6 := patricia.NewIPv6Address(ip.AsSlice(), 128)
lookup := func(rib *rib) error {
bestFound := false
found := false
_, routes := rib.tree.FindDeepestTagsWithFilter(v6, func(route route) bool {
if bestFound {
// We already have the best route, skip remaining routes
return false
}
if rib.nextHops.Get(route.nextHop) == nextHop(nh) {
// Exact match found, use it and don't search further
bestFound = true
return true
}
// If we don't have a match already, use this one.
if !found {
found = true
return true
}
// Otherwise, skip it
return false
})
if len(routes) == 0 {
return nil
}
attributes := rib.rtas.Get(routes[len(routes)-1].attributes)
result = LookupResult{
ASN: attributes.asn,
ASPath: attributes.asPath,
Communities: attributes.communities,
LargeCommunities: attributes.largeCommunities,
}
return nil
}
c.mu.RLock()
defer c.mu.RUnlock()
switch c.config.RIBMode {
case RIBModeMemory:
c.ribWorkerQueue(func(s *ribWorkerState) error {
return lookup(s.rib)
}, ribWorkerHighPriority)
case RIBModePerformance:
lookup(c.ribReadonly.Load())
bestFound := false
found := false
_, routes := c.rib.tree.FindDeepestTagsWithFilter(v6, func(route route) bool {
if bestFound {
// We already have the best route, skip remaining routes
return false
}
if c.rib.nextHops.Get(route.nextHop) == nextHop(nh) {
// Exact match found, use it and don't search further
bestFound = true
return true
}
// If we don't have a match already, use this one.
if !found {
found = true
return true
}
// Otherwise, skip it
return false
})
if len(routes) == 0 {
return LookupResult{}
}
attributes := c.rib.rtas.Get(routes[len(routes)-1].attributes)
return LookupResult{
ASN: attributes.asn,
ASPath: attributes.asPath,
Communities: attributes.communities,
LargeCommunities: attributes.largeCommunities,
}
return
}

View File

@@ -6,18 +6,19 @@ package bmp
import "akvorado/common/reporter"
type metrics struct {
openedConnections *reporter.CounterVec
closedConnections *reporter.CounterVec
peers *reporter.GaugeVec
routes *reporter.GaugeVec
ignoredNlri *reporter.CounterVec
messages *reporter.CounterVec
errors *reporter.CounterVec
ignored *reporter.CounterVec
panics *reporter.CounterVec
ribCopies *reporter.SummaryVec
peerRemovalPartial *reporter.CounterVec
peerRemovalDone *reporter.CounterVec
openedConnections *reporter.CounterVec
closedConnections *reporter.CounterVec
peers *reporter.GaugeVec
routes *reporter.GaugeVec
ignoredNlri *reporter.CounterVec
messages *reporter.CounterVec
errors *reporter.CounterVec
ignored *reporter.CounterVec
panics *reporter.CounterVec
locked *reporter.SummaryVec
peerRemovalDone *reporter.CounterVec
peerRemovalPartial *reporter.CounterVec
peerRemovalQueueFull *reporter.CounterVec
}
// initMetrics initialize the metrics for the BMP component.
@@ -85,13 +86,13 @@ func (c *Component) initMetrics() {
},
[]string{"exporter"},
)
c.metrics.ribCopies = c.r.SummaryVec(
c.metrics.locked = c.r.SummaryVec(
reporter.SummaryOpts{
Name: "rib_copies_total",
Help: "Duration of RIB copies to read-only version.",
Name: "locked_duration_seconds",
Help: "Duration during which the RIB is locked.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"timer"},
[]string{"reason"},
)
c.metrics.peerRemovalDone = c.r.CounterVec(
reporter.CounterOpts{
@@ -107,4 +108,11 @@ func (c *Component) initMetrics() {
},
[]string{"exporter"},
)
c.metrics.peerRemovalQueueFull = c.r.CounterVec(
reporter.CounterOpts{
Name: "peer_removal_queue_full_total",
Help: "Number of time the removal queue was full.",
},
[]string{"exporter"},
)
}

56
inlet/bmp/remove.go Normal file
View File

@@ -0,0 +1,56 @@
// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package bmp
import (
"context"
"time"
)
func (c *Component) peerRemovalWorker() error {
for {
select {
case <-c.t.Dying():
return nil
case pkey := <-c.peerRemovalChan:
exporterStr := pkey.exporter.Addr().Unmap().String()
for {
// Do one run of removal.
removed, done := func() (int, bool) {
ctx, cancel := context.WithTimeout(c.t.Context(context.Background()),
c.config.PeerRemovalMaxTime)
defer cancel()
start := c.d.Clock.Now()
c.mu.Lock()
defer func() {
c.mu.Unlock()
c.metrics.locked.WithLabelValues("peer-removal").Observe(
float64(c.d.Clock.Now().Sub(start).Nanoseconds()) / 1000 / 1000 / 1000)
}()
pinfo := c.peers[pkey]
removed, done := c.rib.flushPeer(ctx, pinfo.reference, c.config.PeerRemovalMinRoutes)
if done {
// Run was complete, remove the peer (we need the lock)
delete(c.peers, pkey)
}
return removed, done
}()
c.metrics.routes.WithLabelValues(exporterStr).Sub(float64(removed))
if done {
// Run was complete, update metrics
c.metrics.peers.WithLabelValues(exporterStr).Dec()
c.metrics.peerRemovalDone.WithLabelValues(exporterStr).Inc()
break
}
// Run is incompletem, update metrics and sleep a bit
c.metrics.peerRemovalPartial.WithLabelValues(exporterStr).Inc()
select {
case <-c.t.Dying():
return nil
case <-time.After(c.config.PeerRemovalSleepInterval):
}
}
}
}
}

View File

@@ -169,29 +169,19 @@ func (r *rib) removePrefix(ip netip.Addr, bits int, old route) int {
// flushPeer removes a whole peer from the RIB, returning the number
// of removed routes.
func (r *rib) flushPeer(peer uint32) int {
removed, _ := r.flushPeerContext(nil, peer, 0)
return removed
}
// flushPeerContext removes a whole peer from the RIB, with a context returning
// the number of removed routes and a bool to say if the operation was completed
// before cancellation.
func (r *rib) flushPeerContext(ctx context.Context, peer uint32, steps int) (int, bool) {
done := atomic.Bool{}
stop := make(chan struct{})
lastStep := 0
if ctx != nil {
defer close(stop)
go func() {
select {
case <-stop:
return
case <-ctx.Done():
done.Store(true)
}
}()
}
func (r *rib) flushPeer(ctx context.Context, peer uint32, min int) (int, bool) {
// Handle context done state
done := atomic.Bool{} // signal for the loop to stop when true
stop := make(chan struct{}) // signal for the goroutine to stop
defer close(stop)
go func() {
select {
case <-stop:
return
case <-ctx.Done():
done.Store(true)
}
}()
// Flush routes
removed := 0
@@ -208,10 +198,7 @@ func (r *rib) flushPeerContext(ctx context.Context, peer uint32, steps int) (int
}
return false
}, route{})
if ctx != nil && removed/steps > lastStep {
runtime.Gosched()
instrumentFlushPeer()
lastStep = removed / steps
if removed >= min {
if done.Load() {
return removed, false
}
@@ -220,18 +207,6 @@ func (r *rib) flushPeerContext(ctx context.Context, peer uint32, steps int) (int
return removed, true
}
var instrumentFlushPeer = func() {}
// clone clone an existing RIB.
func (r *rib) clone() *rib {
return &rib{
tree: r.tree.Clone(),
nlris: r.nlris.Clone(),
nextHops: r.nextHops.Clone(),
rtas: r.rtas.Clone(),
}
}
// newRIB initializes a new RIB.
func newRIB() *rib {
return &rib{

View File

@@ -4,6 +4,7 @@
package bmp
import (
"context"
"fmt"
"math/rand"
"net/netip"
@@ -294,7 +295,7 @@ func TestRIB(t *testing.T) {
// Remove everything
for _, peer := range peers {
r.flushPeer(peer)
r.flushPeer(context.Background(), peer, 100)
}
// Check for leak of interned values

View File

@@ -8,7 +8,7 @@ package bmp
import (
"fmt"
"net"
"sync/atomic"
"sync"
"time"
"github.com/benbjohnson/clock"
@@ -29,11 +29,13 @@ type Component struct {
address net.Addr
metrics metrics
// RIB management
ribReadonly atomic.Pointer[rib]
ribWorkerChan chan ribWorkerPayload
ribWorkerPrioChan chan ribWorkerPayload
peerStaleTimer *clock.Timer
// RIB management with peers
rib *rib
peers map[peerKey]*peerInfo
peerRemovalChan chan peerKey
lastPeerReference uint32
staleTimer *clock.Timer
mu sync.RWMutex
}
// Dependencies define the dependencies of the BMP component.
@@ -52,8 +54,9 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
d: &dependencies,
config: configuration,
ribWorkerChan: make(chan ribWorkerPayload, 100),
ribWorkerPrioChan: make(chan ribWorkerPayload, 100),
rib: newRIB(),
peers: make(map[peerKey]*peerInfo),
peerRemovalChan: make(chan peerKey, configuration.PeerRemovalMaxQueue),
}
if len(c.config.RDs) > 0 {
c.acceptedRDs = make(map[uint64]struct{})
@@ -61,10 +64,7 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
c.acceptedRDs[uint64(rd)] = struct{}{}
}
}
c.peerStaleTimer = c.d.Clock.AfterFunc(time.Hour, c.handleStalePeers)
if c.config.RIBMode == RIBModePerformance {
c.ribReadonly.Store(newRIB())
}
c.staleTimer = c.d.Clock.AfterFunc(time.Hour, c.removeStalePeers)
c.d.Daemon.Track(&c.t, "inlet/bmp")
c.initMetrics()
@@ -80,8 +80,8 @@ func (c *Component) Start() error {
}
c.address = listener.Addr()
// RIB worker
c.t.Go(c.ribWorker)
// Peer removal
c.t.Go(c.peerRemovalWorker)
// Listener
c.t.Go(func() error {
@@ -109,8 +109,7 @@ func (c *Component) Start() error {
// Stop stops the BMP component
func (c *Component) Stop() error {
defer func() {
close(c.ribWorkerChan)
close(c.ribWorkerPrioChan)
close(c.peerRemovalChan)
c.r.Info().Msg("BMP component stopped")
}()
c.r.Info().Msg("stopping BMP component")

View File

@@ -8,6 +8,7 @@ import (
"net"
"net/netip"
"path"
"strconv"
"testing"
"time"
@@ -38,37 +39,36 @@ func TestBMP(t *testing.T) {
}
dumpRIB := func(t *testing.T, c *Component) map[netip.Addr][]string {
t.Helper()
c.mu.RLock()
defer c.mu.RUnlock()
result := map[netip.Addr][]string{}
c.ribWorkerQueue(func(s *ribWorkerState) error {
iter := s.rib.tree.Iterate()
for iter.Next() {
addr := iter.Address()
for _, route := range iter.Tags() {
nlriRef := s.rib.nlris.Get(route.nlri)
nh := s.rib.nextHops.Get(route.nextHop)
attrs := s.rib.rtas.Get(route.attributes)
var peer netip.Addr
for pkey, pinfo := range s.peers {
if pinfo.reference == route.peer {
peer = pkey.ip
break
}
iter := c.rib.tree.Iterate()
for iter.Next() {
addr := iter.Address()
for _, route := range iter.Tags() {
nlriRef := c.rib.nlris.Get(route.nlri)
nh := c.rib.nextHops.Get(route.nextHop)
attrs := c.rib.rtas.Get(route.attributes)
var peer netip.Addr
for pkey, pinfo := range c.peers {
if pinfo.reference == route.peer {
peer = pkey.ip
break
}
if _, ok := result[peer.Unmap()]; !ok {
result[peer.Unmap()] = []string{}
}
result[peer.Unmap()] = append(result[peer.Unmap()],
fmt.Sprintf("[%s] %s via %s %s/%d %d %v %v %v",
nlriRef.family,
addr, netip.Addr(nh).Unmap(),
nlriRef.rd,
nlriRef.path,
attrs.asn, attrs.asPath,
attrs.communities, attrs.largeCommunities))
}
if _, ok := result[peer.Unmap()]; !ok {
result[peer.Unmap()] = []string{}
}
result[peer.Unmap()] = append(result[peer.Unmap()],
fmt.Sprintf("[%s] %s via %s %s/%d %d %v %v %v",
nlriRef.family,
addr, netip.Addr(nh).Unmap(),
nlriRef.rd,
nlriRef.path,
attrs.asn, attrs.asPath,
attrs.communities, attrs.largeCommunities))
}
return nil
})
}
return result
}
@@ -295,8 +295,8 @@ func TestBMP(t *testing.T) {
`messages_received_total{exporter="127.0.0.1",type="statistics-report"}`: "5",
`opened_connections_total{exporter="127.0.0.1"}`: "1",
`peers_total{exporter="127.0.0.1"}`: "3",
`peer_removal_done_total{exporter="127.0.0.1"}`: "1",
`routes_total{exporter="127.0.0.1"}`: "14",
`peer_removal_done_total{exporter="127.0.0.1"}`: "1",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Errorf("Metrics (-got, +want):\n%s", diff)
@@ -800,8 +800,8 @@ func TestBMP(t *testing.T) {
`opened_connections_total{exporter="127.0.0.1"}`: "1",
`closed_connections_total{exporter="127.0.0.1"}`: "1",
`peers_total{exporter="127.0.0.1"}`: "0",
`peer_removal_done_total{exporter="127.0.0.1"}`: "1",
`routes_total{exporter="127.0.0.1"}`: "0",
`peer_removal_done_total{exporter="127.0.0.1"}`: "1",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Errorf("Metrics (-got, +want):\n%s", diff)
@@ -908,8 +908,8 @@ func TestBMP(t *testing.T) {
`opened_connections_total{exporter="127.0.0.1"}`: "2",
`closed_connections_total{exporter="127.0.0.1"}`: "1",
`peers_total{exporter="127.0.0.1"}`: "1",
`peer_removal_done_total{exporter="127.0.0.1"}`: "1",
`routes_total{exporter="127.0.0.1"}`: "2",
`peer_removal_done_total{exporter="127.0.0.1"}`: "1",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Errorf("Metrics (-got, +want):\n%s", diff)
@@ -938,8 +938,8 @@ func TestBMP(t *testing.T) {
`opened_connections_total{exporter="127.0.0.1"}`: "2",
`closed_connections_total{exporter="127.0.0.1"}`: "2",
`peers_total{exporter="127.0.0.1"}`: "1",
`peer_removal_done_total{exporter="127.0.0.1"}`: "1",
`routes_total{exporter="127.0.0.1"}`: "2",
`peer_removal_done_total{exporter="127.0.0.1"}`: "1",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Errorf("Metrics (-got, +want):\n%s", diff)
@@ -961,8 +961,8 @@ func TestBMP(t *testing.T) {
`opened_connections_total{exporter="127.0.0.1"}`: "2",
`closed_connections_total{exporter="127.0.0.1"}`: "2",
`peers_total{exporter="127.0.0.1"}`: "0",
`peer_removal_done_total{exporter="127.0.0.1"}`: "2",
`routes_total{exporter="127.0.0.1"}`: "0",
`peer_removal_done_total{exporter="127.0.0.1"}`: "2",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Errorf("Metrics (-got, +want):\n%s", diff)
@@ -972,55 +972,40 @@ func TestBMP(t *testing.T) {
if diff := helpers.Diff(gotRIB, expectedRIB); diff != "" {
t.Errorf("RIB (-got, +want):\n%s", diff)
}
})
t.Run("init, peers up, eor, reach NLRI, conn down, immediate timeout", func(t *testing.T) {
r := reporter.NewMock(t)
config := DefaultConfiguration()
config.RIBPeerRemovalBatchRoutes = 1
config.PeerRemovalMaxTime = 1
config.PeerRemovalSleepInterval = 1
config.PeerRemovalMinRoutes = 1
c, mockClock := NewMock(t, r, config)
helpers.StartStop(t, c)
conn := dial(t, c)
pauseFlushPeer := make(chan bool)
instrumentFlushPeer = func() { <-pauseFlushPeer }
defer func() { instrumentFlushPeer = func() {} }()
send(t, conn, "bmp-init.pcap")
send(t, conn, "bmp-peers-up.pcap")
send(t, conn, "bmp-eor.pcap")
send(t, conn, "bmp-reach.pcap")
conn.Close()
time.Sleep(20 * time.Millisecond)
mockClock.Add(2 * time.Hour)
for i := 0; i < 5; i++ {
t.Logf("iteration %d", i)
// Sleep a bit to be sure flush is blocked
time.Sleep(10 * time.Millisecond)
done := make(chan bool)
go func() {
t.Logf("start lookup %d", i)
c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::a"))
t.Logf("got lookup result %d", i)
close(done)
}()
// Sleep a bit to let some time to queue the request
time.Sleep(10 * time.Millisecond)
pauseFlushPeer <- false
// Wait for request to be completed
select {
case <-done:
case <-time.After(50 * time.Millisecond):
close(pauseFlushPeer)
t.Fatalf("no lookup answer")
}
}
// Process remaining requests
close(pauseFlushPeer)
time.Sleep(20 * time.Millisecond)
if helpers.RaceEnabled {
t.Skip("unreliable results when running with the race detector")
}
gotMetrics := r.GetMetrics("akvorado_inlet_bmp_", "-locked_duration")
// For peer_removal_partial_total, we have 18 routes, but only 14 routes
// can be removed while keeping 1 route on each peer. 14 is the max, but
// we rely on good-willing from the scheduler to get this number.
peerRemovalPartial, _ := strconv.Atoi(gotMetrics[`peer_removal_partial_total{exporter="127.0.0.1"}`])
if peerRemovalPartial > 14 {
t.Errorf("Metrics: peer_removal_partial_total %d > 14", peerRemovalPartial)
}
if peerRemovalPartial < 5 {
t.Errorf("Metrics: peer_removal_partial_total %d < 5", peerRemovalPartial)
}
expectedMetrics := map[string]string{
`messages_received_total{exporter="127.0.0.1",type="initiation"}`: "1",
`messages_received_total{exporter="127.0.0.1",type="peer-up-notification"}`: "4",
@@ -1031,78 +1016,48 @@ func TestBMP(t *testing.T) {
`peers_total{exporter="127.0.0.1"}`: "0",
`routes_total{exporter="127.0.0.1"}`: "0",
`peer_removal_done_total{exporter="127.0.0.1"}`: "4",
`peer_removal_partial_total{exporter="127.0.0.1"}`: "5",
`peer_removal_partial_total{exporter="127.0.0.1"}`: fmt.Sprintf("%d", peerRemovalPartial),
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Errorf("Metrics (-got, +want):\n%s", diff)
}
})
for _, mode := range []RIBMode{RIBModeMemory, RIBModePerformance} {
t.Run(fmt.Sprintf("lookup %s", mode), func(t *testing.T) {
r := reporter.NewMock(t)
config := DefaultConfiguration()
config.RIBMode = mode
config.RIBIdleUpdateDelay = 40 * time.Millisecond
config.RIBMinimumUpdateDelay = 300 * time.Millisecond
config.RIBMaximumUpdateDelay = 5 * time.Second
c, _ := NewMock(t, r, config)
helpers.StartStop(t, c)
conn := dial(t, c)
t.Run("lookup", func(t *testing.T) {
r := reporter.NewMock(t)
config := DefaultConfiguration()
c, _ := NewMock(t, r, config)
helpers.StartStop(t, c)
conn := dial(t, c)
send(t, conn, "bmp-init.pcap")
send(t, conn, "bmp-peers-up.pcap")
send(t, conn, "bmp-reach.pcap")
send(t, conn, "bmp-eor.pcap")
send(t, conn, "bmp-init.pcap")
send(t, conn, "bmp-peers-up.pcap")
send(t, conn, "bmp-reach.pcap")
send(t, conn, "bmp-eor.pcap")
time.Sleep(20 * time.Millisecond)
for i := 0; i < 50; i++ {
lookup := c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::a"))
if lookup.ASN != 174 {
if i == 99 {
t.Errorf("Lookup() == %d, expected 174", lookup.ASN)
}
} else {
break
}
time.Sleep(5 * time.Millisecond)
}
lookup := c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::a"))
if lookup.ASN != 174 {
t.Errorf("Lookup() == %d, expected 174", lookup.ASN)
}
// Add another prefix
c.ribWorkerQueue(func(s *ribWorkerState) error {
s.rib.addPrefix(netip.MustParseAddr("2001:db8:1::"), 64, route{
peer: 1,
nlri: s.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC}),
nextHop: s.rib.nextHops.Put(nextHop(netip.MustParseAddr("2001:db8::a"))),
attributes: s.rib.rtas.Put(routeAttributes{asn: 176}),
})
return nil
})
if mode == RIBModePerformance {
time.Sleep(100 * time.Millisecond) // < 300 ms but > 40 ms
// Despite that, we hit the minimum update delay
lookup := c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::a"))
if lookup.ASN != 174 {
t.Errorf("Lookup() == %d, expected 174", lookup.ASN)
}
}
for i := 0; i < 100; i++ {
lookup := c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::a"))
if lookup.ASN != 176 {
if i == 99 {
t.Errorf("Lookup() == %d, expected 176", lookup.ASN)
} else {
break
}
time.Sleep(5 * time.Millisecond)
}
}
lookup := c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::b"))
if lookup.ASN != 174 {
t.Errorf("Lookup() == %d, expected 174", lookup.ASN)
}
// Add another prefix
c.rib.addPrefix(netip.MustParseAddr("2001:db8:1::"), 64, route{
peer: 1,
nlri: c.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC}),
nextHop: c.rib.nextHops.Put(nextHop(netip.MustParseAddr("2001:db8::a"))),
attributes: c.rib.rtas.Put(routeAttributes{asn: 176}),
})
}
lookup = c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::a"))
if lookup.ASN != 176 {
t.Errorf("Lookup() == %d, expected 176", lookup.ASN)
}
lookup = c.Lookup(net.ParseIP("2001:db8:1::10"), net.ParseIP("2001:db8::b"))
if lookup.ASN != 174 {
t.Errorf("Lookup() == %d, expected 174", lookup.ASN)
}
})
t.Run("populate", func(t *testing.T) {
r := reporter.NewMock(t)

View File

@@ -61,7 +61,6 @@ func (c *Component) serveConnection(conn *net.TCPConn) error {
// Reading from connection
c.handleConnectionUp(exporter)
peerAddPathModes := map[peerKey]map[bgp.RouteFamily]bgp.BGPAddPathMode{}
init := false
header := make([]byte, bmp.BMP_HEADER_SIZE)
for {
@@ -139,9 +138,11 @@ func (c *Component) serveConnection(conn *net.TCPConn) error {
}
body = body[bmp.BMP_PEER_HEADER_SIZE:]
pkey = peerKeyFromBMPPeerHeader(exporter, &msg.PeerHeader)
if modes, ok := peerAddPathModes[pkey]; ok {
marshallingOptions = []*bgp.MarshallingOption{{AddPath: modes}}
c.mu.RLock()
if pinfo, ok := c.peers[pkey]; ok {
marshallingOptions = pinfo.marshallingOptions
}
c.mu.RUnlock()
}
if err := msg.Body.ParseBody(&msg, body, marshallingOptions...); err != nil {
@@ -201,44 +202,6 @@ func (c *Component) serveConnection(conn *net.TCPConn) error {
logger.Info().Msg("termination message received")
return nil
case *bmp.BMPPeerUpNotification:
// Check for ADD-PATH support.
receivedAddPath := map[bgp.RouteFamily]bgp.BGPAddPathMode{}
received, _ := body.ReceivedOpenMsg.Body.(*bgp.BGPOpen)
for _, param := range received.OptParams {
switch param := param.(type) {
case *bgp.OptionParameterCapability:
for _, cap := range param.Capability {
switch cap := cap.(type) {
case *bgp.CapAddPath:
for _, tuple := range cap.Tuples {
receivedAddPath[tuple.RouteFamily] = tuple.Mode
}
}
}
}
}
sent, _ := body.SentOpenMsg.Body.(*bgp.BGPOpen)
addPathOption := map[bgp.RouteFamily]bgp.BGPAddPathMode{}
for _, param := range sent.OptParams {
switch param := param.(type) {
case *bgp.OptionParameterCapability:
for _, cap := range param.Capability {
switch cap := cap.(type) {
case *bgp.CapAddPath:
for _, sent := range cap.Tuples {
receivedMode := receivedAddPath[sent.RouteFamily]
if receivedMode == bgp.BGP_ADD_PATH_BOTH || receivedMode == bgp.BGP_ADD_PATH_SEND {
if sent.Mode == bgp.BGP_ADD_PATH_BOTH || sent.Mode == bgp.BGP_ADD_PATH_RECEIVE {
// We have at least the receive mode. We only do decoding.
addPathOption[sent.RouteFamily] = bgp.BGP_ADD_PATH_RECEIVE
}
}
}
}
}
}
}
peerAddPathModes[pkey] = addPathOption
c.handlePeerUpNotification(pkey, body)
case *bmp.BMPPeerDownNotification:
c.handlePeerDownNotification(pkey)

View File

@@ -37,53 +37,50 @@ func NewMock(t *testing.T, r *reporter.Reporter, conf Configuration) (*Component
// PopulateRIB populates the RIB with a few entries.
func (c *Component) PopulateRIB(t *testing.T) {
t.Helper()
c.ribWorkerQueue(func(s *ribWorkerState) error {
pinfo := c.addPeer(s, peerKey{
exporter: netip.MustParseAddrPort("[::ffff:127.0.0.1]:47389"),
ip: netip.MustParseAddr("::ffff:203.0.113.4"),
ptype: bmp.BMP_PEER_TYPE_GLOBAL,
asn: 64500,
})
s.rib.addPrefix(netip.MustParseAddr("::ffff:192.0.2.0"), 96+27, route{
peer: pinfo.reference,
nlri: s.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC, path: 1}),
nextHop: s.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.4"))),
attributes: s.rib.rtas.Put(routeAttributes{
asn: 174,
asPath: []uint32{64200, 1299, 174},
communities: []uint32{100, 200, 400},
largeCommunities: []bgp.LargeCommunity{{ASN: 64200, LocalData1: 2, LocalData2: 3}},
}),
})
s.rib.addPrefix(netip.MustParseAddr("::ffff:192.0.2.0"), 96+27, route{
peer: pinfo.reference,
nlri: s.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC, path: 2}),
nextHop: s.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.8"))),
attributes: s.rib.rtas.Put(routeAttributes{
asn: 174,
asPath: []uint32{64200, 174, 174, 174},
communities: []uint32{100},
}),
})
s.rib.addPrefix(netip.MustParseAddr("::ffff:192.0.2.128"), 96+27, route{
peer: pinfo.reference,
nlri: s.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC}),
nextHop: s.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.8"))),
attributes: s.rib.rtas.Put(routeAttributes{
asn: 1299,
asPath: []uint32{64200, 1299},
communities: []uint32{500},
}),
})
s.rib.addPrefix(netip.MustParseAddr("::ffff:1.0.0.0"), 96+24, route{
peer: pinfo.reference,
nlri: s.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC}),
nextHop: s.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.8"))),
attributes: s.rib.rtas.Put(routeAttributes{
asn: 65300,
}),
})
return nil
pinfo := c.addPeer(peerKey{
exporter: netip.MustParseAddrPort("[::ffff:127.0.0.1]:47389"),
ip: netip.MustParseAddr("::ffff:203.0.113.4"),
ptype: bmp.BMP_PEER_TYPE_GLOBAL,
asn: 64500,
})
c.rib.addPrefix(netip.MustParseAddr("::ffff:192.0.2.0"), 96+27, route{
peer: pinfo.reference,
nlri: c.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC, path: 1}),
nextHop: c.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.4"))),
attributes: c.rib.rtas.Put(routeAttributes{
asn: 174,
asPath: []uint32{64200, 1299, 174},
communities: []uint32{100, 200, 400},
largeCommunities: []bgp.LargeCommunity{{ASN: 64200, LocalData1: 2, LocalData2: 3}},
}),
})
c.rib.addPrefix(netip.MustParseAddr("::ffff:192.0.2.0"), 96+27, route{
peer: pinfo.reference,
nlri: c.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC, path: 2}),
nextHop: c.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.8"))),
attributes: c.rib.rtas.Put(routeAttributes{
asn: 174,
asPath: []uint32{64200, 174, 174, 174},
communities: []uint32{100},
}),
})
c.rib.addPrefix(netip.MustParseAddr("::ffff:192.0.2.128"), 96+27, route{
peer: pinfo.reference,
nlri: c.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC}),
nextHop: c.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.8"))),
attributes: c.rib.rtas.Put(routeAttributes{
asn: 1299,
asPath: []uint32{64200, 1299},
communities: []uint32{500},
}),
})
c.rib.addPrefix(netip.MustParseAddr("::ffff:1.0.0.0"), 96+24, route{
peer: pinfo.reference,
nlri: c.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC}),
nextHop: c.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.8"))),
attributes: c.rib.rtas.Put(routeAttributes{
asn: 65300,
}),
})
}

View File

@@ -351,7 +351,6 @@ ClassifyProviderRegex(Interface.Description, "^Transit: ([^ ]+)", "$1")`,
kafkaComponent, kafkaProducer := kafka.NewMock(t, r, kafka.DefaultConfiguration())
httpComponent := http.NewMock(t, r)
bmpComponent, _ := bmp.NewMock(t, r, bmp.DefaultConfiguration())
helpers.StartStop(t, bmpComponent)
bmpComponent.PopulateRIB(t)
// Prepare a configuration
@@ -460,8 +459,6 @@ func TestGetASNumber(t *testing.T) {
configuration := DefaultConfiguration()
configuration.ASNProviders = tc.Providers
bmpComponent, _ := bmp.NewMock(t, r, bmp.DefaultConfiguration())
bmpComponent.Start()
defer bmpComponent.Stop()
bmpComponent.PopulateRIB(t)
c, err := New(r, configuration, Dependencies{

View File

@@ -42,7 +42,6 @@ func TestCore(t *testing.T) {
kafkaComponent, kafkaProducer := kafka.NewMock(t, r, kafka.DefaultConfiguration())
httpComponent := http.NewMock(t, r)
bmpComponent, _ := bmp.NewMock(t, r, bmp.DefaultConfiguration())
helpers.StartStop(t, bmpComponent)
bmpComponent.PopulateRIB(t)
// Instantiate and start core