common/schema: add Src/DstAddrNAT, Src/DstPortNAT, DstPortNAT`

Also parse them for IPFIX.

Fix #211
This commit is contained in:
Vincent Bernat
2023-01-19 19:31:23 +01:00
parent 4d30445099
commit 78caf8e07b
5 changed files with 32 additions and 23 deletions

View File

@@ -75,6 +75,10 @@ const (
ColumnOutIfConnectivity ColumnOutIfConnectivity
ColumnInIfBoundary ColumnInIfBoundary
ColumnOutIfBoundary ColumnOutIfBoundary
ColumnSrcAddrNAT
ColumnDstAddrNAT
ColumnSrcPortNAT
ColumnDstPortNAT
ColumnLast ColumnLast
) )
@@ -256,6 +260,8 @@ END`,
}(), }(),
}, },
{Key: ColumnForwardingStatus, ClickHouseType: "UInt32"}, // TODO: UInt8 but hard to change, primary key {Key: ColumnForwardingStatus, ClickHouseType: "UInt32"}, // TODO: UInt8 but hard to change, primary key
{Key: ColumnSrcAddrNAT, Disabled: true, ClickHouseType: "IPv6", ClickHouseMainOnly: true},
{Key: ColumnSrcPortNAT, Disabled: true, ClickHouseType: "UInt16", ClickHouseMainOnly: true},
}, },
}.finalize() }.finalize()
} }

View File

@@ -20,6 +20,7 @@ days. You can disable that by setting `orchestrator.clickhouse.system-logs-ttl`
to 0. to 0.
-*inlet*: add `schema.enable` and `schema.disable` to add or remove collected data -*inlet*: add `schema.enable` and `schema.disable` to add or remove collected data
-*inlet*: add `SrcAddrNAT`, `DstAddrNAT`, `SrcPortNAT`, `DstPortNAT` as disabled by default columns
- 🩹 *inlet*: handle correctly interfaces with high indexes for sFlow - 🩹 *inlet*: handle correctly interfaces with high indexes for sFlow
- 🩹 *docker*: fix Kafka healthcheck - 🩹 *docker*: fix Kafka healthcheck
- 🌱 *inlet*: improve decoding/encoding performance (twice faster!) - 🌱 *inlet*: improve decoding/encoding performance (twice faster!)

View File

