mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-12 06:24:10 +01:00
common/schema: make enabled/disabled columns configurable
This commit is contained in:
@@ -26,6 +26,7 @@ type ConsoleConfiguration struct {
|
||||
ClickHouse clickhousedb.Configuration
|
||||
Auth authentication.Configuration
|
||||
Database database.Configuration
|
||||
Schema schema.Configuration
|
||||
}
|
||||
|
||||
// Reset resets the console configuration to its default value.
|
||||
@@ -37,6 +38,7 @@ func (c *ConsoleConfiguration) Reset() {
|
||||
ClickHouse: clickhousedb.DefaultConfiguration(),
|
||||
Auth: authentication.DefaultConfiguration(),
|
||||
Database: database.DefaultConfiguration(),
|
||||
Schema: schema.DefaultConfiguration(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,7 +106,7 @@ func consoleStart(r *reporter.Reporter, config ConsoleConfiguration, checkOnly b
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to initialize database component: %w", err)
|
||||
}
|
||||
schemaComponent, err := schema.New()
|
||||
schemaComponent, err := schema.New(config.Schema)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to initialize schema component: %w", err)
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ type InletConfiguration struct {
|
||||
GeoIP geoip.Configuration
|
||||
Kafka kafka.Configuration
|
||||
Core core.Configuration
|
||||
Schema schema.Configuration
|
||||
}
|
||||
|
||||
// Reset resets the configuration for the inlet command to its default value.
|
||||
@@ -43,6 +44,7 @@ func (c *InletConfiguration) Reset() {
|
||||
GeoIP: geoip.DefaultConfiguration(),
|
||||
Kafka: kafka.DefaultConfiguration(),
|
||||
Core: core.DefaultConfiguration(),
|
||||
Schema: schema.DefaultConfiguration(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,7 +98,7 @@ func inletStart(r *reporter.Reporter, config InletConfiguration, checkOnly bool)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to initialize http component: %w", err)
|
||||
}
|
||||
schemaComponent, err := schema.New()
|
||||
schemaComponent, err := schema.New(config.Schema)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to initialize schema component: %w", err)
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ type OrchestratorConfiguration struct {
|
||||
ClickHouse clickhouse.Configuration
|
||||
Kafka kafka.Configuration
|
||||
Orchestrator orchestrator.Configuration `mapstructure:",squash" yaml:",inline"`
|
||||
Schema schema.Configuration
|
||||
// Other service configurations
|
||||
Inlet []InletConfiguration `validate:"dive"`
|
||||
Console []ConsoleConfiguration `validate:"dive"`
|
||||
@@ -45,6 +46,7 @@ func (c *OrchestratorConfiguration) Reset() {
|
||||
ClickHouse: clickhouse.DefaultConfiguration(),
|
||||
Kafka: kafka.DefaultConfiguration(),
|
||||
Orchestrator: orchestrator.DefaultConfiguration(),
|
||||
Schema: schema.DefaultConfiguration(),
|
||||
// Other service configurations
|
||||
Inlet: []InletConfiguration{inletConfiguration},
|
||||
Console: []ConsoleConfiguration{consoleConfiguration},
|
||||
@@ -76,9 +78,11 @@ components and centralizes configuration of the various other components.`,
|
||||
config.ClickHouse.Kafka.Configuration = config.Kafka.Configuration
|
||||
for idx := range config.Inlet {
|
||||
config.Inlet[idx].Kafka.Configuration = config.Kafka.Configuration
|
||||
config.Inlet[idx].Schema = config.Schema
|
||||
}
|
||||
for idx := range config.Console {
|
||||
config.Console[idx].ClickHouse = config.ClickHouse.Configuration
|
||||
config.Console[idx].Schema = config.Schema
|
||||
}
|
||||
}
|
||||
if err := OrchestratorOptions.Parse(cmd.OutOrStdout(), "orchestrator", &config); err != nil {
|
||||
@@ -112,7 +116,7 @@ func orchestratorStart(r *reporter.Reporter, config OrchestratorConfiguration, c
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to initialize HTTP component: %w", err)
|
||||
}
|
||||
schemaComponent, err := schema.New()
|
||||
schemaComponent, err := schema.New(config.Schema)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to initialize schema component: %w", err)
|
||||
}
|
||||
|
||||
16
cmd/testdata/configurations/schema-enable-disable/expected.yaml
vendored
Normal file
16
cmd/testdata/configurations/schema-enable-disable/expected.yaml
vendored
Normal file
@@ -0,0 +1,16 @@
|
||||
---
|
||||
paths:
|
||||
inlet.0.schema:
|
||||
disabled:
|
||||
- SrcCountry
|
||||
- DstCountry
|
||||
enabled:
|
||||
- SrcVlan
|
||||
- DstVlan
|
||||
console.0.schema:
|
||||
disabled:
|
||||
- SrcCountry
|
||||
- DstCountry
|
||||
enabled:
|
||||
- SrcVlan
|
||||
- DstVlan
|
||||
8
cmd/testdata/configurations/schema-enable-disable/in.yaml
vendored
Normal file
8
cmd/testdata/configurations/schema-enable-disable/in.yaml
vendored
Normal file
@@ -0,0 +1,8 @@
|
||||
---
|
||||
schema:
|
||||
disabled:
|
||||
- SrcCountry
|
||||
- DstCountry
|
||||
enabled:
|
||||
- SrcVlan
|
||||
- DstVlan
|
||||
43
common/schema/config.go
Normal file
43
common/schema/config.go
Normal file
@@ -0,0 +1,43 @@
|
||||
// SPDX-FileCopyrightText: 2023 Free Mobile
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
package schema
|
||||
|
||||
import "errors"
|
||||
|
||||
// Configuration describes the configuration for the schema component.
|
||||
type Configuration struct {
|
||||
// Disabled lists the columns disabled (in addition to the ones disabled by default).
|
||||
Disabled []ColumnKey
|
||||
// Enabled lists the columns enabled (in addition to the ones enabled by default).
|
||||
Enabled []ColumnKey
|
||||
}
|
||||
|
||||
// DefaultConfiguration returns the default configuration for the schema component.
|
||||
func DefaultConfiguration() Configuration {
|
||||
return Configuration{}
|
||||
}
|
||||
|
||||
// MarshalText turns a column key to text
|
||||
func (ck ColumnKey) MarshalText() ([]byte, error) {
|
||||
got, ok := columnNameMap.LoadValue(ck)
|
||||
if ok {
|
||||
return []byte(got), nil
|
||||
}
|
||||
return nil, errors.New("unknown column name")
|
||||
}
|
||||
|
||||
func (ck ColumnKey) String() string {
|
||||
name, _ := columnNameMap.LoadValue(ck)
|
||||
return name
|
||||
}
|
||||
|
||||
// UnmarshalText provides a column key from text
|
||||
func (ck *ColumnKey) UnmarshalText(input []byte) error {
|
||||
got, ok := columnNameMap.LoadKey(string(input))
|
||||
if ok {
|
||||
*ck = got
|
||||
return nil
|
||||
}
|
||||
return errors.New("unknown provider")
|
||||
}
|
||||
@@ -81,11 +81,6 @@ const (
|
||||
|
||||
// revive:enable
|
||||
|
||||
func (c ColumnKey) String() string {
|
||||
name, _ := columnNameMap.LoadValue(c)
|
||||
return name
|
||||
}
|
||||
|
||||
// Flows is the data schema for flows tables. Any column starting with Src/InIf
|
||||
// will be duplicated as Dst/OutIf during init. That's not the case for columns
|
||||
// in `PrimaryKeys'.
|
||||
@@ -106,12 +101,13 @@ func flows() Schema {
|
||||
columns: []Column{
|
||||
{
|
||||
Key: ColumnTimeReceived,
|
||||
NoDisable: true,
|
||||
ClickHouseType: "DateTime",
|
||||
ClickHouseCodec: "DoubleDelta, LZ4",
|
||||
ConsoleNotDimension: true,
|
||||
ProtobufType: protoreflect.Uint64Kind,
|
||||
},
|
||||
{Key: ColumnSamplingRate, ClickHouseType: "UInt64", ConsoleNotDimension: true},
|
||||
{Key: ColumnSamplingRate, NoDisable: true, ClickHouseType: "UInt64", ConsoleNotDimension: true},
|
||||
{Key: ColumnExporterAddress, ClickHouseType: "LowCardinality(IPv6)"},
|
||||
{Key: ColumnExporterName, ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
|
||||
{Key: ColumnExporterGroup, ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
|
||||
@@ -223,8 +219,20 @@ END`,
|
||||
{Key: ColumnEType, ClickHouseType: "UInt32"}, // TODO: UInt16 but hard to change, primary key
|
||||
{Key: ColumnProto, ClickHouseType: "UInt32"}, // TODO: UInt8 but hard to change, primary key
|
||||
{Key: ColumnSrcPort, ClickHouseType: "UInt16", ClickHouseMainOnly: true},
|
||||
{Key: ColumnBytes, ClickHouseType: "UInt64", ClickHouseNotSortingKey: true, ConsoleNotDimension: true},
|
||||
{Key: ColumnPackets, ClickHouseType: "UInt64", ClickHouseNotSortingKey: true, ConsoleNotDimension: true},
|
||||
{
|
||||
Key: ColumnBytes,
|
||||
NoDisable: true,
|
||||
ClickHouseType: "UInt64",
|
||||
ClickHouseNotSortingKey: true,
|
||||
ConsoleNotDimension: true,
|
||||
},
|
||||
{
|
||||
Key: ColumnPackets,
|
||||
NoDisable: true,
|
||||
ClickHouseType: "UInt64",
|
||||
ClickHouseNotSortingKey: true,
|
||||
ConsoleNotDimension: true,
|
||||
},
|
||||
{
|
||||
Key: ColumnPacketSize,
|
||||
ClickHouseType: "UInt64",
|
||||
@@ -275,7 +283,7 @@ func (schema Schema) finalize() Schema {
|
||||
}
|
||||
}
|
||||
|
||||
// Add non-main columns with an alias to NotSortingKey
|
||||
// Non-main columns with an alias are NotSortingKey
|
||||
if !column.ClickHouseMainOnly && column.ClickHouseAlias != "" {
|
||||
column.ClickHouseNotSortingKey = true
|
||||
}
|
||||
|
||||
@@ -6,14 +6,47 @@
|
||||
// the subsystem that will use it.
|
||||
package schema
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
// Component represents the schema compomenent.
|
||||
type Component struct {
|
||||
c Configuration
|
||||
|
||||
Schema
|
||||
}
|
||||
|
||||
// New creates a new schema component.
|
||||
func New() (*Component, error) {
|
||||
func New(config Configuration) (*Component, error) {
|
||||
schema := flows()
|
||||
for _, k1 := range config.Enabled {
|
||||
for _, k2 := range config.Disabled {
|
||||
if k1 == k2 {
|
||||
return nil, fmt.Errorf("column %q contained in both EnabledColumns and DisabledColumns", k1)
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, k := range config.Enabled {
|
||||
if column, ok := schema.LookupColumnByKey(k); ok {
|
||||
column.Disabled = false
|
||||
}
|
||||
}
|
||||
for _, k := range config.Disabled {
|
||||
if column, ok := schema.LookupColumnByKey(k); ok {
|
||||
if column.NoDisable {
|
||||
return nil, fmt.Errorf("column %q cannot be disabled", k)
|
||||
}
|
||||
if slices.Contains(schema.clickHousePrimaryKeys, k) {
|
||||
return nil, fmt.Errorf("column %q cannot be disabled (primary key)", k)
|
||||
}
|
||||
column.Disabled = true
|
||||
}
|
||||
}
|
||||
return &Component{
|
||||
Schema: flows(),
|
||||
c: config,
|
||||
Schema: schema,
|
||||
}, nil
|
||||
}
|
||||
|
||||
33
common/schema/root_test.go
Normal file
33
common/schema/root_test.go
Normal file
@@ -0,0 +1,33 @@
|
||||
// SPDX-FileCopyrightText: 2023 Free Mobile
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
package schema_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"akvorado/common/schema"
|
||||
)
|
||||
|
||||
func TestEnableDisableColumns(t *testing.T) {
|
||||
config := schema.DefaultConfiguration()
|
||||
config.Enabled = []schema.ColumnKey{schema.ColumnDstVlan, schema.ColumnSrcVlan}
|
||||
config.Disabled = []schema.ColumnKey{schema.ColumnSrcCountry, schema.ColumnDstCountry}
|
||||
c, err := schema.New(config)
|
||||
if err != nil {
|
||||
t.Fatalf("New() error:\n%+v", err)
|
||||
}
|
||||
|
||||
if column, ok := c.LookupColumnByKey(schema.ColumnDstVlan); !ok {
|
||||
t.Fatal("DstVlan not found")
|
||||
} else if column.Disabled {
|
||||
t.Fatal("DstVlan is still disabled")
|
||||
}
|
||||
|
||||
if column, ok := c.LookupColumnByKey(schema.ColumnDstCountry); !ok {
|
||||
t.Fatal("DstCountry not found")
|
||||
} else if !column.Disabled {
|
||||
t.Fatal("DstCountry is not disabled")
|
||||
}
|
||||
|
||||
}
|
||||
@@ -34,7 +34,7 @@ func DisableDebug(t testing.TB) {
|
||||
// NewMock create a new schema component.
|
||||
func NewMock(t testing.TB) *Component {
|
||||
t.Helper()
|
||||
c, err := New()
|
||||
c, err := New(DefaultConfiguration())
|
||||
if err != nil {
|
||||
t.Fatalf("New() error:\n%+v", err)
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ type Column struct {
|
||||
Key ColumnKey
|
||||
Name string
|
||||
Disabled bool
|
||||
NoDisable bool
|
||||
|
||||
// For ClickHouse. `NotSortingKey' is for columns generated from other
|
||||
// columns. It is only useful if not ClickHouseMainOnly and not Alias. `GenerateFrom'
|
||||
|
||||
@@ -344,6 +344,22 @@ default, no persistent cache is configured.
|
||||
`security-parameters` configuration option. Otherwise, it will use
|
||||
SNMPv2.
|
||||
|
||||
### Schema
|
||||
|
||||
It is possible to alter the data schema used by *Akvorado* by adding and
|
||||
removing columns. For example, to add the `SrcVlan` and `DstVlan` columns while
|
||||
removing the `SrcCountry` and `DstCountry`, one can use:
|
||||
|
||||
```yaml
|
||||
schema:
|
||||
disabled:
|
||||
- SrcCountry
|
||||
- DstCountry
|
||||
enabled:
|
||||
- SrcVlan
|
||||
- DstVlan
|
||||
```
|
||||
|
||||
### HTTP
|
||||
|
||||
The builtin HTTP server serves various pages. Its configuration
|
||||
@@ -383,10 +399,11 @@ nothing to configure either.
|
||||
|
||||
## Orchestrator service
|
||||
|
||||
The two main components of the orchestrator service are `clickhouse`
|
||||
and `kafka`. It also uses the [HTTP](#http) and
|
||||
[reporting](#reporting) component from the inlet service and accepts
|
||||
the same configuration settings.
|
||||
The two main components of the orchestrator service are `clickhouse` and
|
||||
`kafka`. It also uses the [HTTP](#http), [reporting](#reporting), and
|
||||
[schema](#schema) components from the inlet service and accepts the same
|
||||
configuration settings. For the schema, disabling a column won't delete existing
|
||||
data.
|
||||
|
||||
### Kafka
|
||||
|
||||
@@ -518,8 +535,9 @@ resolutions:
|
||||
## Console service
|
||||
|
||||
The main components of the console service are `http`, `console`,
|
||||
`authentication` and `database`. `http` accepts the [same
|
||||
configuration](#http) as for the inlet service.
|
||||
`authentication` and `database`. `http` accepts the [same configuration](#http)
|
||||
as for the inlet service, while `schema` also accepts the [same
|
||||
configuration](#schema) as for the inlet service.
|
||||
|
||||
The console itself accepts the following keys:
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ installations may end up eating space because of that. The default TTL is 30
|
||||
days. You can disable that by setting `orchestrator.clickhouse.system-logs-ttl`
|
||||
to 0.
|
||||
|
||||
- ✨ *inlet*: add `schema.enable` and `schema.disable` to add or remove collected data
|
||||
- 🩹 *inlet*: handle correctly interfaces with high indexes for sFlow
|
||||
- 🩹 *docker*: fix Kafka healthcheck
|
||||
- 🌱 *inlet*: improve decoding/encoding performance (twice faster!)
|
||||
|
||||
Reference in New Issue
Block a user