diff --git a/common/schema/definition.go b/common/schema/definition.go index e41d0016..ff9ef231 100644 --- a/common/schema/definition.go +++ b/common/schema/definition.go @@ -79,6 +79,8 @@ const ( ColumnDstAddrNAT ColumnSrcPortNAT ColumnDstPortNAT + ColumnSrcMAC + ColumnDstMAC ColumnLast ) @@ -262,6 +264,7 @@ END`, {Key: ColumnForwardingStatus, ClickHouseType: "UInt32"}, // TODO: UInt8 but hard to change, primary key {Key: ColumnSrcAddrNAT, Disabled: true, ClickHouseType: "IPv6", ClickHouseMainOnly: true}, {Key: ColumnSrcPortNAT, Disabled: true, ClickHouseType: "UInt16", ClickHouseMainOnly: true}, + {Key: ColumnSrcMAC, Disabled: true, ClickHouseType: "Uint64"}, }, }.finalize() } diff --git a/console/data/docs/99-changelog.md b/console/data/docs/99-changelog.md index 70095564..571e5390 100644 --- a/console/data/docs/99-changelog.md +++ b/console/data/docs/99-changelog.md @@ -20,7 +20,13 @@ days. You can disable that by setting `orchestrator.clickhouse.system-logs-ttl` to 0. - ✨ *inlet*: add `schema.enabled` and `schema.disabled` to add or remove collected data -- ✨ *inlet*: add `SrcAddrNAT`, `DstAddrNAT`, `SrcPortNAT`, `DstPortNAT` as disabled by default columns +- ✨ *inlet*: add the following collected data (disabled by default): + - `SrcAddrNAT` + - `DstAddrNAT` + - `SrcPortNAT` + - `DstPortNAT` + - `SrcMAC` + - `DstMAC` - 🩹 *inlet*: handle correctly interfaces with high indexes for sFlow - 🩹 *docker*: fix Kafka healthcheck - 🌱 *inlet*: improve decoding/encoding performance (twice faster!) diff --git a/console/filter.go b/console/filter.go index edc92fb1..a34dbded 100644 --- a/console/filter.go +++ b/console/filter.go @@ -165,6 +165,31 @@ func (c *Component) filterCompleteHandlerFunc(gc *gin.Context) { filterCompletion{"PIM", "protocol", true}, filterCompletion{"IPv4", "protocol", true}, filterCompletion{"IPv6", "protocol", true}) + case "srcmac", "dstmac": + results := []struct { + Label string `ch:"label"` + }{} + columnName := c.fixQueryColumnName(input.Column) + sqlQuery := fmt.Sprintf(` +SELECT MACNumToString(%s) AS label +FROM flows +WHERE TimeReceived > date_sub(minute, 1, now()) +AND positionCaseInsensitive(label, $1) >= 1 +GROUP BY %s +ORDER BY COUNT(*) DESC +LIMIT 20`, columnName, columnName) + if err := c.d.ClickHouseDB.Conn.Select(ctx, &results, sqlQuery, input.Prefix); err != nil { + c.r.Err(err).Msg("unable to query database") + break + } + for _, result := range results { + completions = append(completions, filterCompletion{ + Label: result.Label, + Detail: "MAC address", + Quoted: false, + }) + } + input.Prefix = "" // We have handled this internally case "dstcommunities": results := []struct { Label string `ch:"label"` diff --git a/console/filter/parser.peg b/console/filter/parser.peg index 0ef896fc..cd69effb 100644 --- a/console/filter/parser.peg +++ b/console/filter/parser.peg @@ -5,6 +5,7 @@ package filter import ( + "errors" "fmt" "net/netip" @@ -37,6 +38,7 @@ NotExpr "NOT expression" ← KW_NOT _ expr:Expr { ConditionExpr "conditional" ← ConditionIPExpr / ConditionPrefixExpr + / ConditionMACExpr / ConditionStringExpr / ConditionBoundaryExpr / ConditionUintExpr @@ -78,6 +80,13 @@ ConditionPrefixExpr "condition on prefix" ← return "", nil } +ConditionMACExpr "condition on MAC" ← + column:("SrcMAC"i !IdentStart #{ return c.metaColumn("SrcMAC") } { return c.acceptColumn() } + / "DstMAC"i !IdentStart #{ return c.metaColumn("DstMAC") } { return c.acceptColumn() }) _ + operator:("=" / "!=") _ mac:MAC { + return fmt.Sprintf("%s %s MACStringToNum(%s)", toString(column), toString(operator), quote(mac)), nil + } + ConditionStringExpr "condition on string" ← column:("ExporterName"i !IdentStart #{ return c.metaColumn("ExporterName") } { return c.acceptColumn() } / "ExporterGroup"i !IdentStart #{ return c.metaColumn("ExporterGroup") } { return c.acceptColumn() } @@ -194,7 +203,7 @@ ConditionPacketSizeExpr "condition on packet size" ← IP "IP address" ← [0-9A-Fa-f:.]+ !IdentStart { ip, err := netip.ParseAddr(string(c.text)) if err != nil { - return false, fmt.Errorf("expecting an IP address") + return "", errors.New("expecting an IP address") } return ip.String(), nil } @@ -202,7 +211,7 @@ IP "IP address" ← [0-9A-Fa-f:.]+ !IdentStart { Subnet "IP subnet" ← [0-9A-Fa-f:.]+ "/" [0-9]+ !IdentStart { net, err := netip.ParsePrefix(string(c.text)) if err != nil { - return false, fmt.Errorf("expecting a subnet") + return "", errors.New("expecting a subnet") } if net.Addr().Is6() { return fmt.Sprintf("BETWEEN toIPv6('%s') AND toIPv6('%s')", net.Masked().Addr().String(), lastIP(net).String()), nil @@ -214,7 +223,7 @@ Prefix "IP prefix" ← [0-9A-Fa-f:.]+ "/" [0-9]+ !IdentStart { // This returns a string with a %s placeholder net, err := netip.ParsePrefix(string(c.text)) if err != nil { - return false, fmt.Errorf("expecting a prefix") + return "", errors.New("expecting a prefix") } if net.Addr().Is6() { return fmt.Sprintf("BETWEEN toIPv6('%s') AND toIPv6('%s') AND %%sNetMask = %d", @@ -224,6 +233,17 @@ Prefix "IP prefix" ← [0-9A-Fa-f:.]+ "/" [0-9]+ !IdentStart { net.Masked().Addr().String(), lastIP(net).String(), net.Bits()), nil } +MAC "MAC address" ← [0-9A-Fa-f:.]+ !IdentStart { + hw, err := net.ParseMAC(string(c.text)) + if err != nil { + return "", errors.New("expecting a MAC address") + } + if len(hw) != 6 { + return "", errors.New("expecting a 6-byte MAC address") + } + return hw.String(), nil +} + ASN "AS number" ← "AS"i? value:Unsigned32 !IdentStart { return value, nil } diff --git a/console/filter/parser_test.go b/console/filter/parser_test.go index 42f36d48..483e59bf 100644 --- a/console/filter/parser_test.go +++ b/console/filter/parser_test.go @@ -225,6 +225,10 @@ output provider */ = 'telia'`, MetaOut: Meta{MainTableRequired: true}}, {Input: `DstPortNAT = 22`, Output: `DstPortNAT = 22`, MetaOut: Meta{MainTableRequired: true}}, + {Input: `SrcMAC = 00:11:22:33:44:55`, Output: `SrcMAC = MACStringToNum('00:11:22:33:44:55')`}, + {Input: `DstMAC = 00:11:22:33:44:55`, Output: `DstMAC = MACStringToNum('00:11:22:33:44:55')`}, + {Input: `SrcMAC != 00:0c:fF:33:44:55`, Output: `SrcMAC != MACStringToNum('00:0c:ff:33:44:55')`}, + {Input: `SrcMAC = 0000.5e00.5301`, Output: `SrcMAC = MACStringToNum('00:00:5e:00:53:01')`}, } for _, tc := range cases { tc.MetaIn.Schema = schema.NewMock(t).EnableAllColumns() @@ -245,34 +249,40 @@ output provider */ = 'telia'`, func TestInvalidFilter(t *testing.T) { cases := []struct { - Input string + Input string + EnableAll bool }{ - {`ExporterName`}, - {`ExporterName = `}, - {`ExporterName = 'something`}, - {`ExporterName='something"`}, - {`ExporterNamee="something"`}, - {`ExporterName>"something"`}, - {`ExporterAddress=203.0.113`}, - {`ExporterAddress=2001:db8`}, - {`ExporterAddress="2001:db8:0::1"`}, - {`SrcAS=12322a`}, - {`SrcAS=785473854857857485784`}, - {`EType = ipv7`}, - {`Proto = 100 AND`}, - {`AND Proto = 100`}, - {`Proto = 100AND Proto = 100`}, - {`Proto = 100 ANDProto = 100`}, - {`Proto = 100 AND (Proto = 100`}, - {`Proto = 100 /* Hello !`}, - {`SrcAS IN (AS12322, 29447`}, - {`SrcAS IN (AS12322 29447)`}, - {`SrcAS IN (AS12322,`}, - {`SrcVlan = 1000`}, - {`DstVlan = 1000`}, + {Input: `ExporterName`}, + {Input: `ExporterName = `}, + {Input: `ExporterName = 'something`}, + {Input: `ExporterName='something"`}, + {Input: `ExporterNamee="something"`}, + {Input: `ExporterName>"something"`}, + {Input: `ExporterAddress=203.0.113`}, + {Input: `ExporterAddress=2001:db8`}, + {Input: `ExporterAddress="2001:db8:0::1"`}, + {Input: `SrcAS=12322a`}, + {Input: `SrcAS=785473854857857485784`}, + {Input: `EType = ipv7`}, + {Input: `Proto = 100 AND`}, + {Input: `AND Proto = 100`}, + {Input: `Proto = 100AND Proto = 100`}, + {Input: `Proto = 100 ANDProto = 100`}, + {Input: `Proto = 100 AND (Proto = 100`}, + {Input: `Proto = 100 /* Hello !`}, + {Input: `SrcAS IN (AS12322, 29447`}, + {Input: `SrcAS IN (AS12322 29447)`}, + {Input: `SrcAS IN (AS12322,`}, + {Input: `SrcVlan = 1000`}, + {Input: `DstVlan = 1000`}, + {Input: `SrcMAC = 00:11:22:33:44:55:66`, EnableAll: true}, } for _, tc := range cases { - out, err := Parse("", []byte(tc.Input), GlobalStore("meta", &Meta{Schema: schema.NewMock(t)})) + sch := schema.NewMock(t) + if tc.EnableAll { + sch.EnableAllColumns() + } + out, err := Parse("", []byte(tc.Input), GlobalStore("meta", &Meta{Schema: sch})) if err == nil { t.Errorf("Parse(%q) didn't throw an error (got %s)", tc.Input, out) } diff --git a/console/filter_test.go b/console/filter_test.go index 9aaea4d3..dfb6b7f9 100644 --- a/console/filter_test.go +++ b/console/filter_test.go @@ -11,6 +11,7 @@ import ( "github.com/golang/mock/gomock" "akvorado/common/helpers" + "akvorado/common/schema" ) func TestFilterHandlers(t *testing.T) { @@ -350,3 +351,39 @@ LIMIT 20`, "6540"). }, }) } + +func TestFilterHandlersMore(t *testing.T) { + c, h, mockConn, _ := NewMock(t, DefaultConfiguration()) + c.d.Schema = schema.NewMock(t).EnableAllColumns() + + mockConn.EXPECT(). + Select(gomock.Any(), gomock.Any(), ` +SELECT MACNumToString(SrcMAC) AS label +FROM flows +WHERE TimeReceived > date_sub(minute, 1, now()) +AND positionCaseInsensitive(label, $1) >= 1 +GROUP BY SrcMAC +ORDER BY COUNT(*) DESC +LIMIT 20`, "11:"). + SetArg(1, []struct { + Label string `ch:"label"` + }{ + {"11:22:33:44:55:66"}, + {"11:33:33:44:55:66"}, + {"11:ff:33:44:55:66"}, + }). + Return(nil) + + helpers.TestHTTPEndpoints(t, h.LocalAddr(), helpers.HTTPEndpointCases{ + { + URL: "/api/v0/console/filter/complete", + StatusCode: 200, + JSONInput: gin.H{"what": "value", "column": "srcMAC", "prefix": "11:"}, + JSONOutput: gin.H{"completions": []gin.H{ + {"label": "11:22:33:44:55:66", "detail": "MAC address", "quoted": false}, + {"label": "11:33:33:44:55:66", "detail": "MAC address", "quoted": false}, + {"label": "11:ff:33:44:55:66", "detail": "MAC address", "quoted": false}, + }}, + }, + }) +} diff --git a/console/query/column.go b/console/query/column.go index baecbc6d..3c6f0c79 100644 --- a/console/query/column.go +++ b/console/query/column.go @@ -114,6 +114,8 @@ func (qc Column) ToSQLSelect() string { strValue = `arrayStringConcat(DstASPath, ' ')` case schema.ColumnDstCommunities: strValue = `arrayStringConcat(arrayConcat(arrayMap(c -> concat(toString(bitShiftRight(c, 16)), ':', toString(bitAnd(c, 0xffff))), DstCommunities), arrayMap(c -> concat(toString(bitAnd(bitShiftRight(c, 64), 0xffffffff)), ':', toString(bitAnd(bitShiftRight(c, 32), 0xffffffff)), ':', toString(bitAnd(c, 0xffffffff))), DstLargeCommunities)), ' ')` + case schema.ColumnSrcMAC, schema.ColumnDstMAC: + strValue = fmt.Sprintf("MACNumToString(%s)", qc) default: strValue = qc.String() } diff --git a/console/query/column_test.go b/console/query/column_test.go index 63dea4da..2f437389 100644 --- a/console/query/column_test.go +++ b/console/query/column_test.go @@ -79,12 +79,15 @@ func TestQueryColumnSQLSelect(t *testing.T) { }, { Input: schema.ColumnDstCommunities, Expected: `arrayStringConcat(arrayConcat(arrayMap(c -> concat(toString(bitShiftRight(c, 16)), ':', toString(bitAnd(c, 0xffff))), DstCommunities), arrayMap(c -> concat(toString(bitAnd(bitShiftRight(c, 64), 0xffffffff)), ':', toString(bitAnd(bitShiftRight(c, 32), 0xffffffff)), ':', toString(bitAnd(c, 0xffffffff))), DstLargeCommunities)), ' ')`, + }, { + Input: schema.ColumnDstMAC, + Expected: `MACNumToString(DstMAC)`, }, } for _, tc := range cases { t.Run(tc.Input.String(), func(t *testing.T) { column := query.NewColumn(tc.Input.String()) - if err := column.Validate(schema.NewMock(t)); err != nil { + if err := column.Validate(schema.NewMock(t).EnableAllColumns()); err != nil { t.Fatalf("Validate() error:\n%+v", err) } got := column.ToSQLSelect() diff --git a/console/widgets.go b/console/widgets.go index 5ecb010a..077936ce 100644 --- a/console/widgets.go +++ b/console/widgets.go @@ -23,6 +23,8 @@ func (c *Component) widgetFlowLastHandlerFunc(gc *gin.Context) { schema.ColumnDstLargeCommunities: `arrayMap(c -> concat(toString(bitAnd(bitShiftRight(c, 64), 0xffffffff)), ':', toString(bitAnd(bitShiftRight(c, 32), 0xffffffff)), ':', toString(bitAnd(c, 0xffffffff))), DstLargeCommunities)`, + schema.ColumnSrcMAC: `MACNumToString(SrcMAC)`, + schema.ColumnDstMAC: `MACNumToString(DstMAC)`, } selectClause := []string{"SELECT *"} except := []string{} diff --git a/inlet/flow/decoder/netflow/decode.go b/inlet/flow/decoder/netflow/decode.go index 0926ed9b..f11a0c48 100644 --- a/inlet/flow/decoder/netflow/decode.go +++ b/inlet/flow/decoder/netflow/decode.go @@ -82,6 +82,14 @@ func (nd *Decoder) decodeRecord(version int, fields []netflow.DataField) *schema nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcVlan, decodeUNumber(v)) case netflow.NFV9_FIELD_DST_VLAN: nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstVlan, decodeUNumber(v)) + case netflow.NFV9_FIELD_IN_SRC_MAC: + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcMAC, decodeUNumber(v)) + case netflow.NFV9_FIELD_IN_DST_MAC: + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstMAC, decodeUNumber(v)) + case netflow.NFV9_FIELD_OUT_SRC_MAC: + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcMAC, decodeUNumber(v)) + case netflow.NFV9_FIELD_OUT_DST_MAC: + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstMAC, decodeUNumber(v)) // L3 case netflow.NFV9_FIELD_IPV4_SRC_ADDR: diff --git a/inlet/flow/decoder/sflow/decode.go b/inlet/flow/decoder/sflow/decode.go index f6de6d01..f2a8e135 100644 --- a/inlet/flow/decoder/sflow/decode.go +++ b/inlet/flow/decoder/sflow/decode.go @@ -120,6 +120,10 @@ func (nd *Decoder) parseEthernetHeader(bf *schema.FlowMessage, data []byte) { if len(data) < 14 { return } + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstMAC, + binary.BigEndian.Uint64([]byte{0, 0, data[0], data[1], data[2], data[3], data[4], data[5]})) + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcMAC, + binary.BigEndian.Uint64([]byte{0, 0, data[6], data[7], data[8], data[9], data[10], data[11]})) etherType := data[12:14] data = data[14:] if etherType[0] == 0x81 && etherType[1] == 0x00 { diff --git a/inlet/flow/decoder/sflow/root_test.go b/inlet/flow/decoder/sflow/root_test.go index cdb88d2d..e8d0c36f 100644 --- a/inlet/flow/decoder/sflow/root_test.go +++ b/inlet/flow/decoder/sflow/root_test.go @@ -42,6 +42,8 @@ func TestDecode(t *testing.T) { schema.ColumnDstPort: 22, schema.ColumnSrcVlan: 100, schema.ColumnDstVlan: 100, + schema.ColumnSrcMAC: 40057391053392, + schema.ColumnDstMAC: 40057381862408, }, }, { SamplingRate: 1024, @@ -63,6 +65,8 @@ func TestDecode(t *testing.T) { schema.ColumnSrcNetMask: 20, schema.ColumnDstNetMask: 27, schema.ColumnDstVlan: 100, + schema.ColumnSrcMAC: 216372595274807, + schema.ColumnDstMAC: 191421060163210, }, }, { SamplingRate: 1024, @@ -80,6 +84,8 @@ func TestDecode(t *testing.T) { schema.ColumnDstPort: 22, schema.ColumnSrcVlan: 100, schema.ColumnDstVlan: 100, + schema.ColumnSrcMAC: 40057391053392, + schema.ColumnDstMAC: 40057381862408, }, }, { SamplingRate: 1024, @@ -101,6 +107,8 @@ func TestDecode(t *testing.T) { schema.ColumnSrcNetMask: 27, schema.ColumnDstNetMask: 17, schema.ColumnSrcVlan: 100, + schema.ColumnSrcMAC: 138617863011056, + schema.ColumnDstMAC: 216372595274807, }, }, { SamplingRate: 1024, @@ -118,6 +126,8 @@ func TestDecode(t *testing.T) { schema.ColumnDstPort: 22, schema.ColumnSrcVlan: 100, schema.ColumnDstVlan: 100, + schema.ColumnSrcMAC: 40057391053392, + schema.ColumnDstMAC: 40057381862408, }, }, }