common/schema: turns into a component

This is a first step to make it accept configuration. Most of the
changes are quite trivial, but I also ran into some difficulties with
query columns and filters. They need the schema for parsing, but parsing
happens before dependencies are instantiated (and even if it was not the
case, parsing is stateless). Therefore, I have added a `Validate()`
method that must be called after instantiation. Various bits `panic()`
if not validated to ensure we catch all cases.

The alternative to make the component manages a global state would have
been simpler but it would break once we add the ability to add or
disable columns.
This commit is contained in:
Vincent Bernat
2023-01-18 12:14:23 +01:00
parent 3c55f90fdf
commit c6a9319b57
58 changed files with 1049 additions and 655 deletions

View File

@@ -12,6 +12,7 @@ import (
"akvorado/common/daemon"
"akvorado/common/http"
"akvorado/common/reporter"
"akvorado/common/schema"
"akvorado/console"
"akvorado/console/authentication"
"akvorado/console/database"
@@ -103,12 +104,17 @@ func consoleStart(r *reporter.Reporter, config ConsoleConfiguration, checkOnly b
if err != nil {
return fmt.Errorf("unable to initialize database component: %w", err)
}
schemaComponent, err := schema.New()
if err != nil {
return fmt.Errorf("unable to initialize schema component: %w", err)
}
consoleComponent, err := console.New(r, config.Console, console.Dependencies{
Daemon: daemonComponent,
HTTP: httpComponent,
ClickHouseDB: clickhouseComponent,
Auth: authenticationComponent,
Database: databaseComponent,
Schema: schemaComponent,
})
if err != nil {
return fmt.Errorf("unable to initialize console component: %w", err)

View File

@@ -11,6 +11,7 @@ import (
"akvorado/common/daemon"
"akvorado/common/http"
"akvorado/common/reporter"
"akvorado/common/schema"
"akvorado/inlet/bmp"
"akvorado/inlet/core"
"akvorado/inlet/flow"
@@ -95,9 +96,14 @@ func inletStart(r *reporter.Reporter, config InletConfiguration, checkOnly bool)
if err != nil {
return fmt.Errorf("unable to initialize http component: %w", err)
}
schemaComponent, err := schema.New()
if err != nil {
return fmt.Errorf("unable to initialize schema component: %w", err)
}
flowComponent, err := flow.New(r, config.Flow, flow.Dependencies{
Daemon: daemonComponent,
HTTP: httpComponent,
Schema: schemaComponent,
})
if err != nil {
return fmt.Errorf("unable to initialize flow component: %w", err)
@@ -122,6 +128,7 @@ func inletStart(r *reporter.Reporter, config InletConfiguration, checkOnly bool)
}
kafkaComponent, err := kafka.New(r, config.Kafka, kafka.Dependencies{
Daemon: daemonComponent,
Schema: schemaComponent,
})
if err != nil {
return fmt.Errorf("unable to initialize Kafka component: %w", err)
@@ -134,6 +141,7 @@ func inletStart(r *reporter.Reporter, config InletConfiguration, checkOnly bool)
GeoIP: geoipComponent,
Kafka: kafkaComponent,
HTTP: httpComponent,
Schema: schemaComponent,
})
if err != nil {
return fmt.Errorf("unable to initialize core component: %w", err)

View File

@@ -12,6 +12,7 @@ import (
"akvorado/common/daemon"
"akvorado/common/http"
"akvorado/common/reporter"
"akvorado/common/schema"
"akvorado/orchestrator"
"akvorado/orchestrator/clickhouse"
"akvorado/orchestrator/kafka"
@@ -111,7 +112,11 @@ func orchestratorStart(r *reporter.Reporter, config OrchestratorConfiguration, c
if err != nil {
return fmt.Errorf("unable to initialize HTTP component: %w", err)
}
kafkaComponent, err := kafka.New(r, config.Kafka)
schemaComponent, err := schema.New()
if err != nil {
return fmt.Errorf("unable to initialize schema component: %w", err)
}
kafkaComponent, err := kafka.New(r, config.Kafka, kafka.Dependencies{Schema: schemaComponent})
if err != nil {
return fmt.Errorf("unable to initialize kafka component: %w", err)
}
@@ -125,6 +130,7 @@ func orchestratorStart(r *reporter.Reporter, config OrchestratorConfiguration, c
Daemon: daemonComponent,
HTTP: httpComponent,
ClickHouse: clickhouseDBComponent,
Schema: schemaComponent,
})
if err != nil {
return fmt.Errorf("unable to initialize clickhouse component: %w", err)

View File

@@ -23,12 +23,24 @@ var prettyC = pretty.Config{
}
func defaultPrettyFormatters() map[reflect.Type]interface{} {
return map[reflect.Type]interface{}{
result := map[reflect.Type]interface{}{
reflect.TypeOf(net.IP{}): fmt.Sprint,
reflect.TypeOf(netip.Addr{}): fmt.Sprint,
reflect.TypeOf(time.Time{}): fmt.Sprint,
reflect.TypeOf(SubnetMap[string]{}): fmt.Sprint,
}
for t, fn := range nonDefaultPrettyFormatters {
result[t] = fn
}
return result
}
var nonDefaultPrettyFormatters = map[reflect.Type]interface{}{}
// AddPrettyFormatter add a global pretty formatter. We cannot put everything in
// the default map due to cycles.
func AddPrettyFormatter(t reflect.Type, fn interface{}) {
nonDefaultPrettyFormatters[t] = fn
}
// DiffOption changes the behavior of the Diff function.

View File

@@ -151,7 +151,8 @@ func (c ColumnKey) String() string {
// Flows is the data schema for flows tables. Any column starting with Src/InIf
// will be duplicated as Dst/OutIf during init. That's not the case for columns
// in `PrimaryKeys'.
var Flows = Schema{
func flows() Schema {
return Schema{
clickHousePrimaryKeys: []ColumnKey{
ColumnTimeReceived,
ColumnExporterAddress,
@@ -310,6 +311,7 @@ END`,
{Key: ColumnForwardingStatus, ClickHouseType: "UInt32"}, // TODO: UInt8 but hard to change, primary key
},
}.finalize()
}
func (schema Schema) finalize() Schema {
ncolumns := []Column{}

View File

@@ -8,8 +8,9 @@ import (
)
func TestFlowsClickHouse(t *testing.T) {
for _, key := range Flows.clickHousePrimaryKeys {
if column := Flows.columnIndex[key]; column.Key == 0 {
c := NewMock(t)
for _, key := range c.clickHousePrimaryKeys {
if column := c.columnIndex[key]; column.Key == 0 {
t.Errorf("primary key %q not a column", key)
} else {
if column.ClickHouseNotSortingKey {
@@ -20,7 +21,8 @@ func TestFlowsClickHouse(t *testing.T) {
}
func TestFlowsProtobuf(t *testing.T) {
for _, column := range Flows.Columns() {
c := NewMock(t)
for _, column := range c.Columns() {
if column.ProtobufIndex >= 0 {
if column.ProtobufType == 0 {
t.Errorf("column %s has not protobuf type", column.Name)
@@ -30,8 +32,9 @@ func TestFlowsProtobuf(t *testing.T) {
}
func TestColumnIndex(t *testing.T) {
c := NewMock(t)
for i := ColumnTimeReceived; i < ColumnLast; i++ {
if _, ok := Flows.LookupColumnByKey(i); !ok {
if _, ok := c.LookupColumnByKey(i); !ok {
t.Errorf("column %s cannot be looked up by key", i)
}
}

View File

@@ -3,9 +3,12 @@
package schema
import "testing"
import (
"testing"
)
func TestLookupColumnByName(t *testing.T) {
c := NewMock(t)
cases := []string{
"TimeReceived",
"InIfProvider",
@@ -14,7 +17,7 @@ func TestLookupColumnByName(t *testing.T) {
"ForwardingStatus",
}
for _, name := range cases {
column, ok := Flows.LookupColumnByName(name)
column, ok := c.LookupColumnByName(name)
if !ok {
t.Fatalf("LookupByName(%q) not found", name)
}
@@ -25,6 +28,7 @@ func TestLookupColumnByName(t *testing.T) {
}
func TestReverseColumnDirection(t *testing.T) {
c := NewMock(t)
cases := []struct {
Input ColumnKey
Output ColumnKey
@@ -37,7 +41,7 @@ func TestReverseColumnDirection(t *testing.T) {
{ColumnExporterName, ColumnExporterName},
}
for _, tc := range cases {
got := Flows.ReverseColumnDirection(tc.Input)
got := c.ReverseColumnDirection(tc.Input)
if got != tc.Output {
t.Errorf("ReverseColumnDirection(%q) == %q but expected %q",
tc.Input.String(), got.String(), tc.Output.String())

View File

@@ -4,9 +4,7 @@
package schema
import (
"fmt"
"net/netip"
"reflect"
"strings"
"testing"
@@ -113,18 +111,19 @@ message FlowMessagevLH2TTFF7P352DSYYCJYWFCXHAM {
}
func TestProtobufMarshal(t *testing.T) {
c := NewMock(t)
exporterAddress := netip.MustParseAddr("::ffff:203.0.113.14")
bf := &FlowMessage{}
bf.TimeReceived = 1000
bf.SamplingRate = 20000
bf.ExporterAddress = exporterAddress
Flows.ProtobufAppendVarint(bf, ColumnDstAS, 65000)
Flows.ProtobufAppendVarint(bf, ColumnBytes, 200)
Flows.ProtobufAppendVarint(bf, ColumnPackets, 300)
Flows.ProtobufAppendVarint(bf, ColumnBytes, 300) // duplicate!
Flows.ProtobufAppendBytes(bf, ColumnDstCountry, []byte("FR"))
c.ProtobufAppendVarint(bf, ColumnDstAS, 65000)
c.ProtobufAppendVarint(bf, ColumnBytes, 200)
c.ProtobufAppendVarint(bf, ColumnPackets, 300)
c.ProtobufAppendVarint(bf, ColumnBytes, 300) // duplicate!
c.ProtobufAppendBytes(bf, ColumnDstCountry, []byte("FR"))
got := Flows.ProtobufMarshal(bf)
got := c.ProtobufMarshal(bf)
size, n := protowire.ConsumeVarint(got)
if uint64(len(got)-n) != size {
@@ -154,7 +153,7 @@ func TestProtobufMarshal(t *testing.T) {
})
t.Run("compare as protobuf message", func(t *testing.T) {
got := Flows.ProtobufDecode(t, got)
got := c.ProtobufDecode(t, got)
expected := FlowMessage{
TimeReceived: 1000,
SamplingRate: 20000,
@@ -166,13 +165,14 @@ func TestProtobufMarshal(t *testing.T) {
ColumnDstCountry: "FR",
},
}
if diff := helpers.Diff(got, expected, helpers.DiffFormatter(reflect.TypeOf(ColumnBytes), fmt.Sprint)); diff != "" {
if diff := helpers.Diff(got, expected); diff != "" {
t.Fatalf("ProtobufDecode() (-got, +want):\n%s", diff)
}
})
}
func BenchmarkProtobufMarshal(b *testing.B) {
c := NewMock(b)
exporterAddress := netip.MustParseAddr("::ffff:203.0.113.14")
DisableDebug(b)
for i := 0; i < b.N; i++ {
@@ -181,11 +181,11 @@ func BenchmarkProtobufMarshal(b *testing.B) {
SamplingRate: 20000,
ExporterAddress: exporterAddress,
}
Flows.ProtobufAppendVarint(bf, ColumnDstAS, 65000)
Flows.ProtobufAppendVarint(bf, ColumnBytes, 200)
Flows.ProtobufAppendVarint(bf, ColumnPackets, 300)
Flows.ProtobufAppendVarint(bf, ColumnBytes, 300) // duplicate!
Flows.ProtobufAppendBytes(bf, ColumnDstCountry, []byte("FR"))
Flows.ProtobufMarshal(bf)
c.ProtobufAppendVarint(bf, ColumnDstAS, 65000)
c.ProtobufAppendVarint(bf, ColumnBytes, 200)
c.ProtobufAppendVarint(bf, ColumnPackets, 300)
c.ProtobufAppendVarint(bf, ColumnBytes, 300) // duplicate!
c.ProtobufAppendBytes(bf, ColumnDstCountry, []byte("FR"))
c.ProtobufMarshal(bf)
}
}

19
common/schema/root.go Normal file
View File

@@ -0,0 +1,19 @@
// SPDX-FileCopyrightText: 2023 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
// Package schema is an abstraction of the data schema for flows used by
// Akvorado. It is a leaky abstraction as there are multiple parts dependant of
// the subsystem that will use it.
package schema
// Component represents the schema compomenent.
type Component struct {
Schema
}
// New creates a new schema component.
func New() (*Component, error) {
return &Component{
Schema: flows(),
}, nil
}

View File

@@ -6,11 +6,14 @@
package schema
import (
"fmt"
"net/netip"
"reflect"
"strings"
"testing"
"akvorado/common/helpers"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/protoparse"
"github.com/jhump/protoreflect/dynamic"
@@ -19,7 +22,8 @@ import (
var debug = true
// DisableDebug disables debug during the provided test.
// DisableDebug disables debug during the provided test. We keep this as a
// global function for performance reason (when release, debug is a const).
func DisableDebug(t testing.TB) {
debug = false
t.Cleanup(func() {
@@ -27,6 +31,16 @@ func DisableDebug(t testing.TB) {
})
}
// NewMock create a new schema component.
func NewMock(t testing.TB) *Component {
t.Helper()
c, err := New()
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
return c
}
// ProtobufDecode decodes the provided protobuf message.
func (schema *Schema) ProtobufDecode(t *testing.T, input []byte) *FlowMessage {
parser := protoparse.Parser{
@@ -101,3 +115,7 @@ func (schema *Schema) ProtobufDecode(t *testing.T, input []byte) *FlowMessage {
return &flow
}
func init() {
helpers.AddPrettyFormatter(reflect.TypeOf(ColumnBytes), fmt.Sprint)
}

View File

@@ -1,9 +1,6 @@
// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
// Package schema is an abstraction of the data schema for flows used by
// Akvorado. It is a leaky abstraction as there are multiple parts dependant of
// the subsystem that will use it.
package schema
import (

View File

@@ -12,6 +12,8 @@ import (
"strings"
"text/template"
"time"
"akvorado/console/query"
)
// flowsTable describe a consolidated or unconsolidated flows table.
@@ -122,11 +124,11 @@ func templateEscape(input string) string {
}
// templateWhere transforms a filter to a WHERE clause
func templateWhere(qf queryFilter) string {
if qf.Filter == "" {
func templateWhere(qf query.Filter) string {
if qf.Direct() == "" {
return `{{ .Timefilter }}`
}
return fmt.Sprintf(`{{ .Timefilter }} AND (%s)`, templateEscape(qf.Filter))
return fmt.Sprintf(`{{ .Timefilter }} AND (%s)`, templateEscape(qf.Direct()))
}
// templateTable builds a template directive to select the right table

View File

@@ -7,7 +7,7 @@ import (
"net/http"
"time"
"akvorado/common/schema"
"akvorado/console/query"
"github.com/gin-gonic/gin"
)
@@ -39,7 +39,7 @@ type VisualizeOptionsConfiguration struct {
// Filter is the the filter string
Filter string `json:"filter"`
// Dimensions is the array of dimensions to use
Dimensions []queryColumn `json:"dimensions"`
Dimensions []query.Column `json:"dimensions"`
// Limit is the default limit to use
Limit int `json:"limit" validate:"min=5"`
}
@@ -52,7 +52,7 @@ func DefaultConfiguration() Configuration {
Start: "6 hours ago",
End: "now",
Filter: "InIfBoundary = external",
Dimensions: []queryColumn{queryColumn(schema.ColumnSrcAS)},
Dimensions: []query.Column{query.NewColumn("SrcAS")},
Limit: 10,
},
HomepageTopWidgets: []string{"src-as", "src-port", "protocol", "src-country", "etype"},
@@ -63,7 +63,7 @@ func DefaultConfiguration() Configuration {
func (c *Component) configHandlerFunc(gc *gin.Context) {
dimensions := []string{}
for _, column := range schema.Flows.Columns() {
for _, column := range c.d.Schema.Columns() {
if column.ConsoleNotDimension {
continue
}

View File

@@ -42,7 +42,7 @@ func (c *Component) filterValidateHandlerFunc(gc *gin.Context) {
})
return
}
got, err := filter.Parse("", []byte(input.Filter), filter.GlobalStore("meta", &filter.Meta{}))
got, err := filter.Parse("", []byte(input.Filter), filter.GlobalStore("meta", &filter.Meta{Schema: c.d.Schema}))
if err == nil {
gc.JSON(http.StatusOK, filterValidateHandlerOutput{
Message: "ok",
@@ -85,7 +85,7 @@ func (c *Component) filterCompleteHandlerFunc(gc *gin.Context) {
switch input.What {
case "column":
_, err := filter.Parse("", []byte{},
filter.Entrypoint("ConditionExpr"), filter.GlobalStore("meta", &filter.Meta{}))
filter.Entrypoint("ConditionExpr"), filter.GlobalStore("meta", &filter.Meta{Schema: c.d.Schema}))
if err != nil {
for _, candidate := range filter.Expected(err) {
if !strings.HasSuffix(candidate, `"i`) {
@@ -102,7 +102,7 @@ func (c *Component) filterCompleteHandlerFunc(gc *gin.Context) {
_, err := filter.Parse("",
[]byte(fmt.Sprintf("%s ", input.Column)),
filter.Entrypoint("ConditionExpr"),
filter.GlobalStore("meta", &filter.Meta{}))
filter.GlobalStore("meta", &filter.Meta{Schema: c.d.Schema}))
if err != nil {
for _, candidate := range filter.Expected(err) {
if !strings.HasPrefix(candidate, `"`) {
@@ -213,7 +213,7 @@ LIMIT 20`
Label string `ch:"label"`
Detail string `ch:"detail"`
}{}
columnName := fixQueryColumnName(input.Column)
columnName := c.fixQueryColumnName(input.Column)
if columnName == "DstASPath" {
columnName = "DstAS"
}
@@ -270,7 +270,7 @@ LIMIT 20`, attributeName, attributeName, attributeName), input.Prefix); err != n
}
input.Prefix = ""
case "exportername", "exportergroup", "exporterrole", "exportersite", "exporterregion", "exportertenant":
column = fixQueryColumnName(inputColumn)
column = c.fixQueryColumnName(inputColumn)
detail = fmt.Sprintf("exporter %s", inputColumn[8:])
case "inifname", "outifname":
column = "IfName"

View File

@@ -7,13 +7,14 @@ import (
"testing"
"akvorado/common/helpers"
"akvorado/common/schema"
)
func TestFilterHumanError(t *testing.T) {
_, err := Parse("", []byte(`
InIfDescription = "Gi0/0/0/0"
AND Proto = 1000
OR `), GlobalStore("meta", &Meta{}))
OR `), GlobalStore("meta", &Meta{Schema: schema.NewMock(t)}))
expected := "at line 3, position 13: expecting an unsigned 8-bit integer"
if diff := helpers.Diff(HumanError(err), expected); diff != "" {
t.Errorf("HumanError() (-got, +want):\n%s", diff)
@@ -24,7 +25,7 @@ func TestAllErrors(t *testing.T) {
_, err := Parse("", []byte(`
InIfDescription = "Gi0/0/0/0"
AND Proto = 1000
OR`), GlobalStore("meta", &Meta{}))
OR`), GlobalStore("meta", &Meta{Schema: schema.NewMock(t)}))
// Currently, the parser stops at the first error.
expected := Errors{
oneError{
@@ -40,7 +41,8 @@ OR`), GlobalStore("meta", &Meta{}))
}
func TestExpected(t *testing.T) {
_, err := Parse("", []byte{}, Entrypoint("ConditionBoundaryExpr"), GlobalStore("meta", &Meta{}))
_, err := Parse("", []byte{}, Entrypoint("ConditionBoundaryExpr"),
GlobalStore("meta", &Meta{Schema: schema.NewMock(t)}))
expected := []string{`"InIfBoundary"i`, `"OutIfBoundary"i`}
if diff := helpers.Diff(Expected(err), expected); diff != "" {
t.Errorf("AllErrors() (-got, +want):\n%s", diff)

View File

@@ -14,14 +14,16 @@ import (
// Meta is used to inject/retrieve state from the parser.
type Meta struct {
// Schema is the data schema (used as input)
Schema *schema.Component
// ReverseDirection tells if we require the reverse direction for the provided filter (used as input)
ReverseDirection bool
// MainTableRequired tells if the main table is required to execute the expression (used as output)
MainTableRequired bool
}
// ReverseColumnDirection reverts the direction of a provided column name.
func ReverseColumnDirection(name string) string {
// reverseColumnDirection reverts the direction of a provided column name.
func reverseColumnDirection(schema *schema.Component, name string) string {
var candidate string
if strings.HasPrefix(name, "Src") {
candidate = "Dst" + name[3:]
@@ -35,7 +37,7 @@ func ReverseColumnDirection(name string) string {
if strings.HasPrefix(name, "Out") {
candidate = "In" + name[3:]
}
if column, ok := schema.Flows.LookupColumnByName(candidate); ok {
if column, ok := schema.LookupColumnByName(candidate); ok {
return column.Name
}
return name
@@ -45,10 +47,11 @@ func ReverseColumnDirection(name string) string {
// in predicate code blocks.
func (c *current) acceptColumn() (string, error) {
name := string(c.text)
for _, column := range schema.Flows.Columns() {
schema := c.globalStore["meta"].(*Meta).Schema
for _, column := range schema.Columns() {
if strings.EqualFold(name, column.Name) {
if c.globalStore["meta"].(*Meta).ReverseDirection {
return ReverseColumnDirection(column.Name), nil
return reverseColumnDirection(schema, column.Name), nil
}
return column.Name, nil
}
@@ -60,7 +63,7 @@ func (c *current) acceptColumn() (string, error) {
// in state change blocks. Unfortunately, it cannot extract matched text, so it
// should be provided.
func (c *current) metaColumn(name string) error {
if column, ok := schema.Flows.LookupColumnByName(name); ok {
if column, ok := c.globalStore["meta"].(*Meta).Schema.LookupColumnByName(name); ok {
if column.MainOnly {
c.state["main-table-only"] = true
}

View File

@@ -7,6 +7,7 @@ import (
"testing"
"akvorado/common/helpers"
"akvorado/common/schema"
)
func TestValidFilter(t *testing.T) {
@@ -216,6 +217,8 @@ output provider */ = 'telia'`,
{Input: `DstCommunities != 65000:100:200`, Output: `NOT has(DstLargeCommunities, bitShiftLeft(65000::UInt128, 64) + bitShiftLeft(100::UInt128, 32) + 200::UInt128)`, MetaOut: Meta{MainTableRequired: true}},
}
for _, tc := range cases {
tc.MetaIn.Schema = schema.NewMock(t)
tc.MetaOut.Schema = tc.MetaIn.Schema
got, err := Parse("", []byte(tc.Input), GlobalStore("meta", &tc.MetaIn))
if err != nil {
t.Errorf("Parse(%q) error:\n%+v", tc.Input, err)
@@ -260,7 +263,7 @@ func TestInvalidFilter(t *testing.T) {
{`SrcAS IN (AS12322,`},
}
for _, tc := range cases {
out, err := Parse("", []byte(tc.Input), GlobalStore("meta", &Meta{}))
out, err := Parse("", []byte(tc.Input), GlobalStore("meta", &Meta{Schema: schema.NewMock(t)}))
t.Logf("out: %v", out)
if err == nil {
t.Errorf("Parse(%q) didn't throw an error", tc.Input)

View File

@@ -11,19 +11,22 @@ import (
"time"
"github.com/gin-gonic/gin"
"golang.org/x/exp/slices"
"akvorado/common/helpers"
"akvorado/common/schema"
"akvorado/console/query"
)
// graphHandlerInput describes the input for the /graph endpoint.
type graphHandlerInput struct {
schema *schema.Component
Start time.Time `json:"start" binding:"required"`
End time.Time `json:"end" binding:"required,gtfield=Start"`
Points uint `json:"points" binding:"required,min=5,max=2000"` // minimum number of points
Dimensions []queryColumn `json:"dimensions"` // group by ...
Dimensions []query.Column `json:"dimensions"` // group by ...
Limit int `json:"limit" binding:"min=1"` // limit product of dimensions
Filter queryFilter `json:"filter"` // where ...
Filter query.Filter `json:"filter"` // where ...
Units string `json:"units" binding:"required,oneof=pps l2bps l3bps"`
Bidirectional bool `json:"bidirectional"`
PreviousPeriod bool `json:"previous-period"`
@@ -45,14 +48,12 @@ type graphHandlerOutput struct {
NinetyFivePercentile []int `json:"95th"` // row → 95th xps
}
// reverseDirection reverts the direction of a provided input
// reverseDirection reverts the direction of a provided input. It does not
// modify the original.
func (input graphHandlerInput) reverseDirection() graphHandlerInput {
input.Filter.Filter, input.Filter.ReverseFilter = input.Filter.ReverseFilter, input.Filter.Filter
dimensions := input.Dimensions
input.Dimensions = make([]queryColumn, len(dimensions))
for i := range dimensions {
input.Dimensions[i] = queryColumn(schema.Flows.ReverseColumnDirection(schema.ColumnKey(dimensions[i])))
}
input.Filter.Swap()
input.Dimensions = slices.Clone(input.Dimensions)
query.Columns(input.Dimensions).Reverse(input.schema)
return input
}
@@ -82,7 +83,7 @@ func nearestPeriod(period time.Duration) (time.Duration, string) {
// for less than 2-months, this is the month, otherwise, this is the
// year. Also, dimensions are stripped.
func (input graphHandlerInput) previousPeriod() graphHandlerInput {
input.Dimensions = []queryColumn{}
input.Dimensions = []query.Column{}
diff := input.End.Sub(input.Start)
period, _ := nearestPeriod(diff)
if period == 0 {
@@ -123,7 +124,7 @@ func (input graphHandlerInput) toSQL1(axis int, options toSQL1Options) string {
dimensionsInterpolate := ""
others := []string{}
for _, column := range input.Dimensions {
field := column.toSQLSelect()
field := column.ToSQLSelect()
selectFields = append(selectFields, field)
dimensions = append(dimensions, column.String())
others = append(others, "'Other'")
@@ -172,7 +173,7 @@ ORDER BY time WITH FILL
Start: input.Start,
End: input.End,
StartForInterval: startForInterval,
MainTableRequired: requireMainTable(input.Dimensions, input.Filter),
MainTableRequired: requireMainTable(input.schema, input.Dimensions, input.Filter),
Points: input.Points,
Units: input.Units,
}),
@@ -207,11 +208,19 @@ func (input graphHandlerInput) toSQL() string {
func (c *Component) graphHandlerFunc(gc *gin.Context) {
ctx := c.t.Context(gc.Request.Context())
var input graphHandlerInput
input := graphHandlerInput{schema: c.d.Schema}
if err := gc.ShouldBindJSON(&input); err != nil {
gc.JSON(http.StatusBadRequest, gin.H{"message": helpers.Capitalize(err.Error())})
return
}
if err := query.Columns(input.Dimensions).Validate(input.schema); err != nil {
gc.JSON(http.StatusBadRequest, gin.H{"message": helpers.Capitalize(err.Error())})
return
}
if err := input.Filter.Validate(input.schema); err != nil {
gc.JSON(http.StatusBadRequest, gin.H{"message": helpers.Capitalize(err.Error())})
return
}
if input.Limit > c.config.DimensionsLimit {
gc.JSON(http.StatusBadRequest,
gin.H{"message": fmt.Sprintf("Limit is set beyond maximum value (%d)",

View File

@@ -14,21 +14,20 @@ import (
"akvorado/common/helpers"
"akvorado/common/schema"
"akvorado/console/query"
)
func TestGraphInputReverseDirection(t *testing.T) {
input := graphHandlerInput{
schema: schema.NewMock(t),
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Dimensions: []queryColumn{
queryColumn(schema.ColumnExporterName),
queryColumn(schema.ColumnInIfProvider),
},
Filter: queryFilter{
Filter: "DstCountry = 'FR' AND SrcCountry = 'US'",
ReverseFilter: "SrcCountry = 'FR' AND DstCountry = 'US'",
Dimensions: query.Columns{
query.NewColumn("ExporterName"),
query.NewColumn("InIfProvider"),
},
Filter: query.NewFilter("DstCountry = 'FR' AND SrcCountry = 'US'"),
Units: "l3bps",
}
original1 := fmt.Sprintf("%+v", input)
@@ -36,16 +35,15 @@ func TestGraphInputReverseDirection(t *testing.T) {
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Dimensions: []queryColumn{
queryColumn(schema.ColumnExporterName),
queryColumn(schema.ColumnOutIfProvider),
},
Filter: queryFilter{
Filter: "SrcCountry = 'FR' AND DstCountry = 'US'",
ReverseFilter: "DstCountry = 'FR' AND SrcCountry = 'US'",
Dimensions: query.Columns{
query.NewColumn("ExporterName"),
query.NewColumn("OutIfProvider"),
},
Filter: query.NewFilter("SrcCountry = 'FR' AND DstCountry = 'US'"),
Units: "l3bps",
}
query.Columns(input.Dimensions).Validate(input.schema)
query.Columns(expected.Dimensions).Validate(input.schema)
got := input.reverseDirection()
original2 := fmt.Sprintf("%+v", input)
if diff := helpers.Diff(got, expected); diff != "" {
@@ -118,18 +116,20 @@ func TestGraphPreviousPeriod(t *testing.T) {
t.Fatalf("time.Parse(%q) error:\n%+v", tc.ExpectedEnd, err)
}
input := graphHandlerInput{
schema: schema.NewMock(t),
Start: start,
End: end,
Dimensions: []queryColumn{
queryColumn(schema.ColumnExporterAddress),
queryColumn(schema.ColumnExporterName),
Dimensions: query.Columns{
query.NewColumn("ExporterAddress"),
query.NewColumn("ExporterName"),
},
}
query.Columns(input.Dimensions).Validate(input.schema)
got := input.previousPeriod()
expected := graphHandlerInput{
Start: expectedStart,
End: expectedEnd,
Dimensions: []queryColumn{},
Dimensions: []query.Column{},
}
if diff := helpers.Diff(got, expected); diff != "" {
t.Fatalf("previousPeriod() (-got, +want):\n%s", diff)
@@ -150,8 +150,8 @@ func TestGraphQuerySQL(t *testing.T) {
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Dimensions: []queryColumn{},
Filter: queryFilter{},
Dimensions: []query.Column{},
Filter: query.Filter{},
Units: "l3bps",
},
Expected: `
@@ -176,8 +176,8 @@ ORDER BY time WITH FILL
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Dimensions: []queryColumn{},
Filter: queryFilter{},
Dimensions: []query.Column{},
Filter: query.Filter{},
Units: "l2bps",
},
Expected: `
@@ -203,8 +203,8 @@ ORDER BY time WITH FILL
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Dimensions: []queryColumn{},
Filter: queryFilter{},
Dimensions: []query.Column{},
Filter: query.Filter{},
Units: "pps",
},
Expected: `
@@ -229,8 +229,8 @@ ORDER BY time WITH FILL
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Dimensions: []queryColumn{},
Filter: queryFilter{Filter: "DstCountry = 'FR' AND SrcCountry = 'US'"},
Dimensions: []query.Column{},
Filter: query.NewFilter("DstCountry = 'FR' AND SrcCountry = 'US'"),
Units: "l3bps",
},
Expected: `
@@ -255,8 +255,8 @@ ORDER BY time WITH FILL
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Dimensions: []queryColumn{},
Filter: queryFilter{Filter: "InIfDescription = '{{ hello }}' AND SrcCountry = 'US'"},
Dimensions: []query.Column{},
Filter: query.NewFilter("InIfDescription = '{{ hello }}' AND SrcCountry = 'US'"),
Units: "l3bps",
},
Expected: `
@@ -281,11 +281,8 @@ ORDER BY time WITH FILL
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Dimensions: []queryColumn{},
Filter: queryFilter{
Filter: "DstCountry = 'FR' AND SrcCountry = 'US'",
ReverseFilter: "SrcCountry = 'FR' AND DstCountry = 'US'",
},
Dimensions: []query.Column{},
Filter: query.NewFilter("DstCountry = 'FR' AND SrcCountry = 'US'"),
Units: "l3bps",
Bidirectional: true,
},
@@ -328,11 +325,11 @@ ORDER BY time WITH FILL
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Limit: 20,
Dimensions: []queryColumn{
queryColumn(schema.ColumnExporterName),
queryColumn(schema.ColumnInIfProvider),
Dimensions: []query.Column{
query.NewColumn("ExporterName"),
query.NewColumn("InIfProvider"),
},
Filter: queryFilter{},
Filter: query.Filter{},
Units: "l3bps",
},
Expected: `
@@ -360,11 +357,11 @@ ORDER BY time WITH FILL
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Limit: 20,
Dimensions: []queryColumn{
queryColumn(schema.ColumnExporterName),
queryColumn(schema.ColumnInIfProvider),
Dimensions: []query.Column{
query.NewColumn("ExporterName"),
query.NewColumn("InIfProvider"),
},
Filter: queryFilter{},
Filter: query.Filter{},
Units: "l3bps",
Bidirectional: true,
},
@@ -409,11 +406,11 @@ ORDER BY time WITH FILL
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Limit: 20,
Dimensions: []queryColumn{
queryColumn(schema.ColumnExporterName),
queryColumn(schema.ColumnInIfProvider),
Dimensions: []query.Column{
query.NewColumn("ExporterName"),
query.NewColumn("InIfProvider"),
},
Filter: queryFilter{},
Filter: query.Filter{},
Units: "l3bps",
PreviousPeriod: true,
},
@@ -454,6 +451,13 @@ ORDER BY time WITH FILL
},
}
for _, tc := range cases {
tc.Input.schema = schema.NewMock(t)
if err := query.Columns(tc.Input.Dimensions).Validate(tc.Input.schema); err != nil {
t.Fatalf("Validate() error:\n%+v", err)
}
if err := tc.Input.Filter.Validate(tc.Input.schema); err != nil {
t.Fatalf("Validate() error:\n%+v", err)
}
tc.Expected = strings.ReplaceAll(tc.Expected, "@@", "`")
t.Run(tc.Description, func(t *testing.T) {
got := tc.Input.toSQL()

View File

@@ -4,109 +4,28 @@
package console
import (
"errors"
"fmt"
"strings"
"akvorado/common/helpers"
"akvorado/common/schema"
"akvorado/console/filter"
"akvorado/console/query"
)
type queryColumn schema.ColumnKey
func (qc queryColumn) MarshalText() ([]byte, error) {
return []byte(schema.ColumnKey(qc).String()), nil
}
func (qc queryColumn) String() string {
return schema.ColumnKey(qc).String()
}
func (qc *queryColumn) UnmarshalText(input []byte) error {
name := string(input)
if column, ok := schema.Flows.LookupColumnByName(name); ok && !column.ConsoleNotDimension {
*qc = queryColumn(column.Key)
return nil
}
return errors.New("unknown field")
}
func requireMainTable(qcs []queryColumn, qf queryFilter) bool {
if qf.MainTableRequired {
func requireMainTable(sch *schema.Component, qcs []query.Column, qf query.Filter) bool {
if qf.MainTableRequired() {
return true
}
for _, qc := range qcs {
if column, ok := schema.Flows.LookupColumnByKey(schema.ColumnKey(qc)); ok && column.MainOnly {
if column, ok := sch.LookupColumnByKey(qc.Key()); ok && column.MainOnly {
return true
}
}
return false
}
type queryFilter struct {
Filter string
ReverseFilter string
MainTableRequired bool
}
func (qf queryFilter) String() string {
return qf.Filter
}
func (qf queryFilter) MarshalText() ([]byte, error) {
return []byte(qf.Filter), nil
}
func (qf *queryFilter) UnmarshalText(input []byte) error {
if strings.TrimSpace(string(input)) == "" {
*qf = queryFilter{}
return nil
}
meta := &filter.Meta{}
direct, err := filter.Parse("", input, filter.GlobalStore("meta", meta))
if err != nil {
return fmt.Errorf("cannot parse filter: %s", filter.HumanError(err))
}
meta = &filter.Meta{ReverseDirection: true}
reverse, err := filter.Parse("", input, filter.GlobalStore("meta", meta))
if err != nil {
return fmt.Errorf("cannot parse reverse filter: %s", filter.HumanError(err))
}
*qf = queryFilter{
Filter: direct.(string),
ReverseFilter: reverse.(string),
MainTableRequired: meta.MainTableRequired,
}
return nil
}
// toSQLSelect transforms a column into an expression to use in SELECT
func (qc queryColumn) toSQLSelect() string {
var strValue string
switch schema.ColumnKey(qc) {
case schema.ColumnExporterAddress, schema.ColumnSrcAddr, schema.ColumnDstAddr:
strValue = fmt.Sprintf("replaceRegexpOne(IPv6NumToString(%s), '^::ffff:', '')", qc)
case schema.ColumnSrcAS, schema.ColumnDstAS, schema.ColumnDst1stAS, schema.ColumnDst2ndAS, schema.ColumnDst3rdAS:
strValue = fmt.Sprintf(`concat(toString(%s), ': ', dictGetOrDefault('asns', 'name', %s, '???'))`,
qc, qc)
case schema.ColumnEType:
strValue = fmt.Sprintf(`if(EType = %d, 'IPv4', if(EType = %d, 'IPv6', '???'))`,
helpers.ETypeIPv4, helpers.ETypeIPv6)
case schema.ColumnProto:
strValue = `dictGetOrDefault('protocols', 'name', Proto, '???')`
case schema.ColumnInIfSpeed, schema.ColumnOutIfSpeed, schema.ColumnSrcPort, schema.ColumnDstPort, schema.ColumnForwardingStatus, schema.ColumnInIfBoundary, schema.ColumnOutIfBoundary:
strValue = fmt.Sprintf("toString(%s)", qc)
case schema.ColumnDstASPath:
strValue = `arrayStringConcat(DstASPath, ' ')`
case schema.ColumnDstCommunities:
strValue = `arrayStringConcat(arrayConcat(arrayMap(c -> concat(toString(bitShiftRight(c, 16)), ':', toString(bitAnd(c, 0xffff))), DstCommunities), arrayMap(c -> concat(toString(bitAnd(bitShiftRight(c, 64), 0xffffffff)), ':', toString(bitAnd(bitShiftRight(c, 32), 0xffffffff)), ':', toString(bitAnd(c, 0xffffffff))), DstLargeCommunities)), ' ')`
default:
strValue = qc.String()
}
return strValue
}
// fixQueryColumnName fix capitalization of the provided column name
func fixQueryColumnName(name string) string {
func (c *Component) fixQueryColumnName(name string) string {
name = strings.ToLower(name)
for _, column := range schema.Flows.Columns() {
for _, column := range c.d.Schema.Columns() {
if strings.ToLower(column.Name) == name {
return column.Name
}

121
console/query/column.go Normal file
View File

@@ -0,0 +1,121 @@
// SPDX-FileCopyrightText: 2023 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
// Package query provides query columns and query filters. These
// types are special as they need a schema to be validated.
package query
import (
"fmt"
"akvorado/common/helpers"
"akvorado/common/schema"
)
// Column represents a query column. It should be instantiated with NewColumn() or
// Unmarshal(), then call Validate().
type Column struct {
validated bool
name string
key schema.ColumnKey
}
// Columns is a set of query columns.
type Columns []Column
// NewColumn creates a new column. Validate() should be called before using it.
func NewColumn(name string) Column {
return Column{name: name}
}
func (qc Column) check() {
if !qc.validated {
panic("query column not validated")
}
}
func (qc Column) String() string {
return qc.name
}
// MarshalText turns a column into a string.
func (qc Column) MarshalText() ([]byte, error) {
return []byte(qc.name), nil
}
// UnmarshalText parses a column. Validate() should be called before use.
func (qc *Column) UnmarshalText(input []byte) error {
name := string(input)
*qc = Column{name: name}
return nil
}
// Key returns the key for the column.
func (qc *Column) Key() schema.ColumnKey {
qc.check()
return qc.key
}
// Validate should be called before using the column. We need a schema component
// for that.
func (qc *Column) Validate(schema *schema.Component) error {
if column, ok := schema.LookupColumnByName(qc.name); ok && !column.ConsoleNotDimension {
qc.key = column.Key
qc.validated = true
return nil
}
return fmt.Errorf("unknown column name %s", qc.name)
}
// Reverse reverses the column direction
func (qc *Column) Reverse(schema *schema.Component) {
name := schema.ReverseColumnDirection(qc.Key()).String()
reverted := Column{name: name}
if reverted.Validate(schema) == nil {
*qc = reverted
}
// No modification otherwise
}
// Reverse reverses the direction of all columns
func (qcs Columns) Reverse(schema *schema.Component) {
for i := range qcs {
qcs[i].Reverse(schema)
}
}
// Validate call Validate on each column.
func (qcs Columns) Validate(schema *schema.Component) error {
for i := range qcs {
if err := qcs[i].Validate(schema); err != nil {
return err
}
}
return nil
}
// ToSQLSelect transforms a column into an expression to use in SELECT
func (qc Column) ToSQLSelect() string {
var strValue string
switch qc.Key() {
case schema.ColumnExporterAddress, schema.ColumnSrcAddr, schema.ColumnDstAddr:
strValue = fmt.Sprintf("replaceRegexpOne(IPv6NumToString(%s), '^::ffff:', '')", qc)
case schema.ColumnSrcAS, schema.ColumnDstAS, schema.ColumnDst1stAS, schema.ColumnDst2ndAS, schema.ColumnDst3rdAS:
strValue = fmt.Sprintf(`concat(toString(%s), ': ', dictGetOrDefault('asns', 'name', %s, '???'))`,
qc, qc)
case schema.ColumnEType:
strValue = fmt.Sprintf(`if(EType = %d, 'IPv4', if(EType = %d, 'IPv6', '???'))`,
helpers.ETypeIPv4, helpers.ETypeIPv6)
case schema.ColumnProto:
strValue = `dictGetOrDefault('protocols', 'name', Proto, '???')`
case schema.ColumnInIfSpeed, schema.ColumnOutIfSpeed, schema.ColumnSrcPort, schema.ColumnDstPort, schema.ColumnForwardingStatus, schema.ColumnInIfBoundary, schema.ColumnOutIfBoundary:
strValue = fmt.Sprintf("toString(%s)", qc)
case schema.ColumnDstASPath:
strValue = `arrayStringConcat(DstASPath, ' ')`
case schema.ColumnDstCommunities:
strValue = `arrayStringConcat(arrayConcat(arrayMap(c -> concat(toString(bitShiftRight(c, 16)), ':', toString(bitAnd(c, 0xffff))), DstCommunities), arrayMap(c -> concat(toString(bitAnd(bitShiftRight(c, 64), 0xffffffff)), ':', toString(bitAnd(bitShiftRight(c, 32), 0xffffffff)), ':', toString(bitAnd(c, 0xffffffff))), DstLargeCommunities)), ' ')`
default:
strValue = qc.String()
}
return strValue
}

View File

@@ -0,0 +1,119 @@
// SPDX-FileCopyrightText: 2023 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package query_test
import (
"fmt"
"reflect"
"testing"
"akvorado/common/helpers"
"akvorado/common/schema"
"akvorado/console/query"
)
func TestUnmarshalQueryColumn(t *testing.T) {
cases := []struct {
Input string
Expected schema.ColumnKey
Error bool
}{
{"DstAddr", schema.ColumnDstAddr, false},
{"TimeReceived", 0, true},
{"Nothing", 0, true},
}
for _, tc := range cases {
var qc query.Column
if err := qc.UnmarshalText([]byte(tc.Input)); err != nil {
t.Fatalf("UnmarshalText() error:\n%+v", err)
}
err := qc.Validate(schema.NewMock(t))
if err != nil && !tc.Error {
t.Fatalf("Validate(%q) error:\n%+v", tc.Input, err)
}
if err == nil && tc.Error {
t.Fatalf("Validate(%q) did not error", tc.Input)
}
if err != nil {
continue
}
if diff := helpers.Diff(qc.Key(), tc.Expected, helpers.DiffFormatter(reflect.TypeOf(schema.ColumnBytes), fmt.Sprint)); diff != "" {
t.Fatalf("UnmarshalText(%q) (-got, +want):\n%s", tc.Input, diff)
}
}
}
func TestQueryColumnSQLSelect(t *testing.T) {
cases := []struct {
Input schema.ColumnKey
Expected string
}{
{
Input: schema.ColumnSrcAddr,
Expected: `replaceRegexpOne(IPv6NumToString(SrcAddr), '^::ffff:', '')`,
}, {
Input: schema.ColumnDstAS,
Expected: `concat(toString(DstAS), ': ', dictGetOrDefault('asns', 'name', DstAS, '???'))`,
}, {
Input: schema.ColumnDst2ndAS,
Expected: `concat(toString(Dst2ndAS), ': ', dictGetOrDefault('asns', 'name', Dst2ndAS, '???'))`,
}, {
Input: schema.ColumnProto,
Expected: `dictGetOrDefault('protocols', 'name', Proto, '???')`,
}, {
Input: schema.ColumnEType,
Expected: `if(EType = 2048, 'IPv4', if(EType = 34525, 'IPv6', '???'))`,
}, {
Input: schema.ColumnOutIfSpeed,
Expected: `toString(OutIfSpeed)`,
}, {
Input: schema.ColumnExporterName,
Expected: `ExporterName`,
}, {
Input: schema.ColumnPacketSizeBucket,
Expected: `PacketSizeBucket`,
}, {
Input: schema.ColumnDstASPath,
Expected: `arrayStringConcat(DstASPath, ' ')`,
}, {
Input: schema.ColumnDstCommunities,
Expected: `arrayStringConcat(arrayConcat(arrayMap(c -> concat(toString(bitShiftRight(c, 16)), ':', toString(bitAnd(c, 0xffff))), DstCommunities), arrayMap(c -> concat(toString(bitAnd(bitShiftRight(c, 64), 0xffffffff)), ':', toString(bitAnd(bitShiftRight(c, 32), 0xffffffff)), ':', toString(bitAnd(c, 0xffffffff))), DstLargeCommunities)), ' ')`,
},
}
for _, tc := range cases {
t.Run(tc.Input.String(), func(t *testing.T) {
column := query.NewColumn(tc.Input.String())
if err := column.Validate(schema.NewMock(t)); err != nil {
t.Fatalf("Validate() error:\n%+v", err)
}
got := column.ToSQLSelect()
if diff := helpers.Diff(got, tc.Expected); diff != "" {
t.Errorf("toSQLWhere (-got, +want):\n%s", diff)
}
})
}
}
func TestReverseDirection(t *testing.T) {
columns := query.Columns{
query.NewColumn("SrcAS"),
query.NewColumn("DstAS"),
query.NewColumn("ExporterName"),
query.NewColumn("InIfProvider"),
}
sch := schema.NewMock(t)
if err := columns.Validate(sch); err != nil {
t.Fatalf("Validate() error:\n%+v", err)
}
columns.Reverse(sch)
expected := query.Columns{
query.NewColumn("DstAS"),
query.NewColumn("SrcAS"),
query.NewColumn("ExporterName"),
query.NewColumn("OutIfProvider"),
}
if diff := helpers.Diff(columns, expected, helpers.DiffFormatter(reflect.TypeOf(query.Column{}), fmt.Sprint)); diff != "" {
t.Fatalf("Reverse() (-got, +want):\n%s", diff)
}
}

95
console/query/filter.go Normal file
View File

@@ -0,0 +1,95 @@
// SPDX-FileCopyrightText: 2023 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package query
import (
"fmt"
"strings"
"akvorado/common/schema"
"akvorado/console/filter"
)
// Filter represents a query filter. It should be instantiated with NewFilter() and validated with Validate().
type Filter struct {
validated bool
filter string
reverseFilter string
mainTableRequired bool
}
// NewFilter creates a new filter. It should be validated with Validate() before use.
func NewFilter(input string) Filter {
return Filter{filter: input}
}
func (qf Filter) check() {
if !qf.validated {
panic("query filter not validated")
}
}
func (qf Filter) String() string {
return qf.filter
}
// MarshalText turns a filter into a string.
func (qf Filter) MarshalText() ([]byte, error) {
return []byte(qf.filter), nil
}
// UnmarshalText parses a filter. Validate() should be called before use.
func (qf *Filter) UnmarshalText(input []byte) error {
*qf = Filter{
filter: strings.TrimSpace(string(input)),
}
return nil
}
// Validate validates a query filter with the provided schema.
func (qf *Filter) Validate(sch *schema.Component) error {
if qf.filter == "" {
qf.validated = true
return nil
}
input := []byte(qf.filter)
meta := &filter.Meta{Schema: sch}
direct, err := filter.Parse("", input, filter.GlobalStore("meta", meta))
if err != nil {
return fmt.Errorf("cannot parse filter: %s", filter.HumanError(err))
}
meta = &filter.Meta{Schema: sch, ReverseDirection: true}
reverse, err := filter.Parse("", input, filter.GlobalStore("meta", meta))
if err != nil {
return fmt.Errorf("cannot parse reverse filter: %s", filter.HumanError(err))
}
qf.filter = direct.(string)
qf.reverseFilter = reverse.(string)
qf.mainTableRequired = meta.MainTableRequired
qf.validated = true
return nil
}
// MainTableRequired tells if the main table is required for this filter.
func (qf Filter) MainTableRequired() bool {
qf.check()
return qf.mainTableRequired
}
// Reverse provides the reverse filter.
func (qf Filter) Reverse() string {
qf.check()
return qf.reverseFilter
}
// Direct provides the filter.
func (qf Filter) Direct() string {
qf.check()
return qf.filter
}
// Swap swap direct and reverse filter.
func (qf *Filter) Swap() {
qf.filter, qf.reverseFilter = qf.reverseFilter, qf.filter
}

View File

@@ -0,0 +1,61 @@
// SPDX-FileCopyrightText: 2023 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package query_test
import (
"testing"
"akvorado/common/helpers"
"akvorado/common/schema"
"akvorado/console/query"
)
func TestUnmarshalFilter(t *testing.T) {
cases := []struct {
Input string
Expected string
Error bool
}{
{"", "", false},
{" ", "", false},
{"SrcPort=12322", "SrcPort = 12322", false},
{"NoPort = 12322", "", true},
}
sch := schema.NewMock(t)
for _, tc := range cases {
t.Run(tc.Input, func(t *testing.T) {
var qf query.Filter
if err := qf.UnmarshalText([]byte(tc.Input)); err != nil {
t.Fatalf("UnmarshalText() error:\n%+v", err)
}
err := qf.Validate(sch)
if err != nil && !tc.Error {
t.Fatalf("Validate() error:\n%+v", err)
}
if err == nil && tc.Error {
t.Fatal("Validate() did not error")
}
if err != nil {
return
}
if diff := helpers.Diff(qf.Direct(), tc.Expected); diff != "" {
t.Fatalf("UnmarshalText(%q) (-got, +want):\n%s", tc.Input, diff)
}
})
}
}
func TestFilterSwap(t *testing.T) {
filter := query.NewFilter("SrcAS = 12322")
if err := filter.Validate(schema.NewMock(t)); err != nil {
t.Fatalf("Validate() error:\n%+v", err)
}
filter.Swap()
if diff := helpers.Diff(filter.Direct(), "DstAS = 12322"); diff != "" {
t.Fatalf("Swap() (-got, +want):\n%s", diff)
}
if diff := helpers.Diff(filter.Reverse(), "SrcAS = 12322"); diff != "" {
t.Fatalf("Swap() (-got, +want):\n%s", diff)
}
}

View File

@@ -6,126 +6,38 @@ package console
import (
"testing"
"akvorado/common/helpers"
"akvorado/common/schema"
"akvorado/console/query"
)
func TestRequireMainTable(t *testing.T) {
cases := []struct {
Columns []queryColumn
Filter queryFilter
Columns []query.Column
Filter query.Filter
Expected bool
}{
{[]queryColumn{}, queryFilter{}, false},
{[]queryColumn{queryColumn(schema.ColumnSrcAS)}, queryFilter{}, false},
{[]queryColumn{queryColumn(schema.ColumnExporterAddress)}, queryFilter{}, false},
{[]queryColumn{queryColumn(schema.ColumnSrcPort)}, queryFilter{}, true},
{[]queryColumn{queryColumn(schema.ColumnSrcAddr)}, queryFilter{}, true},
{[]queryColumn{queryColumn(schema.ColumnDstPort)}, queryFilter{}, true},
{[]queryColumn{queryColumn(schema.ColumnDstAddr)}, queryFilter{}, true},
{[]queryColumn{queryColumn(schema.ColumnSrcAS), queryColumn(schema.ColumnDstAddr)}, queryFilter{}, true},
{[]queryColumn{queryColumn(schema.ColumnDstAddr), queryColumn(schema.ColumnSrcAS)}, queryFilter{}, true},
{[]queryColumn{}, queryFilter{MainTableRequired: true}, true},
{[]query.Column{}, query.NewFilter(""), false},
{[]query.Column{query.NewColumn("SrcAS")}, query.NewFilter(""), false},
{[]query.Column{query.NewColumn("ExporterAddress")}, query.NewFilter(""), false},
{[]query.Column{query.NewColumn("SrcPort")}, query.NewFilter(""), true},
{[]query.Column{query.NewColumn("SrcAddr")}, query.NewFilter(""), true},
{[]query.Column{query.NewColumn("DstPort")}, query.NewFilter(""), true},
{[]query.Column{query.NewColumn("DstAddr")}, query.NewFilter(""), true},
{[]query.Column{query.NewColumn("SrcAS"), query.NewColumn("DstAddr")}, query.NewFilter(""), true},
{[]query.Column{query.NewColumn("DstAddr"), query.NewColumn("SrcAS")}, query.NewFilter(""), true},
{[]query.Column{}, query.NewFilter("SrcAddr = 203.0.113.15"), true},
}
sch := schema.NewMock(t)
for idx, tc := range cases {
got := requireMainTable(tc.Columns, tc.Filter)
if err := query.Columns(tc.Columns).Validate(sch); err != nil {
t.Fatalf("Validate() error:\n%+v", err)
}
if err := tc.Filter.Validate(sch); err != nil {
t.Fatalf("Validate() error:\n%+v", err)
}
got := requireMainTable(sch, tc.Columns, tc.Filter)
if got != tc.Expected {
t.Errorf("requireMainTable(%d) == %v but expected %v", idx, got, tc.Expected)
}
}
}
func TestUnmarshalQueryColumn(t *testing.T) {
cases := []struct {
Input string
Expected schema.ColumnKey
Error bool
}{
{"DstAddr", schema.ColumnDstAddr, false},
{"TimeReceived", 0, true},
{"Nothing", 0, true},
}
for _, tc := range cases {
var qc queryColumn
err := qc.UnmarshalText([]byte(tc.Input))
if err != nil && !tc.Error {
t.Fatalf("UnmarshalText(%q) error:\n%+v", tc.Input, err)
}
if err == nil && tc.Error {
t.Fatalf("UnmarshalText(%q) did not error", tc.Input)
}
if diff := helpers.Diff(qc, tc.Expected); diff != "" {
t.Fatalf("UnmarshalText(%q) (-got, +want):\n%s", tc.Input, diff)
}
}
}
func TestQueryColumnSQLSelect(t *testing.T) {
cases := []struct {
Input schema.ColumnKey
Expected string
}{
{
Input: schema.ColumnSrcAddr,
Expected: `replaceRegexpOne(IPv6NumToString(SrcAddr), '^::ffff:', '')`,
}, {
Input: schema.ColumnDstAS,
Expected: `concat(toString(DstAS), ': ', dictGetOrDefault('asns', 'name', DstAS, '???'))`,
}, {
Input: schema.ColumnDst2ndAS,
Expected: `concat(toString(Dst2ndAS), ': ', dictGetOrDefault('asns', 'name', Dst2ndAS, '???'))`,
}, {
Input: schema.ColumnProto,
Expected: `dictGetOrDefault('protocols', 'name', Proto, '???')`,
}, {
Input: schema.ColumnEType,
Expected: `if(EType = 2048, 'IPv4', if(EType = 34525, 'IPv6', '???'))`,
}, {
Input: schema.ColumnOutIfSpeed,
Expected: `toString(OutIfSpeed)`,
}, {
Input: schema.ColumnExporterName,
Expected: `ExporterName`,
}, {
Input: schema.ColumnPacketSizeBucket,
Expected: `PacketSizeBucket`,
}, {
Input: schema.ColumnDstASPath,
Expected: `arrayStringConcat(DstASPath, ' ')`,
}, {
Input: schema.ColumnDstCommunities,
Expected: `arrayStringConcat(arrayConcat(arrayMap(c -> concat(toString(bitShiftRight(c, 16)), ':', toString(bitAnd(c, 0xffff))), DstCommunities), arrayMap(c -> concat(toString(bitAnd(bitShiftRight(c, 64), 0xffffffff)), ':', toString(bitAnd(bitShiftRight(c, 32), 0xffffffff)), ':', toString(bitAnd(c, 0xffffffff))), DstLargeCommunities)), ' ')`,
},
}
for _, tc := range cases {
t.Run(queryColumn(tc.Input).String(), func(t *testing.T) {
got := queryColumn(tc.Input).toSQLSelect()
if diff := helpers.Diff(got, tc.Expected); diff != "" {
t.Errorf("toSQLWhere (-got, +want):\n%s", diff)
}
})
}
}
func TestUnmarshalFilter(t *testing.T) {
cases := []struct {
Input string
Expected string
}{
{"", ""},
{" ", ""},
{"SrcPort=12322", "SrcPort = 12322"},
}
for _, tc := range cases {
t.Run(tc.Input, func(t *testing.T) {
var qf queryFilter
err := qf.UnmarshalText([]byte(tc.Input))
if err != nil {
t.Fatalf("UnmarshalText(%q) error:\n%+v", tc.Input, err)
}
if diff := helpers.Diff(qf.Filter, tc.Expected); diff != "" {
t.Fatalf("UnmarshalText(%q) (-got, +want):\n%s", tc.Input, diff)
}
})
}
}

View File

@@ -21,8 +21,10 @@ import (
"akvorado/common/daemon"
"akvorado/common/http"
"akvorado/common/reporter"
"akvorado/common/schema"
"akvorado/console/authentication"
"akvorado/console/database"
"akvorado/console/query"
)
// Component represents the console component.
@@ -48,6 +50,7 @@ type Dependencies struct {
Clock clock.Clock
Auth *authentication.Component
Database *database.Component
Schema *schema.Component
}
// New creates a new console component.
@@ -55,6 +58,9 @@ func New(r *reporter.Reporter, config Configuration, dependencies Dependencies)
if dependencies.Clock == nil {
dependencies.Clock = clock.New()
}
if err := query.Columns(config.DefaultVisualizeOptions.Dimensions).Validate(dependencies.Schema); err != nil {
return nil, err
}
c := Component{
r: r,
d: &dependencies,

View File

@@ -13,15 +13,18 @@ import (
"github.com/gin-gonic/gin"
"akvorado/common/helpers"
"akvorado/common/schema"
"akvorado/console/query"
)
// sankeyHandlerInput describes the input for the /sankey endpoint.
type sankeyHandlerInput struct {
schema *schema.Component
Start time.Time `json:"start" binding:"required"`
End time.Time `json:"end" binding:"required,gtfield=Start"`
Dimensions []queryColumn `json:"dimensions" binding:"required,min=2"` // group by ...
Dimensions []query.Column `json:"dimensions" binding:"required,min=2"` // group by ...
Limit int `json:"limit" binding:"min=1,max=50"` // limit product of dimensions
Filter queryFilter `json:"filter"` // where ...
Filter query.Filter `json:"filter"` // where ...
Units string `json:"units" binding:"required,oneof=pps l3bps l2bps"`
}
@@ -51,7 +54,7 @@ func (input sankeyHandlerInput) toSQL() (string, error) {
arrayFields = append(arrayFields, fmt.Sprintf(`if(%s IN (SELECT %s FROM rows), %s, 'Other')`,
column.String(),
column.String(),
column.toSQLSelect()))
column.ToSQLSelect()))
dimensions = append(dimensions, column.String())
}
fields := []string{
@@ -84,7 +87,7 @@ ORDER BY xps DESC
templateContext(inputContext{
Start: input.Start,
End: input.End,
MainTableRequired: requireMainTable(input.Dimensions, input.Filter),
MainTableRequired: requireMainTable(input.schema, input.Dimensions, input.Filter),
Points: 20,
Units: input.Units,
}),
@@ -94,11 +97,19 @@ ORDER BY xps DESC
func (c *Component) sankeyHandlerFunc(gc *gin.Context) {
ctx := c.t.Context(gc.Request.Context())
var input sankeyHandlerInput
input := sankeyHandlerInput{schema: c.d.Schema}
if err := gc.ShouldBindJSON(&input); err != nil {
gc.JSON(http.StatusBadRequest, gin.H{"message": helpers.Capitalize(err.Error())})
return
}
if err := query.Columns(input.Dimensions).Validate(input.schema); err != nil {
gc.JSON(http.StatusBadRequest, gin.H{"message": helpers.Capitalize(err.Error())})
return
}
if err := input.Filter.Validate(input.schema); err != nil {
gc.JSON(http.StatusBadRequest, gin.H{"message": helpers.Capitalize(err.Error())})
return
}
sqlQuery, err := input.toSQL()
if err != nil {

View File

@@ -13,6 +13,7 @@ import (
"akvorado/common/helpers"
"akvorado/common/schema"
"akvorado/console/query"
)
func TestSankeyQuerySQL(t *testing.T) {
@@ -26,9 +27,11 @@ func TestSankeyQuerySQL(t *testing.T) {
Input: sankeyHandlerInput{
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Dimensions: []queryColumn{queryColumn(schema.ColumnSrcAS), queryColumn(schema.ColumnExporterName)},
Dimensions: []query.Column{
query.NewColumn("SrcAS"),
query.NewColumn("ExporterName")},
Limit: 5,
Filter: queryFilter{},
Filter: query.Filter{},
Units: "l3bps",
},
Expected: `
@@ -50,9 +53,11 @@ ORDER BY xps DESC
Input: sankeyHandlerInput{
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Dimensions: []queryColumn{queryColumn(schema.ColumnSrcAS), queryColumn(schema.ColumnExporterName)},
Dimensions: []query.Column{
query.NewColumn("SrcAS"),
query.NewColumn("ExporterName")},
Limit: 5,
Filter: queryFilter{},
Filter: query.Filter{},
Units: "l2bps",
},
Expected: `
@@ -75,9 +80,11 @@ ORDER BY xps DESC
Input: sankeyHandlerInput{
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Dimensions: []queryColumn{queryColumn(schema.ColumnSrcAS), queryColumn(schema.ColumnExporterName)},
Dimensions: []query.Column{
query.NewColumn("SrcAS"),
query.NewColumn("ExporterName")},
Limit: 5,
Filter: queryFilter{},
Filter: query.Filter{},
Units: "pps",
},
Expected: `
@@ -99,9 +106,11 @@ ORDER BY xps DESC
Input: sankeyHandlerInput{
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Dimensions: []queryColumn{queryColumn(schema.ColumnSrcAS), queryColumn(schema.ColumnExporterName)},
Dimensions: []query.Column{
query.NewColumn("SrcAS"),
query.NewColumn("ExporterName")},
Limit: 10,
Filter: queryFilter{Filter: "DstCountry = 'FR'"},
Filter: query.NewFilter("DstCountry = 'FR'"),
Units: "l3bps",
},
Expected: `
@@ -121,6 +130,13 @@ ORDER BY xps DESC
},
}
for _, tc := range cases {
tc.Input.schema = schema.NewMock(t)
if err := query.Columns(tc.Input.Dimensions).Validate(tc.Input.schema); err != nil {
t.Fatalf("Validate() error:\n%+v", err)
}
if err := tc.Input.Filter.Validate(tc.Input.schema); err != nil {
t.Fatalf("Validate() error:\n%+v", err)
}
tc.Expected = strings.ReplaceAll(tc.Expected, "@@", "`")
t.Run(tc.Description, func(t *testing.T) {
got, _ := tc.Input.toSQL()

View File

@@ -16,6 +16,7 @@ import (
"akvorado/common/helpers"
"akvorado/common/http"
"akvorado/common/reporter"
"akvorado/common/schema"
"akvorado/console/authentication"
"akvorado/console/database"
)
@@ -34,6 +35,7 @@ func NewMock(t *testing.T, config Configuration) (*Component, *http.Component, *
Clock: mockClock,
Auth: authentication.NewMock(t, r),
Database: database.NewMock(t, r, database.DefaultConfiguration()),
Schema: schema.NewMock(t),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)

View File

@@ -19,7 +19,7 @@ import (
func TestGetNetflowData(t *testing.T) {
r := reporter.NewMock(t)
nfdecoder := netflow.New(r)
nfdecoder := netflow.New(r, decoder.Dependencies{Schema: schema.NewMock(t)})
ch := getNetflowTemplates(
context.Background(),

View File

@@ -98,30 +98,30 @@ func (c *Component) enrichFlow(exporterIP netip.Addr, exporterStr string, flow *
destBMP := c.d.BMP.Lookup(flow.DstAddr, flow.NextHop)
flow.SrcAS = c.getASNumber(flow.SrcAddr, flow.SrcAS, sourceBMP.ASN)
flow.DstAS = c.getASNumber(flow.DstAddr, flow.DstAS, destBMP.ASN)
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnSrcCountry, []byte(c.d.GeoIP.LookupCountry(flow.SrcAddr)))
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnDstCountry, []byte(c.d.GeoIP.LookupCountry(flow.DstAddr)))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnSrcCountry, []byte(c.d.GeoIP.LookupCountry(flow.SrcAddr)))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnDstCountry, []byte(c.d.GeoIP.LookupCountry(flow.DstAddr)))
for _, comm := range destBMP.Communities {
schema.Flows.ProtobufAppendVarint(flow, schema.ColumnDstCommunities, uint64(comm))
c.d.Schema.ProtobufAppendVarint(flow, schema.ColumnDstCommunities, uint64(comm))
}
for _, asn := range destBMP.ASPath {
schema.Flows.ProtobufAppendVarint(flow, schema.ColumnDstASPath, uint64(asn))
c.d.Schema.ProtobufAppendVarint(flow, schema.ColumnDstASPath, uint64(asn))
}
for _, comm := range destBMP.LargeCommunities {
schema.Flows.ProtobufAppendVarintForce(flow,
c.d.Schema.ProtobufAppendVarintForce(flow,
schema.ColumnDstLargeCommunitiesASN, uint64(comm.ASN))
schema.Flows.ProtobufAppendVarintForce(flow,
c.d.Schema.ProtobufAppendVarintForce(flow,
schema.ColumnDstLargeCommunitiesLocalData1, uint64(comm.LocalData1))
schema.Flows.ProtobufAppendVarintForce(flow,
c.d.Schema.ProtobufAppendVarintForce(flow,
schema.ColumnDstLargeCommunitiesLocalData2, uint64(comm.LocalData2))
}
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnExporterName, []byte(flowExporterName))
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnInIfName, []byte(flowInIfName))
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnInIfDescription, []byte(flowInIfDescription))
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnOutIfName, []byte(flowOutIfName))
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnOutIfDescription, []byte(flowOutIfDescription))
schema.Flows.ProtobufAppendVarint(flow, schema.ColumnInIfSpeed, uint64(flowInIfSpeed))
schema.Flows.ProtobufAppendVarint(flow, schema.ColumnOutIfSpeed, uint64(flowOutIfSpeed))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnExporterName, []byte(flowExporterName))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnInIfName, []byte(flowInIfName))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnInIfDescription, []byte(flowInIfDescription))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnOutIfName, []byte(flowOutIfName))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnOutIfDescription, []byte(flowOutIfDescription))
c.d.Schema.ProtobufAppendVarint(flow, schema.ColumnInIfSpeed, uint64(flowInIfSpeed))
c.d.Schema.ProtobufAppendVarint(flow, schema.ColumnOutIfSpeed, uint64(flowOutIfSpeed))
return
}
@@ -154,12 +154,12 @@ func (c *Component) getASNumber(flowAddr netip.Addr, flowAS, bmpAS uint32) (asn
return asn
}
func writeExporter(flow *schema.FlowMessage, classification exporterClassification) {
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnExporterGroup, []byte(classification.Group))
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnExporterRole, []byte(classification.Role))
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnExporterSite, []byte(classification.Site))
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnExporterRegion, []byte(classification.Region))
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnExporterTenant, []byte(classification.Tenant))
func (c *Component) writeExporter(flow *schema.FlowMessage, classification exporterClassification) {
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnExporterGroup, []byte(classification.Group))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnExporterRole, []byte(classification.Role))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnExporterSite, []byte(classification.Site))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnExporterRegion, []byte(classification.Region))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnExporterTenant, []byte(classification.Tenant))
}
func (c *Component) classifyExporter(ip string, name string, flow *schema.FlowMessage) {
@@ -168,7 +168,7 @@ func (c *Component) classifyExporter(ip string, name string, flow *schema.FlowMe
}
si := exporterInfo{IP: ip, Name: name}
if classification, ok := c.classifierExporterCache.Get(si); ok {
writeExporter(flow, classification)
c.writeExporter(flow, classification)
return
}
@@ -190,18 +190,18 @@ func (c *Component) classifyExporter(ip string, name string, flow *schema.FlowMe
break
}
c.classifierExporterCache.Set(si, classification)
writeExporter(flow, classification)
c.writeExporter(flow, classification)
}
func writeInterface(flow *schema.FlowMessage, classification interfaceClassification, directionIn bool) {
func (c *Component) writeInterface(flow *schema.FlowMessage, classification interfaceClassification, directionIn bool) {
if directionIn {
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnInIfConnectivity, []byte(classification.Connectivity))
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnInIfProvider, []byte(classification.Provider))
schema.Flows.ProtobufAppendVarint(flow, schema.ColumnInIfBoundary, uint64(classification.Boundary))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnInIfConnectivity, []byte(classification.Connectivity))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnInIfProvider, []byte(classification.Provider))
c.d.Schema.ProtobufAppendVarint(flow, schema.ColumnInIfBoundary, uint64(classification.Boundary))
} else {
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnOutIfConnectivity, []byte(classification.Connectivity))
schema.Flows.ProtobufAppendBytes(flow, schema.ColumnOutIfProvider, []byte(classification.Provider))
schema.Flows.ProtobufAppendVarint(flow, schema.ColumnOutIfBoundary, uint64(classification.Boundary))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnOutIfConnectivity, []byte(classification.Connectivity))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnOutIfProvider, []byte(classification.Provider))
c.d.Schema.ProtobufAppendVarint(flow, schema.ColumnOutIfBoundary, uint64(classification.Boundary))
}
}
@@ -216,7 +216,7 @@ func (c *Component) classifyInterface(ip string, exporterName string, fl *schema
Interface: ii,
}
if classification, ok := c.classifierInterfaceCache.Get(key); ok {
writeInterface(fl, classification, directionIn)
c.writeInterface(fl, classification, directionIn)
return
}
@@ -243,7 +243,7 @@ func (c *Component) classifyInterface(ip string, exporterName string, fl *schema
break
}
c.classifierInterfaceCache.Set(key, classification)
writeInterface(fl, classification, directionIn)
c.writeInterface(fl, classification, directionIn)
}
func isPrivateAS(as uint32) bool {

View File

@@ -6,7 +6,6 @@ package core
import (
"fmt"
"net/netip"
"reflect"
"testing"
"time"
@@ -372,6 +371,7 @@ ClassifyProviderRegex(Interface.Description, "^Transit: ([^ ]+)", "$1")`,
Kafka: kafkaComponent,
HTTP: httpComponent,
BMP: bmpComponent,
Schema: schema.NewMock(t),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
@@ -388,8 +388,8 @@ ClassifyProviderRegex(Interface.Description, "^Transit: ([^ ]+)", "$1")`,
t.Fatalf("Kafka message encoding error:\n%+v", err)
}
t.Logf("Raw message: %v", b)
got := schema.Flows.ProtobufDecode(t, b)
if diff := helpers.Diff(&got, tc.OutputFlow, helpers.DiffFormatter(reflect.TypeOf(schema.ColumnBytes), fmt.Sprint)); diff != "" {
got := c.d.Schema.ProtobufDecode(t, b)
if diff := helpers.Diff(&got, tc.OutputFlow); diff != "" {
t.Errorf("Classifier (-got, +want):\n%s", diff)
}
return nil
@@ -461,6 +461,7 @@ func TestGetASNumber(t *testing.T) {
Daemon: daemon.NewMock(t),
GeoIP: geoip.NewMock(t, r),
BMP: bmpComponent,
Schema: schema.NewMock(t),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)

View File

@@ -51,6 +51,7 @@ type Dependencies struct {
GeoIP *geoip.Component
Kafka *kafka.Component
HTTP *http.Component
Schema *schema.Component
}
// New creates a new core component.
@@ -119,7 +120,7 @@ func (c *Component) runWorker(workerID int) error {
}
// Serialize flow to Protobuf
buf := schema.Flows.ProtobufMarshal(flow)
buf := c.d.Schema.ProtobufMarshal(flow)
c.metrics.flowsProcessingTime.Observe(time.Now().Sub(start).Seconds())
// Forward to Kafka. This could block and buf is now owned by the

View File

@@ -53,6 +53,7 @@ func TestCore(t *testing.T) {
Kafka: kafkaComponent,
HTTP: httpComponent,
BMP: bmpComponent,
Schema: schema.NewMock(t),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)

View File

@@ -15,7 +15,7 @@ import (
"github.com/netsampler/goflow2/producer"
)
func decode(msgDec interface{}, samplingRateSys producer.SamplingRateSystem) []*schema.FlowMessage {
func (nd *Decoder) decode(msgDec interface{}, samplingRateSys producer.SamplingRateSystem) []*schema.FlowMessage {
flowMessageSet := []*schema.FlowMessage{}
var obsDomainID uint32
var dataFlowSet []netflow.DataFlowSet
@@ -47,7 +47,7 @@ func decode(msgDec interface{}, samplingRateSys producer.SamplingRateSystem) []*
// Parse fields
for _, dataFlowSetItem := range dataFlowSet {
for _, record := range dataFlowSetItem.Records {
flow := decodeRecord(version, record.Values)
flow := nd.decodeRecord(version, record.Values)
if flow != nil {
flow.SamplingRate = samplingRate
flowMessageSet = append(flowMessageSet, flow)
@@ -58,7 +58,7 @@ func decode(msgDec interface{}, samplingRateSys producer.SamplingRateSystem) []*
return flowMessageSet
}
func decodeRecord(version int, fields []netflow.DataField) *schema.FlowMessage {
func (nd *Decoder) decodeRecord(version int, fields []netflow.DataField) *schema.FlowMessage {
var etype uint16
bf := &schema.FlowMessage{}
for _, field := range fields {
@@ -73,9 +73,9 @@ func decodeRecord(version int, fields []netflow.DataField) *schema.FlowMessage {
switch field.Type {
// Statistics
case netflow.NFV9_FIELD_IN_BYTES, netflow.NFV9_FIELD_OUT_BYTES:
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnBytes, decodeUNumber(v))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnBytes, decodeUNumber(v))
case netflow.NFV9_FIELD_IN_PKTS, netflow.NFV9_FIELD_OUT_PKTS:
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnPackets, decodeUNumber(v))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnPackets, decodeUNumber(v))
// L3
case netflow.NFV9_FIELD_IPV4_SRC_ADDR:
@@ -91,19 +91,19 @@ func decodeRecord(version int, fields []netflow.DataField) *schema.FlowMessage {
etype = helpers.ETypeIPv6
bf.DstAddr = decodeIP(v)
case netflow.NFV9_FIELD_SRC_MASK, netflow.NFV9_FIELD_IPV6_SRC_MASK:
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnSrcNetMask, decodeUNumber(v))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcNetMask, decodeUNumber(v))
case netflow.NFV9_FIELD_DST_MASK, netflow.NFV9_FIELD_IPV6_DST_MASK:
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnDstNetMask, decodeUNumber(v))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstNetMask, decodeUNumber(v))
case netflow.NFV9_FIELD_IPV4_NEXT_HOP, netflow.NFV9_FIELD_BGP_IPV4_NEXT_HOP, netflow.NFV9_FIELD_IPV6_NEXT_HOP, netflow.NFV9_FIELD_BGP_IPV6_NEXT_HOP:
bf.NextHop = decodeIP(v)
// L4
case netflow.NFV9_FIELD_L4_SRC_PORT:
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnSrcPort, decodeUNumber(v))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcPort, decodeUNumber(v))
case netflow.NFV9_FIELD_L4_DST_PORT:
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnDstPort, decodeUNumber(v))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstPort, decodeUNumber(v))
case netflow.NFV9_FIELD_PROTOCOL:
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnProto, decodeUNumber(v))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnProto, decodeUNumber(v))
// Network
case netflow.NFV9_FIELD_SRC_AS:
@@ -119,10 +119,10 @@ func decodeRecord(version int, fields []netflow.DataField) *schema.FlowMessage {
// Remaining
case netflow.NFV9_FIELD_FORWARDING_STATUS:
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnForwardingStatus, decodeUNumber(v))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnForwardingStatus, decodeUNumber(v))
}
}
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnEType, uint64(etype))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnEType, uint64(etype))
return bf
}

View File

@@ -21,6 +21,7 @@ import (
// Decoder contains the state for the Netflow v9 decoder.
type Decoder struct {
r *reporter.Reporter
d decoder.Dependencies
// Templates and sampling systems
systemsLock sync.RWMutex
@@ -37,9 +38,10 @@ type Decoder struct {
}
// New instantiates a new netflow decoder.
func New(r *reporter.Reporter) decoder.Decoder {
func New(r *reporter.Reporter, dependencies decoder.Dependencies) decoder.Decoder {
nd := &Decoder{
r: r,
d: dependencies,
templates: map[string]*templateSystem{},
sampling: map[string]producer.SamplingRateSystem{},
}
@@ -208,7 +210,7 @@ func (nd *Decoder) Decode(in decoder.RawFlow) []*schema.FlowMessage {
}
}
flowMessageSet := decode(msgDec, sampling)
flowMessageSet := nd.decode(msgDec, sampling)
exporterAddress, _ := netip.AddrFromSlice(in.Source.To16())
for _, fmsg := range flowMessageSet {
fmsg.TimeReceived = ts

View File

@@ -4,11 +4,9 @@
package netflow
import (
"fmt"
"net"
"net/netip"
"path/filepath"
"reflect"
"testing"
"akvorado/common/helpers"
@@ -19,7 +17,7 @@ import (
func TestDecode(t *testing.T) {
r := reporter.NewMock(t)
nfdecoder := New(r)
nfdecoder := New(r, decoder.Dependencies{Schema: schema.NewMock(t)})
// Send an option template
template := helpers.ReadPcapPayload(t, filepath.Join("testdata", "options-template-257.pcap"))
@@ -183,7 +181,7 @@ func TestDecode(t *testing.T) {
f.TimeReceived = 0
}
if diff := helpers.Diff(got, expectedFlows, helpers.DiffFormatter(reflect.TypeOf(schema.ColumnBytes), fmt.Sprint)); diff != "" {
if diff := helpers.Diff(got, expectedFlows); diff != "" {
t.Fatalf("Decode() (-got, +want):\n%s", diff)
}
gotMetrics = r.GetMetrics(

View File

@@ -24,6 +24,11 @@ type Decoder interface {
Name() string
}
// Dependencies are the dependencies for the decoder
type Dependencies struct {
Schema *schema.Component
}
// RawFlow is an undecoded flow.
type RawFlow struct {
TimeReceived time.Time
@@ -32,4 +37,4 @@ type RawFlow struct {
}
// NewDecoderFunc is the signature of a function to instantiate a decoder.
type NewDecoderFunc func(*reporter.Reporter) Decoder
type NewDecoderFunc func(*reporter.Reporter, Dependencies) Decoder

View File

@@ -14,7 +14,7 @@ import (
"github.com/netsampler/goflow2/decoders/sflow"
)
func decode(msgDec interface{}) []*schema.FlowMessage {
func (nd *Decoder) decode(msgDec interface{}) []*schema.FlowMessage {
flowMessageSet := []*schema.FlowMessage{}
switch msgDec.(type) {
case sflow.Packet:
@@ -54,33 +54,33 @@ func decode(msgDec interface{}) []*schema.FlowMessage {
}
bf.ExporterAddress = decodeIP(packet.AgentIP)
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnPackets, 1)
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnForwardingStatus, uint64(forwardingStatus))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnPackets, 1)
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnForwardingStatus, uint64(forwardingStatus))
for _, record := range records {
switch recordData := record.Data.(type) {
case sflow.SampledHeader:
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnBytes, uint64(recordData.FrameLength))
parseSampledHeader(bf, &recordData)
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnBytes, uint64(recordData.FrameLength))
nd.parseSampledHeader(bf, &recordData)
case sflow.SampledIPv4:
bf.SrcAddr = decodeIP(recordData.Base.SrcIP)
bf.DstAddr = decodeIP(recordData.Base.DstIP)
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnBytes, uint64(recordData.Base.Length))
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnProto, uint64(recordData.Base.Protocol))
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnSrcPort, uint64(recordData.Base.SrcPort))
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnDstPort, uint64(recordData.Base.DstPort))
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnEType, helpers.ETypeIPv4)
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnBytes, uint64(recordData.Base.Length))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnProto, uint64(recordData.Base.Protocol))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcPort, uint64(recordData.Base.SrcPort))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstPort, uint64(recordData.Base.DstPort))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnEType, helpers.ETypeIPv4)
case sflow.SampledIPv6:
bf.SrcAddr = decodeIP(recordData.Base.SrcIP)
bf.DstAddr = decodeIP(recordData.Base.DstIP)
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnBytes, uint64(recordData.Base.Length))
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnProto, uint64(recordData.Base.Protocol))
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnSrcPort, uint64(recordData.Base.SrcPort))
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnDstPort, uint64(recordData.Base.DstPort))
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnEType, helpers.ETypeIPv6)
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnBytes, uint64(recordData.Base.Length))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnProto, uint64(recordData.Base.Protocol))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcPort, uint64(recordData.Base.SrcPort))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstPort, uint64(recordData.Base.DstPort))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnEType, helpers.ETypeIPv6)
case sflow.ExtendedRouter:
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnSrcNetMask, uint64(recordData.SrcMaskLen))
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnDstNetMask, uint64(recordData.DstMaskLen))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcNetMask, uint64(recordData.SrcMaskLen))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstNetMask, uint64(recordData.DstMaskLen))
bf.NextHop = decodeIP(recordData.NextHop)
case sflow.ExtendedGateway:
bf.NextHop = decodeIP(recordData.NextHop)
@@ -101,15 +101,15 @@ func decode(msgDec interface{}) []*schema.FlowMessage {
return flowMessageSet
}
func parseSampledHeader(bf *schema.FlowMessage, header *sflow.SampledHeader) {
func (nd *Decoder) parseSampledHeader(bf *schema.FlowMessage, header *sflow.SampledHeader) {
data := header.HeaderData
switch header.Protocol {
case 1: // Ethernet
parseEthernetHeader(bf, data)
nd.parseEthernetHeader(bf, data)
}
}
func parseEthernetHeader(bf *schema.FlowMessage, data []byte) {
func (nd *Decoder) parseEthernetHeader(bf *schema.FlowMessage, data []byte) {
if len(data) < 14 {
return
}
@@ -150,7 +150,7 @@ func parseEthernetHeader(bf *schema.FlowMessage, data []byte) {
if len(data) < 20 {
return
}
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnEType, helpers.ETypeIPv4)
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnEType, helpers.ETypeIPv4)
bf.SrcAddr = decodeIP(data[12:16])
bf.DstAddr = decodeIP(data[16:20])
proto = data[9]
@@ -165,19 +165,19 @@ func parseEthernetHeader(bf *schema.FlowMessage, data []byte) {
if len(data) < 40 {
return
}
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnEType, helpers.ETypeIPv6)
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnEType, helpers.ETypeIPv6)
bf.SrcAddr = decodeIP(data[8:24])
bf.DstAddr = decodeIP(data[24:40])
proto = data[6]
data = data[40:]
}
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnProto, uint64(proto))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnProto, uint64(proto))
if proto == 6 || proto == 17 {
if len(data) > 4 {
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnSrcPort,
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcPort,
uint64(binary.BigEndian.Uint16(data[0:2])))
schema.Flows.ProtobufAppendVarint(bf, schema.ColumnDstPort,
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstPort,
uint64(binary.BigEndian.Uint16(data[2:4])))
}
}

