common/schema: use a symbol to identify columns

This commit is contained in:
Vincent Bernat
2023-01-14 00:12:18 +01:00
parent e88d2a2974
commit 727807b937
15 changed files with 531 additions and 269 deletions

View File

@@ -62,10 +62,11 @@ func (schema Schema) ClickHouseSelectColumns(options ...ClickHouseTableOption) [
return cols
}
func (schema Schema) clickhouseIterate(fn func(column Column), options ...ClickHouseTableOption) {
func (schema Schema) clickhouseIterate(fn func(Column), options ...ClickHouseTableOption) {
for pair := schema.Columns.Front(); pair != nil; pair = pair.Next() {
key := pair.Key
column := pair.Value
if slices.Contains(options, ClickHouseSkipTimeReceived) && column.Name == "TimeReceived" {
if slices.Contains(options, ClickHouseSkipTimeReceived) && key == ColumnTimeReceived {
continue
}
if slices.Contains(options, ClickHouseSkipMainOnlyColumns) && column.MainOnly {
@@ -101,7 +102,10 @@ func (schema Schema) clickhouseIterate(fn func(column Column), options ...ClickH
// ClickHouseSortingKeys returns the list of sorting keys, prefixed by the primary keys.
func (schema Schema) ClickHouseSortingKeys() []string {
cols := append([]string{}, schema.ClickHousePrimaryKeys...)
cols := []string{}
for _, key := range schema.ClickHousePrimaryKeys {
cols = append(cols, key.String())
}
for pair := schema.Columns.Front(); pair != nil; pair = pair.Next() {
column := pair.Value
if column.ClickHouseNotSortingKey || column.MainOnly {

346
common/schema/definition.go Normal file
View File

@@ -0,0 +1,346 @@
// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package schema
import (
"fmt"
"strings"
"akvorado/common/helpers/bimap"
orderedmap "github.com/elliotchance/orderedmap/v2"
)
// revive:disable
const (
ColumnTimeReceived ColumnKey = iota + 1
ColumnSamplingRate
ColumnExporterAddress
ColumnExporterName
ColumnExporterGroup
ColumnExporterRole
ColumnExporterSite
ColumnExporterRegion
ColumnExporterTenant
ColumnSrcAddr
ColumnDstAddr
ColumnSrcNetMask
ColumnDstNetMask
ColumnSrcNetPrefix
ColumnDstNetPrefix
ColumnSrcAS
ColumnDstAS
ColumnSrcNetName
ColumnDstNetName
ColumnSrcNetRole
ColumnDstNetRole
ColumnSrcNetSite
ColumnDstNetSite
ColumnSrcNetRegion
ColumnDstNetRegion
ColumnSrcNetTenant
ColumnDstNetTenant
ColumnSrcCountry
ColumnDstCountry
ColumnDstASPath
ColumnDst1stAS
ColumnDst2ndAS
ColumnDst3rdAS
ColumnDstCommunities
ColumnDstLargeCommunities
ColumnDstLargeCommunitiesASN
ColumnDstLargeCommunitiesLocalData1
ColumnDstLargeCommunitiesLocalData2
ColumnInIfName
ColumnOutIfName
ColumnInIfDescription
ColumnOutIfDescription
ColumnInIfSpeed
ColumnOutIfSpeed
ColumnInIfProvider
ColumnOutIfProvider
ColumnInIfConnectivity
ColumnOutIfConnectivity
ColumnInIfBoundary
ColumnOutIfBoundary
ColumnEType
ColumnProto
ColumnSrcPort
ColumnDstPort
ColumnBytes
ColumnPackets
ColumnPacketSize
ColumnPacketSizeBucket
ColumnForwardingStatus
)
// revive:enable
var columnNameMap = bimap.New(map[ColumnKey]string{
ColumnTimeReceived: "TimeReceived",
ColumnSamplingRate: "SamplingRate",
ColumnExporterAddress: "ExporterAddress",
ColumnExporterName: "ExporterName",
ColumnExporterGroup: "ExporterGroup",
ColumnExporterRole: "ExporterRole",
ColumnExporterSite: "ExporterSite",
ColumnExporterRegion: "ExporterRegion",
ColumnExporterTenant: "ExporterTenant",
ColumnSrcAddr: "SrcAddr",
ColumnDstAddr: "DstAddr",
ColumnSrcNetMask: "SrcNetMask",
ColumnDstNetMask: "DstNetMask",
ColumnSrcNetPrefix: "SrcNetPrefix",
ColumnDstNetPrefix: "DstNetPrefix",
ColumnSrcAS: "SrcAS",
ColumnDstAS: "DstAS",
ColumnSrcNetName: "SrcNetName",
ColumnDstNetName: "DstNetName",
ColumnSrcNetRole: "SrcNetRole",
ColumnDstNetRole: "DstNetRole",
ColumnSrcNetSite: "SrcNetSite",
ColumnDstNetSite: "DstNetSite",
ColumnSrcNetRegion: "SrcNetRegion",
ColumnDstNetRegion: "DstNetRegion",
ColumnSrcNetTenant: "SrcNetTenant",
ColumnDstNetTenant: "DstNetTenant",
ColumnSrcCountry: "SrcCountry",
ColumnDstCountry: "DstCountry",
ColumnDstASPath: "DstASPath",
ColumnDst1stAS: "Dst1stAS",
ColumnDst2ndAS: "Dst2ndAS",
ColumnDst3rdAS: "Dst3rdAS",
ColumnDstCommunities: "DstCommunities",
ColumnDstLargeCommunities: "DstLargeCommunities",
ColumnDstLargeCommunitiesASN: "DstLargeCommunities.ASN",
ColumnDstLargeCommunitiesLocalData1: "DstLargeCommunities.LocalData1",
ColumnDstLargeCommunitiesLocalData2: "DstLargeCommunities.LocalData2",
ColumnInIfName: "InIfName",
ColumnOutIfName: "OutIfName",
ColumnInIfDescription: "InIfDescription",
ColumnOutIfDescription: "OutIfDescription",
ColumnInIfSpeed: "InIfSpeed",
ColumnOutIfSpeed: "OutIfSpeed",
ColumnInIfProvider: "InIfProvider",
ColumnOutIfProvider: "OutIfProvider",
ColumnInIfConnectivity: "InIfConnectivity",
ColumnOutIfConnectivity: "OutIfConnectivity",
ColumnInIfBoundary: "InIfBoundary",
ColumnOutIfBoundary: "OutIfBoundary",
ColumnEType: "EType",
ColumnProto: "Proto",
ColumnSrcPort: "SrcPort",
ColumnDstPort: "DstPort",
ColumnBytes: "Bytes",
ColumnPackets: "Packets",
ColumnPacketSize: "PacketSize",
ColumnPacketSizeBucket: "PacketSizeBucket",
ColumnForwardingStatus: "ForwardingStatus",
})
func (c ColumnKey) String() string {
name, _ := columnNameMap.LoadValue(c)
return name
}
// Flows is the data schema for flows tables. Any column starting with Src/InIf
// will be duplicated as Dst/OutIf during init. That's not the case for columns
// in `PrimaryKeys'.
var Flows = Schema{
ClickHousePrimaryKeys: []ColumnKey{
ColumnTimeReceived,
ColumnExporterAddress,
ColumnEType,
ColumnProto,
ColumnInIfName,
ColumnSrcAS,
ColumnForwardingStatus,
ColumnOutIfName,
ColumnDstAS,
ColumnSamplingRate,
},
Columns: buildMapFromColumns([]Column{
{
Key: ColumnTimeReceived,
ClickHouseType: "DateTime",
ClickHouseCodec: "DoubleDelta, LZ4",
ConsoleNotDimension: true,
},
{Key: ColumnSamplingRate, ClickHouseType: "UInt64", ConsoleNotDimension: true},
{Key: ColumnExporterAddress, ClickHouseType: "LowCardinality(IPv6)"},
{Key: ColumnExporterName, ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Key: ColumnExporterGroup, ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Key: ColumnExporterRole, ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Key: ColumnExporterSite, ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Key: ColumnExporterRegion, ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Key: ColumnExporterTenant, ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{
Key: ColumnSrcAddr,
MainOnly: true,
ClickHouseType: "IPv6",
}, {
Key: ColumnSrcNetMask,
MainOnly: true,
ClickHouseType: "UInt8",
ConsoleNotDimension: true,
}, {
Key: ColumnSrcNetPrefix,
MainOnly: true,
ClickHouseType: "String",
ClickHouseAlias: `CASE
WHEN EType = 0x800 THEN concat(replaceRegexpOne(IPv6CIDRToRange(SrcAddr, (96 + SrcNetMask)::UInt8).1::String, '^::ffff:', ''), '/', SrcNetMask::String)
WHEN EType = 0x86dd THEN concat(IPv6CIDRToRange(SrcAddr, SrcNetMask).1::String, '/', SrcNetMask::String)
ELSE ''
END`,
},
{Key: ColumnSrcAS, ClickHouseType: "UInt32"},
{
Key: ColumnSrcNetName,
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'name', SrcAddr, '')",
}, {
Key: ColumnSrcNetRole,
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'role', SrcAddr, '')",
}, {
Key: ColumnSrcNetSite,
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'site', SrcAddr, '')",
}, {
Key: ColumnSrcNetRegion,
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'region', SrcAddr, '')",
}, {
Key: ColumnSrcNetTenant,
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'tenant', SrcAddr, '')",
},
{Key: ColumnSrcCountry, ClickHouseType: "FixedString(2)"},
{
Key: ColumnDstASPath,
MainOnly: true,
ClickHouseType: "Array(UInt32)",
}, {
Key: ColumnDst1stAS,
ClickHouseType: "UInt32",
ClickHouseGenerateFrom: "c_DstASPath[1]",
}, {
Key: ColumnDst2ndAS,
ClickHouseType: "UInt32",
ClickHouseGenerateFrom: "c_DstASPath[2]",
}, {
Key: ColumnDst3rdAS,
ClickHouseType: "UInt32",
ClickHouseGenerateFrom: "c_DstASPath[3]",
}, {
Key: ColumnDstCommunities,
MainOnly: true,
ClickHouseType: "Array(UInt32)",
}, {
Key: ColumnDstLargeCommunities,
MainOnly: true,
ClickHouseType: "Array(UInt128)",
ClickHouseTransformFrom: []Column{
{Key: ColumnDstLargeCommunitiesASN, ClickHouseType: "Array(UInt32)"},
{Key: ColumnDstLargeCommunitiesLocalData1, ClickHouseType: "Array(UInt32)"},
{Key: ColumnDstLargeCommunitiesLocalData2, ClickHouseType: "Array(UInt32)"},
},
ClickHouseTransformTo: "arrayMap((asn, l1, l2) -> ((bitShiftLeft(CAST(asn, 'UInt128'), 64) + bitShiftLeft(CAST(l1, 'UInt128'), 32)) + CAST(l2, 'UInt128')), `DstLargeCommunities.ASN`, `DstLargeCommunities.LocalData1`, `DstLargeCommunities.LocalData2`)",
ConsoleNotDimension: true,
},
{Key: ColumnInIfName, ClickHouseType: "LowCardinality(String)"},
{Key: ColumnInIfDescription, ClickHouseType: "String", ClickHouseNotSortingKey: true},
{Key: ColumnInIfSpeed, ClickHouseType: "UInt32", ClickHouseNotSortingKey: true},
{Key: ColumnInIfConnectivity, ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Key: ColumnInIfProvider, ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Key: ColumnInIfBoundary, ClickHouseType: "Enum8('undefined' = 0, 'external' = 1, 'internal' = 2)", ClickHouseNotSortingKey: true},
{Key: ColumnEType, ClickHouseType: "UInt32"},
{Key: ColumnProto, ClickHouseType: "UInt32"},
{Key: ColumnSrcPort, ClickHouseType: "UInt32", MainOnly: true},
{Key: ColumnBytes, ClickHouseType: "UInt64", ClickHouseNotSortingKey: true, ConsoleNotDimension: true},
{Key: ColumnPackets, ClickHouseType: "UInt64", ClickHouseNotSortingKey: true, ConsoleNotDimension: true},
{
Key: ColumnPacketSize,
ClickHouseType: "UInt64",
ClickHouseAlias: "intDiv(Bytes, Packets)",
ConsoleNotDimension: true,
}, {
Key: ColumnPacketSizeBucket,
ClickHouseType: "LowCardinality(String)",
ClickHouseAlias: func() string {
boundaries := []int{64, 128, 256, 512, 768, 1024, 1280, 1501,
2048, 3072, 4096, 8192, 10240, 16384, 32768, 65536}
conditions := []string{}
last := 0
for _, boundary := range boundaries {
conditions = append(conditions, fmt.Sprintf("PacketSize < %d, '%d-%d'",
boundary, last, boundary-1))
last = boundary
}
conditions = append(conditions, fmt.Sprintf("'%d-Inf'", last))
return fmt.Sprintf("multiIf(%s)", strings.Join(conditions, ", "))
}(),
},
{Key: ColumnForwardingStatus, ClickHouseType: "UInt32"},
}),
}
func buildMapFromColumns(columns []Column) *orderedmap.OrderedMap[ColumnKey, Column] {
omap := orderedmap.NewOrderedMap[ColumnKey, Column]()
for _, column := range columns {
// Add true name
name, ok := columnNameMap.LoadValue(column.Key)
if !ok {
panic(fmt.Sprintf("missing name mapping for %d", column.Key))
}
column.Name = name
// Also true name for columns in ClickHouseTransformFrom
for idx, ecolumn := range column.ClickHouseTransformFrom {
name, ok := columnNameMap.LoadValue(ecolumn.Key)
if !ok {
panic(fmt.Sprintf("missing name mapping for %d", ecolumn.Key))
}
column.ClickHouseTransformFrom[idx].Name = name
}
// Add non-main columns with an alias to NotSortingKey
if !column.MainOnly && column.ClickHouseAlias != "" {
column.ClickHouseNotSortingKey = true
}
omap.Set(column.Key, column)
// Expand the schema Src → Dst and InIf → OutIf
if strings.HasPrefix(name, "Src") {
column.Name = fmt.Sprintf("Dst%s", name[3:])
column.Key, ok = columnNameMap.LoadKey(column.Name)
if !ok {
panic(fmt.Sprintf("missing name mapping for %q", column.Name))
}
column.ClickHouseAlias = strings.ReplaceAll(column.ClickHouseAlias, "Src", "Dst")
omap.Set(column.Key, column)
} else if strings.HasPrefix(name, "InIf") {
column.Name = fmt.Sprintf("OutIf%s", name[4:])
column.Key, ok = columnNameMap.LoadKey(column.Name)
if !ok {
panic(fmt.Sprintf("missing name mapping for %q", column.Name))
}
column.ClickHouseAlias = strings.ReplaceAll(column.ClickHouseAlias, "InIf", "OutIf")
omap.Set(column.Key, column)
}
}
return omap
}
func init() {
for _, key := range Flows.ClickHousePrimaryKeys {
if column, ok := Flows.Columns.Get(key); !ok {
panic(fmt.Sprintf("primary key %q not a column", key))
} else {
if column.ClickHouseNotSortingKey {
panic(fmt.Sprintf("primary key %q is marked as a non-sorting key", key))
}
}
}
}

View File

@@ -1,187 +0,0 @@
// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package schema
import (
"fmt"
"strings"
orderedmap "github.com/elliotchance/orderedmap/v2"
)
// Flows is the data schema for flows tables. Any column starting with Src/InIf
// will be duplicated as Dst/OutIf during init. That's not the case for columns
// in `PrimaryKeys'.
var Flows = Schema{
ClickHousePrimaryKeys: []string{
"TimeReceived",
"ExporterAddress",
"EType",
"Proto",
"InIfName",
"SrcAS",
"ForwardingStatus",
"OutIfName",
"DstAS",
"SamplingRate",
},
Columns: buildMapFromColumns([]Column{
{
Name: "TimeReceived",
ClickHouseType: "DateTime",
ClickHouseCodec: "DoubleDelta, LZ4",
ConsoleNotDimension: true,
},
{Name: "SamplingRate", ClickHouseType: "UInt64", ConsoleNotDimension: true},
{Name: "ExporterAddress", ClickHouseType: "LowCardinality(IPv6)"},
{Name: "ExporterName", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Name: "ExporterGroup", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Name: "ExporterRole", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Name: "ExporterSite", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Name: "ExporterRegion", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Name: "ExporterTenant", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{
Name: "SrcAddr",
MainOnly: true,
ClickHouseType: "IPv6",
}, {
Name: "SrcNetMask",
MainOnly: true,
ClickHouseType: "UInt8",
ConsoleNotDimension: true,
}, {
Name: "SrcNetPrefix",
MainOnly: true,
ClickHouseType: "String",
ClickHouseAlias: `CASE
WHEN EType = 0x800 THEN concat(replaceRegexpOne(IPv6CIDRToRange(SrcAddr, (96 + SrcNetMask)::UInt8).1::String, '^::ffff:', ''), '/', SrcNetMask::String)
WHEN EType = 0x86dd THEN concat(IPv6CIDRToRange(SrcAddr, SrcNetMask).1::String, '/', SrcNetMask::String)
ELSE ''
END`,
},
{Name: "SrcAS", ClickHouseType: "UInt32"},
{
Name: "SrcNetName",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'name', SrcAddr, '')",
}, {
Name: "SrcNetRole",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'role', SrcAddr, '')",
}, {
Name: "SrcNetSite",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'site', SrcAddr, '')",
}, {
Name: "SrcNetRegion",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'region', SrcAddr, '')",
}, {
Name: "SrcNetTenant",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'tenant', SrcAddr, '')",
},
{Name: "SrcCountry", ClickHouseType: "FixedString(2)"},
{
Name: "DstASPath",
MainOnly: true,
ClickHouseType: "Array(UInt32)",
}, {
Name: "Dst1stAS",
ClickHouseType: "UInt32",
ClickHouseGenerateFrom: "c_DstASPath[1]",
}, {
Name: "Dst2ndAS",
ClickHouseType: "UInt32",
ClickHouseGenerateFrom: "c_DstASPath[2]",
}, {
Name: "Dst3rdAS",
ClickHouseType: "UInt32",
ClickHouseGenerateFrom: "c_DstASPath[3]",
}, {
Name: "DstCommunities",
MainOnly: true,
ClickHouseType: "Array(UInt32)",
}, {
Name: "DstLargeCommunities",
MainOnly: true,
ClickHouseType: "Array(UInt128)",
ClickHouseTransformFrom: []Column{
{Name: "DstLargeCommunities.ASN", ClickHouseType: "Array(UInt32)"},
{Name: "DstLargeCommunities.LocalData1", ClickHouseType: "Array(UInt32)"},
{Name: "DstLargeCommunities.LocalData2", ClickHouseType: "Array(UInt32)"},
},
ClickHouseTransformTo: "arrayMap((asn, l1, l2) -> ((bitShiftLeft(CAST(asn, 'UInt128'), 64) + bitShiftLeft(CAST(l1, 'UInt128'), 32)) + CAST(l2, 'UInt128')), `DstLargeCommunities.ASN`, `DstLargeCommunities.LocalData1`, `DstLargeCommunities.LocalData2`)",
ConsoleNotDimension: true,
},
{Name: "InIfName", ClickHouseType: "LowCardinality(String)"},
{Name: "InIfDescription", ClickHouseType: "String", ClickHouseNotSortingKey: true},
{Name: "InIfSpeed", ClickHouseType: "UInt32", ClickHouseNotSortingKey: true},
{Name: "InIfConnectivity", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Name: "InIfProvider", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Name: "InIfBoundary", ClickHouseType: "Enum8('undefined' = 0, 'external' = 1, 'internal' = 2)", ClickHouseNotSortingKey: true},
{Name: "EType", ClickHouseType: "UInt32"},
{Name: "Proto", ClickHouseType: "UInt32"},
{Name: "SrcPort", ClickHouseType: "UInt32", MainOnly: true},
{Name: "Bytes", ClickHouseType: "UInt64", ClickHouseNotSortingKey: true, ConsoleNotDimension: true},
{Name: "Packets", ClickHouseType: "UInt64", ClickHouseNotSortingKey: true, ConsoleNotDimension: true},
{
Name: "PacketSize",
ClickHouseType: "UInt64",
ClickHouseAlias: "intDiv(Bytes, Packets)",
ConsoleNotDimension: true,
}, {
Name: "PacketSizeBucket",
ClickHouseType: "LowCardinality(String)",
ClickHouseAlias: func() string {
boundaries := []int{64, 128, 256, 512, 768, 1024, 1280, 1501,
2048, 3072, 4096, 8192, 10240, 16384, 32768, 65536}
conditions := []string{}
last := 0
for _, boundary := range boundaries {
conditions = append(conditions, fmt.Sprintf("PacketSize < %d, '%d-%d'",
boundary, last, boundary-1))
last = boundary
}
conditions = append(conditions, fmt.Sprintf("'%d-Inf'", last))
return fmt.Sprintf("multiIf(%s)", strings.Join(conditions, ", "))
}(),
},
{Name: "ForwardingStatus", ClickHouseType: "UInt32"},
}),
}
func buildMapFromColumns(columns []Column) *orderedmap.OrderedMap[string, Column] {
omap := orderedmap.NewOrderedMap[string, Column]()
for _, column := range columns {
// Add non-main columns with an alias to NotSortingKey
if !column.MainOnly && column.ClickHouseAlias != "" {
column.ClickHouseNotSortingKey = true
}
omap.Set(column.Name, column)
// Expand the schema Src → Dst and InIf → OutIf
if strings.HasPrefix(column.Name, "Src") {
column.Name = fmt.Sprintf("Dst%s", column.Name[3:])
column.ClickHouseAlias = strings.ReplaceAll(column.ClickHouseAlias, "Src", "Dst")
omap.Set(column.Name, column)
} else if strings.HasPrefix(column.Name, "InIf") {
column.Name = fmt.Sprintf("OutIf%s", column.Name[4:])
column.ClickHouseAlias = strings.ReplaceAll(column.ClickHouseAlias, "InIf", "OutIf")
omap.Set(column.Name, column)
}
}
return omap
}
func init() {
for _, key := range Flows.ClickHousePrimaryKeys {
if column, ok := Flows.Columns.Get(key); !ok {
panic(fmt.Sprintf("primary key %q not a column", key))
} else {
if column.ClickHouseNotSortingKey {
panic(fmt.Sprintf("primary key %q is marked as a non-sorting key", key))
}
}
}
}

39
common/schema/generic.go Normal file
View File

@@ -0,0 +1,39 @@
// SPDX-FileCopyrightText: 2023 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package schema
import "strings"
// LookupColumnByName can lookup a column by its name.
func (schema Schema) LookupColumnByName(name string) (Column, bool) {
key, ok := columnNameMap.LoadKey(name)
if !ok {
return Column{}, false
}
return schema.Columns.Get(key)
}
// ReverseColumnDirection reverts the direction of a provided column name.
func (schema Schema) ReverseColumnDirection(key ColumnKey) ColumnKey {
var candidateName string
name := key.String()
if strings.HasPrefix(name, "Src") {
candidateName = "Dst" + name[3:]
}
if strings.HasPrefix(name, "Dst") {
candidateName = "Src" + name[3:]
}
if strings.HasPrefix(name, "In") {
candidateName = "Out" + name[2:]
}
if strings.HasPrefix(name, "Out") {
candidateName = "In" + name[3:]
}
if candidateKey, ok := columnNameMap.LoadKey(candidateName); ok {
if _, ok := schema.Columns.Get(candidateKey); ok {
return candidateKey
}
}
return key
}

View File

@@ -0,0 +1,46 @@
// SPDX-FileCopyrightText: 2023 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package schema
import "testing"
func TestLookupColumnByName(t *testing.T) {
cases := []string{
"TimeReceived",
"InIfProvider",
"OutIfProvider",
"SrcAS",
"ForwardingStatus",
}
for _, name := range cases {
column, ok := Flows.LookupColumnByName(name)
if !ok {
t.Fatalf("LookupByName(%q) not found", name)
}
if column.Name != name {
t.Fatalf("LookupByName(%q) == %q but should be %q", name, column.Name, name)
}
}
}
func TestReverseColumnDirection(t *testing.T) {
cases := []struct {
Input ColumnKey
Output ColumnKey
}{
{ColumnSrcAS, ColumnDstAS},
{ColumnDstAS, ColumnSrcAS},
{ColumnInIfProvider, ColumnOutIfProvider},
{ColumnOutIfDescription, ColumnInIfDescription},
{ColumnDstASPath, ColumnDstASPath},
{ColumnExporterName, ColumnExporterName},
}
for _, tc := range cases {
got := Flows.ReverseColumnDirection(tc.Input)
if got != tc.Output {
t.Errorf("ReverseColumnDirection(%q) == %q but expected %q",
tc.Input.String(), got.String(), tc.Output.String())
}
}
}

View File

@@ -11,15 +11,16 @@ import orderedmap "github.com/elliotchance/orderedmap/v2"
// Schema is the data schema.
type Schema struct {
// We use an ordered map for direct access to columns.
Columns *orderedmap.OrderedMap[string, Column]
Columns *orderedmap.OrderedMap[ColumnKey, Column]
// For ClickHouse. This is the set of primary keys (order is important and
// may not follow column order).
ClickHousePrimaryKeys []string
ClickHousePrimaryKeys []ColumnKey
}
// Column represents a column of data.
type Column struct {
Key ColumnKey
Name string
MainOnly bool
@@ -40,3 +41,6 @@ type Column struct {
// For the console.
ConsoleNotDimension bool
}
// ColumnKey is the name of a column
type ColumnKey int

View File

@@ -52,7 +52,7 @@ func DefaultConfiguration() Configuration {
Start: "6 hours ago",
End: "now",
Filter: "InIfBoundary = external",
Dimensions: []queryColumn{"SrcAS"},
Dimensions: []queryColumn{queryColumn(schema.ColumnSrcAS)},
Limit: 10,
},
HomepageTopWidgets: []string{"src-as", "src-port", "protocol", "src-country", "etype"},

View File

@@ -35,7 +35,7 @@ func ReverseColumnDirection(name string) string {
if strings.HasPrefix(name, "Out") {
candidate = "In" + name[3:]
}
if column, ok := schema.Flows.Columns.Get(candidate); ok {
if column, ok := schema.Flows.LookupColumnByName(candidate); ok {
return column.Name
}
return name
@@ -46,11 +46,12 @@ func ReverseColumnDirection(name string) string {
func (c *current) acceptColumn() (string, error) {
name := string(c.text)
for _, columnName := range schema.Flows.Columns.Keys() {
if strings.EqualFold(name, columnName) {
columnNameStr := columnName.String()
if strings.EqualFold(name, columnNameStr) {
if c.globalStore["meta"].(*Meta).ReverseDirection {
return ReverseColumnDirection(columnName), nil
return ReverseColumnDirection(columnNameStr), nil
}
return columnName, nil
return columnNameStr, nil
}
}
return "", fmt.Errorf("unknown column %q", name)
@@ -60,7 +61,7 @@ func (c *current) acceptColumn() (string, error) {
// in state change blocks. Unfortunately, it cannot extract matched text, so it
// should be provided.
func (c *current) metaColumn(name string) error {
if column, ok := schema.Flows.Columns.Get(name); ok {
if column, ok := schema.Flows.LookupColumnByName(name); ok {
if column.MainOnly {
c.state["main-table-only"] = true
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/gin-gonic/gin"
"akvorado/common/helpers"
"akvorado/common/schema"
)
// graphHandlerInput describes the input for the /graph endpoint.
@@ -50,7 +51,7 @@ func (input graphHandlerInput) reverseDirection() graphHandlerInput {
dimensions := input.Dimensions
input.Dimensions = make([]queryColumn, len(dimensions))
for i := range dimensions {
input.Dimensions[i] = dimensions[i].reverseDirection()
input.Dimensions[i] = queryColumn(schema.Flows.ReverseColumnDirection(schema.ColumnKey(dimensions[i])))
}
return input
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/golang/mock/gomock"
"akvorado/common/helpers"
"akvorado/common/schema"
)
func TestGraphInputReverseDirection(t *testing.T) {
@@ -21,8 +22,8 @@ func TestGraphInputReverseDirection(t *testing.T) {
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Dimensions: []queryColumn{
"ExporterName",
"InIfProvider",
queryColumn(schema.ColumnExporterName),
queryColumn(schema.ColumnInIfProvider),
},
Filter: queryFilter{
Filter: "DstCountry = 'FR' AND SrcCountry = 'US'",
@@ -36,8 +37,8 @@ func TestGraphInputReverseDirection(t *testing.T) {
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Dimensions: []queryColumn{
"ExporterName",
"OutIfProvider",
queryColumn(schema.ColumnExporterName),
queryColumn(schema.ColumnOutIfProvider),
},
Filter: queryFilter{
Filter: "SrcCountry = 'FR' AND DstCountry = 'US'",
@@ -120,8 +121,8 @@ func TestGraphPreviousPeriod(t *testing.T) {
Start: start,
End: end,
Dimensions: []queryColumn{
"ExporterAddress",
"ExporterName",
queryColumn(schema.ColumnExporterAddress),
queryColumn(schema.ColumnExporterName),
},
}
got := input.previousPeriod()
@@ -328,8 +329,8 @@ ORDER BY time WITH FILL
Points: 100,
Limit: 20,
Dimensions: []queryColumn{
"ExporterName",
"InIfProvider",
queryColumn(schema.ColumnExporterName),
queryColumn(schema.ColumnInIfProvider),
},
Filter: queryFilter{},
Units: "l3bps",
@@ -360,8 +361,8 @@ ORDER BY time WITH FILL
Points: 100,
Limit: 20,
Dimensions: []queryColumn{
"ExporterName",
"InIfProvider",
queryColumn(schema.ColumnExporterName),
queryColumn(schema.ColumnInIfProvider),
},
Filter: queryFilter{},
Units: "l3bps",
@@ -409,8 +410,8 @@ ORDER BY time WITH FILL
Points: 100,
Limit: 20,
Dimensions: []queryColumn{
"ExporterName",
"InIfProvider",
queryColumn(schema.ColumnExporterName),
queryColumn(schema.ColumnInIfProvider),
},
Filter: queryFilter{},
Units: "l3bps",

View File

@@ -13,18 +13,18 @@ import (
"akvorado/console/filter"
)
type queryColumn string
type queryColumn schema.ColumnKey
func (qc queryColumn) MarshalText() ([]byte, error) {
return []byte(qc), nil
return []byte(schema.ColumnKey(qc).String()), nil
}
func (qc queryColumn) String() string {
return string(qc)
return schema.ColumnKey(qc).String()
}
func (qc *queryColumn) UnmarshalText(input []byte) error {
name := string(input)
if column, ok := schema.Flows.Columns.Get(name); ok && !column.ConsoleNotDimension {
*qc = queryColumn(name)
if column, ok := schema.Flows.LookupColumnByName(name); ok && !column.ConsoleNotDimension {
*qc = queryColumn(column.Key)
return nil
}
return errors.New("unknown field")
@@ -35,7 +35,7 @@ func requireMainTable(qcs []queryColumn, qf queryFilter) bool {
return true
}
for _, qc := range qcs {
if column, ok := schema.Flows.Columns.Get(string(qc)); ok && column.MainOnly {
if column, ok := schema.Flows.Columns.Get(schema.ColumnKey(qc)); ok && column.MainOnly {
return true
}
}
@@ -80,22 +80,22 @@ func (qf *queryFilter) UnmarshalText(input []byte) error {
// toSQLSelect transforms a column into an expression to use in SELECT
func (qc queryColumn) toSQLSelect() string {
var strValue string
switch qc {
case "ExporterAddress", "SrcAddr", "DstAddr":
switch schema.ColumnKey(qc) {
case schema.ColumnExporterAddress, schema.ColumnSrcAddr, schema.ColumnDstAddr:
strValue = fmt.Sprintf("replaceRegexpOne(IPv6NumToString(%s), '^::ffff:', '')", qc)
case "SrcAS", "DstAS", "Dst1stAS", "Dst2ndAS", "Dst3rdAS":
case schema.ColumnSrcAS, schema.ColumnDstAS, schema.ColumnDst1stAS, schema.ColumnDst2ndAS, schema.ColumnDst3rdAS:
strValue = fmt.Sprintf(`concat(toString(%s), ': ', dictGetOrDefault('asns', 'name', %s, '???'))`,
qc, qc)
case "EType":
case schema.ColumnEType:
strValue = fmt.Sprintf(`if(EType = %d, 'IPv4', if(EType = %d, 'IPv6', '???'))`,
helpers.ETypeIPv4, helpers.ETypeIPv6)
case "Proto":
case schema.ColumnProto:
strValue = `dictGetOrDefault('protocols', 'name', Proto, '???')`
case "InIfSpeed", "OutIfSpeed", "SrcPort", "DstPort", "ForwardingStatus", "InIfBoundary", "OutIfBoundary":
case schema.ColumnInIfSpeed, schema.ColumnOutIfSpeed, schema.ColumnSrcPort, schema.ColumnDstPort, schema.ColumnForwardingStatus, schema.ColumnInIfBoundary, schema.ColumnOutIfBoundary:
strValue = fmt.Sprintf("toString(%s)", qc)
case "DstASPath":
case schema.ColumnDstASPath:
strValue = `arrayStringConcat(DstASPath, ' ')`
case "DstCommunities":
case schema.ColumnDstCommunities:
strValue = `arrayStringConcat(arrayConcat(arrayMap(c -> concat(toString(bitShiftRight(c, 16)), ':', toString(bitAnd(c, 0xffff))), DstCommunities), arrayMap(c -> concat(toString(bitAnd(bitShiftRight(c, 64), 0xffffffff)), ':', toString(bitAnd(bitShiftRight(c, 32), 0xffffffff)), ':', toString(bitAnd(c, 0xffffffff))), DstLargeCommunities)), ' ')`
default:
strValue = qc.String()
@@ -103,17 +103,13 @@ func (qc queryColumn) toSQLSelect() string {
return strValue
}
// reverseDirection reverse the direction of a column (src/dst, in/out)
func (qc queryColumn) reverseDirection() queryColumn {
return queryColumn(filter.ReverseColumnDirection(string(qc)))
}
// fixQueryColumnName fix capitalization of the provided column name
func fixQueryColumnName(name string) string {
name = strings.ToLower(name)
for _, k := range schema.Flows.Columns.Keys() {
if strings.ToLower(k) == name {
return k
for pair := schema.Flows.Columns.Front(); pair != nil; pair = pair.Next() {
column := pair.Value
if strings.ToLower(column.Name) == name {
return column.Name
}
}
return ""

View File

@@ -7,6 +7,7 @@ import (
"testing"
"akvorado/common/helpers"
"akvorado/common/schema"
)
func TestRequireMainTable(t *testing.T) {
@@ -16,14 +17,14 @@ func TestRequireMainTable(t *testing.T) {
Expected bool
}{
{[]queryColumn{}, queryFilter{}, false},
{[]queryColumn{"SrcAS"}, queryFilter{}, false},
{[]queryColumn{"ExporterAddress"}, queryFilter{}, false},
{[]queryColumn{"SrcPort"}, queryFilter{}, true},
{[]queryColumn{"SrcAddr"}, queryFilter{}, true},
{[]queryColumn{"DstPort"}, queryFilter{}, true},
{[]queryColumn{"DstAddr"}, queryFilter{}, true},
{[]queryColumn{"SrcAS", "DstAddr"}, queryFilter{}, true},
{[]queryColumn{"DstAddr", "SrcAS"}, queryFilter{}, true},
{[]queryColumn{queryColumn(schema.ColumnSrcAS)}, queryFilter{}, false},
{[]queryColumn{queryColumn(schema.ColumnExporterAddress)}, queryFilter{}, false},
{[]queryColumn{queryColumn(schema.ColumnSrcPort)}, queryFilter{}, true},
{[]queryColumn{queryColumn(schema.ColumnSrcAddr)}, queryFilter{}, true},
{[]queryColumn{queryColumn(schema.ColumnDstPort)}, queryFilter{}, true},
{[]queryColumn{queryColumn(schema.ColumnDstAddr)}, queryFilter{}, true},
{[]queryColumn{queryColumn(schema.ColumnSrcAS), queryColumn(schema.ColumnDstAddr)}, queryFilter{}, true},
{[]queryColumn{queryColumn(schema.ColumnDstAddr), queryColumn(schema.ColumnSrcAS)}, queryFilter{}, true},
{[]queryColumn{}, queryFilter{MainTableRequired: true}, true},
}
for idx, tc := range cases {
@@ -37,12 +38,12 @@ func TestRequireMainTable(t *testing.T) {
func TestUnmarshalQueryColumn(t *testing.T) {
cases := []struct {
Input string
Expected string
Expected schema.ColumnKey
Error bool
}{
{"DstAddr", "DstAddr", false},
{"TimeReceived", "", true},
{"Nothing", "", true},
{"DstAddr", schema.ColumnDstAddr, false},
{"TimeReceived", 0, true},
{"Nothing", 0, true},
}
for _, tc := range cases {
var qc queryColumn
@@ -61,44 +62,44 @@ func TestUnmarshalQueryColumn(t *testing.T) {
func TestQueryColumnSQLSelect(t *testing.T) {
cases := []struct {
Input queryColumn
Input schema.ColumnKey
Expected string
}{
{
Input: "SrcAddr",
Input: schema.ColumnSrcAddr,
Expected: `replaceRegexpOne(IPv6NumToString(SrcAddr), '^::ffff:', '')`,
}, {
Input: "DstAS",
Input: schema.ColumnDstAS,
Expected: `concat(toString(DstAS), ': ', dictGetOrDefault('asns', 'name', DstAS, '???'))`,
}, {
Input: "Dst2ndAS",
Input: schema.ColumnDst2ndAS,
Expected: `concat(toString(Dst2ndAS), ': ', dictGetOrDefault('asns', 'name', Dst2ndAS, '???'))`,
}, {
Input: "Proto",
Input: schema.ColumnProto,
Expected: `dictGetOrDefault('protocols', 'name', Proto, '???')`,
}, {
Input: "EType",
Input: schema.ColumnEType,
Expected: `if(EType = 2048, 'IPv4', if(EType = 34525, 'IPv6', '???'))`,
}, {
Input: "OutIfSpeed",
Input: schema.ColumnOutIfSpeed,
Expected: `toString(OutIfSpeed)`,
}, {
Input: "ExporterName",
Input: schema.ColumnExporterName,
Expected: `ExporterName`,
}, {
Input: "PacketSizeBucket",
Input: schema.ColumnPacketSizeBucket,
Expected: `PacketSizeBucket`,
}, {
Input: "DstASPath",
Input: schema.ColumnDstASPath,
Expected: `arrayStringConcat(DstASPath, ' ')`,
}, {
Input: "DstCommunities",
Input: schema.ColumnDstCommunities,
Expected: `arrayStringConcat(arrayConcat(arrayMap(c -> concat(toString(bitShiftRight(c, 16)), ':', toString(bitAnd(c, 0xffff))), DstCommunities), arrayMap(c -> concat(toString(bitAnd(bitShiftRight(c, 64), 0xffffffff)), ':', toString(bitAnd(bitShiftRight(c, 32), 0xffffffff)), ':', toString(bitAnd(c, 0xffffffff))), DstLargeCommunities)), ' ')`,
},
}
for _, tc := range cases {
t.Run(tc.Input.String(), func(t *testing.T) {
got := tc.Input.toSQLSelect()
t.Run(queryColumn(tc.Input).String(), func(t *testing.T) {
got := queryColumn(tc.Input).toSQLSelect()
if diff := helpers.Diff(got, tc.Expected); diff != "" {
t.Errorf("toSQLWhere (-got, +want):\n%s", diff)
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/golang/mock/gomock"
"akvorado/common/helpers"
"akvorado/common/schema"
)
func TestSankeyQuerySQL(t *testing.T) {
@@ -25,7 +26,7 @@ func TestSankeyQuerySQL(t *testing.T) {
Input: sankeyHandlerInput{
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Dimensions: []queryColumn{"SrcAS", "ExporterName"},
Dimensions: []queryColumn{queryColumn(schema.ColumnSrcAS), queryColumn(schema.ColumnExporterName)},
Limit: 5,
Filter: queryFilter{},
Units: "l3bps",
@@ -49,7 +50,7 @@ ORDER BY xps DESC
Input: sankeyHandlerInput{
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Dimensions: []queryColumn{"SrcAS", "ExporterName"},
Dimensions: []queryColumn{queryColumn(schema.ColumnSrcAS), queryColumn(schema.ColumnExporterName)},
Limit: 5,
Filter: queryFilter{},
Units: "l2bps",
@@ -74,7 +75,7 @@ ORDER BY xps DESC
Input: sankeyHandlerInput{
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Dimensions: []queryColumn{"SrcAS", "ExporterName"},
Dimensions: []queryColumn{queryColumn(schema.ColumnSrcAS), queryColumn(schema.ColumnExporterName)},
Limit: 5,
Filter: queryFilter{},
Units: "pps",
@@ -98,7 +99,7 @@ ORDER BY xps DESC
Input: sankeyHandlerInput{
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Dimensions: []queryColumn{"SrcAS", "ExporterName"},
Dimensions: []queryColumn{queryColumn(schema.ColumnSrcAS), queryColumn(schema.ColumnExporterName)},
Limit: 10,
Filter: queryFilter{Filter: "DstCountry = 'FR'"},
Units: "l3bps",

View File

@@ -127,7 +127,8 @@ func (c *Component) createExportersView(ctx context.Context) error {
cols := []string{}
for pair := schema.Flows.Columns.Front(); pair != nil; pair = pair.Next() {
column := pair.Value
if column.Name == "TimeReceived" || strings.HasPrefix(column.Name, "Exporter") {
key := pair.Key
if key == schema.ColumnTimeReceived || strings.HasPrefix(column.Name, "Exporter") {
cols = append(cols, column.Name)
}
if strings.HasPrefix(column.Name, "InIf") {
@@ -357,6 +358,10 @@ TTL TimeReceived + toIntervalSecond({{ .TTL }})
"TTL": ttl,
})
} else {
primaryKeys := []string{}
for _, key := range schema.Flows.ClickHousePrimaryKeys {
primaryKeys = append(primaryKeys, key.String())
}
createQuery, err = stemplate(`
CREATE TABLE {{ .Table }} ({{ .Schema }})
ENGINE = SummingMergeTree((Bytes, Packets))
@@ -368,7 +373,7 @@ TTL TimeReceived + toIntervalSecond({{ .TTL }})
"Table": tableName,
"Schema": schema.Flows.ClickHouseCreateTable(schema.ClickHouseSkipMainOnlyColumns),
"PartitionInterval": partitionInterval,
"PrimaryKey": strings.Join(schema.Flows.ClickHousePrimaryKeys, ", "),
"PrimaryKey": strings.Join(primaryKeys, ", "),
"SortingKey": strings.Join(schema.Flows.ClickHouseSortingKeys(), ", "),
"TTL": ttl,
})
@@ -406,6 +411,7 @@ ORDER BY position ASC
outer:
for pair := schema.Flows.Columns.Front(); pair != nil; pair = pair.Next() {
wantedColumn := pair.Value
wantedColumnKey := pair.Key
if resolution.Interval > 0 && wantedColumn.MainOnly {
continue
}
@@ -417,7 +423,7 @@ outer:
return fmt.Errorf("table %s, column %s has a non-matching type: %s vs %s",
tableName, wantedColumn.Name, existingColumn.Type, wantedColumn.ClickHouseType)
}
if resolution.Interval > 0 && slices.Contains(schema.Flows.ClickHousePrimaryKeys, wantedColumn.Name) && existingColumn.IsPrimaryKey == 0 {
if resolution.Interval > 0 && slices.Contains(schema.Flows.ClickHousePrimaryKeys, wantedColumnKey) && existingColumn.IsPrimaryKey == 0 {
return fmt.Errorf("table %s, column %s should be a primary key, cannot change that",
tableName, wantedColumn.Name)
}
@@ -438,7 +444,7 @@ outer:
}
}
// Add the missing column. Only if not primary.
if resolution.Interval > 0 && slices.Contains(schema.Flows.ClickHousePrimaryKeys, wantedColumn.Name) {
if resolution.Interval > 0 && slices.Contains(schema.Flows.ClickHousePrimaryKeys, wantedColumnKey) {
return fmt.Errorf("table %s, column %s is missing but it is a primary key",
tableName, wantedColumn.Name)
}

View File

@@ -261,15 +261,18 @@ WHERE database=currentDatabase() AND table NOT LIKE '.%'`)
})
if t.Failed() {
row := chComponent.QueryRow(context.Background(), `
SELECT query
SELECT query, exception
FROM system.query_log
WHERE client_name = $1
AND query NOT LIKE '%ORDER BY event_time_microseconds%'
ORDER BY event_time_microseconds DESC
LIMIT 1`, proto.ClientName)
var lastQuery string
if err := row.Scan(&lastQuery); err == nil {
var lastQuery, exception string
if err := row.Scan(&lastQuery, &exception); err == nil {
t.Logf("last ClickHouse query: %s", lastQuery)
if exception != "" {
t.Logf("last ClickHouse error: %s", exception)
}
}
break
}