feat: add custom dictionaries for additional, customized flow hydration

This commit is contained in:
Marvin Gaube
2023-08-08 08:55:25 +02:00
committed by Vincent Bernat
parent c1b2008ee9
commit e6effd1335
16 changed files with 764 additions and 3 deletions

View File

@@ -0,0 +1,48 @@
---
paths:
inlet.0.schema:
customdictionaries:
test:
source: test.csv
layout: hashed
keys:
- name: addr
type: String
matchdimension: ""
matchdimensionsuffix: ""
attributes:
- name: AddrValue
type: String
default: ""
label: ""
dimensions:
- SrcAddr
- DstAddr
disabled: []
enabled: []
materialize: []
maintableonly: []
notmaintableonly: []
console.0.schema:
customdictionaries:
test:
source: test.csv
layout: hashed
keys:
- name: addr
type: String
matchdimension: ""
matchdimensionsuffix: ""
attributes:
- name: AddrValue
type: String
default: ""
label: ""
dimensions:
- SrcAddr
- DstAddr
disabled: []
enabled: []
materialize: []
maintableonly: []
notmaintableonly: []

View File

@@ -0,0 +1,12 @@
---
schema:
custom-dictionaries:
test:
source: test.csv
keys:
- name: addr
attributes:
- name: AddrValue
dimensions:
- SrcAddr
- DstAddr

View File

@@ -1,6 +1,7 @@
--- ---
paths: paths:
inlet.0.schema: inlet.0.schema:
customdictionaries: {}
disabled: disabled:
- SrcCountry - SrcCountry
- DstCountry - DstCountry
@@ -13,6 +14,7 @@ paths:
- DstMAC - DstMAC
notmaintableonly: [] notmaintableonly: []
console.0.schema: console.0.schema:
customdictionaries: {}
disabled: disabled:
- SrcCountry - SrcCountry
- DstCountry - DstCountry

View File

@@ -59,3 +59,9 @@ func (bi *Bimap[K, V]) Values() []V {
func (bi *Bimap[K, V]) String() string { func (bi *Bimap[K, V]) String() string {
return fmt.Sprintf("Bi%v", bi.forward) return fmt.Sprintf("Bi%v", bi.forward)
} }
// Insert inserts a new key/value pair
func (bi *Bimap[K, V]) Insert(k K, v V) {
bi.forward[k] = v
bi.inverse[v] = k
}

View File

@@ -94,3 +94,27 @@ func TestBimapValues(t *testing.T) {
t.Errorf("Values() (-got, +want):\n%s", diff) t.Errorf("Values() (-got, +want):\n%s", diff)
} }
} }
func TestBimapInsert(t *testing.T) {
bmap := bimap.New(map[int]string{
1: "hello",
2: "world",
3: "happy",
})
bmap.Insert(4, "test")
v, ok := bmap.LoadValue(4)
if !ok {
t.Errorf("LoadValue(4) ok: %v but expected %v", ok, true)
}
if v != "test" {
t.Errorf("LoadValue(4) got: %q but expected %q", v, "test")
}
var k int
k, ok = bmap.LoadKey("test")
if !ok {
t.Errorf("LoadKey(\"test\") ok: %v but expected %v", ok, true)
}
if k != 4 {
t.Errorf("LoadKey(\"test\") got: %q but expected %q", k, 4)
}
}

View File

