diff --git a/common/schema/definition.go b/common/schema/definition.go index ff9ef231..7e33ebe1 100644 --- a/common/schema/definition.go +++ b/common/schema/definition.go @@ -7,6 +7,7 @@ import ( "fmt" "strings" + "github.com/bits-and-blooms/bitset" "golang.org/x/exp/slices" "google.golang.org/protobuf/encoding/protowire" "google.golang.org/protobuf/reflect/protoreflect" @@ -85,6 +86,13 @@ const ( ColumnLast ) +const ( + ColumnGroupL2 ColumnGroup = iota + 1 + ColumnGroupNAT + + ColumnGroupLast +) + // revive:enable // Flows is the data schema for flows tables. Any column starting with Src/InIf @@ -162,7 +170,7 @@ END`, ClickHouseType: "LowCardinality(String)", ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'tenant', SrcAddr, '')", }, - {Key: ColumnSrcVlan, ClickHouseType: "UInt16", Disabled: true}, + {Key: ColumnSrcVlan, ClickHouseType: "UInt16", Disabled: true, Group: ColumnGroupL2}, {Key: ColumnSrcCountry, ClickHouseType: "FixedString(2)"}, { Key: ColumnDstASPath, @@ -262,9 +270,21 @@ END`, }(), }, {Key: ColumnForwardingStatus, ClickHouseType: "UInt32"}, // TODO: UInt8 but hard to change, primary key - {Key: ColumnSrcAddrNAT, Disabled: true, ClickHouseType: "IPv6", ClickHouseMainOnly: true}, - {Key: ColumnSrcPortNAT, Disabled: true, ClickHouseType: "UInt16", ClickHouseMainOnly: true}, - {Key: ColumnSrcMAC, Disabled: true, ClickHouseType: "Uint64"}, + { + Key: ColumnSrcAddrNAT, + Disabled: true, + Group: ColumnGroupNAT, + ClickHouseType: "IPv6", + ClickHouseMainOnly: true, + }, + { + Key: ColumnSrcPortNAT, + Disabled: true, + Group: ColumnGroupNAT, + ClickHouseType: "UInt16", + ClickHouseMainOnly: true, + }, + {Key: ColumnSrcMAC, Disabled: true, Group: ColumnGroupL2, ClickHouseType: "Uint64"}, }, }.finalize() } @@ -300,24 +320,37 @@ func (schema Schema) finalize() Schema { ncolumns = append(ncolumns, column) // Expand the schema Src → Dst and InIf → OutIf + alreadyExists := func(name string) bool { + key, _ := columnNameMap.LoadKey(name) + for _, column := range schema.columns { + if column.Key == key { + return true + } + } + return false + } if strings.HasPrefix(column.Name, "Src") { column.Name = fmt.Sprintf("Dst%s", column.Name[3:]) - column.Key, ok = columnNameMap.LoadKey(column.Name) - if !ok { - panic(fmt.Sprintf("missing name mapping for %q", column.Name)) + if !alreadyExists(column.Name) { + column.Key, ok = columnNameMap.LoadKey(column.Name) + if !ok { + panic(fmt.Sprintf("missing name mapping for %q", column.Name)) + } + column.ClickHouseAlias = strings.ReplaceAll(column.ClickHouseAlias, "Src", "Dst") + column.ClickHouseTransformFrom = slices.Clone(column.ClickHouseTransformFrom) + ncolumns = append(ncolumns, column) } - column.ClickHouseAlias = strings.ReplaceAll(column.ClickHouseAlias, "Src", "Dst") - column.ClickHouseTransformFrom = slices.Clone(column.ClickHouseTransformFrom) - ncolumns = append(ncolumns, column) } else if strings.HasPrefix(column.Name, "InIf") { column.Name = fmt.Sprintf("OutIf%s", column.Name[4:]) - column.Key, ok = columnNameMap.LoadKey(column.Name) - if !ok { - panic(fmt.Sprintf("missing name mapping for %q", column.Name)) + if !alreadyExists(column.Name) { + column.Key, ok = columnNameMap.LoadKey(column.Name) + if !ok { + panic(fmt.Sprintf("missing name mapping for %q", column.Name)) + } + column.ClickHouseAlias = strings.ReplaceAll(column.ClickHouseAlias, "InIf", "OutIf") + column.ClickHouseTransformFrom = slices.Clone(column.ClickHouseTransformFrom) + ncolumns = append(ncolumns, column) } - column.ClickHouseAlias = strings.ReplaceAll(column.ClickHouseAlias, "InIf", "OutIf") - column.ClickHouseTransformFrom = slices.Clone(column.ClickHouseTransformFrom) - ncolumns = append(ncolumns, column) } } schema.columns = ncolumns @@ -375,5 +408,16 @@ func (schema Schema) finalize() Schema { } } + // Update disabledGroups + schema.disabledGroups = *bitset.New(uint(ColumnGroupLast)) + for group := ColumnGroup(0); group < ColumnGroupLast; group++ { + schema.disabledGroups.Set(uint(group)) + for _, column := range schema.columns { + if !column.Disabled && column.Group == group { + schema.disabledGroups.Clear(uint(group)) + } + } + } + return schema } diff --git a/common/schema/definition_gen.sh b/common/schema/definition_gen.sh index c0c237c5..b84d9cb9 100755 --- a/common/schema/definition_gen.sh +++ b/common/schema/definition_gen.sh @@ -16,6 +16,7 @@ $(grep -oEw 'Column[a-zA-Z0-9]+' common/schema/definition.go \ | sed 's/^Column//' \ | grep -vFx Last \ | grep -vFx Key \ + | grep -v '^Group' \ | sort | uniq \ | awk '{ print "Column"$1": \""$1"\","}') }) diff --git a/common/schema/definition_test.go b/common/schema/definition_test.go index f3bf50e8..8eeb949b 100644 --- a/common/schema/definition_test.go +++ b/common/schema/definition_test.go @@ -5,6 +5,8 @@ package schema import ( "testing" + + "akvorado/common/helpers" ) func TestFlowsClickHouse(t *testing.T) { @@ -39,3 +41,28 @@ func TestColumnIndex(t *testing.T) { } } } + +func TestFinalizeTwice(t *testing.T) { + c := NewMock(t) + old := c.Schema + new := c.finalize() + if diff := helpers.Diff(old, new, helpers.DiffUnexported); diff != "" { + t.Fatalf("finalize() (-old, +new):\n%s", diff) + } +} + +func TestDisabledGroup(t *testing.T) { + c := flows() + if !c.IsDisabled(ColumnGroupNAT) { + t.Error("ColumnGroupNAT is not disabled while it should") + } + if !c.IsDisabled(ColumnGroupL2) { + t.Error("ColumnGroupL2 is not disabled while it should") + } + column, _ := c.LookupColumnByKey(ColumnSrcAddrNAT) + column.Disabled = false + c = c.finalize() + if c.IsDisabled(ColumnGroupNAT) { + t.Error("ColumnGroupNAT is disabled while it should not") + } +} diff --git a/common/schema/generic.go b/common/schema/generic.go index 671c21b0..5348d99f 100644 --- a/common/schema/generic.go +++ b/common/schema/generic.go @@ -57,3 +57,8 @@ func (schema *Schema) Columns() []Column { } return columns } + +// IsDisabled tells if a column group is disabled. +func (schema *Schema) IsDisabled(group ColumnGroup) bool { + return schema.disabledGroups.Test(uint(group)) +} diff --git a/common/schema/root.go b/common/schema/root.go index 135ed4e2..eaa170e2 100644 --- a/common/schema/root.go +++ b/common/schema/root.go @@ -47,6 +47,6 @@ func New(config Configuration) (*Component, error) { } return &Component{ c: config, - Schema: schema, + Schema: schema.finalize(), }, nil } diff --git a/common/schema/tests.go b/common/schema/tests.go index c9f26313..81e6ef8a 100644 --- a/common/schema/tests.go +++ b/common/schema/tests.go @@ -121,6 +121,7 @@ func (schema *Component) EnableAllColumns() *Component { for i := range schema.columns { schema.columns[i].Disabled = false } + schema.Schema = schema.finalize() return schema } diff --git a/common/schema/types.go b/common/schema/types.go index 44e74838..b1227dda 100644 --- a/common/schema/types.go +++ b/common/schema/types.go @@ -13,8 +13,9 @@ import ( // Schema is the data schema. type Schema struct { - columns []Column // Ordered list of columns - columnIndex []*Column // Columns indexed by ColumnKey + columns []Column // Ordered list of columns + columnIndex []*Column // Columns indexed by ColumnKey + disabledGroups bitset.BitSet // Disabled column groups // For ClickHouse. This is the set of primary keys (order is important and // may not follow column order). @@ -27,6 +28,7 @@ type Column struct { Name string Disabled bool NoDisable bool + Group ColumnGroup // For ClickHouse. `NotSortingKey' is for columns generated from other // columns. It is only useful if not ClickHouseMainOnly and not Alias. `GenerateFrom' @@ -59,6 +61,9 @@ type Column struct { // ColumnKey is the name of a column type ColumnKey int +// ColumnGroup represents a group of columns +type ColumnGroup uint + // FlowMessage is the abstract representation of a flow through various subsystems. type FlowMessage struct { TimeReceived uint64 diff --git a/console/data/docs/06-internals.md b/console/data/docs/06-internals.md index 329552cb..be2781cc 100644 --- a/console/data/docs/06-internals.md +++ b/console/data/docs/06-internals.md @@ -172,6 +172,8 @@ needs to follow these steps: `Src` or `InIf`, don't add the opposite direction, this is done automatically. Use `ClickHouseMainOnly` if the column is expected to take a lot of space. Add the column to the end and set `Disabled` field to `true`. + If you add several fields, create a group and use it on decoding to keep + decoding/encoding fast for people not enabling them. 3. Make it usable in the filters by adding it to `console/filter/parser.peg`. Don't forget to add a test in `console/filter/parser_test.go`. 4. Modify `console/query/column.go` to alter the display of the column (it diff --git a/inlet/flow/decoder/netflow/decode.go b/inlet/flow/decoder/netflow/decode.go index f11a0c48..dcd30027 100644 --- a/inlet/flow/decoder/netflow/decode.go +++ b/inlet/flow/decoder/netflow/decode.go @@ -77,20 +77,6 @@ func (nd *Decoder) decodeRecord(version int, fields []netflow.DataField) *schema case netflow.NFV9_FIELD_IN_PKTS, netflow.NFV9_FIELD_OUT_PKTS: nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnPackets, decodeUNumber(v)) - // L2 - case netflow.NFV9_FIELD_SRC_VLAN: - nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcVlan, decodeUNumber(v)) - case netflow.NFV9_FIELD_DST_VLAN: - nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstVlan, decodeUNumber(v)) - case netflow.NFV9_FIELD_IN_SRC_MAC: - nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcMAC, decodeUNumber(v)) - case netflow.NFV9_FIELD_IN_DST_MAC: - nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstMAC, decodeUNumber(v)) - case netflow.NFV9_FIELD_OUT_SRC_MAC: - nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcMAC, decodeUNumber(v)) - case netflow.NFV9_FIELD_OUT_DST_MAC: - nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstMAC, decodeUNumber(v)) - // L3 case netflow.NFV9_FIELD_IPV4_SRC_ADDR: etype = helpers.ETypeIPv4 @@ -131,19 +117,42 @@ func (nd *Decoder) decodeRecord(version int, fields []netflow.DataField) *schema case netflow.NFV9_FIELD_OUTPUT_SNMP: bf.OutIf = uint32(decodeUNumber(v)) - // NAT - case netflow.IPFIX_FIELD_postNATSourceIPv4Address: - nd.d.Schema.ProtobufAppendIP(bf, schema.ColumnSrcAddrNAT, decodeIP(v)) - case netflow.IPFIX_FIELD_postNATDestinationIPv4Address: - nd.d.Schema.ProtobufAppendIP(bf, schema.ColumnDstAddrNAT, decodeIP(v)) - case netflow.IPFIX_FIELD_postNAPTSourceTransportPort: - nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcPortNAT, decodeUNumber(v)) - case netflow.IPFIX_FIELD_postNAPTDestinationTransportPort: - nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstPortNAT, decodeUNumber(v)) - // Remaining case netflow.NFV9_FIELD_FORWARDING_STATUS: nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnForwardingStatus, decodeUNumber(v)) + default: + + if !nd.d.Schema.IsDisabled(schema.ColumnGroupNAT) { + // NAT + switch field.Type { + case netflow.IPFIX_FIELD_postNATSourceIPv4Address: + nd.d.Schema.ProtobufAppendIP(bf, schema.ColumnSrcAddrNAT, decodeIP(v)) + case netflow.IPFIX_FIELD_postNATDestinationIPv4Address: + nd.d.Schema.ProtobufAppendIP(bf, schema.ColumnDstAddrNAT, decodeIP(v)) + case netflow.IPFIX_FIELD_postNAPTSourceTransportPort: + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcPortNAT, decodeUNumber(v)) + case netflow.IPFIX_FIELD_postNAPTDestinationTransportPort: + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstPortNAT, decodeUNumber(v)) + } + } + + if !nd.d.Schema.IsDisabled(schema.ColumnGroupL2) { + // L2 + switch field.Type { + case netflow.NFV9_FIELD_SRC_VLAN: + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcVlan, decodeUNumber(v)) + case netflow.NFV9_FIELD_DST_VLAN: + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstVlan, decodeUNumber(v)) + case netflow.NFV9_FIELD_IN_SRC_MAC: + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcMAC, decodeUNumber(v)) + case netflow.NFV9_FIELD_IN_DST_MAC: + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstMAC, decodeUNumber(v)) + case netflow.NFV9_FIELD_OUT_SRC_MAC: + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcMAC, decodeUNumber(v)) + case netflow.NFV9_FIELD_OUT_DST_MAC: + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstMAC, decodeUNumber(v)) + } + } } } nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnEType, uint64(etype)) diff --git a/inlet/flow/decoder/sflow/decode.go b/inlet/flow/decoder/sflow/decode.go index f2a8e135..2f40aaed 100644 --- a/inlet/flow/decoder/sflow/decode.go +++ b/inlet/flow/decoder/sflow/decode.go @@ -79,11 +79,13 @@ func (nd *Decoder) decode(msgDec interface{}) []*schema.FlowMessage { nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstPort, uint64(recordData.Base.DstPort)) nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnEType, helpers.ETypeIPv6) case sflow.ExtendedSwitch: - if recordData.SrcVlan < 4096 { - nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcVlan, uint64(recordData.SrcVlan)) - } - if recordData.DstVlan < 4096 { - nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstVlan, uint64(recordData.DstVlan)) + if !nd.d.Schema.IsDisabled(schema.ColumnGroupL2) { + if recordData.SrcVlan < 4096 { + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcVlan, uint64(recordData.SrcVlan)) + } + if recordData.DstVlan < 4096 { + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstVlan, uint64(recordData.DstVlan)) + } } case sflow.ExtendedRouter: nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcNetMask, uint64(recordData.SrcMaskLen)) @@ -120,10 +122,12 @@ func (nd *Decoder) parseEthernetHeader(bf *schema.FlowMessage, data []byte) { if len(data) < 14 { return } - nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstMAC, - binary.BigEndian.Uint64([]byte{0, 0, data[0], data[1], data[2], data[3], data[4], data[5]})) - nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcMAC, - binary.BigEndian.Uint64([]byte{0, 0, data[6], data[7], data[8], data[9], data[10], data[11]})) + if !nd.d.Schema.IsDisabled(schema.ColumnGroupL2) { + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstMAC, + binary.BigEndian.Uint64([]byte{0, 0, data[0], data[1], data[2], data[3], data[4], data[5]})) + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcMAC, + binary.BigEndian.Uint64([]byte{0, 0, data[6], data[7], data[8], data[9], data[10], data[11]})) + } etherType := data[12:14] data = data[14:] if etherType[0] == 0x81 && etherType[1] == 0x00 { @@ -131,8 +135,10 @@ func (nd *Decoder) parseEthernetHeader(bf *schema.FlowMessage, data []byte) { if len(data) < 4 { return } - vlan := (uint64(data[0]&0xf) << 8) + uint64(data[1]) - nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcVlan, uint64(vlan)) + if !nd.d.Schema.IsDisabled(schema.ColumnGroupL2) { + vlan := (uint64(data[0]&0xf) << 8) + uint64(data[1]) + nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcVlan, uint64(vlan)) + } etherType = data[2:4] data = data[4:] }