From e6effd1335c35558a0fe5869b5f85ad38bbd9e0e Mon Sep 17 00:00:00 2001 From: Marvin Gaube Date: Tue, 8 Aug 2023 08:55:25 +0200 Subject: [PATCH] feat: add custom dictionaries for additional, customized flow hydration --- .../schema-customdict/expected.yaml | 48 +++++ .../configurations/schema-customdict/in.yaml | 12 ++ .../schema-enable-disable/expected.yaml | 2 + common/helpers/bimap/bimap.go | 6 + common/helpers/bimap/bimap_test.go | 24 +++ common/schema/config.go | 65 +++++- common/schema/definition.go | 4 +- common/schema/root.go | 68 ++++++ common/schema/root_test.go | 178 ++++++++++++++++ common/schema/types.go | 2 + console/data/docs/02-configuration.md | 66 ++++++ orchestrator/clickhouse/http.go | 24 +++ orchestrator/clickhouse/http_test.go | 33 ++- orchestrator/clickhouse/migrations.go | 35 ++++ orchestrator/clickhouse/migrations_test.go | 198 ++++++++++++++++++ .../clickhouse/testdata/dicts/test.csv | 2 + 16 files changed, 764 insertions(+), 3 deletions(-) create mode 100644 cmd/testdata/configurations/schema-customdict/expected.yaml create mode 100644 cmd/testdata/configurations/schema-customdict/in.yaml create mode 100644 orchestrator/clickhouse/testdata/dicts/test.csv diff --git a/cmd/testdata/configurations/schema-customdict/expected.yaml b/cmd/testdata/configurations/schema-customdict/expected.yaml new file mode 100644 index 00000000..8156a2b0 --- /dev/null +++ b/cmd/testdata/configurations/schema-customdict/expected.yaml @@ -0,0 +1,48 @@ +--- +paths: + inlet.0.schema: + customdictionaries: + test: + source: test.csv + layout: hashed + keys: + - name: addr + type: String + matchdimension: "" + matchdimensionsuffix: "" + attributes: + - name: AddrValue + type: String + default: "" + label: "" + dimensions: + - SrcAddr + - DstAddr + disabled: [] + enabled: [] + materialize: [] + maintableonly: [] + notmaintableonly: [] + console.0.schema: + customdictionaries: + test: + source: test.csv + layout: hashed + keys: + - name: addr + type: String + matchdimension: "" + matchdimensionsuffix: "" + attributes: + - name: AddrValue + type: String + default: "" + label: "" + dimensions: + - SrcAddr + - DstAddr + disabled: [] + enabled: [] + materialize: [] + maintableonly: [] + notmaintableonly: [] \ No newline at end of file diff --git a/cmd/testdata/configurations/schema-customdict/in.yaml b/cmd/testdata/configurations/schema-customdict/in.yaml new file mode 100644 index 00000000..451407de --- /dev/null +++ b/cmd/testdata/configurations/schema-customdict/in.yaml @@ -0,0 +1,12 @@ +--- +schema: + custom-dictionaries: + test: + source: test.csv + keys: + - name: addr + attributes: + - name: AddrValue + dimensions: + - SrcAddr + - DstAddr diff --git a/cmd/testdata/configurations/schema-enable-disable/expected.yaml b/cmd/testdata/configurations/schema-enable-disable/expected.yaml index 53c299b4..a65e9854 100644 --- a/cmd/testdata/configurations/schema-enable-disable/expected.yaml +++ b/cmd/testdata/configurations/schema-enable-disable/expected.yaml @@ -1,6 +1,7 @@ --- paths: inlet.0.schema: + customdictionaries: {} disabled: - SrcCountry - DstCountry @@ -13,6 +14,7 @@ paths: - DstMAC notmaintableonly: [] console.0.schema: + customdictionaries: {} disabled: - SrcCountry - DstCountry diff --git a/common/helpers/bimap/bimap.go b/common/helpers/bimap/bimap.go index 39635da6..785b48ac 100644 --- a/common/helpers/bimap/bimap.go +++ b/common/helpers/bimap/bimap.go @@ -59,3 +59,9 @@ func (bi *Bimap[K, V]) Values() []V { func (bi *Bimap[K, V]) String() string { return fmt.Sprintf("Bi%v", bi.forward) } + +// Insert inserts a new key/value pair +func (bi *Bimap[K, V]) Insert(k K, v V) { + bi.forward[k] = v + bi.inverse[v] = k +} diff --git a/common/helpers/bimap/bimap_test.go b/common/helpers/bimap/bimap_test.go index 2e95d457..8a800c2a 100644 --- a/common/helpers/bimap/bimap_test.go +++ b/common/helpers/bimap/bimap_test.go @@ -94,3 +94,27 @@ func TestBimapValues(t *testing.T) { t.Errorf("Values() (-got, +want):\n%s", diff) } } + +func TestBimapInsert(t *testing.T) { + bmap := bimap.New(map[int]string{ + 1: "hello", + 2: "world", + 3: "happy", + }) + bmap.Insert(4, "test") + v, ok := bmap.LoadValue(4) + if !ok { + t.Errorf("LoadValue(4) ok: %v but expected %v", ok, true) + } + if v != "test" { + t.Errorf("LoadValue(4) got: %q but expected %q", v, "test") + } + var k int + k, ok = bmap.LoadKey("test") + if !ok { + t.Errorf("LoadKey(\"test\") ok: %v but expected %v", ok, true) + } + if k != 4 { + t.Errorf("LoadKey(\"test\") got: %q but expected %q", k, 4) + } +} diff --git a/common/schema/config.go b/common/schema/config.go index 1e73b5f2..0dfabc51 100644 --- a/common/schema/config.go +++ b/common/schema/config.go @@ -3,7 +3,11 @@ package schema -import "errors" +import ( + "errors" + + "akvorado/common/helpers" +) // Configuration describes the configuration for the schema component. type Configuration struct { @@ -17,6 +21,33 @@ type Configuration struct { NotMainTableOnly []ColumnKey `validate:"ninterfield=MainTableOnly"` // Materialize lists columns that shall be materialized at ingest instead of computed at query time Materialize []ColumnKey + // CustomDictionaries allows enrichment of flows with custom metadata + CustomDictionaries map[string]CustomDict `validate:"dive"` +} + +// CustomDict represents a single custom dictionary +type CustomDict struct { + Keys []CustomDictKey `validate:"required,dive"` + Attributes []CustomDictAttribute `validate:"required,dive"` + Source string `validate:"required"` + Layout string `validate:"required,oneof=hashed iptrie complex_key_hashed"` + Dimensions []string `validate:"required"` +} + +// CustomDictKey represents a single key (matching) column of a custom dictionary +type CustomDictKey struct { + Name string `validate:"required,alphanum"` + Type string `validate:"required,oneof=String UInt8 UInt16 UInt32 UInt64 IPv4 IPv6"` + MatchDimension string `validate:"omitempty,alphanum"` + MatchDimensionSuffix string `validate:"omitempty,alphanum"` +} + +// CustomDictAttribute represents a single value column of a custom dictionary +type CustomDictAttribute struct { + Name string `validate:"required,alphanum"` + Type string `validate:"required,oneof=String UInt8 UInt16 UInt32 UInt64 IPv4 IPv6"` + Label string `validate:"omitempty,alphanum"` // empty label is acceptable, in this case fallback to name + Default string `validate:"omitempty,alphanum"` } // DefaultConfiguration returns the default configuration for the schema component. @@ -47,3 +78,35 @@ func (ck *ColumnKey) UnmarshalText(input []byte) error { } return errors.New("unknown provider") } + +// GetCustomDictConfig returns the custom dicts encoded in this schema +func (c *Component) GetCustomDictConfig() map[string]CustomDict { + return c.c.CustomDictionaries +} + +// DefaultCustomDictConfiguration is the default config for a CustomDict +func DefaultCustomDictConfiguration() CustomDict { + return CustomDict{ + Layout: "hashed", + } +} + +// DefaultCustomDictKeyConfiguration is the default config for a CustomDictKey +func DefaultCustomDictKeyConfiguration() CustomDictKey { + return CustomDictKey{ + Type: "String", + } +} + +// DefaultCustomDictAttributeConfiguration is the default config for a CustomDictAttribute +func DefaultCustomDictAttributeConfiguration() CustomDictAttribute { + return CustomDictAttribute{ + Type: "String", + } +} + +func init() { + helpers.RegisterMapstructureUnmarshallerHook(helpers.DefaultValuesUnmarshallerHook[CustomDict](DefaultCustomDictConfiguration())) + helpers.RegisterMapstructureUnmarshallerHook(helpers.DefaultValuesUnmarshallerHook[CustomDictKey](DefaultCustomDictKeyConfiguration())) + helpers.RegisterMapstructureUnmarshallerHook(helpers.DefaultValuesUnmarshallerHook[CustomDictAttribute](DefaultCustomDictAttributeConfiguration())) +} diff --git a/common/schema/definition.go b/common/schema/definition.go index 11df7e1b..15630786 100644 --- a/common/schema/definition.go +++ b/common/schema/definition.go @@ -96,6 +96,7 @@ const ( ColumnICMPv6Code ColumnNextHop + // ColumnLast points to after the last static column, custom dictionaries (dynamic columns) come after ColumnLast ColumnLast ) @@ -385,6 +386,7 @@ END`, ClickHouseCodec: "ZSTD(1)", }, }, + dynamicColumns: 0, }.finalize() } @@ -510,7 +512,7 @@ func (schema Schema) finalize() Schema { schema.columns = ncolumns // Build column index - schema.columnIndex = make([]*Column, ColumnLast) + schema.columnIndex = make([]*Column, ColumnLast+schema.dynamicColumns) for i, column := range schema.columns { schema.columnIndex[column.Key] = &schema.columns[i] for j, column := range column.ClickHouseTransformFrom { diff --git a/common/schema/root.go b/common/schema/root.go index 7c0ad4f2..dcafd86d 100644 --- a/common/schema/root.go +++ b/common/schema/root.go @@ -8,8 +8,11 @@ package schema import ( "fmt" + "strings" "golang.org/x/exp/slices" + "golang.org/x/text/cases" + "golang.org/x/text/language" ) // Component represents the schema compomenent. @@ -74,6 +77,71 @@ func New(config Configuration) (*Component, error) { column.ClickHouseMainOnly = true } } + + customDictColumns := []Column{} + // add new columns from custom dictionaries after the static ones + // as we dont reference the dicts in the code and they are created during runtime from the config, this is enough for us. + + for dname, v := range config.CustomDictionaries { + for _, d := range v.Dimensions { + // check if we can actually create the dictionary (we need to know what to match on) + if len(v.Keys) == 0 { + return nil, fmt.Errorf("custom dictionary %s has no keys, this is not supported", dname) + } + if len(v.Keys) > 1 { + // if more than one key is present, every key needs either a MatchDimension or a MatchDimensionSuffix + for _, kv := range v.Keys { + if kv.MatchDimension == "" && kv.MatchDimensionSuffix == "" { + return nil, fmt.Errorf("custom dictionary %s has more than one key, but key %s has neither MatchDimension nor MatchDimensionSuffix set", dname, kv.Name) + } + } + } + // first, we need to build the matching string for this + matchingList := []string{} + // prefer match dimension or match dimension suffix if available + for _, kv := range v.Keys { + if kv.MatchDimension != "" { + matchingList = append(matchingList, kv.MatchDimension) + continue + } + // match post is appended after the dimension name, and useful if we wanna match a subkey e.g. both in Src/Dst + if kv.MatchDimensionSuffix != "" { + matchingList = append(matchingList, fmt.Sprintf("%s%s", d, kv.MatchDimensionSuffix)) + } + } + matchingString := "" + if len(matchingList) > 0 { + matchingString = fmt.Sprintf("(%s)", strings.Join(matchingList, ",")) + } else { + // if match dimension and match dimension suffix are both not available, we use the dimension name (e.g. SrcAddr) + matchingString = d + } + + for _, a := range v.Attributes { + // add the dimension combined with capitalizing the name of the dimension field + l := a.Label + if l == "" { + l = cases.Title(language.Und).String(a.Name) + } + name := fmt.Sprintf("%s%s", d, l) + // compute the key for this new dynamic column, added after the last dynamic column + key := ColumnLast + schema.dynamicColumns + customDictColumns = append(customDictColumns, + Column{ + Key: key, + Name: name, + ClickHouseType: fmt.Sprintf("LowCardinality(%s)", a.Type), + ClickHouseGenerateFrom: fmt.Sprintf("dictGet('custom_dict_%s', '%s', %s)", dname, a.Name, + matchingString), + }) + columnNameMap.Insert(key, name) + schema.dynamicColumns++ + } + } + } + + schema.columns = append(schema.columns, customDictColumns...) + return &Component{ c: config, Schema: schema.finalize(), diff --git a/common/schema/root_test.go b/common/schema/root_test.go index 7f1934f6..4928a229 100644 --- a/common/schema/root_test.go +++ b/common/schema/root_test.go @@ -6,6 +6,7 @@ package schema_test import ( "testing" + "akvorado/common/helpers" "akvorado/common/schema" ) @@ -61,3 +62,180 @@ func TestDisableForbiddenColumns(t *testing.T) { t.Fatalf("New() error:\n%+v", err) } } + +func TestCustomDictionaries(t *testing.T) { + config := schema.DefaultConfiguration() + config.CustomDictionaries = make(map[string]schema.CustomDict) + config.CustomDictionaries["test"] = schema.CustomDict{ + Keys: []schema.CustomDictKey{ + {Name: "SrcAddr", Type: "string"}, + }, + Attributes: []schema.CustomDictAttribute{ + {Name: "csv_col_name", Type: "string", Label: "DimensionAttribute"}, + {Name: "role", Type: "string"}, + }, + Source: "test.csv", + Dimensions: []string{"SrcAddr", "DstAddr"}, + } + + s, err := schema.New(config) + if err != nil { + t.Fatalf("New() error:\n%+v", err) + } + + // Test if SrcAddrAttribute and DstAddrAttribute are in s.columns + srcFound := false + dstFound := false + srcRoleFound := false + dstRoleFound := false + + // check if srcAddrAttribute and dstAddrAttribute are in s.columns, and have the correct type/generatefrom + for _, column := range s.Columns() { + if column.Name == "SrcAddrDimensionAttribute" { + srcFound = true + if column.ClickHouseType != "LowCardinality(string)" { + t.Fatalf("SrcAddrDimensionAttribute should be LowCardinality(string), is %s", column.ClickHouseType) + } + if column.ClickHouseGenerateFrom != "dictGet('custom_dict_test', 'csv_col_name', SrcAddr)" { + t.Fatalf("SrcAddrDimensionAttribute should be generated from `dictGet('custom_dict_test', 'csv_col_name', SrcAddr)`, is %s", column.ClickHouseGenerateFrom) + } + } + if column.Name == "DstAddrDimensionAttribute" { + dstFound = true + if column.ClickHouseType != "LowCardinality(string)" { + t.Fatalf("DstAddrDimensionAttribute should be LowCardinality(string), is %s", column.ClickHouseType) + } + if column.ClickHouseGenerateFrom != "dictGet('custom_dict_test', 'csv_col_name', DstAddr)" { + t.Fatalf("DstAddrDimensionAttribute should be generated from `dictGet('custom_dict_test', 'csv_col_name', DstAddr)`, is %s", column.ClickHouseGenerateFrom) + } + } + // this part only tests default dimension name generation + if column.Name == "SrcAddrRole" { + srcRoleFound = true + } + if column.Name == "DstAddrRole" { + dstRoleFound = true + } + + } + + if !srcFound { + t.Fatal("SrcAddrDimensionAttribute not found") + } + if !dstFound { + t.Fatal("DstAddrDimensionAttribute not found") + } + if !srcRoleFound { + t.Fatal("SrcAddrRole not found") + } + if !dstRoleFound { + t.Fatal("DstAddrRole not found") + } + +} + +func TestCustomDictionariesMatcher(t *testing.T) { + config := schema.DefaultConfiguration() + config.CustomDictionaries = make(map[string]schema.CustomDict) + config.CustomDictionaries["test"] = schema.CustomDict{ + Keys: []schema.CustomDictKey{ + {Name: "exporter", Type: "string", MatchDimension: "ExporterAddress"}, + {Name: "interface", Type: "string", MatchDimensionSuffix: "Name"}, + }, + Attributes: []schema.CustomDictAttribute{ + {Name: "csv_col_name", Type: "string", Label: "DimensionAttribute"}, + }, + Source: "test.csv", + Dimensions: []string{"OutIf", "InIf"}, + Layout: "complex_key_hashed", + } + + s, err := schema.New(config) + if err != nil { + t.Fatalf("New() error:\n%+v", err) + } + + // Test if SrcAddrAttribute and DstAddrAttribute are in s.columns + outFound := false + inFound := false + + // check if srcAddrAttribute and dstAddrAttribute are in s.columns, and have the correct type/generatefrom + for _, column := range s.Columns() { + if column.Name == "OutIfDimensionAttribute" { + outFound = true + if column.ClickHouseType != "LowCardinality(string)" { + t.Fatalf("OutIfDimensionAttribute should be LowCardinality(string), is %s", column.ClickHouseType) + } + if column.ClickHouseGenerateFrom != "dictGet('custom_dict_test', 'csv_col_name', (ExporterAddress,OutIfName))" { + t.Fatalf("OutIfDimensionAttribute should be generated from `dictGet('custom_dict_test', 'csv_col_name', (ExporterAddress,OutIfName))`, is %s", column.ClickHouseGenerateFrom) + } + } + if column.Name == "InIfDimensionAttribute" { + inFound = true + if column.ClickHouseType != "LowCardinality(string)" { + t.Fatalf("InIfDimensionAttribute should be LowCardinality(string), is %s", column.ClickHouseType) + } + if column.ClickHouseGenerateFrom != "dictGet('custom_dict_test', 'csv_col_name', (ExporterAddress,InIfName))" { + t.Fatalf("InIfDimensionAttribute should be generated from `dictGet('custom_dict_test', 'csv_col_name', (ExporterAddress,InIfName)), is %s", column.ClickHouseGenerateFrom) + } + } + } + + if !outFound { + t.Fatal("OutIfDimensionAttribute not found") + } + if !inFound { + t.Fatal("InIfDimensionAttribute not found") + } +} + +// we need MatchDimension or MatchDimensionSuffix for multiple keys +func TestCustomDictMultiKeyErr(t *testing.T) { + config := schema.DefaultConfiguration() + config.CustomDictionaries = make(map[string]schema.CustomDict) + config.CustomDictionaries["test"] = schema.CustomDict{ + Keys: []schema.CustomDictKey{ + {Name: "exporter", Type: "string"}, + {Name: "interface", Type: "string"}, + }, + Attributes: []schema.CustomDictAttribute{ + {Name: "csv_col_name", Type: "string", Label: "DimensionAttribute"}, + }, + Source: "test.csv", + Dimensions: []string{"OutIf", "InIf"}, + Layout: "complex_key_hashed", + } + + _, err := schema.New(config) + if err == nil { + t.Fatal("New() did not error") + } + + if diff := helpers.Diff(err.Error(), "custom dictionary test has more than one key, but key exporter has neither MatchDimension nor MatchDimensionSuffix set"); diff != "" { + t.Fatalf("New() did not error correctly\n %s", diff) + } +} + +// a dict without key makes no sense, catch this +func TestCustomDictNoKeyErr(t *testing.T) { + config := schema.DefaultConfiguration() + config.CustomDictionaries = make(map[string]schema.CustomDict) + config.CustomDictionaries["test"] = schema.CustomDict{ + Keys: []schema.CustomDictKey{}, + Attributes: []schema.CustomDictAttribute{ + {Name: "csv_col_name", Type: "string", Label: "DimensionAttribute"}, + }, + Source: "test.csv", + Dimensions: []string{"OutIf", "InIf"}, + Layout: "complex_key_hashed", + } + + _, err := schema.New(config) + if err == nil { + t.Fatal("New() did not error") + } + + if diff := helpers.Diff(err.Error(), "custom dictionary test has no keys, this is not supported"); diff != "" { + t.Fatalf("New() did not error correctly\n %s", diff) + } +} diff --git a/common/schema/types.go b/common/schema/types.go index 91cf3ffa..06aa10ac 100644 --- a/common/schema/types.go +++ b/common/schema/types.go @@ -17,6 +17,8 @@ type Schema struct { columnIndex []*Column // Columns indexed by ColumnKey disabledGroups bitset.BitSet // Disabled column groups + // dynamicColumns is the number of columns that are generated at runtime and appended after columnLast + dynamicColumns ColumnKey // For ClickHouse. This is the set of primary keys (order is important and // may not follow column order) for the aggregated tables. clickHousePrimaryKeys []ColumnKey diff --git a/console/data/docs/02-configuration.md b/console/data/docs/02-configuration.md index 451784c8..74169bf1 100644 --- a/console/data/docs/02-configuration.md +++ b/console/data/docs/02-configuration.md @@ -552,6 +552,72 @@ For ICMP, you get `ICMPv4Type`, `ICMPv4Code`, `ICMPv6Type`, `ICMPv6Code`, `ICMPv4`, and `ICMPv6`. The two latest one are displayed as a string in the console (like `echo-reply` or `frag-needed`). +#### Custom Dictionaries +You can add custom dimensions to be looked up via a dictionary. This is useful to enrich your flow with additional informations not possible to get in the classifier. + +**Note:** +Filtering by dictionaries is not possible with the current state of development. + +This works by providing the database with a CSV files containing the values. + +```yaml +schema: + custom-dictionaries: + ips: + layout: complex_key_hashed + keys: + - name: addr + type: String + attributes: + - name: role + type: String + default: DefaultRole + label: IPRole + source: ips_annotation.csv + dimensions: + - SrcAddr + - DstAddr +``` + +This example expects a CSV file named `ips_annotation.csv` with the following format: +``` +addr,role +2001:db8::1,ExampleRole +``` + +If the SrcAddr has the value `2001:db8::1` (matches the key), the dimension `SrcAddrIPRole` will be set to `ExampleRole`. + +Independently, if the DstAddr has the value `2001:db8::1`, the dimension `DstAddrIPRole` will be set to `ExampleRole`. + +All other IPs will get "DefaultRole" in their "SrcAddrIPRole"/"DstAddrIPRole" dimension. + +The `label`and `default` keys are optional. + +It is possible to add the same dictionary to multiple dimensions, usually for the "Input" and "Output"-direction. + +By default, the value of the key tries to match a dimension. For multiple keys, it is necessary to explicitly specify the dimension name to match by either specifing `match-dimension` or `match-dimension-suffix`: +```yaml +schema: + custom-dictionaries: + interfaces: + layout: complex_key_hashed + keys: + - name: agent + type: String + match-dimension: ExporterAddress # csv col agent matches ExporterAddress dimension + - name: interface + type: String + match-dimension-suffix: Name # csv col interface matches either OutIfName or InIfName, match name is added as suffix to dimension + attributes: + - name: information # this column is added as OutIfInformation/InIfInformation to the flow on matches + type: String # no default: If no match of both agent and interface, the Information-Dimension is empty + source: interfaces.csv + dimensions: + - OutIf + - InIf + +``` + ### Kafka The Kafka component creates or updates the Kafka topic to receive diff --git a/orchestrator/clickhouse/http.go b/orchestrator/clickhouse/http.go index 1edf270f..9d02d34c 100644 --- a/orchestrator/clickhouse/http.go +++ b/orchestrator/clickhouse/http.go @@ -9,6 +9,7 @@ import ( "encoding/csv" "fmt" "io" + "io/ioutil" "net/http" "strconv" "text/template" @@ -90,6 +91,29 @@ func (c *Component) registerHTTPHandlers() error { w.Write(result.Bytes()) })) + // add handler for custom dicts + for name, dict := range c.d.Schema.GetCustomDictConfig() { + // we need to call this a func to avoid issues with the for loop + k := name + v := dict + c.d.HTTP.AddHandler(fmt.Sprintf("/api/v0/orchestrator/clickhouse/custom_dict_%s.csv", k), http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + select { + case <-c.networkSourcesReady: + case <-time.After(c.config.NetworkSourcesTimeout): + w.WriteHeader(http.StatusServiceUnavailable) + return + } + file, err := ioutil.ReadFile(v.Source) + if err != nil { + c.r.Err(err).Msg("unable to deliver custom dict csv file") + http.Error(w, fmt.Sprintf("unable to deliver custom dict csv file %s", v.Source), http.StatusNotFound) + } + w.Header().Set("Content-Type", "text/csv; charset=utf-8") + w.WriteHeader(http.StatusOK) + w.Write(file) + })) + } + // networks.csv c.d.HTTP.AddHandler("/api/v0/orchestrator/clickhouse/networks.csv", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/orchestrator/clickhouse/http_test.go b/orchestrator/clickhouse/http_test.go index 49435b61..9bf9f36b 100644 --- a/orchestrator/clickhouse/http_test.go +++ b/orchestrator/clickhouse/http_test.go @@ -21,10 +21,25 @@ func TestHTTPEndpoints(t *testing.T) { config.Networks = helpers.MustNewSubnetMap(map[string]NetworkAttributes{ "::ffff:192.0.2.0/120": {Name: "infra"}, }) + // setup schema config for custom dicts + schemaConfig := schema.DefaultConfiguration() + schemaConfig.CustomDictionaries = make(map[string]schema.CustomDict) + schemaConfig.CustomDictionaries["test"] = schema.CustomDict{ + Source: "testdata/dicts/test.csv", + } + schemaConfig.CustomDictionaries["none"] = schema.CustomDict{ + Source: "none.csv", + } + + sch, err := schema.New(schemaConfig) + if err != nil { + t.Fatalf("schema.New() error:\n%+v", err) + } + // create http entry c, err := New(r, config, Dependencies{ Daemon: daemon.NewMock(t), HTTP: httpserver.NewMock(t, r), - Schema: schema.NewMock(t), + Schema: sch, }) if err != nil { t.Fatalf("New() error:\n%+v", err) @@ -67,6 +82,22 @@ func TestHTTPEndpoints(t *testing.T) { `syntax = "proto3";`, }, }, + { + URL: "/api/v0/orchestrator/clickhouse/custom_dict_none.csv", + ContentType: "text/plain; charset=utf-8", + StatusCode: 404, + FirstLines: []string{ + "unable to deliver custom dict csv file none.csv", + }, + }, + { + URL: "/api/v0/orchestrator/clickhouse/custom_dict_test.csv", + ContentType: "text/csv; charset=utf-8", + FirstLines: []string{ + `col_a,col_b`, + `1,2`, + }, + }, } helpers.TestHTTPEndpoints(t, c.d.HTTP.LocalAddr(), cases) diff --git a/orchestrator/clickhouse/migrations.go b/orchestrator/clickhouse/migrations.go index 7f5051b5..2467d9be 100644 --- a/orchestrator/clickhouse/migrations.go +++ b/orchestrator/clickhouse/migrations.go @@ -7,10 +7,12 @@ import ( "context" "fmt" "net" + "strings" "github.com/ClickHouse/clickhouse-go/v2" "akvorado/common/reporter" + "akvorado/common/schema" ) type migrationStep struct { @@ -82,6 +84,39 @@ func (c *Component) migrateDatabase() error { return err } + // prepare custom dictionary migrations + var dictMigrations []func() error + for k, v := range c.d.Schema.GetCustomDictConfig() { + var schemaStr []string + var keys []string + for _, a := range v.Keys { + // This is a key. We need it in the schema and in primary keys. + schemaStr = append(schemaStr, fmt.Sprintf("`%s` %s", a.Name, a.Type)) + keys = append(keys, a.Name) + } + + for _, a := range v.Attributes { + defaultValue := "None" + if a.Default != "" { + defaultValue = a.Default + } + // this is only an attribute. We only need it in the schema + schemaStr = append(schemaStr, fmt.Sprintf("`%s` %s DEFAULT '%s'", a.Name, a.Type, defaultValue)) + } + // we need to do this as function, otherwise we get problems with the for. + m := func(k string, v schema.CustomDict, schemaStr []string) func() error { + return func() error { + return c.createDictionary(ctx, fmt.Sprintf("custom_dict_%s", k), v.Layout, strings.Join(schemaStr[:], ", "), strings.Join(keys[:], ", ")) + } + }(k, v, schemaStr) + dictMigrations = append(dictMigrations, m) + } + // create custom dictionaries + err = c.wrapMigrations(dictMigrations...) + if err != nil { + return err + } + // Create the various non-raw flow tables for _, resolution := range c.config.Resolutions { err := c.wrapMigrations( diff --git a/orchestrator/clickhouse/migrations_test.go b/orchestrator/clickhouse/migrations_test.go index 841a5225..9594f7a4 100644 --- a/orchestrator/clickhouse/migrations_test.go +++ b/orchestrator/clickhouse/migrations_test.go @@ -446,3 +446,201 @@ AND name LIKE $3`, "flows", ch.config.Database, "%NetPrefix") }) } } + +func TestCustomDictMigration(t *testing.T) { + r := reporter.NewMock(t) + chComponent := clickhousedb.SetupClickHouse(t, r) + if err := chComponent.Exec(context.Background(), "DROP TABLE IF EXISTS system.metric_log"); err != nil { + t.Fatalf("Exec() error:\n%+v", err) + } + // start clean + dropAllTables(t, chComponent) + // First, setup a default configuration + t.Run("default schema", func(t *testing.T) { + r := reporter.NewMock(t) + sch, err := schema.New(schema.DefaultConfiguration()) + if err != nil { + t.Fatalf("schema.New() error:\n%+v", err) + } + configuration := DefaultConfiguration() + configuration.OrchestratorURL = "http://something" + configuration.Kafka.Configuration = kafka.DefaultConfiguration() + ch, err := New(r, configuration, Dependencies{ + Daemon: daemon.NewMock(t), + HTTP: httpserver.NewMock(t, r), + Schema: sch, + ClickHouse: chComponent, + }) + if err != nil { + t.Fatalf("New() error:\n%+v", err) + } + helpers.StartStop(t, ch) + waitMigrations(t, ch) + + // We need to have at least one migration + gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps") + if gotMetrics["applied_steps"] == "0" { + t.Fatal("No migration applied when applying a fresh default schema") + } + }) + // Now, create a custom dictionary on top + if !t.Failed() { + t.Run("custom dictionary", func(t *testing.T) { + r := reporter.NewMock(t) + schConfig := schema.DefaultConfiguration() + schConfig.CustomDictionaries = make(map[string]schema.CustomDict) + schConfig.CustomDictionaries["test"] = schema.CustomDict{ + Keys: []schema.CustomDictKey{ + {Name: "SrcAddr", Type: "String"}, + }, + Attributes: []schema.CustomDictAttribute{ + {Name: "csv_col_name", Type: "String", Label: "DimensionAttribute"}, + {Name: "csv_col_default", Type: "String", Label: "DefaultDimensionAttribute", Default: "Hello World"}, + }, + Source: "test.csv", + Dimensions: []string{"SrcAddr", "DstAddr"}, + Layout: "complex_key_hashed", + } + sch, err := schema.New(schConfig) + + if err != nil { + t.Fatalf("schema.New() error:\n%+v", err) + } + configuration := DefaultConfiguration() + configuration.OrchestratorURL = "http://something" + configuration.Kafka.Configuration = kafka.DefaultConfiguration() + ch, err := New(r, configuration, Dependencies{ + Daemon: daemon.NewMock(t), + HTTP: httpserver.NewMock(t, r), + Schema: sch, + ClickHouse: chComponent, + }) + if err != nil { + t.Fatalf("New() error:\n%+v", err) + } + helpers.StartStop(t, ch) + waitMigrations(t, ch) + + // We need to have at least one migration + gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps") + if gotMetrics["applied_steps"] == "0" { + t.Fatal("No migration applied when enabling a custom dictionary") + } + + // check if the rows were created in the main flows table + row := ch.d.ClickHouse.QueryRow(context.Background(), ` + SELECT toString(groupArray(tuple(name, type, default_expression))) + FROM system.columns + WHERE table = $1 + AND database = $2 + AND name LIKE $3`, "flows", ch.config.Database, "%DimensionAttribute") + var existing string + if err := row.Scan(&existing); err != nil { + t.Fatalf("Scan() error:\n%+v", err) + } + if diff := helpers.Diff(existing, + "[('SrcAddrDimensionAttribute','LowCardinality(String)',''),('SrcAddrDefaultDimensionAttribute','LowCardinality(String)',''),('DstAddrDimensionAttribute','LowCardinality(String)',''),('DstAddrDefaultDimensionAttribute','LowCardinality(String)','')]"); diff != "" { + t.Fatalf("Unexpected state:\n%s", diff) + } + + // check if the rows were created in the consumer flows table + rowConsumer := ch.d.ClickHouse.QueryRow(context.Background(), ` + SHOW CREATE flows_ZUYGDTE3EBIXX352XPM3YEEFV4_raw_consumer`) + var existingConsumer string + if err := rowConsumer.Scan(&existingConsumer); err != nil { + t.Fatalf("Scan() error:\n%+v", err) + } + // check if the definitions are part of the consumer + expectedStatements := []string{ + "dictGet('default.custom_dict_test', 'csv_col_name', DstAddr) AS DstAddrDimensionAttribute", + "dictGet('default.custom_dict_test', 'csv_col_name', SrcAddr) AS SrcAddrDimensionAttribute", + "dictGet('default.custom_dict_test', 'csv_col_default', SrcAddr) AS SrcAddrDefaultDimensionAttribute", + "dictGet('default.custom_dict_test', 'csv_col_default', DstAddr) AS DstAddrDefaultDimensionAttribute", + } + for _, s := range expectedStatements { + if !strings.Contains(existingConsumer, s) { + t.Fatalf("Missing statement in consumer:\n%s", s) + } + } + + // check if the dictionary was created + dictCreate := ch.d.ClickHouse.QueryRow(context.Background(), ` + SHOW CREATE custom_dict_test`) + var dictCreateString string + if err := dictCreate.Scan(&dictCreateString); err != nil { + t.Fatalf("Scan() error:\n%+v", err) + } + if diff := helpers.Diff(dictCreateString, + "CREATE DICTIONARY default.custom_dict_test\n(\n `SrcAddr` String,\n `csv_col_name` String DEFAULT 'None',\n `csv_col_default` String DEFAULT 'Hello World'\n)\nPRIMARY KEY SrcAddr\nSOURCE(HTTP(URL 'http://something/api/v0/orchestrator/clickhouse/custom_dict_test.csv' FORMAT 'CSVWithNames'))\nLIFETIME(MIN 0 MAX 3600)\nLAYOUT(COMPLEX_KEY_HASHED())\nSETTINGS(format_csv_allow_single_quotes = 0)"); diff != "" { + t.Fatalf("Unexpected state:\n%s", diff) + } + }) + } + // next test: with the custom dict removed again, the cols should still exist, but the consumer should be gone + if !t.Failed() { + t.Run("remove custom dictionary", func(t *testing.T) { + r := reporter.NewMock(t) + sch, err := schema.New(schema.DefaultConfiguration()) + + if err != nil { + t.Fatalf("schema.New() error:\n%+v", err) + } + configuration := DefaultConfiguration() + configuration.OrchestratorURL = "http://something" + configuration.Kafka.Configuration = kafka.DefaultConfiguration() + ch, err := New(r, configuration, Dependencies{ + Daemon: daemon.NewMock(t), + HTTP: httpserver.NewMock(t, r), + Schema: sch, + ClickHouse: chComponent, + }) + if err != nil { + t.Fatalf("New() error:\n%+v", err) + } + helpers.StartStop(t, ch) + waitMigrations(t, ch) + + // We need to have at least one migration + gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps") + if gotMetrics["applied_steps"] == "0" { + t.Fatal("No migration applied when disabling the custom dict") + } + + // check if the rows were created in the main flows table + row := ch.d.ClickHouse.QueryRow(context.Background(), ` + SELECT toString(groupArray(tuple(name, type, default_expression))) + FROM system.columns + WHERE table = $1 + AND database = $2 + AND name LIKE $3`, "flows", ch.config.Database, "%DimensionAttribute") + var existing string + if err := row.Scan(&existing); err != nil { + t.Fatalf("Scan() error:\n%+v", err) + } + if diff := helpers.Diff(existing, + "[('SrcAddrDimensionAttribute','LowCardinality(String)',''),('SrcAddrDefaultDimensionAttribute','LowCardinality(String)',''),('DstAddrDimensionAttribute','LowCardinality(String)',''),('DstAddrDefaultDimensionAttribute','LowCardinality(String)','')]"); diff != "" { + t.Fatalf("Unexpected state:\n%s", diff) + } + + // check if the rows were removed in the consumer flows table + rowConsumer := ch.d.ClickHouse.QueryRow(context.Background(), ` + SHOW CREATE flows_ZUYGDTE3EBIXX352XPM3YEEFV4_raw_consumer`) + var existingConsumer string + if err := rowConsumer.Scan(&existingConsumer); err != nil { + t.Fatalf("Scan() error:\n%+v", err) + } + // check if the definitions are missing in the consumer + expectedStatements := []string{ + "dictGet('default.custom_dict_test', 'csv_col_name', DstAddr) AS DstAddrDimensionAttribute", + "dictGet('default.custom_dict_test', 'csv_col_name', SrcAddr) AS SrcAddrDimensionAttribute", + "dictGet('default.custom_dict_test', 'csv_col_default', SrcAddr) AS SrcAddrDefaultDimensionAttribute", + "dictGet('default.custom_dict_test', 'csv_col_default', DstAddr) AS DstAddrDefaultDimensionAttribute", + } + for _, s := range expectedStatements { + if strings.Contains(existingConsumer, s) { + t.Fatalf("Unexpected Statement found in consumer:\n%s", s) + } + } + }) + } +} diff --git a/orchestrator/clickhouse/testdata/dicts/test.csv b/orchestrator/clickhouse/testdata/dicts/test.csv new file mode 100644 index 00000000..3da8d4f0 --- /dev/null +++ b/orchestrator/clickhouse/testdata/dicts/test.csv @@ -0,0 +1,2 @@ +col_a,col_b +1,2