common/schema: allow to move some data in or out the main table

This commit is contained in:
Vincent Bernat
2023-01-26 00:33:06 +01:00
parent 6482c4c258
commit d3b1ae7ad2
8 changed files with 114 additions and 18 deletions

View File

@@ -5,12 +5,20 @@ paths:
- SrcCountry - SrcCountry
- DstCountry - DstCountry
enabled: enabled:
- SrcVlan - SrcMAC
- DstVlan - DstMAC
maintableonly:
- SrcMAC
- DstMAC
notmaintableonly: []
console.0.schema: console.0.schema:
disabled: disabled:
- SrcCountry - SrcCountry
- DstCountry - DstCountry
enabled: enabled:
- SrcVlan - SrcMAC
- DstVlan - DstMAC
maintableonly:
- SrcMAC
- DstMAC
notmaintableonly: []

View File

@@ -4,5 +4,8 @@ schema:
- SrcCountry - SrcCountry
- DstCountry - DstCountry
enabled: enabled:
- SrcVlan - SrcMAC
- DstVlan - DstMAC
main-table-only:
- SrcMAC
- DstMAC

View File

@@ -56,7 +56,7 @@ func isListen(fl validator.FieldLevel) bool {
if err != nil { if err != nil {
return false return false
} }
// Port must be a iny <= 65535. // Port must be a <= 65535.
if portNum, err := strconv.ParseInt(port, 10, 32); err != nil || portNum > 65535 || portNum < 0 { if portNum, err := strconv.ParseInt(port, 10, 32); err != nil || portNum > 65535 || portNum < 0 {
return false return false
} }
@@ -68,9 +68,33 @@ func isListen(fl validator.FieldLevel) bool {
return true return true
} }
// noIntersectField validates a field value does not intersect with another one
// (both fields should be a slice)
func noIntersectField(fl validator.FieldLevel) bool {
field := fl.Field()
currentField, _, ok := fl.GetStructFieldOK()
if !ok {
return false
}
if field.Kind() != reflect.Slice || currentField.Kind() != reflect.Slice {
return false
}
for i := 0; i < field.Len(); i++ {
el1 := field.Index(i).Interface()
for j := 0; j < currentField.Len(); j++ {
el2 := currentField.Index(i).Interface()
if el1 == el2 {
return false
}
}
}
return true
}
func init() { func init() {
Validate = validator.New() Validate = validator.New()
Validate.RegisterValidation("listen", isListen) Validate.RegisterValidation("listen", isListen)
Validate.RegisterValidation("ninterfield", noIntersectField)
Validate.RegisterCustomTypeFunc(netipValidation, netip.Addr{}, netip.Prefix{}) Validate.RegisterCustomTypeFunc(netipValidation, netip.Addr{}, netip.Prefix{})
RegisterSubnetMapValidation[string]() RegisterSubnetMapValidation[string]()
} }

View File

@@ -32,9 +32,39 @@ func TestListenValidator(t *testing.T) {
s.Listen = tc.Listen s.Listen = tc.Listen
err := helpers.Validate.Struct(s) err := helpers.Validate.Struct(s)
if err == nil && tc.Err { if err == nil && tc.Err {
t.Error("Validate.Struct() expected an error") t.Errorf("Validate.Struct(%q) expected an error", tc.Listen)
} else if err != nil && !tc.Err { } else if err != nil && !tc.Err {
t.Errorf("Validate.Struct() error:\n%+v", err) t.Errorf("Validate.Struct(%q) error:\n%+v", tc.Listen, err)
}
}
}
func TestNoIntersectWithValidator(t *testing.T) {
s := struct {
Set1 []string
Set2 []string `validate:"ninterfield=Set1"`
}{}
cases := []struct {
Set1 []string
Set2 []string
Err bool
}{
{nil, nil, false},
{nil, []string{"aaa"}, false},
{[]string{"bbb"}, nil, false},
{[]string{"aaa"}, []string{"bbb"}, false},
{[]string{"aaa"}, []string{"aaa"}, true},
{[]string{"aaa", "ccc"}, []string{"bbb", "ddd"}, false},
{[]string{"aaa", "ccc"}, []string{"bbb", "ccc"}, true},
}
for _, tc := range cases {
s.Set1 = tc.Set1
s.Set2 = tc.Set2
err := helpers.Validate.Struct(s)
if err == nil && tc.Err {
t.Errorf("Validate.Struct(%+v) expected an error", s)
} else if err != nil && !tc.Err {
t.Errorf("Validate.Struct(%+v) error:\n%+v", s, err)
} }
} }
} }

