diff --git a/common/schema/definition_gen.sh b/common/schema/definition_gen.sh index 07cc8bd2..765af6a6 100755 --- a/common/schema/definition_gen.sh +++ b/common/schema/definition_gen.sh @@ -12,17 +12,17 @@ package schema import "akvorado/common/helpers/bimap" var columnNameMap = bimap.New(map[ColumnKey]string{ -$(sed -En '/.*ColumnKey = iota/,/^\s+ColumnLast/p' common/schema/definition.go \ - | head -n-1 \ - | sed -En 's/^\s+Column//p' \ +$(sed -En '/.*ColumnKey = iota/,/^[[:space:]]+ColumnLast/p' common/schema/definition.go \ + | sed \$d \ + | sed -En 's/^[[:space:]]+Column//p' \ | awk '{ print "Column"$1": \""$1"\","}') }) var columnReverseTable = [...]ColumnKey{ -$(sed -En '/.*ColumnKey = iota/,/^\s+ColumnLast/p' common/schema/definition.go \ - | head -n-1 \ - | sed -En 's/^\s+(Column\w+).*/\1/p' \ +$(sed -En '/.*ColumnKey = iota/,/^[[:space:]]ColumnLast/p' common/schema/definition.go \ + | sed \$d \ + | sed -En 's/^[[:space:]]+(Column[^ ]+).*/\1/p' \ | sed -E \ -e 's/(ColumnDst(ASPath|Communities|LargeCommunities|[123]..AS))/\1: 0,/;t' \ -e 's/(ColumnIn)([A-Z0-9].*)/ColumnOut\2: \1\2,/;t' \ diff --git a/console/data/docs/04-operations.md b/console/data/docs/04-operations.md index 7d4cf478..c5db25d6 100644 --- a/console/data/docs/04-operations.md +++ b/console/data/docs/04-operations.md @@ -627,11 +627,12 @@ You need to run one `ipfixprobe` instance for each interface. Each interface should have its own `id` and `dir`. As with *pmacct*, use the static metadata provider to provide interface names and descriptions to Akvorado. +By default, ipfixprobe utilises bidirectional flows (RFC 5103) which are +supported by Akvorado. + > [!WARNING] -> Until Akvorado supports bidirectional flows (RFC 5103), only incoming flows -> are correctly counted. The `split` option for the cache plugin would -> help to count both directions, but the input interface would be -> incorrect for outgoing flows. +> The `split` option for the cache plugin results to incorrect input interfaces +> for outgoing flows. ## Kafka diff --git a/console/data/docs/99-changelog.md b/console/data/docs/99-changelog.md index 1a1e979f..c5dc4a97 100644 --- a/console/data/docs/99-changelog.md +++ b/console/data/docs/99-changelog.md @@ -14,6 +14,7 @@ identified with a specific icon: - 🩹 *inlet*: disable kernel timestamping on Linux kernel older than 5.1 - 🩹 *doc*: fix documentation for SNMPv3 configuration +- 🌱 *inlet*: add support for RFC 5103 (bidirectional flows) ## 2.0.0 - 2025-09-22 diff --git a/outlet/flow/decoder/netflow/decode.go b/outlet/flow/decoder/netflow/decode.go index b9dd555a..9212e85a 100644 --- a/outlet/flow/decoder/netflow/decode.go +++ b/outlet/flow/decoder/netflow/decode.go @@ -14,6 +14,7 @@ import ( "akvorado/common/schema" "akvorado/outlet/flow/decoder" + "github.com/bits-and-blooms/bitset" "github.com/netsampler/goflow2/v2/decoders/netflow" "github.com/netsampler/goflow2/v2/decoders/netflowlegacy" ) @@ -23,6 +24,15 @@ import ( // values in the sub-range of 1-127 are compatible with field types used by // NetFlow version 9 [RFC3954]." +// Used for RFC 5103 +type direction uint + +const ( + reversePEN = 29305 + directionForward direction = iota + directionReverse +) + func (nd *Decoder) decodeNFv5(packet *netflowlegacy.PacketNetFlowV5, ts, sysUptime uint64, options decoder.Option, bf *schema.FlowMessage, finalize decoder.FinalizeFlowFunc) { for _, record := range packet.Records { bf.SamplingRate = uint64(packet.SamplingInterval) @@ -91,232 +101,267 @@ func (nd *Decoder) decodeNFv9IPFIX(version uint16, obsDomainID uint32, flowSets } case netflow.DataFlowSet: for _, record := range tFlowSet.Records { - nd.decodeRecord(version, obsDomainID, samplingRateSys, record.Values, ts, sysUptime, options, bf) - finalize() + nd.decodeRecord(version, obsDomainID, samplingRateSys, record.Values, ts, sysUptime, options, bf, finalize) } } } } -func (nd *Decoder) decodeRecord(version uint16, obsDomainID uint32, samplingRateSys *samplingRateSystem, fields []netflow.DataField, ts, sysUptime uint64, options decoder.Option, bf *schema.FlowMessage) { - var etype, dstPort, srcPort uint16 - var proto, icmpType, icmpCode uint8 - var foundIcmpTypeCode bool - mplsLabels := make([]uint32, 0, 5) - dataLinkFrameSectionIdx := -1 - for idx, field := range fields { - v, ok := field.Value.([]byte) - if !ok || field.PenProvided { - continue - } - - switch field.Type { - // Statistics - case netflow.IPFIX_FIELD_octetDeltaCount, netflow.IPFIX_FIELD_postOctetDeltaCount, netflow.IPFIX_FIELD_initiatorOctets, netflow.IPFIX_FIELD_responderOctets: - bf.AppendUint(schema.ColumnBytes, decodeUNumber(v)) - case netflow.IPFIX_FIELD_packetDeltaCount, netflow.IPFIX_FIELD_postPacketDeltaCount: - bf.AppendUint(schema.ColumnPackets, decodeUNumber(v)) - case netflow.IPFIX_FIELD_samplingInterval, netflow.IPFIX_FIELD_samplerRandomInterval: - bf.SamplingRate = decodeUNumber(v) - case netflow.IPFIX_FIELD_samplerId, netflow.IPFIX_FIELD_selectorId: - bf.SamplingRate = uint64(samplingRateSys.GetSamplingRate(version, obsDomainID, decodeUNumber(v))) - - // L3 - case netflow.IPFIX_FIELD_sourceIPv4Address: - if !isAllZeroIP(v) { - etype = helpers.ETypeIPv4 - bf.SrcAddr = decodeIPFromBytes(v) +func (nd *Decoder) decodeRecord(version uint16, obsDomainID uint32, samplingRateSys *samplingRateSystem, fields []netflow.DataField, ts, sysUptime uint64, options decoder.Option, bf *schema.FlowMessage, finalize decoder.FinalizeFlowFunc) { + var foundReverseElement bool + reversePresent := bitset.New(65535) + for _, dir := range []direction{directionForward, directionReverse} { + var etype, dstPort, srcPort uint16 + var proto, icmpType, icmpCode uint8 + var foundIcmpTypeCode bool + mplsLabels := make([]uint32, 0, 5) + dataLinkFrameSectionIdx := -1 + for idx, field := range fields { + v, ok := field.Value.([]byte) + if !ok { + continue } - case netflow.IPFIX_FIELD_destinationIPv4Address: - if !isAllZeroIP(v) { - etype = helpers.ETypeIPv4 - bf.DstAddr = decodeIPFromBytes(v) - } - case netflow.IPFIX_FIELD_sourceIPv6Address: - if !isAllZeroIP(v) { - etype = helpers.ETypeIPv6 - bf.SrcAddr = decodeIPFromBytes(v) - } - case netflow.IPFIX_FIELD_destinationIPv6Address: - if !isAllZeroIP(v) { - etype = helpers.ETypeIPv6 - bf.DstAddr = decodeIPFromBytes(v) - } - case netflow.IPFIX_FIELD_sourceIPv4PrefixLength, netflow.IPFIX_FIELD_sourceIPv6PrefixLength: - bf.SrcNetMask = uint8(decodeUNumber(v)) - case netflow.IPFIX_FIELD_destinationIPv4PrefixLength, netflow.IPFIX_FIELD_destinationIPv6PrefixLength: - bf.DstNetMask = uint8(decodeUNumber(v)) - case netflow.IPFIX_FIELD_ipNextHopIPv4Address, netflow.IPFIX_FIELD_bgpNextHopIPv4Address, netflow.IPFIX_FIELD_ipNextHopIPv6Address, netflow.IPFIX_FIELD_bgpNextHopIPv6Address: - bf.NextHop = decodeIPFromBytes(v) - // L4 - case netflow.IPFIX_FIELD_sourceTransportPort: - srcPort = uint16(decodeUNumber(v)) - bf.AppendUint(schema.ColumnSrcPort, uint64(srcPort)) - case netflow.IPFIX_FIELD_destinationTransportPort: - dstPort = uint16(decodeUNumber(v)) - bf.AppendUint(schema.ColumnDstPort, uint64(dstPort)) - case netflow.IPFIX_FIELD_protocolIdentifier: - proto = uint8(decodeUNumber(v)) - bf.AppendUint(schema.ColumnProto, uint64(proto)) + // RFC 5103 handling. + if field.PenProvided { + if field.Pen != reversePEN { + continue + } + if dir == directionForward { + // Reverse PEN and current direction is forward. Record we saw it and skip it. + foundReverseElement = true + reversePresent.Set(uint(field.Type)) + continue + } + } else if dir == directionReverse && reversePresent.Test(uint(field.Type)) { + // No reverse PEN but we saw this one and so we should use the reversed value. + continue + } - // Network - case netflow.IPFIX_FIELD_bgpSourceAsNumber: - bf.SrcAS = uint32(decodeUNumber(v)) - case netflow.IPFIX_FIELD_bgpDestinationAsNumber: - bf.DstAS = uint32(decodeUNumber(v)) + switch field.Type { + // Statistics + case netflow.IPFIX_FIELD_octetDeltaCount, netflow.IPFIX_FIELD_postOctetDeltaCount, netflow.IPFIX_FIELD_initiatorOctets, netflow.IPFIX_FIELD_responderOctets: + bf.AppendUint(schema.ColumnBytes, decodeUNumber(v)) + case netflow.IPFIX_FIELD_packetDeltaCount, netflow.IPFIX_FIELD_postPacketDeltaCount: + n := decodeUNumber(v) + if dir == directionReverse && n == 0 { + // We are in the reverse direction, but the flow is empty. + bf.Undo() + return + } + bf.AppendUint(schema.ColumnPackets, n) + case netflow.IPFIX_FIELD_samplingInterval, netflow.IPFIX_FIELD_samplerRandomInterval: + bf.SamplingRate = decodeUNumber(v) + case netflow.IPFIX_FIELD_samplerId, netflow.IPFIX_FIELD_selectorId: + bf.SamplingRate = uint64(samplingRateSys.GetSamplingRate(version, obsDomainID, decodeUNumber(v))) - // Interfaces - case netflow.IPFIX_FIELD_ingressInterface: - bf.InIf = uint32(decodeUNumber(v)) - case netflow.IPFIX_FIELD_egressInterface: - bf.OutIf = uint32(decodeUNumber(v)) - case netflow.IPFIX_FIELD_ingressPhysicalInterface: - if bf.InIf == 0 { + // L3 + case netflow.IPFIX_FIELD_sourceIPv4Address: + if !isAllZeroIP(v) { + etype = helpers.ETypeIPv4 + bf.SrcAddr = decodeIPFromBytes(v) + } + case netflow.IPFIX_FIELD_destinationIPv4Address: + if !isAllZeroIP(v) { + etype = helpers.ETypeIPv4 + bf.DstAddr = decodeIPFromBytes(v) + } + case netflow.IPFIX_FIELD_sourceIPv6Address: + if !isAllZeroIP(v) { + etype = helpers.ETypeIPv6 + bf.SrcAddr = decodeIPFromBytes(v) + } + case netflow.IPFIX_FIELD_destinationIPv6Address: + if !isAllZeroIP(v) { + etype = helpers.ETypeIPv6 + bf.DstAddr = decodeIPFromBytes(v) + } + case netflow.IPFIX_FIELD_sourceIPv4PrefixLength, netflow.IPFIX_FIELD_sourceIPv6PrefixLength: + bf.SrcNetMask = uint8(decodeUNumber(v)) + case netflow.IPFIX_FIELD_destinationIPv4PrefixLength, netflow.IPFIX_FIELD_destinationIPv6PrefixLength: + bf.DstNetMask = uint8(decodeUNumber(v)) + case netflow.IPFIX_FIELD_ipNextHopIPv4Address, netflow.IPFIX_FIELD_bgpNextHopIPv4Address, netflow.IPFIX_FIELD_ipNextHopIPv6Address, netflow.IPFIX_FIELD_bgpNextHopIPv6Address: + bf.NextHop = decodeIPFromBytes(v) + + // L4 + case netflow.IPFIX_FIELD_sourceTransportPort: + srcPort = uint16(decodeUNumber(v)) + bf.AppendUint(schema.ColumnSrcPort, uint64(srcPort)) + case netflow.IPFIX_FIELD_destinationTransportPort: + dstPort = uint16(decodeUNumber(v)) + bf.AppendUint(schema.ColumnDstPort, uint64(dstPort)) + case netflow.IPFIX_FIELD_protocolIdentifier: + proto = uint8(decodeUNumber(v)) + bf.AppendUint(schema.ColumnProto, uint64(proto)) + + // Network + case netflow.IPFIX_FIELD_bgpSourceAsNumber: + bf.SrcAS = uint32(decodeUNumber(v)) + case netflow.IPFIX_FIELD_bgpDestinationAsNumber: + bf.DstAS = uint32(decodeUNumber(v)) + + // Interfaces + case netflow.IPFIX_FIELD_ingressInterface: bf.InIf = uint32(decodeUNumber(v)) - } - case netflow.IPFIX_FIELD_egressPhysicalInterface: - if bf.OutIf == 0 { + case netflow.IPFIX_FIELD_egressInterface: bf.OutIf = uint32(decodeUNumber(v)) - } - - // RFC7133: process it later to not override other fields - case netflow.IPFIX_FIELD_dataLinkFrameSize: - // We are going to ignore it as we don't know L3 size yet. - case netflow.IPFIX_FIELD_dataLinkFrameSection: - dataLinkFrameSectionIdx = idx - - // MPLS - case netflow.IPFIX_FIELD_mplsTopLabelStackSection, netflow.IPFIX_FIELD_mplsLabelStackSection2, netflow.IPFIX_FIELD_mplsLabelStackSection3, netflow.IPFIX_FIELD_mplsLabelStackSection4, netflow.IPFIX_FIELD_mplsLabelStackSection5, netflow.IPFIX_FIELD_mplsLabelStackSection6, netflow.IPFIX_FIELD_mplsLabelStackSection7, netflow.IPFIX_FIELD_mplsLabelStackSection8, netflow.IPFIX_FIELD_mplsLabelStackSection9, netflow.IPFIX_FIELD_mplsLabelStackSection10: - uv := decodeUNumber(v) >> 4 - if uv > 0 { - mplsLabels = append(mplsLabels, uint32(uv)) - } - - // Remaining - case netflow.IPFIX_FIELD_forwardingStatus: - bf.AppendUint(schema.ColumnForwardingStatus, decodeUNumber(v)) - default: - if options.TimestampSource == pb.RawFlow_TS_NETFLOW_FIRST_SWITCHED { - switch field.Type { - case netflow.NFV9_FIELD_FIRST_SWITCHED: - bf.TimeReceived = uint32(ts - sysUptime + decodeUNumber(v)) - case netflow.IPFIX_FIELD_flowStartSeconds: - bf.TimeReceived = uint32(decodeUNumber(v)) - case netflow.IPFIX_FIELD_flowStartMilliseconds: - bf.TimeReceived = uint32(decodeUNumber(v) / 1000) - case netflow.IPFIX_FIELD_flowStartMicroseconds: - bf.TimeReceived = uint32(decodeUNumber(v) / 1_000_000) - case netflow.IPFIX_FIELD_flowStartNanoseconds: - bf.TimeReceived = uint32(ts + decodeUNumber(v)/1_000_000_000) + case netflow.IPFIX_FIELD_ingressPhysicalInterface: + if bf.InIf == 0 { + bf.InIf = uint32(decodeUNumber(v)) } - } - - if !nd.d.Schema.IsDisabled(schema.ColumnGroupNAT) { - // NAT - switch field.Type { - case netflow.IPFIX_FIELD_postNATSourceIPv4Address: - bf.AppendIPv6(schema.ColumnSrcAddrNAT, decodeIPFromBytes(v)) - case netflow.IPFIX_FIELD_postNATDestinationIPv4Address: - bf.AppendIPv6(schema.ColumnDstAddrNAT, decodeIPFromBytes(v)) - case netflow.IPFIX_FIELD_postNAPTSourceTransportPort: - bf.AppendUint(schema.ColumnSrcPortNAT, decodeUNumber(v)) - case netflow.IPFIX_FIELD_postNAPTDestinationTransportPort: - bf.AppendUint(schema.ColumnDstPortNAT, decodeUNumber(v)) + case netflow.IPFIX_FIELD_egressPhysicalInterface: + if bf.OutIf == 0 { + bf.OutIf = uint32(decodeUNumber(v)) } - } - if !nd.d.Schema.IsDisabled(schema.ColumnGroupL2) { - // L2 - switch field.Type { - case netflow.IPFIX_FIELD_vlanId, netflow.IPFIX_FIELD_dot1qVlanId: - if bf.SrcVlan == 0 { - bf.SrcVlan = uint16(decodeUNumber(v)) + // RFC7133: process it later to not override other fields + case netflow.IPFIX_FIELD_dataLinkFrameSize: + // We are going to ignore it as we don't know L3 size yet. + case netflow.IPFIX_FIELD_dataLinkFrameSection: + dataLinkFrameSectionIdx = idx + + // MPLS + case netflow.IPFIX_FIELD_mplsTopLabelStackSection, netflow.IPFIX_FIELD_mplsLabelStackSection2, netflow.IPFIX_FIELD_mplsLabelStackSection3, netflow.IPFIX_FIELD_mplsLabelStackSection4, netflow.IPFIX_FIELD_mplsLabelStackSection5, netflow.IPFIX_FIELD_mplsLabelStackSection6, netflow.IPFIX_FIELD_mplsLabelStackSection7, netflow.IPFIX_FIELD_mplsLabelStackSection8, netflow.IPFIX_FIELD_mplsLabelStackSection9, netflow.IPFIX_FIELD_mplsLabelStackSection10: + uv := decodeUNumber(v) >> 4 + if uv > 0 { + mplsLabels = append(mplsLabels, uint32(uv)) + } + + // Remaining + case netflow.IPFIX_FIELD_forwardingStatus: + bf.AppendUint(schema.ColumnForwardingStatus, decodeUNumber(v)) + default: + if options.TimestampSource == pb.RawFlow_TS_NETFLOW_FIRST_SWITCHED { + switch field.Type { + case netflow.NFV9_FIELD_FIRST_SWITCHED: + bf.TimeReceived = uint32(ts - sysUptime + decodeUNumber(v)) + case netflow.IPFIX_FIELD_flowStartSeconds: + bf.TimeReceived = uint32(decodeUNumber(v)) + case netflow.IPFIX_FIELD_flowStartMilliseconds: + bf.TimeReceived = uint32(decodeUNumber(v) / 1000) + case netflow.IPFIX_FIELD_flowStartMicroseconds: + bf.TimeReceived = uint32(decodeUNumber(v) / 1_000_000) + case netflow.IPFIX_FIELD_flowStartNanoseconds: + bf.TimeReceived = uint32(ts + decodeUNumber(v)/1_000_000_000) } - case netflow.IPFIX_FIELD_postVlanId, netflow.IPFIX_FIELD_postDot1qVlanId: - if bf.DstVlan == 0 { - bf.DstVlan = uint16(decodeUNumber(v)) + } + + if !nd.d.Schema.IsDisabled(schema.ColumnGroupNAT) { + // NAT + switch field.Type { + case netflow.IPFIX_FIELD_postNATSourceIPv4Address: + bf.AppendIPv6(schema.ColumnSrcAddrNAT, decodeIPFromBytes(v)) + case netflow.IPFIX_FIELD_postNATDestinationIPv4Address: + bf.AppendIPv6(schema.ColumnDstAddrNAT, decodeIPFromBytes(v)) + case netflow.IPFIX_FIELD_postNAPTSourceTransportPort: + bf.AppendUint(schema.ColumnSrcPortNAT, decodeUNumber(v)) + case netflow.IPFIX_FIELD_postNAPTDestinationTransportPort: + bf.AppendUint(schema.ColumnDstPortNAT, decodeUNumber(v)) } - case netflow.IPFIX_FIELD_sourceMacAddress: - bf.AppendUint(schema.ColumnSrcMAC, decodeUNumber(v)) - case netflow.IPFIX_FIELD_destinationMacAddress: - bf.AppendUint(schema.ColumnDstMAC, decodeUNumber(v)) - case netflow.IPFIX_FIELD_postSourceMacAddress: - bf.AppendUint(schema.ColumnSrcMAC, decodeUNumber(v)) - case netflow.IPFIX_FIELD_postDestinationMacAddress: - bf.AppendUint(schema.ColumnDstMAC, decodeUNumber(v)) } - } - if !nd.d.Schema.IsDisabled(schema.ColumnGroupL3L4) { - // Misc L3/L4 fields - switch field.Type { - case netflow.IPFIX_FIELD_ipTTL, netflow.IPFIX_FIELD_minimumTTL: - bf.AppendUint(schema.ColumnIPTTL, decodeUNumber(v)) - case netflow.IPFIX_FIELD_ipClassOfService: - bf.AppendUint(schema.ColumnIPTos, decodeUNumber(v)) - case netflow.IPFIX_FIELD_flowLabelIPv6: - bf.AppendUint(schema.ColumnIPv6FlowLabel, decodeUNumber(v)) - case netflow.IPFIX_FIELD_tcpControlBits: - bf.AppendUint(schema.ColumnTCPFlags, decodeUNumber(v)) - case netflow.IPFIX_FIELD_fragmentIdentification: - bf.AppendUint(schema.ColumnIPFragmentID, decodeUNumber(v)) - case netflow.IPFIX_FIELD_fragmentOffset: - bf.AppendUint(schema.ColumnIPFragmentOffset, decodeUNumber(v)) + if !nd.d.Schema.IsDisabled(schema.ColumnGroupL2) { + // L2 + switch field.Type { + case netflow.IPFIX_FIELD_vlanId, netflow.IPFIX_FIELD_dot1qVlanId: + if bf.SrcVlan == 0 { + bf.SrcVlan = uint16(decodeUNumber(v)) + } + case netflow.IPFIX_FIELD_postVlanId, netflow.IPFIX_FIELD_postDot1qVlanId: + if bf.DstVlan == 0 { + bf.DstVlan = uint16(decodeUNumber(v)) + } + case netflow.IPFIX_FIELD_sourceMacAddress: + bf.AppendUint(schema.ColumnSrcMAC, decodeUNumber(v)) + case netflow.IPFIX_FIELD_destinationMacAddress: + bf.AppendUint(schema.ColumnDstMAC, decodeUNumber(v)) + case netflow.IPFIX_FIELD_postSourceMacAddress: + bf.AppendUint(schema.ColumnSrcMAC, decodeUNumber(v)) + case netflow.IPFIX_FIELD_postDestinationMacAddress: + bf.AppendUint(schema.ColumnDstMAC, decodeUNumber(v)) + } + } - // ICMP - case netflow.IPFIX_FIELD_icmpTypeCodeIPv4, netflow.IPFIX_FIELD_icmpTypeCodeIPv6: - icmpTypeCode := decodeUNumber(v) - icmpType = uint8(icmpTypeCode >> 8) - icmpCode = uint8(icmpTypeCode & 0xff) - foundIcmpTypeCode = true - case netflow.IPFIX_FIELD_icmpTypeIPv4, netflow.IPFIX_FIELD_icmpTypeIPv6: - icmpType = uint8(decodeUNumber(v)) - foundIcmpTypeCode = true - case netflow.IPFIX_FIELD_icmpCodeIPv4, netflow.IPFIX_FIELD_icmpCodeIPv6: - icmpCode = uint8(decodeUNumber(v)) - foundIcmpTypeCode = true + if !nd.d.Schema.IsDisabled(schema.ColumnGroupL3L4) { + // Misc L3/L4 fields + switch field.Type { + case netflow.IPFIX_FIELD_ipTTL, netflow.IPFIX_FIELD_minimumTTL: + bf.AppendUint(schema.ColumnIPTTL, decodeUNumber(v)) + case netflow.IPFIX_FIELD_ipClassOfService: + bf.AppendUint(schema.ColumnIPTos, decodeUNumber(v)) + case netflow.IPFIX_FIELD_flowLabelIPv6: + bf.AppendUint(schema.ColumnIPv6FlowLabel, decodeUNumber(v)) + case netflow.IPFIX_FIELD_tcpControlBits: + bf.AppendUint(schema.ColumnTCPFlags, decodeUNumber(v)) + case netflow.IPFIX_FIELD_fragmentIdentification: + bf.AppendUint(schema.ColumnIPFragmentID, decodeUNumber(v)) + case netflow.IPFIX_FIELD_fragmentOffset: + bf.AppendUint(schema.ColumnIPFragmentOffset, decodeUNumber(v)) + + // ICMP + case netflow.IPFIX_FIELD_icmpTypeCodeIPv4, netflow.IPFIX_FIELD_icmpTypeCodeIPv6: + icmpTypeCode := decodeUNumber(v) + icmpType = uint8(icmpTypeCode >> 8) + icmpCode = uint8(icmpTypeCode & 0xff) + foundIcmpTypeCode = true + case netflow.IPFIX_FIELD_icmpTypeIPv4, netflow.IPFIX_FIELD_icmpTypeIPv6: + icmpType = uint8(decodeUNumber(v)) + foundIcmpTypeCode = true + case netflow.IPFIX_FIELD_icmpCodeIPv4, netflow.IPFIX_FIELD_icmpCodeIPv6: + icmpCode = uint8(decodeUNumber(v)) + foundIcmpTypeCode = true + } } } } - } - if dataLinkFrameSectionIdx >= 0 { - data := fields[dataLinkFrameSectionIdx].Value.([]byte) - if l3Length := decoder.ParseEthernet(nd.d.Schema, bf, data); l3Length > 0 { - bf.AppendUint(schema.ColumnBytes, l3Length) - bf.AppendUint(schema.ColumnPackets, 1) - } - } - if !nd.d.Schema.IsDisabled(schema.ColumnGroupL3L4) && (proto == 1 || proto == 58) { - // ICMP - if !foundIcmpTypeCode { - // Some implementations may use source and destination ports, some - // other only destination port. The following heuristic is safe as - // the only valid code for type 0 is 0 (echo reply). - if srcPort == 0 { - // Use destination port instead (Cisco on NFv5 that still exists - // today with NFv9 and IPFIX). - icmpType = uint8(dstPort >> 8) - icmpCode = uint8(dstPort & 0xff) + if dataLinkFrameSectionIdx >= 0 { + data := fields[dataLinkFrameSectionIdx].Value.([]byte) + if l3Length := decoder.ParseEthernet(nd.d.Schema, bf, data); l3Length > 0 { + bf.AppendUint(schema.ColumnBytes, l3Length) + bf.AppendUint(schema.ColumnPackets, 1) } - // Unsure how to do the mapping when using source and destination - // port. Let's ignore. } - if proto == 1 { - bf.AppendUint(schema.ColumnICMPv4Type, uint64(icmpType)) - bf.AppendUint(schema.ColumnICMPv4Code, uint64(icmpCode)) + if !nd.d.Schema.IsDisabled(schema.ColumnGroupL3L4) && (proto == 1 || proto == 58) { + // ICMP + if !foundIcmpTypeCode { + // Some implementations may use source and destination ports, some + // other only destination port. The following heuristic is safe as + // the only valid code for type 0 is 0 (echo reply). + if srcPort == 0 { + // Use destination port instead (Cisco on NFv5 that still exists + // today with NFv9 and IPFIX). + icmpType = uint8(dstPort >> 8) + icmpCode = uint8(dstPort & 0xff) + } + // Unsure how to do the mapping when using source and destination + // port. Let's ignore. + } + if proto == 1 { + bf.AppendUint(schema.ColumnICMPv4Type, uint64(icmpType)) + bf.AppendUint(schema.ColumnICMPv4Code, uint64(icmpCode)) + } else { + bf.AppendUint(schema.ColumnICMPv6Type, uint64(icmpType)) + bf.AppendUint(schema.ColumnICMPv6Code, uint64(icmpCode)) + } + } + bf.AppendUint(schema.ColumnEType, uint64(etype)) + if len(mplsLabels) > 0 { + bf.AppendArrayUInt32(schema.ColumnMPLSLabels, mplsLabels) + } + if bf.SamplingRate == 0 { + bf.SamplingRate = uint64(samplingRateSys.GetSamplingRate(version, obsDomainID, 0)) + } + if dir == directionForward && !foundReverseElement { + finalize() + break + } else if dir == directionForward { + finalize() + bf.Reverse() } else { - bf.AppendUint(schema.ColumnICMPv6Type, uint64(icmpType)) - bf.AppendUint(schema.ColumnICMPv6Code, uint64(icmpCode)) + bf.Reverse() + finalize() } } - bf.AppendUint(schema.ColumnEType, uint64(etype)) - if len(mplsLabels) > 0 { - bf.AppendArrayUInt32(schema.ColumnMPLSLabels, mplsLabels) - } - if bf.SamplingRate == 0 { - bf.SamplingRate = uint64(samplingRateSys.GetSamplingRate(version, obsDomainID, 0)) - } } func decodeUNumber(b []byte) uint64 { diff --git a/outlet/flow/decoder/netflow/root_test.go b/outlet/flow/decoder/netflow/root_test.go index b19fb725..61643663 100644 --- a/outlet/flow/decoder/netflow/root_test.go +++ b/outlet/flow/decoder/netflow/root_test.go @@ -4,8 +4,10 @@ package netflow import ( + "fmt" "net/netip" "path/filepath" + "strings" "testing" "akvorado/common/helpers" @@ -13,6 +15,8 @@ import ( "akvorado/common/reporter" "akvorado/common/schema" "akvorado/outlet/flow/decoder" + + "github.com/google/go-cmp/cmp/cmpopts" ) func setup(t *testing.T, clearTS bool) (*reporter.Reporter, decoder.Decoder, *schema.FlowMessage, *[]*schema.FlowMessage, decoder.FinalizeFlowFunc) { @@ -763,8 +767,6 @@ func TestDecodePhysicalInterfaces(t *testing.T) { _, nfdecoder, bf, got, finalize := setup(t, true) options := decoder.Option{TimestampSource: pb.RawFlow_TS_INPUT} - // The following PCAP is a NAT event, there is no sampling rate, no bytes, - // no packets. We can't do much with it. data := helpers.ReadPcapL4(t, filepath.Join("testdata", "physicalinterfaces.pcap")) _, err := nfdecoder.Decode( decoder.RawFlow{Payload: data, Source: netip.MustParseAddr("::ffff:127.0.0.1")}, @@ -802,3 +804,142 @@ func TestDecodePhysicalInterfaces(t *testing.T) { t.Fatalf("Decode() (-got, +want):\n%s", diff) } } + +func TestDecodeRFC5103(t *testing.T) { + _, nfdecoder, bf, got, finalize := setup(t, true) + options := decoder.Option{TimestampSource: pb.RawFlow_TS_INPUT} + + data := helpers.ReadPcapL4(t, filepath.Join("testdata", "ipfixprobe-templates.pcap")) + _, err := nfdecoder.Decode( + decoder.RawFlow{Payload: data, Source: netip.MustParseAddr("::ffff:127.0.0.1")}, + options, bf, finalize) + if err != nil { + t.Fatalf("Decode() error:\n%+v", err) + } + data = helpers.ReadPcapL4(t, filepath.Join("testdata", "ipfixprobe-data.pcap")) + _, err = nfdecoder.Decode( + decoder.RawFlow{Payload: data, Source: netip.MustParseAddr("::ffff:127.0.0.1")}, + options, bf, finalize) + if err != nil { + t.Fatalf("Decode() error:\n%+v", err) + } + + expectedFlows := []*schema.FlowMessage{ + { + // First biflow, direct + SamplingRate: 0, + InIf: 10, + OutIf: 0, + ExporterAddress: netip.MustParseAddr("::ffff:127.0.0.1"), + SrcAddr: netip.MustParseAddr("::ffff:10.10.1.4"), + DstAddr: netip.MustParseAddr("::ffff:10.10.1.1"), + OtherColumns: map[schema.ColumnKey]any{ + schema.ColumnSrcMAC: uint64(0x00e01c3c17c2), + schema.ColumnDstMAC: uint64(0x001f33d98160), + schema.ColumnPackets: uint64(1), + schema.ColumnBytes: uint64(62), + schema.ColumnSrcPort: uint16(56166), + schema.ColumnDstPort: uint16(53), + schema.ColumnEType: uint32(helpers.ETypeIPv4), + schema.ColumnProto: uint32(17), + }, + }, { + // First biflow, reverse + SamplingRate: 0, + InIf: 0, + OutIf: 10, + ExporterAddress: netip.MustParseAddr("::ffff:127.0.0.1"), + SrcAddr: netip.MustParseAddr("::ffff:10.10.1.1"), + DstAddr: netip.MustParseAddr("::ffff:10.10.1.4"), + OtherColumns: map[schema.ColumnKey]any{ + schema.ColumnDstMAC: uint64(0x00e01c3c17c2), + schema.ColumnSrcMAC: uint64(0x001f33d98160), + schema.ColumnPackets: uint64(1), + schema.ColumnBytes: uint64(128), + schema.ColumnDstPort: uint16(56166), + schema.ColumnSrcPort: uint16(53), + schema.ColumnEType: uint32(helpers.ETypeIPv4), + schema.ColumnProto: uint32(17), + }, + }, { + // Second biflow, direct, no reverse + SamplingRate: 0, + InIf: 10, + OutIf: 0, + ExporterAddress: netip.MustParseAddr("::ffff:127.0.0.1"), + SrcAddr: netip.MustParseAddr("::ffff:10.10.1.20"), + DstAddr: netip.MustParseAddr("::ffff:10.10.1.255"), + OtherColumns: map[schema.ColumnKey]any{ + schema.ColumnSrcMAC: uint64(0x00023fec6111), + schema.ColumnDstMAC: uint64(0xffffffffffff), + schema.ColumnPackets: uint64(1), + schema.ColumnBytes: uint64(229), + schema.ColumnSrcPort: uint16(138), + schema.ColumnDstPort: uint16(138), + schema.ColumnEType: uint32(helpers.ETypeIPv4), + schema.ColumnProto: uint32(17), + }, + }, { + // Third biflow, direct + SamplingRate: 0, + InIf: 10, + OutIf: 0, + ExporterAddress: netip.MustParseAddr("::ffff:127.0.0.1"), + SrcAddr: netip.MustParseAddr("::ffff:10.10.1.4"), + DstAddr: netip.MustParseAddr("::ffff:74.53.140.153"), + OtherColumns: map[schema.ColumnKey]any{ + schema.ColumnSrcMAC: uint64(0x00e01c3c17c2), + schema.ColumnDstMAC: uint64(0x001f33d98160), + schema.ColumnPackets: uint64(28), + schema.ColumnBytes: uint64(21673), + schema.ColumnSrcPort: uint16(1470), + schema.ColumnDstPort: uint16(25), + schema.ColumnEType: uint32(helpers.ETypeIPv4), + schema.ColumnProto: uint32(6), + schema.ColumnTCPFlags: uint16(0x1b), + }, + }, { + // Third biflow, reverse + SamplingRate: 0, + InIf: 0, + OutIf: 10, + ExporterAddress: netip.MustParseAddr("::ffff:127.0.0.1"), + SrcAddr: netip.MustParseAddr("::ffff:74.53.140.153"), + DstAddr: netip.MustParseAddr("::ffff:10.10.1.4"), + OtherColumns: map[schema.ColumnKey]any{ + schema.ColumnSrcMAC: uint64(0x001f33d98160), + schema.ColumnDstMAC: uint64(0x00e01c3c17c2), + schema.ColumnPackets: uint64(25), + schema.ColumnBytes: uint64(1546), + schema.ColumnSrcPort: uint16(25), + schema.ColumnDstPort: uint16(1470), + schema.ColumnEType: uint32(helpers.ETypeIPv4), + schema.ColumnProto: uint32(6), + schema.ColumnTCPFlags: uint16(0x1b), + }, + }, { + // Last biflow, direct, no reverse + SamplingRate: 0, + InIf: 10, + OutIf: 0, + ExporterAddress: netip.MustParseAddr("::ffff:127.0.0.1"), + SrcAddr: netip.MustParseAddr("::ffff:192.168.1.1"), + DstAddr: netip.MustParseAddr("::ffff:10.10.1.4"), + OtherColumns: map[schema.ColumnKey]any{ + schema.ColumnSrcMAC: uint64(0x001f33d98160), + schema.ColumnDstMAC: uint64(0x00e01c3c17c2), + schema.ColumnPackets: uint64(4), + schema.ColumnBytes: uint64(2304), + schema.ColumnEType: uint32(helpers.ETypeIPv4), + schema.ColumnProto: uint32(1), + }, + }, + } + + if diff := helpers.Diff((*got), expectedFlows, cmpopts.SortSlices(func(a, b *schema.FlowMessage) int { + return strings.Compare(fmt.Sprintf("%+v", a), fmt.Sprintf("%+v", b)) + })); diff != "" { + t.Fatalf("Decode() (-got, +want):\n%s", diff) + } + +} diff --git a/outlet/flow/decoder/netflow/testdata/ipfixprobe-data.pcap b/outlet/flow/decoder/netflow/testdata/ipfixprobe-data.pcap new file mode 100644 index 00000000..928930bc Binary files /dev/null and b/outlet/flow/decoder/netflow/testdata/ipfixprobe-data.pcap differ diff --git a/outlet/flow/decoder/netflow/testdata/ipfixprobe-templates.pcap b/outlet/flow/decoder/netflow/testdata/ipfixprobe-templates.pcap new file mode 100644 index 00000000..b03d56a6 Binary files /dev/null and b/outlet/flow/decoder/netflow/testdata/ipfixprobe-templates.pcap differ