common/schema: add SrcMAC and DstMAC

This commit is contained in:
Vincent Bernat
2023-01-19 23:12:17 +01:00
parent 59b06635fd
commit 9eee46cade
12 changed files with 160 additions and 30 deletions

View File

@@ -79,6 +79,8 @@ const (
ColumnDstAddrNAT
ColumnSrcPortNAT
ColumnDstPortNAT
ColumnSrcMAC
ColumnDstMAC
ColumnLast
)
@@ -262,6 +264,7 @@ END`,
{Key: ColumnForwardingStatus, ClickHouseType: "UInt32"}, // TODO: UInt8 but hard to change, primary key
{Key: ColumnSrcAddrNAT, Disabled: true, ClickHouseType: "IPv6", ClickHouseMainOnly: true},
{Key: ColumnSrcPortNAT, Disabled: true, ClickHouseType: "UInt16", ClickHouseMainOnly: true},
{Key: ColumnSrcMAC, Disabled: true, ClickHouseType: "Uint64"},
},
}.finalize()
}

View File

@@ -20,7 +20,13 @@ days. You can disable that by setting `orchestrator.clickhouse.system-logs-ttl`
to 0.
-*inlet*: add `schema.enabled` and `schema.disabled` to add or remove collected data
-*inlet*: add `SrcAddrNAT`, `DstAddrNAT`, `SrcPortNAT`, `DstPortNAT` as disabled by default columns
-*inlet*: add the following collected data (disabled by default):
- `SrcAddrNAT`
- `DstAddrNAT`
- `SrcPortNAT`
- `DstPortNAT`
- `SrcMAC`
- `DstMAC`
- 🩹 *inlet*: handle correctly interfaces with high indexes for sFlow
- 🩹 *docker*: fix Kafka healthcheck
- 🌱 *inlet*: improve decoding/encoding performance (twice faster!)

View File

@@ -165,6 +165,31 @@ func (c *Component) filterCompleteHandlerFunc(gc *gin.Context) {
filterCompletion{"PIM", "protocol", true},
filterCompletion{"IPv4", "protocol", true},
filterCompletion{"IPv6", "protocol", true})
case "srcmac", "dstmac":
results := []struct {
Label string `ch:"label"`
}{}
columnName := c.fixQueryColumnName(input.Column)
sqlQuery := fmt.Sprintf(`
SELECT MACNumToString(%s) AS label
FROM flows
WHERE TimeReceived > date_sub(minute, 1, now())
AND positionCaseInsensitive(label, $1) >= 1
GROUP BY %s
ORDER BY COUNT(*) DESC
LIMIT 20`, columnName, columnName)
if err := c.d.ClickHouseDB.Conn.Select(ctx, &results, sqlQuery, input.Prefix); err != nil {
c.r.Err(err).Msg("unable to query database")
break
}
for _, result := range results {
completions = append(completions, filterCompletion{
Label: result.Label,
Detail: "MAC address",
Quoted: false,
})
}
input.Prefix = "" // We have handled this internally
case "dstcommunities":
results := []struct {
Label string `ch:"label"`

View File

@@ -5,6 +5,7 @@
package filter
import (
"errors"
"fmt"
"net/netip"
@@ -37,6 +38,7 @@ NotExpr "NOT expression" ← KW_NOT _ expr:Expr {
ConditionExpr "conditional" ←
ConditionIPExpr
/ ConditionPrefixExpr
/ ConditionMACExpr
/ ConditionStringExpr
/ ConditionBoundaryExpr
/ ConditionUintExpr
@@ -78,6 +80,13 @@ ConditionPrefixExpr "condition on prefix" ←
return "", nil
}
ConditionMACExpr "condition on MAC" ←
column:("SrcMAC"i !IdentStart #{ return c.metaColumn("SrcMAC") } { return c.acceptColumn() }
/ "DstMAC"i !IdentStart #{ return c.metaColumn("DstMAC") } { return c.acceptColumn() }) _
operator:("=" / "!=") _ mac:MAC {
return fmt.Sprintf("%s %s MACStringToNum(%s)", toString(column), toString(operator), quote(mac)), nil
}
ConditionStringExpr "condition on string" ←
column:("ExporterName"i !IdentStart #{ return c.metaColumn("ExporterName") } { return c.acceptColumn() }
/ "ExporterGroup"i !IdentStart #{ return c.metaColumn("ExporterGroup") } { return c.acceptColumn() }
@@ -194,7 +203,7 @@ ConditionPacketSizeExpr "condition on packet size" ←
IP "IP address" ← [0-9A-Fa-f:.]+ !IdentStart {
ip, err := netip.ParseAddr(string(c.text))
if err != nil {
return false, fmt.Errorf("expecting an IP address")
return "", errors.New("expecting an IP address")
}
return ip.String(), nil
}
@@ -202,7 +211,7 @@ IP "IP address" ← [0-9A-Fa-f:.]+ !IdentStart {
Subnet "IP subnet" ← [0-9A-Fa-f:.]+ "/" [0-9]+ !IdentStart {
net, err := netip.ParsePrefix(string(c.text))
if err != nil {
return false, fmt.Errorf("expecting a subnet")
return "", errors.New("expecting a subnet")
}
if net.Addr().Is6() {
return fmt.Sprintf("BETWEEN toIPv6('%s') AND toIPv6('%s')", net.Masked().Addr().String(), lastIP(net).String()), nil
@@ -214,7 +223,7 @@ Prefix "IP prefix" ← [0-9A-Fa-f:.]+ "/" [0-9]+ !IdentStart {
// This returns a string with a %s placeholder
net, err := netip.ParsePrefix(string(c.text))
if err != nil {
return false, fmt.Errorf("expecting a prefix")
return "", errors.New("expecting a prefix")
}
if net.Addr().Is6() {
return fmt.Sprintf("BETWEEN toIPv6('%s') AND toIPv6('%s') AND %%sNetMask = %d",
@@ -224,6 +233,17 @@ Prefix "IP prefix" ← [0-9A-Fa-f:.]+ "/" [0-9]+ !IdentStart {
net.Masked().Addr().String(), lastIP(net).String(), net.Bits()), nil
}
MAC "MAC address" ← [0-9A-Fa-f:.]+ !IdentStart {
hw, err := net.ParseMAC(string(c.text))
if err != nil {
return "", errors.New("expecting a MAC address")
}
if len(hw) != 6 {
return "", errors.New("expecting a 6-byte MAC address")
}
return hw.String(), nil
}
ASN "AS number" ← "AS"i? value:Unsigned32 !IdentStart {
return value, nil
}

View File

@@ -225,6 +225,10 @@ output provider */ = 'telia'`,
MetaOut: Meta{MainTableRequired: true}},
{Input: `DstPortNAT = 22`, Output: `DstPortNAT = 22`,
MetaOut: Meta{MainTableRequired: true}},
{Input: `SrcMAC = 00:11:22:33:44:55`, Output: `SrcMAC = MACStringToNum('00:11:22:33:44:55')`},
{Input: `DstMAC = 00:11:22:33:44:55`, Output: `DstMAC = MACStringToNum('00:11:22:33:44:55')`},
{Input: `SrcMAC != 00:0c:fF:33:44:55`, Output: `SrcMAC != MACStringToNum('00:0c:ff:33:44:55')`},
{Input: `SrcMAC = 0000.5e00.5301`, Output: `SrcMAC = MACStringToNum('00:00:5e:00:53:01')`},
}
for _, tc := range cases {
tc.MetaIn.Schema = schema.NewMock(t).EnableAllColumns()
@@ -245,34 +249,40 @@ output provider */ = 'telia'`,
func TestInvalidFilter(t *testing.T) {
cases := []struct {
Input string
Input string
EnableAll bool
}{
{`ExporterName`},
{`ExporterName = `},
{`ExporterName = 'something`},
{`ExporterName='something"`},
{`ExporterNamee="something"`},
{`ExporterName>"something"`},
{`ExporterAddress=203.0.113`},
{`ExporterAddress=2001:db8`},
{`ExporterAddress="2001:db8:0::1"`},
{`SrcAS=12322a`},
{`SrcAS=785473854857857485784`},
{`EType = ipv7`},
{`Proto = 100 AND`},
{`AND Proto = 100`},
{`Proto = 100AND Proto = 100`},
{`Proto = 100 ANDProto = 100`},
{`Proto = 100 AND (Proto = 100`},
{`Proto = 100 /* Hello !`},
{`SrcAS IN (AS12322, 29447`},
{`SrcAS IN (AS12322 29447)`},
{`SrcAS IN (AS12322,`},
{`SrcVlan = 1000`},
{`DstVlan = 1000`},
{Input: `ExporterName`},
{Input: `ExporterName = `},
{Input: `ExporterName = 'something`},
{Input: `ExporterName='something"`},
{Input: `ExporterNamee="something"`},
{Input: `ExporterName>"something"`},
{Input: `ExporterAddress=203.0.113`},
{Input: `ExporterAddress=2001:db8`},
{Input: `ExporterAddress="2001:db8:0::1"`},
{Input: `SrcAS=12322a`},
{Input: `SrcAS=785473854857857485784`},
{Input: `EType = ipv7`},
{Input: `Proto = 100 AND`},
{Input: `AND Proto = 100`},
{Input: `Proto = 100AND Proto = 100`},
{Input: `Proto = 100 ANDProto = 100`},
{Input: `Proto = 100 AND (Proto = 100`},
{Input: `Proto = 100 /* Hello !`},
{Input: `SrcAS IN (AS12322, 29447`},
{Input: `SrcAS IN (AS12322 29447)`},
{Input: `SrcAS IN (AS12322,`},
{Input: `SrcVlan = 1000`},
{Input: `DstVlan = 1000`},
{Input: `SrcMAC = 00:11:22:33:44:55:66`, EnableAll: true},
}
for _, tc := range cases {
out, err := Parse("", []byte(tc.Input), GlobalStore("meta", &Meta{Schema: schema.NewMock(t)}))
sch := schema.NewMock(t)
if tc.EnableAll {
sch.EnableAllColumns()
}
out, err := Parse("", []byte(tc.Input), GlobalStore("meta", &Meta{Schema: sch}))
if err == nil {
t.Errorf("Parse(%q) didn't throw an error (got %s)", tc.Input, out)
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/golang/mock/gomock"
"akvorado/common/helpers"
"akvorado/common/schema"
)
func TestFilterHandlers(t *testing.T) {
@@ -350,3 +351,39 @@ LIMIT 20`, "6540").
},
})
}
func TestFilterHandlersMore(t *testing.T) {
c, h, mockConn, _ := NewMock(t, DefaultConfiguration())
c.d.Schema = schema.NewMock(t).EnableAllColumns()
mockConn.EXPECT().
Select(gomock.Any(), gomock.Any(), `
SELECT MACNumToString(SrcMAC) AS label
FROM flows
WHERE TimeReceived > date_sub(minute, 1, now())
AND positionCaseInsensitive(label, $1) >= 1
GROUP BY SrcMAC
ORDER BY COUNT(*) DESC
LIMIT 20`, "11:").
SetArg(1, []struct {
Label string `ch:"label"`
}{
{"11:22:33:44:55:66"},
{"11:33:33:44:55:66"},
{"11:ff:33:44:55:66"},
}).
Return(nil)
helpers.TestHTTPEndpoints(t, h.LocalAddr(), helpers.HTTPEndpointCases{
{
URL: "/api/v0/console/filter/complete",
StatusCode: 200,
JSONInput: gin.H{"what": "value", "column": "srcMAC", "prefix": "11:"},
JSONOutput: gin.H{"completions": []gin.H{
{"label": "11:22:33:44:55:66", "detail": "MAC address", "quoted": false},
{"label": "11:33:33:44:55:66", "detail": "MAC address", "quoted": false},
{"label": "11:ff:33:44:55:66", "detail": "MAC address", "quoted": false},
}},
},
})
}

View File

@@ -114,6 +114,8 @@ func (qc Column) ToSQLSelect() string {
strValue = `arrayStringConcat(DstASPath, ' ')`
case schema.ColumnDstCommunities:
strValue = `arrayStringConcat(arrayConcat(arrayMap(c -> concat(toString(bitShiftRight(c, 16)), ':', toString(bitAnd(c, 0xffff))), DstCommunities), arrayMap(c -> concat(toString(bitAnd(bitShiftRight(c, 64), 0xffffffff)), ':', toString(bitAnd(bitShiftRight(c, 32), 0xffffffff)), ':', toString(bitAnd(c, 0xffffffff))), DstLargeCommunities)), ' ')`
case schema.ColumnSrcMAC, schema.ColumnDstMAC:
strValue = fmt.Sprintf("MACNumToString(%s)", qc)
default:
strValue = qc.String()
}

View File

@@ -79,12 +79,15 @@ func TestQueryColumnSQLSelect(t *testing.T) {
}, {
Input: schema.ColumnDstCommunities,
Expected: `arrayStringConcat(arrayConcat(arrayMap(c -> concat(toString(bitShiftRight(c, 16)), ':', toString(bitAnd(c, 0xffff))), DstCommunities), arrayMap(c -> concat(toString(bitAnd(bitShiftRight(c, 64), 0xffffffff)), ':', toString(bitAnd(bitShiftRight(c, 32), 0xffffffff)), ':', toString(bitAnd(c, 0xffffffff))), DstLargeCommunities)), ' ')`,
}, {
Input: schema.ColumnDstMAC,
Expected: `MACNumToString(DstMAC)`,
},
}
for _, tc := range cases {
t.Run(tc.Input.String(), func(t *testing.T) {
column := query.NewColumn(tc.Input.String())
if err := column.Validate(schema.NewMock(t)); err != nil {
if err := column.Validate(schema.NewMock(t).EnableAllColumns()); err != nil {
t.Fatalf("Validate() error:\n%+v", err)
}
got := column.ToSQLSelect()

View File

@@ -23,6 +23,8 @@ func (c *Component) widgetFlowLastHandlerFunc(gc *gin.Context) {
schema.ColumnDstLargeCommunities: `arrayMap(c -> concat(toString(bitAnd(bitShiftRight(c, 64), 0xffffffff)), ':',
toString(bitAnd(bitShiftRight(c, 32), 0xffffffff)), ':',
toString(bitAnd(c, 0xffffffff))), DstLargeCommunities)`,
schema.ColumnSrcMAC: `MACNumToString(SrcMAC)`,
schema.ColumnDstMAC: `MACNumToString(DstMAC)`,
}
selectClause := []string{"SELECT *"}
except := []string{}

View File

@@ -82,6 +82,14 @@ func (nd *Decoder) decodeRecord(version int, fields []netflow.DataField) *schema
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcVlan, decodeUNumber(v))
case netflow.NFV9_FIELD_DST_VLAN:
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstVlan, decodeUNumber(v))
case netflow.NFV9_FIELD_IN_SRC_MAC:
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcMAC, decodeUNumber(v))
case netflow.NFV9_FIELD_IN_DST_MAC:
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstMAC, decodeUNumber(v))
case netflow.NFV9_FIELD_OUT_SRC_MAC:
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcMAC, decodeUNumber(v))
case netflow.NFV9_FIELD_OUT_DST_MAC:
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstMAC, decodeUNumber(v))
// L3
case netflow.NFV9_FIELD_IPV4_SRC_ADDR:

View File

@@ -120,6 +120,10 @@ func (nd *Decoder) parseEthernetHeader(bf *schema.FlowMessage, data []byte) {
if len(data) < 14 {
return
}
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstMAC,
binary.BigEndian.Uint64([]byte{0, 0, data[0], data[1], data[2], data[3], data[4], data[5]}))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcMAC,
binary.BigEndian.Uint64([]byte{0, 0, data[6], data[7], data[8], data[9], data[10], data[11]}))
etherType := data[12:14]
data = data[14:]
if etherType[0] == 0x81 && etherType[1] == 0x00 {

View File

@@ -42,6 +42,8 @@ func TestDecode(t *testing.T) {
schema.ColumnDstPort: 22,
schema.ColumnSrcVlan: 100,
schema.ColumnDstVlan: 100,
schema.ColumnSrcMAC: 40057391053392,
schema.ColumnDstMAC: 40057381862408,
},
}, {
SamplingRate: 1024,
@@ -63,6 +65,8 @@ func TestDecode(t *testing.T) {
schema.ColumnSrcNetMask: 20,
schema.ColumnDstNetMask: 27,
schema.ColumnDstVlan: 100,
schema.ColumnSrcMAC: 216372595274807,
schema.ColumnDstMAC: 191421060163210,
},
}, {
SamplingRate: 1024,
@@ -80,6 +84,8 @@ func TestDecode(t *testing.T) {
schema.ColumnDstPort: 22,
schema.ColumnSrcVlan: 100,
schema.ColumnDstVlan: 100,
schema.ColumnSrcMAC: 40057391053392,
schema.ColumnDstMAC: 40057381862408,
},
}, {
SamplingRate: 1024,
@@ -101,6 +107,8 @@ func TestDecode(t *testing.T) {
schema.ColumnSrcNetMask: 27,
schema.ColumnDstNetMask: 17,
schema.ColumnSrcVlan: 100,
schema.ColumnSrcMAC: 138617863011056,
schema.ColumnDstMAC: 216372595274807,
},
}, {
SamplingRate: 1024,
@@ -118,6 +126,8 @@ func TestDecode(t *testing.T) {
schema.ColumnDstPort: 22,
schema.ColumnSrcVlan: 100,
schema.ColumnDstVlan: 100,
schema.ColumnSrcMAC: 40057391053392,
schema.ColumnDstMAC: 40057381862408,
},
},
}