View File

@@ -10,7 +10,11 @@ type Configuration struct {
// Disabled lists the columns disabled (in addition to the ones disabled by default). // Disabled lists the columns disabled (in addition to the ones disabled by default).
Disabled []ColumnKey Disabled []ColumnKey
// Enabled lists the columns enabled (in addition to the ones enabled by default). // Enabled lists the columns enabled (in addition to the ones enabled by default).
Enabled []ColumnKey Enabled []ColumnKey `validate:"ninterfield=Disabled"`
// MainTableOnly lists columns to be moved to the main table only
MainTableOnly []ColumnKey
// NotMainTableOnly lists columns to be moved out of the main table only
NotMainTableOnly []ColumnKey `validate:"ninterfield=NotMainTableOnly"`
} }
// DefaultConfiguration returns the default configuration for the schema component. // DefaultConfiguration returns the default configuration for the schema component.

View File

@@ -22,13 +22,6 @@ type Component struct {
// New creates a new schema component. // New creates a new schema component.
func New(config Configuration) (*Component, error) { func New(config Configuration) (*Component, error) {
schema := flows() schema := flows()
for _, k1 := range config.Enabled {
for _, k2 := range config.Disabled {
if k1 == k2 {
return nil, fmt.Errorf("column %q contained in both EnabledColumns and DisabledColumns", k1)
}
}
}
for _, k := range config.Enabled { for _, k := range config.Enabled {
if column, ok := schema.LookupColumnByKey(k); ok { if column, ok := schema.LookupColumnByKey(k); ok {
column.Disabled = false column.Disabled = false
@@ -45,6 +38,23 @@ func New(config Configuration) (*Component, error) {
column.Disabled = true column.Disabled = true
} }
} }
for _, k := range config.NotMainTableOnly {
if column, ok := schema.LookupColumnByKey(k); ok {
column.ClickHouseMainOnly = false
}
}
for _, k := range config.MainTableOnly {
if column, ok := schema.LookupColumnByKey(k); ok {
if column.NoDisable {
return nil, fmt.Errorf("column %q cannot be present on main table only", k)
}
if slices.Contains(schema.clickHousePrimaryKeys, k) {
// Primary keys are part of the sorting key.
return nil, fmt.Errorf("column %q cannot be present on main table only (primary key)", k)
}
column.ClickHouseMainOnly = true
}
}
return &Component{ return &Component{
c: config, c: config,
Schema: schema.finalize(), Schema: schema.finalize(),

View File

@@ -403,6 +403,22 @@ schema:
You can get the list of columns you can enable or disable with `akvorado You can get the list of columns you can enable or disable with `akvorado
version`. Disabling a column won't delete existing data. version`. Disabling a column won't delete existing data.
It is also possible to make make some columns available on the main table only
or on all tables with `main-table-only` and `not-main-table-only`. For example:
```yaml
schema:
enabled:
- SrcMAC
- DstMAC
main-table-only:
- SrcMAC
- DstMAC
not-main-table-only:
- SrcAddr
- DstAddr
```
### Kafka ### Kafka
The Kafka component creates or updates the Kafka topic to receive The Kafka component creates or updates the Kafka topic to receive

View File

@@ -19,7 +19,8 @@ installations may end up eating space because of that. The default TTL is 30
days. You can disable that by setting `orchestrator.clickhouse.system-logs-ttl` days. You can disable that by setting `orchestrator.clickhouse.system-logs-ttl`
to 0. to 0.
-*inlet*: add `schema.enabled` and `schema.disabled` to alter collected data -*inlet*: add `schema.enabled`, `schema.disabled`, `schema.main-table-only`,
and `schema.not-main-table-only` to alter collected data
-*inlet*: add the following collected data (disabled by default): -*inlet*: add the following collected data (disabled by default):
- `SrcAddrNAT` and `DstAddrNAT` - `SrcAddrNAT` and `DstAddrNAT`
- `SrcPortNAT` and `DstPortNAT` - `SrcPortNAT` and `DstPortNAT`