@@ -3,7 +3,11 @@
package schema package schema
import "errors" import (
"errors"
"akvorado/common/helpers"
)
// Configuration describes the configuration for the schema component. // Configuration describes the configuration for the schema component.
type Configuration struct { type Configuration struct {
@@ -17,6 +21,33 @@ type Configuration struct {
NotMainTableOnly []ColumnKey `validate:"ninterfield=MainTableOnly"` NotMainTableOnly []ColumnKey `validate:"ninterfield=MainTableOnly"`
// Materialize lists columns that shall be materialized at ingest instead of computed at query time // Materialize lists columns that shall be materialized at ingest instead of computed at query time
Materialize []ColumnKey Materialize []ColumnKey
// CustomDictionaries allows enrichment of flows with custom metadata
CustomDictionaries map[string]CustomDict `validate:"dive"`
}
// CustomDict represents a single custom dictionary
type CustomDict struct {
Keys []CustomDictKey `validate:"required,dive"`
Attributes []CustomDictAttribute `validate:"required,dive"`
Source string `validate:"required"`
Layout string `validate:"required,oneof=hashed iptrie complex_key_hashed"`
Dimensions []string `validate:"required"`
}
// CustomDictKey represents a single key (matching) column of a custom dictionary
type CustomDictKey struct {
Name string `validate:"required,alphanum"`
Type string `validate:"required,oneof=String UInt8 UInt16 UInt32 UInt64 IPv4 IPv6"`
MatchDimension string `validate:"omitempty,alphanum"`
MatchDimensionSuffix string `validate:"omitempty,alphanum"`
}
// CustomDictAttribute represents a single value column of a custom dictionary
type CustomDictAttribute struct {
Name string `validate:"required,alphanum"`
Type string `validate:"required,oneof=String UInt8 UInt16 UInt32 UInt64 IPv4 IPv6"`
Label string `validate:"omitempty,alphanum"` // empty label is acceptable, in this case fallback to name
Default string `validate:"omitempty,alphanum"`
} }
// DefaultConfiguration returns the default configuration for the schema component. // DefaultConfiguration returns the default configuration for the schema component.
@@ -47,3 +78,35 @@ func (ck *ColumnKey) UnmarshalText(input []byte) error {
} }
return errors.New("unknown provider") return errors.New("unknown provider")
} }
// GetCustomDictConfig returns the custom dicts encoded in this schema
func (c *Component) GetCustomDictConfig() map[string]CustomDict {
return c.c.CustomDictionaries
}
// DefaultCustomDictConfiguration is the default config for a CustomDict
func DefaultCustomDictConfiguration() CustomDict {
return CustomDict{
Layout: "hashed",
}
}
// DefaultCustomDictKeyConfiguration is the default config for a CustomDictKey
func DefaultCustomDictKeyConfiguration() CustomDictKey {
return CustomDictKey{
Type: "String",
}
}
// DefaultCustomDictAttributeConfiguration is the default config for a CustomDictAttribute
func DefaultCustomDictAttributeConfiguration() CustomDictAttribute {
return CustomDictAttribute{
Type: "String",
}
}
func init() {
helpers.RegisterMapstructureUnmarshallerHook(helpers.DefaultValuesUnmarshallerHook[CustomDict](DefaultCustomDictConfiguration()))
helpers.RegisterMapstructureUnmarshallerHook(helpers.DefaultValuesUnmarshallerHook[CustomDictKey](DefaultCustomDictKeyConfiguration()))
helpers.RegisterMapstructureUnmarshallerHook(helpers.DefaultValuesUnmarshallerHook[CustomDictAttribute](DefaultCustomDictAttributeConfiguration()))
}

View File

