demoexporter/flows: switch to testing/synctest package for clock

This commit is contained in:
Vincent Bernat
2025-08-31 07:51:28 +02:00
parent 55c763998a
commit 7016d89969
2 changed files with 80 additions and 87 deletions

View File

@@ -10,7 +10,6 @@ import (
"net" "net"
"time" "time"
"github.com/benbjohnson/clock"
"gopkg.in/tomb.v2" "gopkg.in/tomb.v2"
"akvorado/common/daemon" "akvorado/common/daemon"
@@ -33,14 +32,10 @@ type Component struct {
// Dependencies define the dependencies of the flows component. // Dependencies define the dependencies of the flows component.
type Dependencies struct { type Dependencies struct {
Daemon daemon.Component Daemon daemon.Component
Clock clock.Clock
} }
// New creates a new flows component. // New creates a new flows component.
func New(r *reporter.Reporter, config Configuration, dependencies Dependencies) (*Component, error) { func New(r *reporter.Reporter, config Configuration, dependencies Dependencies) (*Component, error) {
if dependencies.Clock == nil {
dependencies.Clock = clock.New()
}
c := Component{ c := Component{
r: r, r: r,
d: &dependencies, d: &dependencies,
@@ -75,8 +70,8 @@ func (c *Component) Start() error {
} }
sequenceNumber := uint32(1) sequenceNumber := uint32(1)
start := c.d.Clock.Now() start := time.Now()
ticker := c.d.Clock.Ticker(time.Second) ticker := time.NewTicker(time.Second)
errLogger := c.r.Sample(reporter.BurstSampler(time.Minute, 10)) errLogger := c.r.Sample(reporter.BurstSampler(time.Minute, 10))
c.t.Go(func() error { c.t.Go(func() error {

View File

@@ -11,96 +11,94 @@ import (
"net/netip" "net/netip"
"os" "os"
"testing" "testing"
"testing/synctest"
"time" "time"
"github.com/benbjohnson/clock"
"akvorado/common/daemon" "akvorado/common/daemon"
"akvorado/common/helpers" "akvorado/common/helpers"
"akvorado/common/reporter" "akvorado/common/reporter"
) )
func TestReceiveFlows(t *testing.T) { func TestReceiveFlows(t *testing.T) {
// UDP listener synctest.Test(t, func(t *testing.T) {
receiver, err := net.ListenUDP("udp", &net.UDPAddr{ // UDP listener
IP: net.ParseIP("127.0.0.1"), receiver, err := net.ListenUDP("udp", &net.UDPAddr{
Port: 0, IP: net.ParseIP("127.0.0.1"),
}) Port: 0,
if err != nil { })
t.Fatalf("ListenUDP() error:\n%+v", err)
}
defer receiver.Close()
// Flow generator
r := reporter.NewMock(t)
mockClock := clock.NewMock()
config := DefaultConfiguration()
config.Target = receiver.LocalAddr().String()
config.Flows = []FlowConfiguration{
{
PerSecond: 1,
InIfIndex: []int{10},
OutIfIndex: []int{20},
PeakHour: 21 * time.Hour,
Multiplier: 1,
SrcNet: netip.MustParsePrefix("192.0.2.0/24"),
DstNet: netip.MustParsePrefix("203.0.113.0/24"),
SrcAS: []uint32{65201},
DstAS: []uint32{65202},
SrcPort: []uint16{443},
Protocol: []string{"tcp"},
Size: 1400,
},
}
c, err := New(r, config, Dependencies{
Daemon: daemon.NewMock(t),
Clock: mockClock,
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
mockClock.Set(time.Date(2022, 3, 15, 9, 14, 12, 0, time.UTC))
helpers.StartStop(t, c)
mockClock.Add(1 * time.Second)
receiver.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
got := []nfv9Header{}
for {
payload := make([]byte, 9000)
_, err := receiver.Read(payload)
if err != nil { if err != nil {
if errors.Is(err, os.ErrDeadlineExceeded) { t.Fatalf("ListenUDP() error:\n%+v", err)
break }
defer receiver.Close()
// Flow generator
r := reporter.NewMock(t)
config := DefaultConfiguration()
config.Target = receiver.LocalAddr().String()
config.Flows = []FlowConfiguration{
{
PerSecond: 1,
InIfIndex: []int{10},
OutIfIndex: []int{20},
PeakHour: 21 * time.Hour,
Multiplier: 1,
SrcNet: netip.MustParsePrefix("192.0.2.0/24"),
DstNet: netip.MustParsePrefix("203.0.113.0/24"),
SrcAS: []uint32{65201},
DstAS: []uint32{65202},
SrcPort: []uint16{443},
Protocol: []string{"tcp"},
Size: 1400,
},
}
c, err := New(r, config, Dependencies{
Daemon: daemon.NewMock(t),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
helpers.StartStop(t, c)
time.Sleep(1 * time.Second)
receiver.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
got := []nfv9Header{}
for {
payload := make([]byte, 9000)
_, err := receiver.Read(payload)
if err != nil {
if errors.Is(err, os.ErrDeadlineExceeded) {
break
}
t.Fatalf("Read() error:\n%+v", err)
}
header := nfv9Header{}
if err := binary.Read(bytes.NewBuffer(payload), binary.BigEndian, &header); err != nil {
t.Errorf("binary.Read() error:\n%+v", err)
} else {
got = append(got, header)
} }
t.Fatalf("Read() error:\n%+v", err)
} }
header := nfv9Header{} if len(got) != 2 {
if err := binary.Read(bytes.NewBuffer(payload), binary.BigEndian, &header); err != nil { t.Errorf("Read() got %d packets, expected 2", len(got))
t.Errorf("binary.Read() error:\n%+v", err)
} else {
got = append(got, header)
} }
} // We only check the headers. Decoding is already tested in nfdata_test.go.
if len(got) != 2 { expected := []nfv9Header{
t.Errorf("Read() got %d packets, expected 2", len(got)) {
} Version: 9,
// We only check the headers. Decoding is already tested in nfdata_test.go. Count: 4,
expected := []nfv9Header{ SystemUptime: 1,
{ UnixSeconds: uint32(time.Now().Unix()),
Version: 9, SequenceNumber: 1,
Count: 4, }, {
SystemUptime: 1, Version: 9,
UnixSeconds: 1647335653, Count: 1,
SequenceNumber: 1, SystemUptime: 1,
}, { UnixSeconds: uint32(time.Now().Unix()),
Version: 9, SequenceNumber: 2,
Count: 1, },
SystemUptime: 1, }
UnixSeconds: 1647335653, if diff := helpers.Diff(got, expected); diff != "" {
SequenceNumber: 2, t.Fatalf("Read() (-got, +want):\n%s", diff)
}, }
} })
if diff := helpers.Diff(got, expected); diff != "" {
t.Fatalf("Read() (-got, +want):\n%s", diff)
}
} }