View File

@@ -30,6 +30,7 @@ const (
// Decoder contains the state for the sFlow v5 decoder.
type Decoder struct {
r *reporter.Reporter
d decoder.Dependencies
metrics struct {
errors *reporter.CounterVec
@@ -40,9 +41,10 @@ type Decoder struct {
}
// New instantiates a new sFlow decoder.
func New(r *reporter.Reporter) decoder.Decoder {
func New(r *reporter.Reporter, dependencies decoder.Dependencies) decoder.Decoder {
nd := &Decoder{
r: r,
d: dependencies,
}
nd.metrics.errors = nd.r.CounterVec(
@@ -129,7 +131,7 @@ func (nd *Decoder) Decode(in decoder.RawFlow) []*schema.FlowMessage {
}
}
flowMessageSet := decode(msgDec)
flowMessageSet := nd.decode(msgDec)
for _, fmsg := range flowMessageSet {
fmsg.TimeReceived = ts
}

View File

@@ -4,11 +4,9 @@
package sflow
import (
"fmt"
"net"
"net/netip"
"path/filepath"
"reflect"
"testing"
"akvorado/common/helpers"
@@ -19,7 +17,7 @@ import (
func TestDecode(t *testing.T) {
r := reporter.NewMock(t)
sdecoder := New(r)
sdecoder := New(r, decoder.Dependencies{Schema: schema.NewMock(t)})
// Send data
data := helpers.ReadPcapPayload(t, filepath.Join("testdata", "data-1140.pcap"))
@@ -119,7 +117,7 @@ func TestDecode(t *testing.T) {
f.TimeReceived = 0
}
if diff := helpers.Diff(got, expectedFlows, helpers.DiffFormatter(reflect.TypeOf(schema.ColumnBytes), fmt.Sprint)); diff != "" {
if diff := helpers.Diff(got, expectedFlows); diff != "" {
t.Fatalf("Decode() (-got, +want):\n%s", diff)
}
gotMetrics := r.GetMetrics(
@@ -139,7 +137,7 @@ func TestDecode(t *testing.T) {
func TestDecodeInterface(t *testing.T) {
r := reporter.NewMock(t)
sdecoder := New(r)
sdecoder := New(r, decoder.Dependencies{Schema: schema.NewMock(t)})
t.Run("local interface", func(t *testing.T) {
// Send data
@@ -170,7 +168,7 @@ func TestDecodeInterface(t *testing.T) {
f.TimeReceived = 0
}
if diff := helpers.Diff(got, expectedFlows, helpers.DiffFormatter(reflect.TypeOf(schema.ColumnBytes), fmt.Sprint)); diff != "" {
if diff := helpers.Diff(got, expectedFlows); diff != "" {
t.Fatalf("Decode() (-got, +want):\n%s", diff)
}
})
@@ -205,7 +203,7 @@ func TestDecodeInterface(t *testing.T) {
f.TimeReceived = 0
}
if diff := helpers.Diff(got, expectedFlows, helpers.DiffFormatter(reflect.TypeOf(schema.ColumnBytes), fmt.Sprint)); diff != "" {
if diff := helpers.Diff(got, expectedFlows); diff != "" {
t.Fatalf("Decode() (-got, +want):\n%s", diff)
}
})
@@ -239,7 +237,7 @@ func TestDecodeInterface(t *testing.T) {
f.TimeReceived = 0
}
if diff := helpers.Diff(got, expectedFlows, helpers.DiffFormatter(reflect.TypeOf(schema.ColumnBytes), fmt.Sprint)); diff != "" {
if diff := helpers.Diff(got, expectedFlows); diff != "" {
t.Fatalf("Decode() (-got, +want):\n%s", diff)
}
})
@@ -278,7 +276,7 @@ func TestDecodeInterface(t *testing.T) {
f.TimeReceived = 0
}
if diff := helpers.Diff(got, expectedFlows, helpers.DiffFormatter(reflect.TypeOf(schema.ColumnBytes), fmt.Sprint)); diff != "" {
if diff := helpers.Diff(got, expectedFlows); diff != "" {
t.Fatalf("Decode() (-got, +want):\n%s", diff)
}

View File

@@ -13,7 +13,9 @@ import (
// DummyDecoder is a simple decoder producing flows from random data.
// The payload is copied in IfDescription
type DummyDecoder struct{}
type DummyDecoder struct {
Schema *schema.Component
}
// Decode returns uninteresting flow messages.
func (dc *DummyDecoder) Decode(in RawFlow) []*schema.FlowMessage {
@@ -22,9 +24,9 @@ func (dc *DummyDecoder) Decode(in RawFlow) []*schema.FlowMessage {
TimeReceived: uint64(in.TimeReceived.UTC().Unix()),
ExporterAddress: exporterAddress,
}
schema.Flows.ProtobufAppendVarint(f, schema.ColumnBytes, uint64(len(in.Payload)))
schema.Flows.ProtobufAppendVarint(f, schema.ColumnPackets, 1)
schema.Flows.ProtobufAppendBytes(f, schema.ColumnInIfDescription, in.Payload)
dc.Schema.ProtobufAppendVarint(f, schema.ColumnBytes, uint64(len(in.Payload)))
dc.Schema.ProtobufAppendVarint(f, schema.ColumnPackets, 1)
dc.Schema.ProtobufAppendBytes(f, schema.ColumnInIfDescription, in.Payload)
return []*schema.FlowMessage{f}
}

View File

@@ -21,7 +21,8 @@ import (
func BenchmarkDecodeEncodeNetflow(b *testing.B) {
schema.DisableDebug(b)
r := reporter.NewMock(b)
nfdecoder := netflow.New(r)
sch := schema.NewMock(b)
nfdecoder := netflow.New(r, decoder.Dependencies{Schema: sch})
template := helpers.ReadPcapPayload(b, filepath.Join("decoder", "netflow", "testdata", "options-template-257.pcap"))
got := nfdecoder.Decode(decoder.RawFlow{Payload: template, Source: net.ParseIP("127.0.0.1")})
@@ -50,7 +51,7 @@ func BenchmarkDecodeEncodeNetflow(b *testing.B) {
got = nfdecoder.Decode(decoder.RawFlow{Payload: data, Source: net.ParseIP("127.0.0.1")})
if withEncoding {
for _, flow := range got {
schema.Flows.ProtobufMarshal(flow)
sch.ProtobufMarshal(flow)
}
}
}
@@ -64,7 +65,8 @@ func BenchmarkDecodeEncodeNetflow(b *testing.B) {
func BenchmarkDecodeEncodeSflow(b *testing.B) {
schema.DisableDebug(b)
r := reporter.NewMock(b)
sdecoder := sflow.New(r)
sch := schema.NewMock(b)
sdecoder := sflow.New(r, decoder.Dependencies{Schema: sch})
data := helpers.ReadPcapPayload(b, filepath.Join("decoder", "sflow", "testdata", "data-1140.pcap"))
for _, withEncoding := range []bool{true, false} {
@@ -78,7 +80,7 @@ func BenchmarkDecodeEncodeSflow(b *testing.B) {
got = sdecoder.Decode(decoder.RawFlow{Payload: data, Source: net.ParseIP("127.0.0.1")})
if withEncoding {
for _, flow := range got {
schema.Flows.ProtobufMarshal(flow)
sch.ProtobufMarshal(flow)
}
}
}

View File

@@ -19,7 +19,9 @@ func TestFileInput(t *testing.T) {
r := reporter.NewMock(t)
configuration := DefaultConfiguration().(*Configuration)
configuration.Paths = []string{path.Join("testdata", "file1.txt"), path.Join("testdata", "file2.txt")}
in, err := configuration.New(r, daemon.NewMock(t), &decoder.DummyDecoder{})
in, err := configuration.New(r, daemon.NewMock(t), &decoder.DummyDecoder{
Schema: schema.NewMock(t),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}

View File

@@ -4,10 +4,8 @@
package udp
import (
"fmt"
"net"
"net/netip"
"reflect"
"testing"
"time"
@@ -22,7 +20,7 @@ func TestUDPInput(t *testing.T) {
r := reporter.NewMock(t)
configuration := DefaultConfiguration().(*Configuration)
configuration.Listen = "127.0.0.1:0"
in, err := configuration.New(r, daemon.NewMock(t), &decoder.DummyDecoder{})
in, err := configuration.New(r, daemon.NewMock(t), &decoder.DummyDecoder{Schema: schema.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
@@ -73,7 +71,7 @@ func TestUDPInput(t *testing.T) {
},
},
}
if diff := helpers.Diff(got, expected, helpers.DiffFormatter(reflect.TypeOf(schema.ColumnBytes), fmt.Sprint)); diff != "" {
if diff := helpers.Diff(got, expected); diff != "" {
t.Fatalf("Input data (-got, +want):\n%s", diff)
}
@@ -99,7 +97,9 @@ func TestOverflow(t *testing.T) {
configuration := DefaultConfiguration().(*Configuration)
configuration.Listen = "127.0.0.1:0"
configuration.QueueSize = 1
in, err := configuration.New(r, daemon.NewMock(t), &decoder.DummyDecoder{})
in, err := configuration.New(r, daemon.NewMock(t), &decoder.DummyDecoder{
Schema: schema.NewMock(t),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}

View File

@@ -47,6 +47,7 @@ type Component struct {
type Dependencies struct {
Daemon daemon.Component
HTTP *http.Component
Schema *schema.Component
}
// New creates a new flow component.
@@ -77,7 +78,7 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
if !ok {
return nil, fmt.Errorf("unknown decoder %q", input.Decoder)
}
dec = decoderfunc(r)
dec = decoderfunc(r, decoder.Dependencies{Schema: c.d.Schema})
alreadyInitialized[input.Decoder] = dec
decs[idx] = c.wrapDecoder(dec, input.UseSrcAddrForExporterAddr)
}
@@ -120,7 +121,7 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
c.d.HTTP.AddHandler("/api/v0/inlet/flow/schema.proto",
netHTTP.HandlerFunc(func(w netHTTP.ResponseWriter, r *netHTTP.Request) {
w.Header().Set("Content-Type", "text/plain")
w.Write([]byte(schema.Flows.ProtobufDefinition()))
w.Write([]byte(c.d.Schema.ProtobufDefinition()))
}))
return &c, nil

View File

@@ -34,6 +34,7 @@ func NewMock(t *testing.T, r *reporter.Reporter, config Configuration) *Componen
c, err := New(r, config, Dependencies{
Daemon: daemon.NewMock(t),
HTTP: http.NewMock(t, r),
Schema: schema.NewMock(t),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)

View File

@@ -29,9 +29,9 @@ func TestRealKafka(t *testing.T) {
configuration.Brokers = brokers
configuration.Version = kafka.Version(sarama.V2_8_1_0)
configuration.FlushInterval = 100 * time.Millisecond
expectedTopicName := fmt.Sprintf("%s-%s", topicName, schema.Flows.ProtobufMessageHash())
expectedTopicName := fmt.Sprintf("%s-%s", topicName, schema.NewMock(t).ProtobufMessageHash())
r := reporter.NewMock(t)
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t), Schema: schema.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}

View File

@@ -37,6 +37,7 @@ type Component struct {
// Dependencies define the dependencies of the Kafka exporter.
type Dependencies struct {
Daemon daemon.Component
Schema *schema.Component
}
// New creates a new HTTP component.
@@ -65,7 +66,7 @@ func New(reporter *reporter.Reporter, configuration Configuration, dependencies
config: configuration,
kafkaConfig: kafkaConfig,
kafkaTopic: fmt.Sprintf("%s-%s", configuration.Topic, schema.Flows.ProtobufMessageHash()),
kafkaTopic: fmt.Sprintf("%s-%s", configuration.Topic, dependencies.Schema.ProtobufMessageHash()),
}
c.initMetrics()
c.createKafkaProducer = func() (sarama.AsyncProducer, error) {

View File

@@ -27,7 +27,7 @@ func TestKafka(t *testing.T) {
mockProducer.ExpectInputWithMessageCheckerFunctionAndSucceed(func(got *sarama.ProducerMessage) error {
defer close(received)
expected := sarama.ProducerMessage{
Topic: fmt.Sprintf("flows-%s", schema.Flows.ProtobufMessageHash()),
Topic: fmt.Sprintf("flows-%s", c.d.Schema.ProtobufMessageHash()),
Key: got.Key,
Value: sarama.ByteEncoder("hello world!"),
Partition: got.Partition,
@@ -52,7 +52,7 @@ func TestKafka(t *testing.T) {
gotMetrics := r.GetMetrics("akvorado_inlet_kafka_")
expectedMetrics := map[string]string{
`sent_bytes_total{exporter="127.0.0.1"}`: "26",
fmt.Sprintf(`errors_total{error="kafka: Failed to produce message to topic flows-%s: noooo"}`, schema.Flows.ProtobufMessageHash()): "1",
fmt.Sprintf(`errors_total{error="kafka: Failed to produce message to topic flows-%s: noooo"}`, c.d.Schema.ProtobufMessageHash()): "1",
`sent_messages_total{exporter="127.0.0.1"}`: "2",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
@@ -62,7 +62,7 @@ func TestKafka(t *testing.T) {
func TestKafkaMetrics(t *testing.T) {
r := reporter.NewMock(t)
c, err := New(r, DefaultConfiguration(), Dependencies{Daemon: daemon.NewMock(t)})
c, err := New(r, DefaultConfiguration(), Dependencies{Daemon: daemon.NewMock(t), Schema: schema.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}

View File

@@ -14,13 +14,17 @@ import (
"akvorado/common/daemon"
"akvorado/common/helpers"
"akvorado/common/reporter"
"akvorado/common/schema"
)
// NewMock creates a new Kafka component with a mocked Kafka. It will
// panic if it cannot be started.
func NewMock(t *testing.T, reporter *reporter.Reporter, configuration Configuration) (*Component, *mocks.AsyncProducer) {
t.Helper()
c, err := New(reporter, configuration, Dependencies{Daemon: daemon.NewMock(t)})
c, err := New(reporter, configuration, Dependencies{
Daemon: daemon.NewMock(t),
Schema: schema.NewMock(t),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}

View File

@@ -13,8 +13,6 @@ import (
"strconv"
"text/template"
"time"
"akvorado/common/schema"
)
var (
@@ -71,8 +69,8 @@ func (c *Component) registerHTTPHandlers() error {
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var result bytes.Buffer
if err := initShTemplate.Execute(&result, initShVariables{
FlowSchemaHash: schema.Flows.ProtobufMessageHash(),
FlowSchema: schema.Flows.ProtobufDefinition(),
FlowSchemaHash: c.d.Schema.ProtobufMessageHash(),
FlowSchema: c.d.Schema.ProtobufDefinition(),
SystemLogTTL: int(c.config.SystemLogTTL.Seconds()),
SystemLogTables: []string{
"asynchronous_metric_log",

View File

@@ -28,6 +28,7 @@ func TestHTTPEndpoints(t *testing.T) {
c, err := New(r, config, Dependencies{
Daemon: daemon.NewMock(t),
HTTP: http.NewMock(t, r),
Schema: schema.NewMock(t),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
@@ -65,7 +66,7 @@ func TestHTTPEndpoints(t *testing.T) {
``,
`# Install Protobuf schema`,
fmt.Sprintf(`cat > /var/lib/clickhouse/format_schemas/flow-%s.proto <<'EOPROTO'`,
schema.Flows.ProtobufMessageHash()),
c.d.Schema.ProtobufMessageHash()),
"",
`syntax = "proto3";`,
},
@@ -84,6 +85,7 @@ func TestAdditionalASNs(t *testing.T) {
c, err := New(r, config, Dependencies{
Daemon: daemon.NewMock(t),
HTTP: http.NewMock(t, r),
Schema: schema.NewMock(t),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
@@ -177,6 +179,7 @@ func TestNetworkSources(t *testing.T) {
c, err := New(r, config, Dependencies{
Daemon: daemon.NewMock(t),
HTTP: http.NewMock(t, r),
Schema: schema.NewMock(t),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)

View File

@@ -124,7 +124,7 @@ LAYOUT({{ .Layout }}())
func (c *Component) createExportersView(ctx context.Context) error {
// Select the columns we need
cols := []string{}
for _, column := range schema.Flows.Columns() {
for _, column := range c.d.Schema.Columns() {
if column.Key == schema.ColumnTimeReceived || strings.HasPrefix(column.Name, "Exporter") {
cols = append(cols, column.Name)
}
@@ -177,7 +177,7 @@ AS %s
// createRawFlowsTable creates the raw flow table
func (c *Component) createRawFlowsTable(ctx context.Context) error {
hash := schema.Flows.ProtobufMessageHash()
hash := c.d.Schema.ProtobufMessageHash()
tableName := fmt.Sprintf("flows_%s_raw", hash)
kafkaEngine := fmt.Sprintf("Kafka SETTINGS %s", strings.Join([]string{
fmt.Sprintf(`kafka_broker_list = '%s'`,
@@ -198,7 +198,7 @@ func (c *Component) createRawFlowsTable(ctx context.Context) error {
gin.H{
"Database": c.config.Database,
"Table": tableName,
"Schema": schema.Flows.ClickHouseCreateTable(
"Schema": c.d.Schema.ClickHouseCreateTable(
schema.ClickHouseSkipGeneratedColumns,
schema.ClickHouseUseTransformFromType,
schema.ClickHouseSkipAliasedColumns),
@@ -235,7 +235,7 @@ func (c *Component) createRawFlowsTable(ctx context.Context) error {
}
func (c *Component) createRawFlowsConsumerView(ctx context.Context) error {
tableName := fmt.Sprintf("flows_%s_raw", schema.Flows.ProtobufMessageHash())
tableName := fmt.Sprintf("flows_%s_raw", c.d.Schema.ProtobufMessageHash())
viewName := fmt.Sprintf("%s_consumer", tableName)
// Build SELECT query
@@ -243,7 +243,7 @@ func (c *Component) createRawFlowsConsumerView(ctx context.Context) error {
`{{ .With }} SELECT {{ .Columns }} FROM {{ .Database }}.{{ .Table }} WHERE length(_error) = 0`,
gin.H{
"With": "WITH arrayCompact(DstASPath) AS c_DstASPath",
"Columns": strings.Join(schema.Flows.ClickHouseSelectColumns(
"Columns": strings.Join(c.d.Schema.ClickHouseSelectColumns(
schema.ClickHouseSubstituteGenerates,
schema.ClickHouseSubstituteTransforms,
schema.ClickHouseSkipAliasedColumns), ", "),
@@ -277,7 +277,7 @@ func (c *Component) createRawFlowsConsumerView(ctx context.Context) error {
}
func (c *Component) createRawFlowsErrorsView(ctx context.Context) error {
tableName := fmt.Sprintf("flows_%s_raw", schema.Flows.ProtobufMessageHash())
tableName := fmt.Sprintf("flows_%s_raw", c.d.Schema.ProtobufMessageHash())
viewName := fmt.Sprintf("%s_errors", tableName)
// Build SELECT query
@@ -350,7 +350,7 @@ PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, INTERVAL {{ .Parti
ORDER BY (TimeReceived, ExporterAddress, InIfName, OutIfName)
TTL TimeReceived + toIntervalSecond({{ .TTL }})
`, gin.H{
"Schema": schema.Flows.ClickHouseCreateTable(),
"Schema": c.d.Schema.ClickHouseCreateTable(),
"PartitionInterval": partitionInterval,
"TTL": ttl,
})
@@ -364,10 +364,10 @@ ORDER BY ({{ .SortingKey }})
TTL TimeReceived + toIntervalSecond({{ .TTL }})
`, gin.H{
"Table": tableName,
"Schema": schema.Flows.ClickHouseCreateTable(schema.ClickHouseSkipMainOnlyColumns),
"Schema": c.d.Schema.ClickHouseCreateTable(schema.ClickHouseSkipMainOnlyColumns),
"PartitionInterval": partitionInterval,
"PrimaryKey": strings.Join(schema.Flows.ClickHousePrimaryKeys(), ", "),
"SortingKey": strings.Join(schema.Flows.ClickHouseSortingKeys(), ", "),
"PrimaryKey": strings.Join(c.d.Schema.ClickHousePrimaryKeys(), ", "),
"SortingKey": strings.Join(c.d.Schema.ClickHouseSortingKeys(), ", "),
"TTL": ttl,
})
}
@@ -402,7 +402,7 @@ ORDER BY position ASC
modifications := []string{}
previousColumn := ""
outer:
for _, wantedColumn := range schema.Flows.Columns() {
for _, wantedColumn := range c.d.Schema.Columns() {
if resolution.Interval > 0 && wantedColumn.MainOnly {
continue
}
@@ -411,12 +411,12 @@ outer:
if wantedColumn.Name == existingColumn.Name {
// Do a few sanity checks
if wantedColumn.ClickHouseType != existingColumn.Type {
if slices.Contains(schema.Flows.ClickHousePrimaryKeys(), wantedColumn.Name) {
if slices.Contains(c.d.Schema.ClickHousePrimaryKeys(), wantedColumn.Name) {
return fmt.Errorf("table %s, primary key column %s has a non-matching type: %s vs %s",
tableName, wantedColumn.Name, existingColumn.Type, wantedColumn.ClickHouseType)
}
}
if resolution.Interval > 0 && slices.Contains(schema.Flows.ClickHousePrimaryKeys(), wantedColumn.Name) && existingColumn.IsPrimaryKey == 0 {
if resolution.Interval > 0 && slices.Contains(c.d.Schema.ClickHousePrimaryKeys(), wantedColumn.Name) && existingColumn.IsPrimaryKey == 0 {
return fmt.Errorf("table %s, column %s should be a primary key, cannot change that",
tableName, wantedColumn.Name)
}
@@ -440,7 +440,7 @@ outer:
}
}
// Add the missing column. Only if not primary.
if resolution.Interval > 0 && slices.Contains(schema.Flows.ClickHousePrimaryKeys(), wantedColumn.Name) {
if resolution.Interval > 0 && slices.Contains(c.d.Schema.ClickHousePrimaryKeys(), wantedColumn.Name) {
return fmt.Errorf("table %s, column %s is missing but it is a primary key",
tableName, wantedColumn.Name)
}
@@ -453,7 +453,7 @@ outer:
// Also update ORDER BY
if resolution.Interval > 0 {
modifications = append(modifications,
fmt.Sprintf("MODIFY ORDER BY (%s)", strings.Join(schema.Flows.ClickHouseSortingKeys(), ", ")))
fmt.Sprintf("MODIFY ORDER BY (%s)", strings.Join(c.d.Schema.ClickHouseSortingKeys(), ", ")))
}
c.r.Info().Msgf("apply %d modifications to %s", len(modifications), tableName)
if resolution.Interval > 0 {
@@ -503,7 +503,7 @@ SELECT
FROM {{ .Database }}.flows`, gin.H{
"Database": c.config.Database,
"Seconds": uint64(resolution.Interval.Seconds()),
"Columns": strings.Join(schema.Flows.ClickHouseSelectColumns(
"Columns": strings.Join(c.d.Schema.ClickHouseSelectColumns(
schema.ClickHouseSkipTimeReceived,
schema.ClickHouseSkipMainOnlyColumns,
schema.ClickHouseSkipAliasedColumns), ",\n "),

View File

@@ -62,7 +62,7 @@ FROM system.tables
WHERE database=currentDatabase() AND table NOT LIKE '.%'
ORDER BY length(table) ASC`
func dumpAllTables(t *testing.T, ch *clickhousedb.Component) map[string]string {
func dumpAllTables(t *testing.T, ch *clickhousedb.Component, schemaComponent *schema.Component) map[string]string {
// TODO: find the right ordering, this one does not totally work
rows, err := ch.Query(context.Background(), dumpAllTablesQuery)
if err != nil {
@@ -74,7 +74,7 @@ func dumpAllTables(t *testing.T, ch *clickhousedb.Component) map[string]string {
if err := rows.Scan(&table, &schema); err != nil {
t.Fatalf("Scan() error:\n%+v", err)
}
if !oldTable(table) {
if !oldTable(schemaComponent, table) {
schemas[table] = schema
}
}
@@ -86,9 +86,9 @@ type tableWithSchema struct {
schema string
}
func loadTables(t *testing.T, ch *clickhousedb.Component, schemas []tableWithSchema) {
func loadTables(t *testing.T, ch *clickhousedb.Component, sch *schema.Component, schemas []tableWithSchema) {
for _, tws := range schemas {
if oldTable(tws.table) {
if oldTable(sch, tws.table) {
continue
}
t.Logf("Load table %s", tws.table)
@@ -98,8 +98,8 @@ func loadTables(t *testing.T, ch *clickhousedb.Component, schemas []tableWithSch
}
}
func oldTable(table string) bool {
if strings.Contains(table, schema.Flows.ProtobufMessageHash()) {
func oldTable(schema *schema.Component, table string) bool {
if strings.Contains(table, schema.ProtobufMessageHash()) {
return false
}
if strings.HasSuffix(table, "_raw") || strings.HasSuffix(table, "_raw_consumer") || strings.HasSuffix(table, "_raw_errors") {
@@ -110,7 +110,7 @@ func oldTable(table string) bool {
// loadAllTables load tables from a CSV file. Use `format CSV` with
// query from dumpAllTables.
func loadAllTables(t *testing.T, ch *clickhousedb.Component, filename string) {
func loadAllTables(t *testing.T, ch *clickhousedb.Component, sch *schema.Component, filename string) {
input, err := os.Open(filename)
if err != nil {
t.Fatalf("Open(%q) error:\n%+v", filename, err)
@@ -136,7 +136,7 @@ func loadAllTables(t *testing.T, ch *clickhousedb.Component, filename string) {
}
dropAllTables(t, ch)
t.Logf("(%s) Load all tables from dump %s", time.Now(), filename)
loadTables(t, ch, schemas)
loadTables(t, ch, sch, schemas)
t.Logf("(%s) Loaded all tables from dump %s", time.Now(), filename)
}
@@ -161,6 +161,7 @@ func TestGetHTTPBaseURL(t *testing.T) {
c, err := New(r, DefaultConfiguration(), Dependencies{
Daemon: daemon.NewMock(t),
HTTP: http,
Schema: schema.NewMock(t),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
@@ -201,7 +202,7 @@ func TestMigration(t *testing.T) {
}
for _, f := range files {
t.Run(f.Name(), func(t *testing.T) {
loadAllTables(t, chComponent, path.Join("testdata/states", f.Name()))
loadAllTables(t, chComponent, schema.NewMock(t), path.Join("testdata/states", f.Name()))
r := reporter.NewMock(t)
configuration := DefaultConfiguration()
configuration.OrchestratorURL = "http://something"
@@ -209,6 +210,7 @@ func TestMigration(t *testing.T) {
ch, err := New(r, configuration, Dependencies{
Daemon: daemon.NewMock(t),
HTTP: http.NewMock(t, r),
Schema: schema.NewMock(t),
ClickHouse: chComponent,
})
if err != nil {
@@ -225,14 +227,14 @@ WHERE database=currentDatabase() AND table NOT LIKE '.%'`)
if err != nil {
t.Fatalf("Query() error:\n%+v", err)
}
hash := schema.Flows.ProtobufMessageHash()
hash := ch.d.Schema.ProtobufMessageHash()
got := []string{}
for rows.Next() {
var table string
if err := rows.Scan(&table); err != nil {
t.Fatalf("Scan() error:\n%+v", err)
}
if !oldTable(table) {
if !oldTable(ch.d.Schema, table) {
got = append(got, table)
}
}
@@ -256,7 +258,7 @@ WHERE database=currentDatabase() AND table NOT LIKE '.%'`)
t.Fatalf("SHOW TABLES (-got, +want):\n%s", diff)
}
currentRun := dumpAllTables(t, chComponent)
currentRun := dumpAllTables(t, chComponent, ch.d.Schema)
if lastRun != nil {
if diff := helpers.Diff(lastRun, currentRun); diff != "" {
t.Fatalf("Final state is different (-last, +current):\n%s", diff)
@@ -296,6 +298,7 @@ LIMIT 1`, proto.ClientName)
ch, err := New(r, configuration, Dependencies{
Daemon: daemon.NewMock(t),
HTTP: http.NewMock(t, r),
Schema: schema.NewMock(t),
ClickHouse: chComponent,
})
if err != nil {

View File

@@ -18,6 +18,7 @@ import (
"akvorado/common/daemon"
"akvorado/common/http"
"akvorado/common/reporter"
"akvorado/common/schema"
)
// Component represents the ClickHouse configurator.
@@ -40,6 +41,7 @@ type Dependencies struct {
Daemon daemon.Component
HTTP *http.Component
ClickHouse *clickhousedb.Component
Schema *schema.Component
}
// New creates a new ClickHouse component.

View File

@@ -22,11 +22,11 @@ func TestTopicCreation(t *testing.T) {
rand.Seed(time.Now().UnixMicro())
topicName := fmt.Sprintf("test-topic-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-%s", topicName, schema.Flows.ProtobufMessageHash())
retentionMs := "76548"
segmentBytes := "107374184"
segmentBytes2 := "10737184"
cleanupPolicy := "delete"
expectedTopicName := fmt.Sprintf("%s-%s", topicName, schema.NewMock(t).ProtobufMessageHash())
cases := []struct {
Name string
@@ -65,7 +65,7 @@ func TestTopicCreation(t *testing.T) {
}
configuration.Brokers = brokers
configuration.Version = kafka.Version(sarama.V2_8_1_0)
c, err := New(reporter.NewMock(t), configuration)
c, err := New(reporter.NewMock(t), configuration, Dependencies{Schema: schema.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
@@ -96,7 +96,7 @@ func TestTopicMorePartitions(t *testing.T) {
rand.Seed(time.Now().UnixMicro())
topicName := fmt.Sprintf("test-topic-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-%s", topicName, schema.Flows.ProtobufMessageHash())
expectedTopicName := fmt.Sprintf("%s-%s", topicName, schema.NewMock(t).ProtobufMessageHash())
configuration := DefaultConfiguration()
configuration.Topic = topicName
@@ -108,7 +108,7 @@ func TestTopicMorePartitions(t *testing.T) {
configuration.Brokers = brokers
configuration.Version = kafka.Version(sarama.V2_8_1_0)
c, err := New(reporter.NewMock(t), configuration)
c, err := New(reporter.NewMock(t), configuration, Dependencies{Schema: schema.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
@@ -133,7 +133,7 @@ func TestTopicMorePartitions(t *testing.T) {
// Increase number of partitions
configuration.TopicConfiguration.NumPartitions = 4
c, err = New(reporter.NewMock(t), configuration)
c, err = New(reporter.NewMock(t), configuration, Dependencies{Schema: schema.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}

View File

@@ -18,14 +18,20 @@ import (
// Component represents the Kafka configurator.
type Component struct {
r *reporter.Reporter
d Dependencies
config Configuration
kafkaConfig *sarama.Config
kafkaTopic string
}
// Dependencies are the dependencies for the Kafka component
type Dependencies struct {
Schema *schema.Component
}
// New creates a new Kafka configurator.
func New(r *reporter.Reporter, config Configuration) (*Component, error) {
func New(r *reporter.Reporter, config Configuration, dependencies Dependencies) (*Component, error) {
kafkaConfig, err := kafka.NewConfig(config.Configuration)
if err != nil {
return nil, err
@@ -36,10 +42,11 @@ func New(r *reporter.Reporter, config Configuration) (*Component, error) {
return &Component{
r: r,
d: dependencies,
config: config,
kafkaConfig: kafkaConfig,
kafkaTopic: fmt.Sprintf("%s-%s", config.Topic, schema.Flows.ProtobufMessageHash()),
kafkaTopic: fmt.Sprintf("%s-%s", config.Topic, dependencies.Schema.ProtobufMessageHash()),
}, nil
}