@@ -96,6 +96,7 @@ const (
ColumnICMPv6Code ColumnICMPv6Code
ColumnNextHop ColumnNextHop
// ColumnLast points to after the last static column, custom dictionaries (dynamic columns) come after ColumnLast
ColumnLast ColumnLast
) )
@@ -385,6 +386,7 @@ END`,
ClickHouseCodec: "ZSTD(1)", ClickHouseCodec: "ZSTD(1)",
}, },
}, },
dynamicColumns: 0,
}.finalize() }.finalize()
} }
@@ -510,7 +512,7 @@ func (schema Schema) finalize() Schema {
schema.columns = ncolumns schema.columns = ncolumns
// Build column index // Build column index
schema.columnIndex = make([]*Column, ColumnLast) schema.columnIndex = make([]*Column, ColumnLast+schema.dynamicColumns)
for i, column := range schema.columns { for i, column := range schema.columns {
schema.columnIndex[column.Key] = &schema.columns[i] schema.columnIndex[column.Key] = &schema.columns[i]
for j, column := range column.ClickHouseTransformFrom { for j, column := range column.ClickHouseTransformFrom {

View File

@@ -8,8 +8,11 @@ package schema
import ( import (
"fmt" "fmt"
"strings"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"golang.org/x/text/cases"
"golang.org/x/text/language"
) )
// Component represents the schema compomenent. // Component represents the schema compomenent.
@@ -74,6 +77,71 @@ func New(config Configuration) (*Component, error) {
column.ClickHouseMainOnly = true column.ClickHouseMainOnly = true
} }
} }
customDictColumns := []Column{}
// add new columns from custom dictionaries after the static ones
// as we dont reference the dicts in the code and they are created during runtime from the config, this is enough for us.
for dname, v := range config.CustomDictionaries {
for _, d := range v.Dimensions {
// check if we can actually create the dictionary (we need to know what to match on)
if len(v.Keys) == 0 {
return nil, fmt.Errorf("custom dictionary %s has no keys, this is not supported", dname)
}
if len(v.Keys) > 1 {
// if more than one key is present, every key needs either a MatchDimension or a MatchDimensionSuffix
for _, kv := range v.Keys {
if kv.MatchDimension == "" && kv.MatchDimensionSuffix == "" {
return nil, fmt.Errorf("custom dictionary %s has more than one key, but key %s has neither MatchDimension nor MatchDimensionSuffix set", dname, kv.Name)
}
}
}
// first, we need to build the matching string for this
matchingList := []string{}
// prefer match dimension or match dimension suffix if available
for _, kv := range v.Keys {
if kv.MatchDimension != "" {
matchingList = append(matchingList, kv.MatchDimension)
continue
}
// match post is appended after the dimension name, and useful if we wanna match a subkey e.g. both in Src/Dst
if kv.MatchDimensionSuffix != "" {
matchingList = append(matchingList, fmt.Sprintf("%s%s", d, kv.MatchDimensionSuffix))
}
}
matchingString := ""
if len(matchingList) > 0 {
matchingString = fmt.Sprintf("(%s)", strings.Join(matchingList, ","))
} else {
// if match dimension and match dimension suffix are both not available, we use the dimension name (e.g. SrcAddr)
matchingString = d
}
for _, a := range v.Attributes {
// add the dimension combined with capitalizing the name of the dimension field
l := a.Label
if l == "" {
l = cases.Title(language.Und).String(a.Name)
}
name := fmt.Sprintf("%s%s", d, l)
// compute the key for this new dynamic column, added after the last dynamic column
key := ColumnLast + schema.dynamicColumns
customDictColumns = append(customDictColumns,
Column{
Key: key,
Name: name,
ClickHouseType: fmt.Sprintf("LowCardinality(%s)", a.Type),
ClickHouseGenerateFrom: fmt.Sprintf("dictGet('custom_dict_%s', '%s', %s)", dname, a.Name,
matchingString),
})
columnNameMap.Insert(key, name)
schema.dynamicColumns++
}
}
}
schema.columns = append(schema.columns, customDictColumns...)
return &Component{ return &Component{
c: config, c: config,
Schema: schema.finalize(), Schema: schema.finalize(),

View File

@@ -6,6 +6,7 @@ package schema_test
import ( import (
"testing" "testing"
"akvorado/common/helpers"
"akvorado/common/schema" "akvorado/common/schema"
) )
@@ -61,3 +62,180 @@ func TestDisableForbiddenColumns(t *testing.T) {
t.Fatalf("New() error:\n%+v", err) t.Fatalf("New() error:\n%+v", err)
} }
} }
func TestCustomDictionaries(t *testing.T) {
config := schema.DefaultConfiguration()
config.CustomDictionaries = make(map[string]schema.CustomDict)
config.CustomDictionaries["test"] = schema.CustomDict{
Keys: []schema.CustomDictKey{
{Name: "SrcAddr", Type: "string"},
},
Attributes: []schema.CustomDictAttribute{
{Name: "csv_col_name", Type: "string", Label: "DimensionAttribute"},
{Name: "role", Type: "string"},
},
Source: "test.csv",
Dimensions: []string{"SrcAddr", "DstAddr"},
}
s, err := schema.New(config)
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
// Test if SrcAddrAttribute and DstAddrAttribute are in s.columns
srcFound := false
dstFound := false
srcRoleFound := false
dstRoleFound := false
// check if srcAddrAttribute and dstAddrAttribute are in s.columns, and have the correct type/generatefrom
for _, column := range s.Columns() {
if column.Name == "SrcAddrDimensionAttribute" {
srcFound = true
if column.ClickHouseType != "LowCardinality(string)" {
t.Fatalf("SrcAddrDimensionAttribute should be LowCardinality(string), is %s", column.ClickHouseType)
}
if column.ClickHouseGenerateFrom != "dictGet('custom_dict_test', 'csv_col_name', SrcAddr)" {
t.Fatalf("SrcAddrDimensionAttribute should be generated from `dictGet('custom_dict_test', 'csv_col_name', SrcAddr)`, is %s", column.ClickHouseGenerateFrom)
}
}
if column.Name == "DstAddrDimensionAttribute" {
dstFound = true
if column.ClickHouseType != "LowCardinality(string)" {
t.Fatalf("DstAddrDimensionAttribute should be LowCardinality(string), is %s", column.ClickHouseType)
}
if column.ClickHouseGenerateFrom != "dictGet('custom_dict_test', 'csv_col_name', DstAddr)" {
t.Fatalf("DstAddrDimensionAttribute should be generated from `dictGet('custom_dict_test', 'csv_col_name', DstAddr)`, is %s", column.ClickHouseGenerateFrom)
}
}
// this part only tests default dimension name generation
if column.Name == "SrcAddrRole" {
srcRoleFound = true
}
if column.Name == "DstAddrRole" {
dstRoleFound = true
}
}
if !srcFound {
t.Fatal("SrcAddrDimensionAttribute not found")
}
if !dstFound {
t.Fatal("DstAddrDimensionAttribute not found")
}
if !srcRoleFound {
t.Fatal("SrcAddrRole not found")
}
if !dstRoleFound {
t.Fatal("DstAddrRole not found")
}
}
func TestCustomDictionariesMatcher(t *testing.T) {
config := schema.DefaultConfiguration()
config.CustomDictionaries = make(map[string]schema.CustomDict)
config.CustomDictionaries["test"] = schema.CustomDict{
Keys: []schema.CustomDictKey{
{Name: "exporter", Type: "string", MatchDimension: "ExporterAddress"},
{Name: "interface", Type: "string", MatchDimensionSuffix: "Name"},
},
Attributes: []schema.CustomDictAttribute{
{Name: "csv_col_name", Type: "string", Label: "DimensionAttribute"},
},
Source: "test.csv",
Dimensions: []string{"OutIf", "InIf"},
Layout: "complex_key_hashed",
}
s, err := schema.New(config)
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
// Test if SrcAddrAttribute and DstAddrAttribute are in s.columns
outFound := false
inFound := false
// check if srcAddrAttribute and dstAddrAttribute are in s.columns, and have the correct type/generatefrom
for _, column := range s.Columns() {
if column.Name == "OutIfDimensionAttribute" {
outFound = true
if column.ClickHouseType != "LowCardinality(string)" {
t.Fatalf("OutIfDimensionAttribute should be LowCardinality(string), is %s", column.ClickHouseType)
}
if column.ClickHouseGenerateFrom != "dictGet('custom_dict_test', 'csv_col_name', (ExporterAddress,OutIfName))" {
t.Fatalf("OutIfDimensionAttribute should be generated from `dictGet('custom_dict_test', 'csv_col_name', (ExporterAddress,OutIfName))`, is %s", column.ClickHouseGenerateFrom)
}
}
if column.Name == "InIfDimensionAttribute" {
inFound = true
if column.ClickHouseType != "LowCardinality(string)" {
t.Fatalf("InIfDimensionAttribute should be LowCardinality(string), is %s", column.ClickHouseType)
}
if column.ClickHouseGenerateFrom != "dictGet('custom_dict_test', 'csv_col_name', (ExporterAddress,InIfName))" {
t.Fatalf("InIfDimensionAttribute should be generated from `dictGet('custom_dict_test', 'csv_col_name', (ExporterAddress,InIfName)), is %s", column.ClickHouseGenerateFrom)
}
}
}
if !outFound {
t.Fatal("OutIfDimensionAttribute not found")
}
if !inFound {
t.Fatal("InIfDimensionAttribute not found")
}
}
// we need MatchDimension or MatchDimensionSuffix for multiple keys
func TestCustomDictMultiKeyErr(t *testing.T) {
config := schema.DefaultConfiguration()
config.CustomDictionaries = make(map[string]schema.CustomDict)
config.CustomDictionaries["test"] = schema.CustomDict{
Keys: []schema.CustomDictKey{
{Name: "exporter", Type: "string"},
{Name: "interface", Type: "string"},
},
Attributes: []schema.CustomDictAttribute{
{Name: "csv_col_name", Type: "string", Label: "DimensionAttribute"},
},
Source: "test.csv",
Dimensions: []string{"OutIf", "InIf"},
Layout: "complex_key_hashed",
}
_, err := schema.New(config)
if err == nil {
t.Fatal("New() did not error")
}
if diff := helpers.Diff(err.Error(), "custom dictionary test has more than one key, but key exporter has neither MatchDimension nor MatchDimensionSuffix set"); diff != "" {
t.Fatalf("New() did not error correctly\n %s", diff)
}
}
// a dict without key makes no sense, catch this
func TestCustomDictNoKeyErr(t *testing.T) {
config := schema.DefaultConfiguration()
config.CustomDictionaries = make(map[string]schema.CustomDict)
config.CustomDictionaries["test"] = schema.CustomDict{
Keys: []schema.CustomDictKey{},
Attributes: []schema.CustomDictAttribute{
{Name: "csv_col_name", Type: "string", Label: "DimensionAttribute"},
},
Source: "test.csv",
Dimensions: []string{"OutIf", "InIf"},
Layout: "complex_key_hashed",
}
_, err := schema.New(config)
if err == nil {
t.Fatal("New() did not error")
}
if diff := helpers.Diff(err.Error(), "custom dictionary test has no keys, this is not supported"); diff != "" {
t.Fatalf("New() did not error correctly\n %s", diff)
}
}

