diff --git a/common/schema/definition.go b/common/schema/definition.go index 5be75d81..e41d0016 100644 --- a/common/schema/definition.go +++ b/common/schema/definition.go @@ -75,6 +75,10 @@ const ( ColumnOutIfConnectivity ColumnInIfBoundary ColumnOutIfBoundary + ColumnSrcAddrNAT + ColumnDstAddrNAT + ColumnSrcPortNAT + ColumnDstPortNAT ColumnLast ) @@ -256,6 +260,8 @@ END`, }(), }, {Key: ColumnForwardingStatus, ClickHouseType: "UInt32"}, // TODO: UInt8 but hard to change, primary key + {Key: ColumnSrcAddrNAT, Disabled: true, ClickHouseType: "IPv6", ClickHouseMainOnly: true}, + {Key: ColumnSrcPortNAT, Disabled: true, ClickHouseType: "UInt16", ClickHouseMainOnly: true}, }, }.finalize() } diff --git a/console/data/docs/99-changelog.md b/console/data/docs/99-changelog.md index 2586d926..5d870dc1 100644 --- a/console/data/docs/99-changelog.md +++ b/console/data/docs/99-changelog.md @@ -20,6 +20,7 @@ days. You can disable that by setting `orchestrator.clickhouse.system-logs-ttl` to 0. - ✨ *inlet*: add `schema.enable` and `schema.disable` to add or remove collected data +- ✨ *inlet*: add `SrcAddrNAT`, `DstAddrNAT`, `SrcPortNAT`, `DstPortNAT` as disabled by default columns - 🩹 *inlet*: handle correctly interfaces with high indexes for sFlow - 🩹 *docker*: fix Kafka healthcheck - 🌱 *inlet*: improve decoding/encoding performance (twice faster!) diff --git a/console/filter/parser.peg b/console/filter/parser.peg index d1590ca4..7556293e 100644 --- a/console/filter/parser.peg +++ b/console/filter/parser.peg @@ -51,6 +51,8 @@ ColumnIP ← "ExporterAddress"i #{ return c.metaColumn("ExporterAddress") } { return c.acceptColumn() } / "SrcAddr"i #{ return c.metaColumn("SrcAddr") } { return c.acceptColumn() } / "DstAddr"i #{ return c.metaColumn("DstAddr") } { return c.acceptColumn() } + / "SrcAddrNAT"i #{ return c.metaColumn("SrcAddrNAT") } { return c.acceptColumn() } + / "DstAddrNAT"i #{ return c.metaColumn("DstAddrNAT") } { return c.acceptColumn() } ConditionIPExpr "condition on IP" ← column:ColumnIP _ operator:("=" / "!=") _ ip:IP { @@ -128,6 +130,8 @@ ConditionUintExpr "condition on integer" ← / "OutIfSpeed"i #{ return c.metaColumn("OutIfSpeed") } { return c.acceptColumn() } / "SrcPort"i #{ return c.metaColumn("SrcPort") } { return c.acceptColumn() } / "DstPort"i #{ return c.metaColumn("DstPort") } { return c.acceptColumn() } + / "SrcPortNAT"i #{ return c.metaColumn("SrcPortNAT") } { return c.acceptColumn() } + / "DstPortNAT"i #{ return c.metaColumn("DstPortNAT") } { return c.acceptColumn() } / "SrcVlan"i #{ return c.metaColumn("SrcVlan") } { return c.acceptColumn() } / "DstVlan"i #{ return c.metaColumn("DstVlan") } { return c.acceptColumn() } / "ForwardingStatus"i #{ return c.metaColumn("ForwardingStatus") } { return c.acceptColumn() }) _ diff --git a/console/filter/parser_test.go b/console/filter/parser_test.go index 4288e1b5..42f36d48 100644 --- a/console/filter/parser_test.go +++ b/console/filter/parser_test.go @@ -215,9 +215,19 @@ output provider */ = 'telia'`, {Input: `DstCommunities != 65000:100`, Output: `NOT has(DstCommunities, 4259840100)`, MetaOut: Meta{MainTableRequired: true}}, {Input: `DstCommunities = 65000:100:200`, Output: `has(DstLargeCommunities, bitShiftLeft(65000::UInt128, 64) + bitShiftLeft(100::UInt128, 32) + 200::UInt128)`, MetaOut: Meta{MainTableRequired: true}}, {Input: `DstCommunities != 65000:100:200`, Output: `NOT has(DstLargeCommunities, bitShiftLeft(65000::UInt128, 64) + bitShiftLeft(100::UInt128, 32) + 200::UInt128)`, MetaOut: Meta{MainTableRequired: true}}, + {Input: `SrcVlan = 1000`, Output: `SrcVlan = 1000`}, + {Input: `DstVlan = 1000`, Output: `DstVlan = 1000`}, + {Input: `SrcAddrNAT = 203.0.113.4`, Output: `SrcAddrNAT = toIPv6('203.0.113.4')`, + MetaOut: Meta{MainTableRequired: true}}, + {Input: `DstAddrNAT = 203.0.113.4`, Output: `DstAddrNAT = toIPv6('203.0.113.4')`, + MetaOut: Meta{MainTableRequired: true}}, + {Input: `SrcPortNAT = 22`, Output: `SrcPortNAT = 22`, + MetaOut: Meta{MainTableRequired: true}}, + {Input: `DstPortNAT = 22`, Output: `DstPortNAT = 22`, + MetaOut: Meta{MainTableRequired: true}}, } for _, tc := range cases { - tc.MetaIn.Schema = schema.NewMock(t) + tc.MetaIn.Schema = schema.NewMock(t).EnableAllColumns() tc.MetaOut.Schema = tc.MetaIn.Schema got, err := Parse("", []byte(tc.Input), GlobalStore("meta", &tc.MetaIn)) if err != nil { @@ -233,28 +243,6 @@ output provider */ = 'telia'`, } } -func TestFilterDisabled(t *testing.T) { - cases := []struct { - Input string - Output string - }{ - {`SrcVlan = 1000`, `SrcVlan = 1000`}, - {`DstVlan = 1000`, `DstVlan = 1000`}, - } - for _, tc := range cases { - got, err := Parse("", []byte(tc.Input), - GlobalStore("meta", &Meta{Schema: schema.NewMock(t).EnableAllColumns()})) - if err != nil { - t.Errorf("Parse(%q) error:\n%+v", tc.Input, err) - continue - } - if diff := helpers.Diff(got.(string), tc.Output); diff != "" { - t.Errorf("Parse(%q) (-got, +want):\n%s", tc.Input, diff) - } - } - -} - func TestInvalidFilter(t *testing.T) { cases := []struct { Input string diff --git a/inlet/flow/decoder/netflow/decode.go b/inlet/flow/decoder/netflow/decode.go index 961637a9..0926ed9b 100644 --- a/inlet/flow/decoder/netflow/decode.go +++ b/inlet/flow/decoder/netflow/decode.go @@ -123,6 +123,16 @@ func (nd *Decoder) decodeRecord(version int, fields []netflow.DataField) *schema case netflow.NFV9_FIELD_OUTPUT_SNMP: bf.OutIf = uint32(decodeUNumber(v)) + // NAT + case netflow.IPFIX_FIELD_postNATSourceIPv4Address: + nd.d.Schema.ProtobufAppendIP(bf, schema.ColumnSrcAddrNAT, decodeIP(v)) + case netflow.IPFIX_FIELD_postNATDestinationIPv4Address: + nd.d.Schema.ProtobufAppendIP(bf, schema.ColumnDstAddrNAT, decodeIP(v)) + case netflow.IPFIX_FIELD_postNAPTSourceTransportPort: + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcPortNAT, decodeUNumber(v)) + case netflow.IPFIX_FIELD_postNAPTDestinationTransportPort: + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstPortNAT, decodeUNumber(v)) + // Remaining case netflow.NFV9_FIELD_FORWARDING_STATUS: nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnForwardingStatus, decodeUNumber(v))