outlet/flow: implement RFC 5103 support

This commit is contained in:
Gregor Düster
2025-08-20 12:05:50 +02:00
committed by Vincent Bernat
parent a1e29071a9
commit 73d005d229
7 changed files with 399 additions and 211 deletions

View File

@@ -12,17 +12,17 @@ package schema
import "akvorado/common/helpers/bimap" import "akvorado/common/helpers/bimap"
var columnNameMap = bimap.New(map[ColumnKey]string{ var columnNameMap = bimap.New(map[ColumnKey]string{
$(sed -En '/.*ColumnKey = iota/,/^\s+ColumnLast/p' common/schema/definition.go \ $(sed -En '/.*ColumnKey = iota/,/^[[:space:]]+ColumnLast/p' common/schema/definition.go \
| head -n-1 \ | sed \$d \
| sed -En 's/^\s+Column//p' \ | sed -En 's/^[[:space:]]+Column//p' \
| awk '{ print "Column"$1": \""$1"\","}') | awk '{ print "Column"$1": \""$1"\","}')
}) })
var columnReverseTable = [...]ColumnKey{ var columnReverseTable = [...]ColumnKey{
$(sed -En '/.*ColumnKey = iota/,/^\s+ColumnLast/p' common/schema/definition.go \ $(sed -En '/.*ColumnKey = iota/,/^[[:space:]]ColumnLast/p' common/schema/definition.go \
| head -n-1 \ | sed \$d \
| sed -En 's/^\s+(Column\w+).*/\1/p' \ | sed -En 's/^[[:space:]]+(Column[^ ]+).*/\1/p' \
| sed -E \ | sed -E \
-e 's/(ColumnDst(ASPath|Communities|LargeCommunities|[123]..AS))/\1: 0,/;t' \ -e 's/(ColumnDst(ASPath|Communities|LargeCommunities|[123]..AS))/\1: 0,/;t' \
-e 's/(ColumnIn)([A-Z0-9].*)/ColumnOut\2: \1\2,/;t' \ -e 's/(ColumnIn)([A-Z0-9].*)/ColumnOut\2: \1\2,/;t' \

View File

@@ -627,11 +627,12 @@ You need to run one `ipfixprobe` instance for each interface. Each interface
should have its own `id` and `dir`. As with *pmacct*, use the static metadata should have its own `id` and `dir`. As with *pmacct*, use the static metadata
provider to provide interface names and descriptions to Akvorado. provider to provide interface names and descriptions to Akvorado.
By default, ipfixprobe utilises bidirectional flows (RFC 5103) which are
supported by Akvorado.
> [!WARNING] > [!WARNING]
> Until Akvorado supports bidirectional flows (RFC 5103), only incoming flows > The `split` option for the cache plugin results to incorrect input interfaces
> are correctly counted. The `split` option for the cache plugin would > for outgoing flows.
> help to count both directions, but the input interface would be
> incorrect for outgoing flows.
## Kafka ## Kafka

View File

@@ -14,6 +14,7 @@ identified with a specific icon:
- 🩹 *inlet*: disable kernel timestamping on Linux kernel older than 5.1 - 🩹 *inlet*: disable kernel timestamping on Linux kernel older than 5.1
- 🩹 *doc*: fix documentation for SNMPv3 configuration - 🩹 *doc*: fix documentation for SNMPv3 configuration
- 🌱 *inlet*: add support for RFC 5103 (bidirectional flows)
## 2.0.0 - 2025-09-22 ## 2.0.0 - 2025-09-22

View File

@@ -14,6 +14,7 @@ import (
"akvorado/common/schema" "akvorado/common/schema"
"akvorado/outlet/flow/decoder" "akvorado/outlet/flow/decoder"
"github.com/bits-and-blooms/bitset"
"github.com/netsampler/goflow2/v2/decoders/netflow" "github.com/netsampler/goflow2/v2/decoders/netflow"
"github.com/netsampler/goflow2/v2/decoders/netflowlegacy" "github.com/netsampler/goflow2/v2/decoders/netflowlegacy"
) )
@@ -23,6 +24,15 @@ import (
// values in the sub-range of 1-127 are compatible with field types used by // values in the sub-range of 1-127 are compatible with field types used by
// NetFlow version 9 [RFC3954]." // NetFlow version 9 [RFC3954]."
// Used for RFC 5103
type direction uint
const (
reversePEN = 29305
directionForward direction = iota
directionReverse
)
func (nd *Decoder) decodeNFv5(packet *netflowlegacy.PacketNetFlowV5, ts, sysUptime uint64, options decoder.Option, bf *schema.FlowMessage, finalize decoder.FinalizeFlowFunc) { func (nd *Decoder) decodeNFv5(packet *netflowlegacy.PacketNetFlowV5, ts, sysUptime uint64, options decoder.Option, bf *schema.FlowMessage, finalize decoder.FinalizeFlowFunc) {
for _, record := range packet.Records { for _, record := range packet.Records {
bf.SamplingRate = uint64(packet.SamplingInterval) bf.SamplingRate = uint64(packet.SamplingInterval)
@@ -91,14 +101,16 @@ func (nd *Decoder) decodeNFv9IPFIX(version uint16, obsDomainID uint32, flowSets
} }
case netflow.DataFlowSet: case netflow.DataFlowSet:
for _, record := range tFlowSet.Records { for _, record := range tFlowSet.Records {
nd.decodeRecord(version, obsDomainID, samplingRateSys, record.Values, ts, sysUptime, options, bf) nd.decodeRecord(version, obsDomainID, samplingRateSys, record.Values, ts, sysUptime, options, bf, finalize)
finalize()
} }
} }
} }
} }
func (nd *Decoder) decodeRecord(version uint16, obsDomainID uint32, samplingRateSys *samplingRateSystem, fields []netflow.DataField, ts, sysUptime uint64, options decoder.Option, bf *schema.FlowMessage) { func (nd *Decoder) decodeRecord(version uint16, obsDomainID uint32, samplingRateSys *samplingRateSystem, fields []netflow.DataField, ts, sysUptime uint64, options decoder.Option, bf *schema.FlowMessage, finalize decoder.FinalizeFlowFunc) {
var foundReverseElement bool
reversePresent := bitset.New(65535)
for _, dir := range []direction{directionForward, directionReverse} {
var etype, dstPort, srcPort uint16 var etype, dstPort, srcPort uint16
var proto, icmpType, icmpCode uint8 var proto, icmpType, icmpCode uint8
var foundIcmpTypeCode bool var foundIcmpTypeCode bool
@@ -106,7 +118,23 @@ func (nd *Decoder) decodeRecord(version uint16, obsDomainID uint32, samplingRate
dataLinkFrameSectionIdx := -1 dataLinkFrameSectionIdx := -1
for idx, field := range fields { for idx, field := range fields {
v, ok := field.Value.([]byte) v, ok := field.Value.([]byte)
if !ok || field.PenProvided { if !ok {
continue
}
// RFC 5103 handling.
if field.PenProvided {
if field.Pen != reversePEN {
continue
}
if dir == directionForward {
// Reverse PEN and current direction is forward. Record we saw it and skip it.
foundReverseElement = true
reversePresent.Set(uint(field.Type))
continue
}
} else if dir == directionReverse && reversePresent.Test(uint(field.Type)) {
// No reverse PEN but we saw this one and so we should use the reversed value.
continue continue
} }
@@ -115,7 +143,13 @@ func (nd *Decoder) decodeRecord(version uint16, obsDomainID uint32, samplingRate
case netflow.IPFIX_FIELD_octetDeltaCount, netflow.IPFIX_FIELD_postOctetDeltaCount, netflow.IPFIX_FIELD_initiatorOctets, netflow.IPFIX_FIELD_responderOctets: case netflow.IPFIX_FIELD_octetDeltaCount, netflow.IPFIX_FIELD_postOctetDeltaCount, netflow.IPFIX_FIELD_initiatorOctets, netflow.IPFIX_FIELD_responderOctets:
bf.AppendUint(schema.ColumnBytes, decodeUNumber(v)) bf.AppendUint(schema.ColumnBytes, decodeUNumber(v))
case netflow.IPFIX_FIELD_packetDeltaCount, netflow.IPFIX_FIELD_postPacketDeltaCount: case netflow.IPFIX_FIELD_packetDeltaCount, netflow.IPFIX_FIELD_postPacketDeltaCount:
bf.AppendUint(schema.ColumnPackets, decodeUNumber(v)) n := decodeUNumber(v)
if dir == directionReverse && n == 0 {
// We are in the reverse direction, but the flow is empty.
bf.Undo()
return
}
bf.AppendUint(schema.ColumnPackets, n)
case netflow.IPFIX_FIELD_samplingInterval, netflow.IPFIX_FIELD_samplerRandomInterval: case netflow.IPFIX_FIELD_samplingInterval, netflow.IPFIX_FIELD_samplerRandomInterval:
bf.SamplingRate = decodeUNumber(v) bf.SamplingRate = decodeUNumber(v)
case netflow.IPFIX_FIELD_samplerId, netflow.IPFIX_FIELD_selectorId: case netflow.IPFIX_FIELD_samplerId, netflow.IPFIX_FIELD_selectorId:
@@ -317,6 +351,17 @@ func (nd *Decoder) decodeRecord(version uint16, obsDomainID uint32, samplingRate
if bf.SamplingRate == 0 { if bf.SamplingRate == 0 {
bf.SamplingRate = uint64(samplingRateSys.GetSamplingRate(version, obsDomainID, 0)) bf.SamplingRate = uint64(samplingRateSys.GetSamplingRate(version, obsDomainID, 0))
} }
if dir == directionForward && !foundReverseElement {
finalize()
break
} else if dir == directionForward {
finalize()
bf.Reverse()
} else {
bf.Reverse()
finalize()
}
}
} }
func decodeUNumber(b []byte) uint64 { func decodeUNumber(b []byte) uint64 {

View File

@@ -4,8 +4,10 @@
package netflow package netflow
import ( import (
"fmt"
"net/netip" "net/netip"
"path/filepath" "path/filepath"
"strings"
"testing" "testing"
"akvorado/common/helpers" "akvorado/common/helpers"
@@ -13,6 +15,8 @@ import (
"akvorado/common/reporter" "akvorado/common/reporter"
"akvorado/common/schema" "akvorado/common/schema"
"akvorado/outlet/flow/decoder" "akvorado/outlet/flow/decoder"
"github.com/google/go-cmp/cmp/cmpopts"
) )
func setup(t *testing.T, clearTS bool) (*reporter.Reporter, decoder.Decoder, *schema.FlowMessage, *[]*schema.FlowMessage, decoder.FinalizeFlowFunc) { func setup(t *testing.T, clearTS bool) (*reporter.Reporter, decoder.Decoder, *schema.FlowMessage, *[]*schema.FlowMessage, decoder.FinalizeFlowFunc) {
@@ -763,8 +767,6 @@ func TestDecodePhysicalInterfaces(t *testing.T) {
_, nfdecoder, bf, got, finalize := setup(t, true) _, nfdecoder, bf, got, finalize := setup(t, true)
options := decoder.Option{TimestampSource: pb.RawFlow_TS_INPUT} options := decoder.Option{TimestampSource: pb.RawFlow_TS_INPUT}
// The following PCAP is a NAT event, there is no sampling rate, no bytes,
// no packets. We can't do much with it.
data := helpers.ReadPcapL4(t, filepath.Join("testdata", "physicalinterfaces.pcap")) data := helpers.ReadPcapL4(t, filepath.Join("testdata", "physicalinterfaces.pcap"))
_, err := nfdecoder.Decode( _, err := nfdecoder.Decode(
decoder.RawFlow{Payload: data, Source: netip.MustParseAddr("::ffff:127.0.0.1")}, decoder.RawFlow{Payload: data, Source: netip.MustParseAddr("::ffff:127.0.0.1")},
@@ -802,3 +804,142 @@ func TestDecodePhysicalInterfaces(t *testing.T) {
t.Fatalf("Decode() (-got, +want):\n%s", diff) t.Fatalf("Decode() (-got, +want):\n%s", diff)
} }
} }
func TestDecodeRFC5103(t *testing.T) {
_, nfdecoder, bf, got, finalize := setup(t, true)
options := decoder.Option{TimestampSource: pb.RawFlow_TS_INPUT}
data := helpers.ReadPcapL4(t, filepath.Join("testdata", "ipfixprobe-templates.pcap"))
_, err := nfdecoder.Decode(
decoder.RawFlow{Payload: data, Source: netip.MustParseAddr("::ffff:127.0.0.1")},
options, bf, finalize)
if err != nil {
t.Fatalf("Decode() error:\n%+v", err)
}
data = helpers.ReadPcapL4(t, filepath.Join("testdata", "ipfixprobe-data.pcap"))
_, err = nfdecoder.Decode(
decoder.RawFlow{Payload: data, Source: netip.MustParseAddr("::ffff:127.0.0.1")},
options, bf, finalize)
if err != nil {
t.Fatalf("Decode() error:\n%+v", err)
}
expectedFlows := []*schema.FlowMessage{
{
// First biflow, direct
SamplingRate: 0,
InIf: 10,
OutIf: 0,
ExporterAddress: netip.MustParseAddr("::ffff:127.0.0.1"),
SrcAddr: netip.MustParseAddr("::ffff:10.10.1.4"),
DstAddr: netip.MustParseAddr("::ffff:10.10.1.1"),
OtherColumns: map[schema.ColumnKey]any{
schema.ColumnSrcMAC: uint64(0x00e01c3c17c2),
schema.ColumnDstMAC: uint64(0x001f33d98160),
schema.ColumnPackets: uint64(1),
schema.ColumnBytes: uint64(62),
schema.ColumnSrcPort: uint16(56166),
schema.ColumnDstPort: uint16(53),
schema.ColumnEType: uint32(helpers.ETypeIPv4),
schema.ColumnProto: uint32(17),
},
}, {
// First biflow, reverse
SamplingRate: 0,
InIf: 0,
OutIf: 10,
ExporterAddress: netip.MustParseAddr("::ffff:127.0.0.1"),
SrcAddr: netip.MustParseAddr("::ffff:10.10.1.1"),
DstAddr: netip.MustParseAddr("::ffff:10.10.1.4"),
OtherColumns: map[schema.ColumnKey]any{
schema.ColumnDstMAC: uint64(0x00e01c3c17c2),
schema.ColumnSrcMAC: uint64(0x001f33d98160),
schema.ColumnPackets: uint64(1),
schema.ColumnBytes: uint64(128),
schema.ColumnDstPort: uint16(56166),
schema.ColumnSrcPort: uint16(53),
schema.ColumnEType: uint32(helpers.ETypeIPv4),
schema.ColumnProto: uint32(17),
},
}, {
// Second biflow, direct, no reverse
SamplingRate: 0,
InIf: 10,
OutIf: 0,
ExporterAddress: netip.MustParseAddr("::ffff:127.0.0.1"),
SrcAddr: netip.MustParseAddr("::ffff:10.10.1.20"),
DstAddr: netip.MustParseAddr("::ffff:10.10.1.255"),
OtherColumns: map[schema.ColumnKey]any{
schema.ColumnSrcMAC: uint64(0x00023fec6111),
schema.ColumnDstMAC: uint64(0xffffffffffff),
schema.ColumnPackets: uint64(1),
schema.ColumnBytes: uint64(229),
schema.ColumnSrcPort: uint16(138),
schema.ColumnDstPort: uint16(138),
schema.ColumnEType: uint32(helpers.ETypeIPv4),
schema.ColumnProto: uint32(17),
},
}, {
// Third biflow, direct
SamplingRate: 0,
InIf: 10,
OutIf: 0,
ExporterAddress: netip.MustParseAddr("::ffff:127.0.0.1"),
SrcAddr: netip.MustParseAddr("::ffff:10.10.1.4"),
DstAddr: netip.MustParseAddr("::ffff:74.53.140.153"),
OtherColumns: map[schema.ColumnKey]any{
schema.ColumnSrcMAC: uint64(0x00e01c3c17c2),
schema.ColumnDstMAC: uint64(0x001f33d98160),
schema.ColumnPackets: uint64(28),
schema.ColumnBytes: uint64(21673),
schema.ColumnSrcPort: uint16(1470),
schema.ColumnDstPort: uint16(25),
schema.ColumnEType: uint32(helpers.ETypeIPv4),
schema.ColumnProto: uint32(6),
schema.ColumnTCPFlags: uint16(0x1b),
},
}, {
// Third biflow, reverse
SamplingRate: 0,
InIf: 0,
OutIf: 10,
ExporterAddress: netip.MustParseAddr("::ffff:127.0.0.1"),
SrcAddr: netip.MustParseAddr("::ffff:74.53.140.153"),
DstAddr: netip.MustParseAddr("::ffff:10.10.1.4"),
OtherColumns: map[schema.ColumnKey]any{
schema.ColumnSrcMAC: uint64(0x001f33d98160),
schema.ColumnDstMAC: uint64(0x00e01c3c17c2),
schema.ColumnPackets: uint64(25),
schema.ColumnBytes: uint64(1546),
schema.ColumnSrcPort: uint16(25),
schema.ColumnDstPort: uint16(1470),
schema.ColumnEType: uint32(helpers.ETypeIPv4),
schema.ColumnProto: uint32(6),
schema.ColumnTCPFlags: uint16(0x1b),
},
}, {
// Last biflow, direct, no reverse
SamplingRate: 0,
InIf: 10,
OutIf: 0,
ExporterAddress: netip.MustParseAddr("::ffff:127.0.0.1"),
SrcAddr: netip.MustParseAddr("::ffff:192.168.1.1"),
DstAddr: netip.MustParseAddr("::ffff:10.10.1.4"),
OtherColumns: map[schema.ColumnKey]any{
schema.ColumnSrcMAC: uint64(0x001f33d98160),
schema.ColumnDstMAC: uint64(0x00e01c3c17c2),
schema.ColumnPackets: uint64(4),
schema.ColumnBytes: uint64(2304),
schema.ColumnEType: uint32(helpers.ETypeIPv4),
schema.ColumnProto: uint32(1),
},
},
}
if diff := helpers.Diff((*got), expectedFlows, cmpopts.SortSlices(func(a, b *schema.FlowMessage) int {
return strings.Compare(fmt.Sprintf("%+v", a), fmt.Sprintf("%+v", b))
})); diff != "" {
t.Fatalf("Decode() (-got, +want):\n%s", diff)
}
}

Binary file not shown.

Binary file not shown.