View File

@@ -17,6 +17,8 @@ type Schema struct {
columnIndex []*Column // Columns indexed by ColumnKey columnIndex []*Column // Columns indexed by ColumnKey
disabledGroups bitset.BitSet // Disabled column groups disabledGroups bitset.BitSet // Disabled column groups
// dynamicColumns is the number of columns that are generated at runtime and appended after columnLast
dynamicColumns ColumnKey
// For ClickHouse. This is the set of primary keys (order is important and // For ClickHouse. This is the set of primary keys (order is important and
// may not follow column order) for the aggregated tables. // may not follow column order) for the aggregated tables.
clickHousePrimaryKeys []ColumnKey clickHousePrimaryKeys []ColumnKey

View File

@@ -552,6 +552,72 @@ For ICMP, you get `ICMPv4Type`, `ICMPv4Code`, `ICMPv6Type`, `ICMPv6Code`,
`ICMPv4`, and `ICMPv6`. The two latest one are displayed as a string in the `ICMPv4`, and `ICMPv6`. The two latest one are displayed as a string in the
console (like `echo-reply` or `frag-needed`). console (like `echo-reply` or `frag-needed`).
#### Custom Dictionaries
You can add custom dimensions to be looked up via a dictionary. This is useful to enrich your flow with additional informations not possible to get in the classifier.
**Note:**
Filtering by dictionaries is not possible with the current state of development.
This works by providing the database with a CSV files containing the values.
```yaml
schema:
custom-dictionaries:
ips:
layout: complex_key_hashed
keys:
- name: addr
type: String
attributes:
- name: role
type: String
default: DefaultRole
label: IPRole
source: ips_annotation.csv
dimensions:
- SrcAddr
- DstAddr
```
This example expects a CSV file named `ips_annotation.csv` with the following format:
```
addr,role
2001:db8::1,ExampleRole
```
If the SrcAddr has the value `2001:db8::1` (matches the key), the dimension `SrcAddrIPRole` will be set to `ExampleRole`.
Independently, if the DstAddr has the value `2001:db8::1`, the dimension `DstAddrIPRole` will be set to `ExampleRole`.
All other IPs will get "DefaultRole" in their "SrcAddrIPRole"/"DstAddrIPRole" dimension.
The `label`and `default` keys are optional.
It is possible to add the same dictionary to multiple dimensions, usually for the "Input" and "Output"-direction.
By default, the value of the key tries to match a dimension. For multiple keys, it is necessary to explicitly specify the dimension name to match by either specifing `match-dimension` or `match-dimension-suffix`:
```yaml
schema:
custom-dictionaries:
interfaces:
layout: complex_key_hashed
keys:
- name: agent
type: String
match-dimension: ExporterAddress # csv col agent matches ExporterAddress dimension
- name: interface
type: String
match-dimension-suffix: Name # csv col interface matches either OutIfName or InIfName, match name is added as suffix to dimension
attributes:
- name: information # this column is added as OutIfInformation/InIfInformation to the flow on matches
type: String # no default: If no match of both agent and interface, the Information-Dimension is empty
source: interfaces.csv
dimensions:
- OutIf
- InIf
```
### Kafka ### Kafka
The Kafka component creates or updates the Kafka topic to receive The Kafka component creates or updates the Kafka topic to receive

View File

@@ -9,6 +9,7 @@ import (
"encoding/csv" "encoding/csv"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"strconv" "strconv"
"text/template" "text/template"
@@ -90,6 +91,29 @@ func (c *Component) registerHTTPHandlers() error {
w.Write(result.Bytes()) w.Write(result.Bytes())
})) }))
// add handler for custom dicts
for name, dict := range c.d.Schema.GetCustomDictConfig() {
// we need to call this a func to avoid issues with the for loop
k := name
v := dict
c.d.HTTP.AddHandler(fmt.Sprintf("/api/v0/orchestrator/clickhouse/custom_dict_%s.csv", k), http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case <-c.networkSourcesReady:
case <-time.After(c.config.NetworkSourcesTimeout):
w.WriteHeader(http.StatusServiceUnavailable)
return
}
file, err := ioutil.ReadFile(v.Source)
if err != nil {
c.r.Err(err).Msg("unable to deliver custom dict csv file")
http.Error(w, fmt.Sprintf("unable to deliver custom dict csv file %s", v.Source), http.StatusNotFound)
}
w.Header().Set("Content-Type", "text/csv; charset=utf-8")
w.WriteHeader(http.StatusOK)
w.Write(file)
}))
}
// networks.csv // networks.csv
c.d.HTTP.AddHandler("/api/v0/orchestrator/clickhouse/networks.csv", c.d.HTTP.AddHandler("/api/v0/orchestrator/clickhouse/networks.csv",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

View File

@@ -21,10 +21,25 @@ func TestHTTPEndpoints(t *testing.T) {
config.Networks = helpers.MustNewSubnetMap(map[string]NetworkAttributes{ config.Networks = helpers.MustNewSubnetMap(map[string]NetworkAttributes{
"::ffff:192.0.2.0/120": {Name: "infra"}, "::ffff:192.0.2.0/120": {Name: "infra"},
}) })
// setup schema config for custom dicts
schemaConfig := schema.DefaultConfiguration()
schemaConfig.CustomDictionaries = make(map[string]schema.CustomDict)
schemaConfig.CustomDictionaries["test"] = schema.CustomDict{
Source: "testdata/dicts/test.csv",
}
schemaConfig.CustomDictionaries["none"] = schema.CustomDict{
Source: "none.csv",
}
sch, err := schema.New(schemaConfig)
if err != nil {
t.Fatalf("schema.New() error:\n%+v", err)
}
// create http entry
c, err := New(r, config, Dependencies{ c, err := New(r, config, Dependencies{
Daemon: daemon.NewMock(t), Daemon: daemon.NewMock(t),
HTTP: httpserver.NewMock(t, r), HTTP: httpserver.NewMock(t, r),
Schema: schema.NewMock(t), Schema: sch,
}) })
if err != nil { if err != nil {
t.Fatalf("New() error:\n%+v", err) t.Fatalf("New() error:\n%+v", err)
@@ -67,6 +82,22 @@ func TestHTTPEndpoints(t *testing.T) {
`syntax = "proto3";`, `syntax = "proto3";`,
}, },
}, },
{
URL: "/api/v0/orchestrator/clickhouse/custom_dict_none.csv",
ContentType: "text/plain; charset=utf-8",
StatusCode: 404,
FirstLines: []string{
"unable to deliver custom dict csv file none.csv",
},
},
{
URL: "/api/v0/orchestrator/clickhouse/custom_dict_test.csv",
ContentType: "text/csv; charset=utf-8",
FirstLines: []string{
`col_a,col_b`,
`1,2`,
},
},
} }
helpers.TestHTTPEndpoints(t, c.d.HTTP.LocalAddr(), cases) helpers.TestHTTPEndpoints(t, c.d.HTTP.LocalAddr(), cases)