@@ -51,6 +51,8 @@ ColumnIP ←
"ExporterAddress"i #{ return c.metaColumn("ExporterAddress") } { return c.acceptColumn() } "ExporterAddress"i #{ return c.metaColumn("ExporterAddress") } { return c.acceptColumn() }
/ "SrcAddr"i #{ return c.metaColumn("SrcAddr") } { return c.acceptColumn() } / "SrcAddr"i #{ return c.metaColumn("SrcAddr") } { return c.acceptColumn() }
/ "DstAddr"i #{ return c.metaColumn("DstAddr") } { return c.acceptColumn() } / "DstAddr"i #{ return c.metaColumn("DstAddr") } { return c.acceptColumn() }
/ "SrcAddrNAT"i #{ return c.metaColumn("SrcAddrNAT") } { return c.acceptColumn() }
/ "DstAddrNAT"i #{ return c.metaColumn("DstAddrNAT") } { return c.acceptColumn() }
ConditionIPExpr "condition on IP" ← ConditionIPExpr "condition on IP" ←
column:ColumnIP _ column:ColumnIP _
operator:("=" / "!=") _ ip:IP { operator:("=" / "!=") _ ip:IP {
@@ -128,6 +130,8 @@ ConditionUintExpr "condition on integer" ←
/ "OutIfSpeed"i #{ return c.metaColumn("OutIfSpeed") } { return c.acceptColumn() } / "OutIfSpeed"i #{ return c.metaColumn("OutIfSpeed") } { return c.acceptColumn() }
/ "SrcPort"i #{ return c.metaColumn("SrcPort") } { return c.acceptColumn() } / "SrcPort"i #{ return c.metaColumn("SrcPort") } { return c.acceptColumn() }
/ "DstPort"i #{ return c.metaColumn("DstPort") } { return c.acceptColumn() } / "DstPort"i #{ return c.metaColumn("DstPort") } { return c.acceptColumn() }
/ "SrcPortNAT"i #{ return c.metaColumn("SrcPortNAT") } { return c.acceptColumn() }
/ "DstPortNAT"i #{ return c.metaColumn("DstPortNAT") } { return c.acceptColumn() }
/ "SrcVlan"i #{ return c.metaColumn("SrcVlan") } { return c.acceptColumn() } / "SrcVlan"i #{ return c.metaColumn("SrcVlan") } { return c.acceptColumn() }
/ "DstVlan"i #{ return c.metaColumn("DstVlan") } { return c.acceptColumn() } / "DstVlan"i #{ return c.metaColumn("DstVlan") } { return c.acceptColumn() }
/ "ForwardingStatus"i #{ return c.metaColumn("ForwardingStatus") } { return c.acceptColumn() }) _ / "ForwardingStatus"i #{ return c.metaColumn("ForwardingStatus") } { return c.acceptColumn() }) _

View File

@@ -215,9 +215,19 @@ output provider */ = 'telia'`,
{Input: `DstCommunities != 65000:100`, Output: `NOT has(DstCommunities, 4259840100)`, MetaOut: Meta{MainTableRequired: true}}, {Input: `DstCommunities != 65000:100`, Output: `NOT has(DstCommunities, 4259840100)`, MetaOut: Meta{MainTableRequired: true}},
{Input: `DstCommunities = 65000:100:200`, Output: `has(DstLargeCommunities, bitShiftLeft(65000::UInt128, 64) + bitShiftLeft(100::UInt128, 32) + 200::UInt128)`, MetaOut: Meta{MainTableRequired: true}}, {Input: `DstCommunities = 65000:100:200`, Output: `has(DstLargeCommunities, bitShiftLeft(65000::UInt128, 64) + bitShiftLeft(100::UInt128, 32) + 200::UInt128)`, MetaOut: Meta{MainTableRequired: true}},
{Input: `DstCommunities != 65000:100:200`, Output: `NOT has(DstLargeCommunities, bitShiftLeft(65000::UInt128, 64) + bitShiftLeft(100::UInt128, 32) + 200::UInt128)`, MetaOut: Meta{MainTableRequired: true}}, {Input: `DstCommunities != 65000:100:200`, Output: `NOT has(DstLargeCommunities, bitShiftLeft(65000::UInt128, 64) + bitShiftLeft(100::UInt128, 32) + 200::UInt128)`, MetaOut: Meta{MainTableRequired: true}},
{Input: `SrcVlan = 1000`, Output: `SrcVlan = 1000`},
{Input: `DstVlan = 1000`, Output: `DstVlan = 1000`},
{Input: `SrcAddrNAT = 203.0.113.4`, Output: `SrcAddrNAT = toIPv6('203.0.113.4')`,
MetaOut: Meta{MainTableRequired: true}},
{Input: `DstAddrNAT = 203.0.113.4`, Output: `DstAddrNAT = toIPv6('203.0.113.4')`,
MetaOut: Meta{MainTableRequired: true}},
{Input: `SrcPortNAT = 22`, Output: `SrcPortNAT = 22`,
MetaOut: Meta{MainTableRequired: true}},
{Input: `DstPortNAT = 22`, Output: `DstPortNAT = 22`,
MetaOut: Meta{MainTableRequired: true}},
} }
for _, tc := range cases { for _, tc := range cases {
tc.MetaIn.Schema = schema.NewMock(t) tc.MetaIn.Schema = schema.NewMock(t).EnableAllColumns()
tc.MetaOut.Schema = tc.MetaIn.Schema tc.MetaOut.Schema = tc.MetaIn.Schema
got, err := Parse("", []byte(tc.Input), GlobalStore("meta", &tc.MetaIn)) got, err := Parse("", []byte(tc.Input), GlobalStore("meta", &tc.MetaIn))
if err != nil { if err != nil {
@@ -233,28 +243,6 @@ output provider */ = 'telia'`,
} }
} }
func TestFilterDisabled(t *testing.T) {
cases := []struct {
Input string
Output string
}{
{`SrcVlan = 1000`, `SrcVlan = 1000`},
{`DstVlan = 1000`, `DstVlan = 1000`},
}
for _, tc := range cases {
got, err := Parse("", []byte(tc.Input),
GlobalStore("meta", &Meta{Schema: schema.NewMock(t).EnableAllColumns()}))
if err != nil {
t.Errorf("Parse(%q) error:\n%+v", tc.Input, err)
continue
}
if diff := helpers.Diff(got.(string), tc.Output); diff != "" {
t.Errorf("Parse(%q) (-got, +want):\n%s", tc.Input, diff)
}
}
}
func TestInvalidFilter(t *testing.T) { func TestInvalidFilter(t *testing.T) {
cases := []struct { cases := []struct {
Input string Input string

View File

@@ -123,6 +123,16 @@ func (nd *Decoder) decodeRecord(version int, fields []netflow.DataField) *schema
case netflow.NFV9_FIELD_OUTPUT_SNMP: case netflow.NFV9_FIELD_OUTPUT_SNMP:
bf.OutIf = uint32(decodeUNumber(v)) bf.OutIf = uint32(decodeUNumber(v))
// NAT
case netflow.IPFIX_FIELD_postNATSourceIPv4Address:
nd.d.Schema.ProtobufAppendIP(bf, schema.ColumnSrcAddrNAT, decodeIP(v))
case netflow.IPFIX_FIELD_postNATDestinationIPv4Address:
nd.d.Schema.ProtobufAppendIP(bf, schema.ColumnDstAddrNAT, decodeIP(v))
case netflow.IPFIX_FIELD_postNAPTSourceTransportPort:
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcPortNAT, decodeUNumber(v))
case netflow.IPFIX_FIELD_postNAPTDestinationTransportPort:
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstPortNAT, decodeUNumber(v))
// Remaining // Remaining
case netflow.NFV9_FIELD_FORWARDING_STATUS: case netflow.NFV9_FIELD_FORWARDING_STATUS:
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnForwardingStatus, decodeUNumber(v)) nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnForwardingStatus, decodeUNumber(v))