mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-11 22:14:02 +01:00
inlet: decode MPLS labels
They are stored in an array and there are some aliases to get 1st, 2nd and third label. Support for sFlow would need a test to ensure it works as expected. Fix #960
This commit is contained in:
@@ -95,6 +95,10 @@ const (
|
||||
ColumnICMPv6Type
|
||||
ColumnICMPv6Code
|
||||
ColumnNextHop
|
||||
ColumnMPLSLabels
|
||||
ColumnMPLS1stLabel
|
||||
ColumnMPLS2ndLabel
|
||||
ColumnMPLS3rdLabel
|
||||
|
||||
// ColumnLast points to after the last static column, custom dictionaries
|
||||
// (dynamic columns) come after ColumnLast
|
||||
@@ -404,6 +408,40 @@ END`,
|
||||
ClickHouseType: "LowCardinality(IPv6)",
|
||||
ClickHouseCodec: "ZSTD(1)",
|
||||
},
|
||||
{
|
||||
Key: ColumnMPLSLabels,
|
||||
Disabled: true,
|
||||
ClickHouseMainOnly: true,
|
||||
ClickHouseType: "Array(UInt32)",
|
||||
ParserType: "array(uint)",
|
||||
},
|
||||
{
|
||||
Key: ColumnMPLS1stLabel,
|
||||
Disabled: true,
|
||||
Depends: []ColumnKey{ColumnMPLSLabels},
|
||||
ClickHouseMainOnly: true,
|
||||
ClickHouseType: "UInt32",
|
||||
ClickHouseAlias: "MPLSLabels[1]",
|
||||
ParserType: "uint",
|
||||
},
|
||||
{
|
||||
Key: ColumnMPLS2ndLabel,
|
||||
Disabled: true,
|
||||
Depends: []ColumnKey{ColumnMPLSLabels},
|
||||
ClickHouseMainOnly: true,
|
||||
ClickHouseType: "UInt32",
|
||||
ClickHouseAlias: "MPLSLabels[2]",
|
||||
ParserType: "uint",
|
||||
},
|
||||
{
|
||||
Key: ColumnMPLS3rdLabel,
|
||||
Disabled: true,
|
||||
Depends: []ColumnKey{ColumnMPLSLabels},
|
||||
ClickHouseMainOnly: true,
|
||||
ClickHouseType: "UInt32",
|
||||
ClickHouseAlias: "MPLSLabels[3]",
|
||||
ParserType: "uint",
|
||||
},
|
||||
},
|
||||
}.finalize()
|
||||
}
|
||||
|
||||
@@ -14,6 +14,8 @@ identified with a specific icon:
|
||||
## Unreleased
|
||||
|
||||
- 💥 *inlet*: many metrics renamed to match [Prometheus best practices](https://prometheus.io/docs/practices/naming/)
|
||||
- ✨ *inlet*: add the following collected data (disabled by default):
|
||||
`MPLSLabels`, `MPLS1stLabel`, `MPLS2ndLabel`, and `MPLS3rdLabel`
|
||||
- 🩹 *docker*: ensure ClickHouse init script is executed even when database already exists
|
||||
- 🌱 *docker*: update ClickHouse to 23.8 (this is not mandatory)
|
||||
- 🌱 *orchestrator*: add `orchestrator`→`clickhouse`→`prometheus-endpoint` to configure an endpoint to expose metrics to Prometheus
|
||||
|
||||
@@ -41,6 +41,7 @@ ConditionExpr "conditional" ←
|
||||
/ ConditionStringExpr
|
||||
/ ConditionBoundaryExpr
|
||||
/ ConditionUintExpr
|
||||
/ ConditionArrayUintExpr
|
||||
/ ConditionASExpr
|
||||
/ ConditionASPathExpr
|
||||
/ ConditionCommunitiesExpr
|
||||
@@ -129,6 +130,20 @@ ConditionUintExpr "condition on integer" ←
|
||||
return []any{column, operator, value}, nil
|
||||
}
|
||||
|
||||
ConditionArrayUintExpr "condition on array of integers" ←
|
||||
column:(value:[A-Za-z0-9]+ !IdentStart
|
||||
&{ return c.columnIsOfType(value, "array(uint)") }
|
||||
{ return c.acceptColumn() }) _
|
||||
"=" _ value:Unsigned64 {
|
||||
return []any{"has(", column, ",", value, ")"}, nil
|
||||
}
|
||||
/ column:(value:[A-Za-z0-9]+ !IdentStart
|
||||
&{ return c.columnIsOfType(value, "array(uint)") }
|
||||
{ return c.acceptColumn() }) _
|
||||
"!=" _ value:Unsigned64 {
|
||||
return []any{"NOT has(", column, ",", value, ")"}, nil
|
||||
}
|
||||
|
||||
ConditionASExpr "condition on AS number" ←
|
||||
column:("SrcAS"i !IdentStart { return c.acceptColumn() }
|
||||
/ "DstAS"i !IdentStart { return c.acceptColumn() }
|
||||
|
||||
@@ -326,6 +326,11 @@ output provider */ = 'telia'`,
|
||||
{Input: `DstAddrPriority = 200`, Output: `DstAddrPriority = 200`},
|
||||
{Input: `DstAddrSibling = 2001:db8::1`, Output: `DstAddrSibling = toIPv6('2001:db8::1')`},
|
||||
{Input: `SrcAddrDimensionAttribute IN ("Test", "None")`, Output: `SrcAddrDimensionAttribute IN ('Test', 'None')`},
|
||||
{Input: `MPLSLabels = 76876`, Output: `has(MPLSLabels, 76876)`, MetaOut: Meta{MainTableRequired: true}},
|
||||
{Input: `MPLSLabels != 76876`, Output: `NOT has(MPLSLabels, 76876)`, MetaOut: Meta{MainTableRequired: true}},
|
||||
{Input: `MPLS1stLabel = 76876`, Output: `MPLS1stLabel = 76876`, MetaOut: Meta{MainTableRequired: true}},
|
||||
{Input: `MPLS2ndLabel > 76876`, Output: `MPLS2ndLabel > 76876`, MetaOut: Meta{MainTableRequired: true}},
|
||||
{Input: `MPLS3rdLabel < 76876`, Output: `MPLS3rdLabel < 76876`, MetaOut: Meta{MainTableRequired: true}},
|
||||
}
|
||||
config := schema.DefaultConfiguration()
|
||||
config.CustomDictionaries = make(map[string]schema.CustomDict)
|
||||
|
||||
@@ -109,6 +109,8 @@ func (qc Column) ToSQLSelect(sch *schema.Component) string {
|
||||
helpers.ETypeIPv4, helpers.ETypeIPv6)
|
||||
case schema.ColumnProto:
|
||||
strValue = `dictGetOrDefault('protocols', 'name', Proto, '???')`
|
||||
case schema.ColumnMPLSLabels:
|
||||
strValue = `arrayStringConcat(MPLSLabels, ' ')`
|
||||
case schema.ColumnDstASPath:
|
||||
strValue = `arrayStringConcat(DstASPath, ' ')`
|
||||
case schema.ColumnDstCommunities:
|
||||
|
||||
@@ -92,6 +92,12 @@ func TestQueryColumnSQLSelect(t *testing.T) {
|
||||
}, {
|
||||
Input: schema.ColumnDstMAC,
|
||||
Expected: `MACNumToString(DstMAC)`,
|
||||
}, {
|
||||
Input: schema.ColumnMPLSLabels,
|
||||
Expected: `arrayStringConcat(MPLSLabels, ' ')`,
|
||||
}, {
|
||||
Input: schema.ColumnMPLS3rdLabel,
|
||||
Expected: `toString(MPLS3rdLabel)`,
|
||||
}, {
|
||||
Input: schema.ColumnTCPFlags,
|
||||
// Can be tested with "WITH 16 AS TCPFlags SELECT ..."
|
||||
|
||||
@@ -139,6 +139,7 @@ func ParseEthernet(sch *schema.Component, bf *schema.FlowMessage, data []byte) u
|
||||
label := binary.BigEndian.Uint32(append([]byte{0}, data[:3]...)) >> 4
|
||||
bottom := data[2] & 1
|
||||
data = data[4:]
|
||||
sch.ProtobufAppendVarint(bf, schema.ColumnMPLSLabels, uint64(label))
|
||||
if bottom == 1 || label <= 15 {
|
||||
if data[0]&0xf0>>4 == 4 {
|
||||
etherType = []byte{0x8, 0x0}
|
||||
|
||||
@@ -131,6 +131,10 @@ func (nd *Decoder) decodeRecord(fields []netflow.DataField) *schema.FlowMessage
|
||||
case netflow.IPFIX_FIELD_dataLinkFrameSection:
|
||||
dataLinkFrameSectionIdx = idx
|
||||
|
||||
// MPLS
|
||||
case netflow.NFV9_FIELD_MPLS_LABEL_1, netflow.NFV9_FIELD_MPLS_LABEL_2, netflow.NFV9_FIELD_MPLS_LABEL_3, netflow.NFV9_FIELD_MPLS_LABEL_4, netflow.NFV9_FIELD_MPLS_LABEL_5, netflow.NFV9_FIELD_MPLS_LABEL_6, netflow.NFV9_FIELD_MPLS_LABEL_7, netflow.NFV9_FIELD_MPLS_LABEL_8, netflow.NFV9_FIELD_MPLS_LABEL_9, netflow.NFV9_FIELD_MPLS_LABEL_10:
|
||||
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnMPLSLabels, decodeUNumber(v)>>4)
|
||||
|
||||
// Remaining
|
||||
case netflow.NFV9_FIELD_FORWARDING_STATUS:
|
||||
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnForwardingStatus, decodeUNumber(v))
|
||||
|
||||
@@ -385,3 +385,59 @@ func TestDecodeDataLink(t *testing.T) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestDecodeMPLS(t *testing.T) {
|
||||
r := reporter.NewMock(t)
|
||||
nfdecoder := New(r, decoder.Dependencies{Schema: schema.NewMock(t).EnableAllColumns()})
|
||||
|
||||
data := helpers.ReadPcapPayload(t, filepath.Join("testdata", "mpls.pcap"))
|
||||
got := nfdecoder.Decode(decoder.RawFlow{Payload: data, Source: net.ParseIP("127.0.0.1")})
|
||||
|
||||
expectedFlows := []*schema.FlowMessage{
|
||||
{
|
||||
ExporterAddress: netip.MustParseAddr("::ffff:127.0.0.1"),
|
||||
SrcAddr: netip.MustParseAddr("fd00::1:0:1:7:1"),
|
||||
DstAddr: netip.MustParseAddr("fd00::1:0:1:5:1"),
|
||||
NextHop: netip.MustParseAddr("::ffff:0.0.0.0"),
|
||||
SamplingRate: 1,
|
||||
OutIf: 16,
|
||||
ProtobufDebug: map[schema.ColumnKey]interface{}{
|
||||
schema.ColumnBytes: 89,
|
||||
schema.ColumnPackets: 1,
|
||||
schema.ColumnEType: helpers.ETypeIPv6,
|
||||
schema.ColumnForwardingStatus: 66,
|
||||
schema.ColumnIPTTL: 255,
|
||||
schema.ColumnProto: 17,
|
||||
schema.ColumnSrcPort: 49153,
|
||||
schema.ColumnDstPort: 862,
|
||||
schema.ColumnMPLSLabels: []uint32{20005, 524250},
|
||||
},
|
||||
}, {
|
||||
ExporterAddress: netip.MustParseAddr("::ffff:127.0.0.1"),
|
||||
SrcAddr: netip.MustParseAddr("fd00::1:0:1:7:1"),
|
||||
DstAddr: netip.MustParseAddr("fd00::1:0:1:6:1"),
|
||||
NextHop: netip.MustParseAddr("::ffff:0.0.0.0"),
|
||||
SamplingRate: 1,
|
||||
OutIf: 17,
|
||||
ProtobufDebug: map[schema.ColumnKey]interface{}{
|
||||
schema.ColumnBytes: 890,
|
||||
schema.ColumnPackets: 10,
|
||||
schema.ColumnEType: helpers.ETypeIPv6,
|
||||
schema.ColumnForwardingStatus: 66,
|
||||
schema.ColumnIPTTL: 255,
|
||||
schema.ColumnProto: 17,
|
||||
schema.ColumnSrcPort: 49153,
|
||||
schema.ColumnDstPort: 862,
|
||||
schema.ColumnMPLSLabels: []uint32{20006, 524275},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, f := range got {
|
||||
f.TimeReceived = 0
|
||||
}
|
||||
|
||||
if diff := helpers.Diff(got, expectedFlows); diff != "" {
|
||||
t.Fatalf("Decode() (-got, +want):\n%s", diff)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
BIN
inlet/flow/decoder/netflow/testdata/mpls.pcap
vendored
Normal file
BIN
inlet/flow/decoder/netflow/testdata/mpls.pcap
vendored
Normal file
Binary file not shown.
Reference in New Issue
Block a user