View File

@@ -7,10 +7,12 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"strings"
"github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2"
"akvorado/common/reporter" "akvorado/common/reporter"
"akvorado/common/schema"
) )
type migrationStep struct { type migrationStep struct {
@@ -82,6 +84,39 @@ func (c *Component) migrateDatabase() error {
return err return err
} }
// prepare custom dictionary migrations
var dictMigrations []func() error
for k, v := range c.d.Schema.GetCustomDictConfig() {
var schemaStr []string
var keys []string
for _, a := range v.Keys {
// This is a key. We need it in the schema and in primary keys.
schemaStr = append(schemaStr, fmt.Sprintf("`%s` %s", a.Name, a.Type))
keys = append(keys, a.Name)
}
for _, a := range v.Attributes {
defaultValue := "None"
if a.Default != "" {
defaultValue = a.Default
}
// this is only an attribute. We only need it in the schema
schemaStr = append(schemaStr, fmt.Sprintf("`%s` %s DEFAULT '%s'", a.Name, a.Type, defaultValue))
}
// we need to do this as function, otherwise we get problems with the for.
m := func(k string, v schema.CustomDict, schemaStr []string) func() error {
return func() error {
return c.createDictionary(ctx, fmt.Sprintf("custom_dict_%s", k), v.Layout, strings.Join(schemaStr[:], ", "), strings.Join(keys[:], ", "))
}
}(k, v, schemaStr)
dictMigrations = append(dictMigrations, m)
}
// create custom dictionaries
err = c.wrapMigrations(dictMigrations...)
if err != nil {
return err
}
// Create the various non-raw flow tables // Create the various non-raw flow tables
for _, resolution := range c.config.Resolutions { for _, resolution := range c.config.Resolutions {
err := c.wrapMigrations( err := c.wrapMigrations(

View File

@@ -446,3 +446,201 @@ AND name LIKE $3`, "flows", ch.config.Database, "%NetPrefix")
}) })
} }
} }
func TestCustomDictMigration(t *testing.T) {
r := reporter.NewMock(t)
chComponent := clickhousedb.SetupClickHouse(t, r)
if err := chComponent.Exec(context.Background(), "DROP TABLE IF EXISTS system.metric_log"); err != nil {
t.Fatalf("Exec() error:\n%+v", err)
}
// start clean
dropAllTables(t, chComponent)
// First, setup a default configuration
t.Run("default schema", func(t *testing.T) {
r := reporter.NewMock(t)
sch, err := schema.New(schema.DefaultConfiguration())
if err != nil {
t.Fatalf("schema.New() error:\n%+v", err)
}
configuration := DefaultConfiguration()
configuration.OrchestratorURL = "http://something"
configuration.Kafka.Configuration = kafka.DefaultConfiguration()
ch, err := New(r, configuration, Dependencies{
Daemon: daemon.NewMock(t),
HTTP: httpserver.NewMock(t, r),
Schema: sch,
ClickHouse: chComponent,
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
helpers.StartStop(t, ch)
waitMigrations(t, ch)
// We need to have at least one migration
gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps")
if gotMetrics["applied_steps"] == "0" {
t.Fatal("No migration applied when applying a fresh default schema")
}
})
// Now, create a custom dictionary on top
if !t.Failed() {
t.Run("custom dictionary", func(t *testing.T) {
r := reporter.NewMock(t)
schConfig := schema.DefaultConfiguration()
schConfig.CustomDictionaries = make(map[string]schema.CustomDict)
schConfig.CustomDictionaries["test"] = schema.CustomDict{
Keys: []schema.CustomDictKey{
{Name: "SrcAddr", Type: "String"},
},
Attributes: []schema.CustomDictAttribute{
{Name: "csv_col_name", Type: "String", Label: "DimensionAttribute"},
{Name: "csv_col_default", Type: "String", Label: "DefaultDimensionAttribute", Default: "Hello World"},
},
Source: "test.csv",
Dimensions: []string{"SrcAddr", "DstAddr"},
Layout: "complex_key_hashed",
}
sch, err := schema.New(schConfig)
if err != nil {
t.Fatalf("schema.New() error:\n%+v", err)
}
configuration := DefaultConfiguration()
configuration.OrchestratorURL = "http://something"
configuration.Kafka.Configuration = kafka.DefaultConfiguration()
ch, err := New(r, configuration, Dependencies{
Daemon: daemon.NewMock(t),
HTTP: httpserver.NewMock(t, r),
Schema: sch,
ClickHouse: chComponent,
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
helpers.StartStop(t, ch)
waitMigrations(t, ch)
// We need to have at least one migration
gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps")
if gotMetrics["applied_steps"] == "0" {
t.Fatal("No migration applied when enabling a custom dictionary")
}
// check if the rows were created in the main flows table
row := ch.d.ClickHouse.QueryRow(context.Background(), `
SELECT toString(groupArray(tuple(name, type, default_expression)))
FROM system.columns
WHERE table = $1
AND database = $2
AND name LIKE $3`, "flows", ch.config.Database, "%DimensionAttribute")
var existing string
if err := row.Scan(&existing); err != nil {
t.Fatalf("Scan() error:\n%+v", err)
}
if diff := helpers.Diff(existing,
"[('SrcAddrDimensionAttribute','LowCardinality(String)',''),('SrcAddrDefaultDimensionAttribute','LowCardinality(String)',''),('DstAddrDimensionAttribute','LowCardinality(String)',''),('DstAddrDefaultDimensionAttribute','LowCardinality(String)','')]"); diff != "" {
t.Fatalf("Unexpected state:\n%s", diff)
}
// check if the rows were created in the consumer flows table
rowConsumer := ch.d.ClickHouse.QueryRow(context.Background(), `
SHOW CREATE flows_ZUYGDTE3EBIXX352XPM3YEEFV4_raw_consumer`)
var existingConsumer string
if err := rowConsumer.Scan(&existingConsumer); err != nil {
t.Fatalf("Scan() error:\n%+v", err)
}
// check if the definitions are part of the consumer
expectedStatements := []string{
"dictGet('default.custom_dict_test', 'csv_col_name', DstAddr) AS DstAddrDimensionAttribute",
"dictGet('default.custom_dict_test', 'csv_col_name', SrcAddr) AS SrcAddrDimensionAttribute",
"dictGet('default.custom_dict_test', 'csv_col_default', SrcAddr) AS SrcAddrDefaultDimensionAttribute",
"dictGet('default.custom_dict_test', 'csv_col_default', DstAddr) AS DstAddrDefaultDimensionAttribute",
}
for _, s := range expectedStatements {
if !strings.Contains(existingConsumer, s) {
t.Fatalf("Missing statement in consumer:\n%s", s)
}
}
// check if the dictionary was created
dictCreate := ch.d.ClickHouse.QueryRow(context.Background(), `
SHOW CREATE custom_dict_test`)
var dictCreateString string
if err := dictCreate.Scan(&dictCreateString); err != nil {
t.Fatalf("Scan() error:\n%+v", err)
}
if diff := helpers.Diff(dictCreateString,
"CREATE DICTIONARY default.custom_dict_test\n(\n `SrcAddr` String,\n `csv_col_name` String DEFAULT 'None',\n `csv_col_default` String DEFAULT 'Hello World'\n)\nPRIMARY KEY SrcAddr\nSOURCE(HTTP(URL 'http://something/api/v0/orchestrator/clickhouse/custom_dict_test.csv' FORMAT 'CSVWithNames'))\nLIFETIME(MIN 0 MAX 3600)\nLAYOUT(COMPLEX_KEY_HASHED())\nSETTINGS(format_csv_allow_single_quotes = 0)"); diff != "" {
t.Fatalf("Unexpected state:\n%s", diff)
}
})
}
// next test: with the custom dict removed again, the cols should still exist, but the consumer should be gone
if !t.Failed() {
t.Run("remove custom dictionary", func(t *testing.T) {
r := reporter.NewMock(t)
sch, err := schema.New(schema.DefaultConfiguration())
if err != nil {
t.Fatalf("schema.New() error:\n%+v", err)
}
configuration := DefaultConfiguration()
configuration.OrchestratorURL = "http://something"
configuration.Kafka.Configuration = kafka.DefaultConfiguration()
ch, err := New(r, configuration, Dependencies{
Daemon: daemon.NewMock(t),
HTTP: httpserver.NewMock(t, r),
Schema: sch,
ClickHouse: chComponent,
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
helpers.StartStop(t, ch)
waitMigrations(t, ch)
// We need to have at least one migration
gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps")
if gotMetrics["applied_steps"] == "0" {
t.Fatal("No migration applied when disabling the custom dict")
}
// check if the rows were created in the main flows table
row := ch.d.ClickHouse.QueryRow(context.Background(), `
SELECT toString(groupArray(tuple(name, type, default_expression)))
FROM system.columns
WHERE table = $1
AND database = $2
AND name LIKE $3`, "flows", ch.config.Database, "%DimensionAttribute")
var existing string
if err := row.Scan(&existing); err != nil {
t.Fatalf("Scan() error:\n%+v", err)
}
if diff := helpers.Diff(existing,
"[('SrcAddrDimensionAttribute','LowCardinality(String)',''),('SrcAddrDefaultDimensionAttribute','LowCardinality(String)',''),('DstAddrDimensionAttribute','LowCardinality(String)',''),('DstAddrDefaultDimensionAttribute','LowCardinality(String)','')]"); diff != "" {
t.Fatalf("Unexpected state:\n%s", diff)
}
// check if the rows were removed in the consumer flows table
rowConsumer := ch.d.ClickHouse.QueryRow(context.Background(), `
SHOW CREATE flows_ZUYGDTE3EBIXX352XPM3YEEFV4_raw_consumer`)
var existingConsumer string
if err := rowConsumer.Scan(&existingConsumer); err != nil {
t.Fatalf("Scan() error:\n%+v", err)
}
// check if the definitions are missing in the consumer
expectedStatements := []string{
"dictGet('default.custom_dict_test', 'csv_col_name', DstAddr) AS DstAddrDimensionAttribute",
"dictGet('default.custom_dict_test', 'csv_col_name', SrcAddr) AS SrcAddrDimensionAttribute",
"dictGet('default.custom_dict_test', 'csv_col_default', SrcAddr) AS SrcAddrDefaultDimensionAttribute",
"dictGet('default.custom_dict_test', 'csv_col_default', DstAddr) AS DstAddrDefaultDimensionAttribute",
}
for _, s := range expectedStatements {
if strings.Contains(existingConsumer, s) {
t.Fatalf("Unexpected Statement found in consumer:\n%s", s)
}
}
})
}
}

View File

@@ -0,0 +1,2 @@
col_a,col_b
1,2
1 col_a col_b
2 1 2