diff --git a/demoexporter/flows/root.go b/demoexporter/flows/root.go index 5e2c78ed..a5221414 100644 --- a/demoexporter/flows/root.go +++ b/demoexporter/flows/root.go @@ -10,7 +10,6 @@ import ( "net" "time" - "github.com/benbjohnson/clock" "gopkg.in/tomb.v2" "akvorado/common/daemon" @@ -33,14 +32,10 @@ type Component struct { // Dependencies define the dependencies of the flows component. type Dependencies struct { Daemon daemon.Component - Clock clock.Clock } // New creates a new flows component. func New(r *reporter.Reporter, config Configuration, dependencies Dependencies) (*Component, error) { - if dependencies.Clock == nil { - dependencies.Clock = clock.New() - } c := Component{ r: r, d: &dependencies, @@ -75,8 +70,8 @@ func (c *Component) Start() error { } sequenceNumber := uint32(1) - start := c.d.Clock.Now() - ticker := c.d.Clock.Ticker(time.Second) + start := time.Now() + ticker := time.NewTicker(time.Second) errLogger := c.r.Sample(reporter.BurstSampler(time.Minute, 10)) c.t.Go(func() error { diff --git a/demoexporter/flows/root_test.go b/demoexporter/flows/root_test.go index 5472259f..30db12a8 100644 --- a/demoexporter/flows/root_test.go +++ b/demoexporter/flows/root_test.go @@ -11,96 +11,94 @@ import ( "net/netip" "os" "testing" + "testing/synctest" "time" - "github.com/benbjohnson/clock" - "akvorado/common/daemon" "akvorado/common/helpers" "akvorado/common/reporter" ) func TestReceiveFlows(t *testing.T) { - // UDP listener - receiver, err := net.ListenUDP("udp", &net.UDPAddr{ - IP: net.ParseIP("127.0.0.1"), - Port: 0, - }) - if err != nil { - t.Fatalf("ListenUDP() error:\n%+v", err) - } - defer receiver.Close() - - // Flow generator - r := reporter.NewMock(t) - mockClock := clock.NewMock() - config := DefaultConfiguration() - config.Target = receiver.LocalAddr().String() - config.Flows = []FlowConfiguration{ - { - PerSecond: 1, - InIfIndex: []int{10}, - OutIfIndex: []int{20}, - PeakHour: 21 * time.Hour, - Multiplier: 1, - SrcNet: netip.MustParsePrefix("192.0.2.0/24"), - DstNet: netip.MustParsePrefix("203.0.113.0/24"), - SrcAS: []uint32{65201}, - DstAS: []uint32{65202}, - SrcPort: []uint16{443}, - Protocol: []string{"tcp"}, - Size: 1400, - }, - } - c, err := New(r, config, Dependencies{ - Daemon: daemon.NewMock(t), - Clock: mockClock, - }) - if err != nil { - t.Fatalf("New() error:\n%+v", err) - } - mockClock.Set(time.Date(2022, 3, 15, 9, 14, 12, 0, time.UTC)) - helpers.StartStop(t, c) - mockClock.Add(1 * time.Second) - - receiver.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) - got := []nfv9Header{} - for { - payload := make([]byte, 9000) - _, err := receiver.Read(payload) + synctest.Test(t, func(t *testing.T) { + // UDP listener + receiver, err := net.ListenUDP("udp", &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 0, + }) if err != nil { - if errors.Is(err, os.ErrDeadlineExceeded) { - break + t.Fatalf("ListenUDP() error:\n%+v", err) + } + defer receiver.Close() + + // Flow generator + r := reporter.NewMock(t) + config := DefaultConfiguration() + config.Target = receiver.LocalAddr().String() + config.Flows = []FlowConfiguration{ + { + PerSecond: 1, + InIfIndex: []int{10}, + OutIfIndex: []int{20}, + PeakHour: 21 * time.Hour, + Multiplier: 1, + SrcNet: netip.MustParsePrefix("192.0.2.0/24"), + DstNet: netip.MustParsePrefix("203.0.113.0/24"), + SrcAS: []uint32{65201}, + DstAS: []uint32{65202}, + SrcPort: []uint16{443}, + Protocol: []string{"tcp"}, + Size: 1400, + }, + } + c, err := New(r, config, Dependencies{ + Daemon: daemon.NewMock(t), + }) + if err != nil { + t.Fatalf("New() error:\n%+v", err) + } + helpers.StartStop(t, c) + time.Sleep(1 * time.Second) + + receiver.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) + got := []nfv9Header{} + for { + payload := make([]byte, 9000) + _, err := receiver.Read(payload) + if err != nil { + if errors.Is(err, os.ErrDeadlineExceeded) { + break + } + t.Fatalf("Read() error:\n%+v", err) + } + header := nfv9Header{} + if err := binary.Read(bytes.NewBuffer(payload), binary.BigEndian, &header); err != nil { + t.Errorf("binary.Read() error:\n%+v", err) + } else { + got = append(got, header) } - t.Fatalf("Read() error:\n%+v", err) } - header := nfv9Header{} - if err := binary.Read(bytes.NewBuffer(payload), binary.BigEndian, &header); err != nil { - t.Errorf("binary.Read() error:\n%+v", err) - } else { - got = append(got, header) + if len(got) != 2 { + t.Errorf("Read() got %d packets, expected 2", len(got)) } - } - if len(got) != 2 { - t.Errorf("Read() got %d packets, expected 2", len(got)) - } - // We only check the headers. Decoding is already tested in nfdata_test.go. - expected := []nfv9Header{ - { - Version: 9, - Count: 4, - SystemUptime: 1, - UnixSeconds: 1647335653, - SequenceNumber: 1, - }, { - Version: 9, - Count: 1, - SystemUptime: 1, - UnixSeconds: 1647335653, - SequenceNumber: 2, - }, - } - if diff := helpers.Diff(got, expected); diff != "" { - t.Fatalf("Read() (-got, +want):\n%s", diff) - } + // We only check the headers. Decoding is already tested in nfdata_test.go. + expected := []nfv9Header{ + { + Version: 9, + Count: 4, + SystemUptime: 1, + UnixSeconds: uint32(time.Now().Unix()), + SequenceNumber: 1, + }, { + Version: 9, + Count: 1, + SystemUptime: 1, + UnixSeconds: uint32(time.Now().Unix()), + SequenceNumber: 2, + }, + } + if diff := helpers.Diff(got, expected); diff != "" { + t.Fatalf("Read() (-got, +want):\n%s", diff) + } + }) }