common/schema: namespace column members by subsystem

Otherwise, it will be a mess once we introduce IPFIX/sFlow specific fields
This commit is contained in:
Vincent Bernat
2023-01-07 19:47:28 +01:00
parent 56d7ef10f0
commit e88d2a2974
10 changed files with 187 additions and 184 deletions

View File

@@ -7,7 +7,7 @@ import (
"time"
)
// Configuration defines how we connect to a Clickhouse database
// Configuration defines how we connect to a ClickHouse database
type Configuration struct {
// Servers define the list of clickhouse servers to connect to (with ports)
Servers []string `validate:"min=1,dive,listen"`
@@ -23,7 +23,7 @@ type Configuration struct {
DialTimeout time.Duration `validate:"min=100ms"`
}
// DefaultConfiguration represents the default configuration for connecting to Clickhouse
// DefaultConfiguration represents the default configuration for connecting to ClickHouse
func DefaultConfiguration() Configuration {
return Configuration{
Servers: []string{"127.0.0.1:9000"},

View File

@@ -10,106 +10,101 @@ import (
"golang.org/x/exp/slices"
)
// String turns a column into a declaration for ClickHouse
func (column Column) String() string {
result := []string{fmt.Sprintf("`%s`", column.Name), column.Type}
if column.Codec != "" {
result = append(result, fmt.Sprintf("CODEC(%s)", column.Codec))
// ClickHouseDefinition turns a column into a declaration for ClickHouse
func (column Column) ClickHouseDefinition() string {
result := []string{fmt.Sprintf("`%s`", column.Name), column.ClickHouseType}
if column.ClickHouseCodec != "" {
result = append(result, fmt.Sprintf("CODEC(%s)", column.ClickHouseCodec))
}
if column.Alias != "" {
result = append(result, fmt.Sprintf("ALIAS %s", column.Alias))
if column.ClickHouseAlias != "" {
result = append(result, fmt.Sprintf("ALIAS %s", column.ClickHouseAlias))
}
return strings.Join(result, " ")
}
// TableOption is an option to alter the values returned by Table() and Columns().
type TableOption int
// ClickHouseTableOption is an option to alter the values returned by ClickHouseCreateTable() and ClickHouseSelectColumns().
type ClickHouseTableOption int
const (
// SkipMainOnlyColumns skips the columns for the main flows table only.
SkipMainOnlyColumns TableOption = iota
// SkipGeneratedColumns skips the columns with a GenerateFrom value
SkipGeneratedColumns
// SkipTransformColumns skips the columns with a TransformFrom value
SkipTransformColumns
// SkipAliasedColumns skips the columns with a Alias value
SkipAliasedColumns
// SkipTimeReceived skips the time received column
SkipTimeReceived
// SkipNotDimension skips columns that cannot be used as a dimension
SkipNotDimension
// UseTransformFromType uses the type from TransformFrom if any
UseTransformFromType
// SubstituteGenerates changes the column name to use the default generated value
SubstituteGenerates
// SubstituteTransforms changes the column name to use the transformed value
SubstituteTransforms
// ClickHouseSkipMainOnlyColumns skips the columns for the main flows table only.
ClickHouseSkipMainOnlyColumns ClickHouseTableOption = iota
// ClickHouseSkipGeneratedColumns skips the columns with a GenerateFrom value
ClickHouseSkipGeneratedColumns
// ClickHouseSkipTransformColumns skips the columns with a TransformFrom value
ClickHouseSkipTransformColumns
// ClickHouseSkipAliasedColumns skips the columns with a Alias value
ClickHouseSkipAliasedColumns
// ClickHouseSkipTimeReceived skips the time received column
ClickHouseSkipTimeReceived
// ClickHouseUseTransformFromType uses the type from TransformFrom if any
ClickHouseUseTransformFromType
// ClickHouseSubstituteGenerates changes the column name to use the default generated value
ClickHouseSubstituteGenerates
// ClickHouseSubstituteTransforms changes the column name to use the transformed value
ClickHouseSubstituteTransforms
)
// CreateTable returns the columns for the CREATE TABLE clause in ClickHouse.
func (schema Schema) CreateTable(options ...TableOption) string {
// ClickHouseCreateTable returns the columns for the CREATE TABLE clause in ClickHouse.
func (schema Schema) ClickHouseCreateTable(options ...ClickHouseTableOption) string {
lines := []string{}
schema.iterate(func(column Column) {
lines = append(lines, column.String())
schema.clickhouseIterate(func(column Column) {
lines = append(lines, column.ClickHouseDefinition())
}, options...)
return strings.Join(lines, ",\n")
}
// SelectColumns returns the column for the SELECT clause in ClickHouse.
func (schema Schema) SelectColumns(options ...TableOption) []string {
// ClickHouseSelectColumns returns the columns matching the options for use in SELECT
func (schema Schema) ClickHouseSelectColumns(options ...ClickHouseTableOption) []string {
cols := []string{}
schema.iterate(func(column Column) {
schema.clickhouseIterate(func(column Column) {
cols = append(cols, column.Name)
}, options...)
return cols
}
func (schema Schema) iterate(fn func(column Column), options ...TableOption) {
func (schema Schema) clickhouseIterate(fn func(column Column), options ...ClickHouseTableOption) {
for pair := schema.Columns.Front(); pair != nil; pair = pair.Next() {
column := pair.Value
if slices.Contains(options, SkipTimeReceived) && column.Name == "TimeReceived" {
if slices.Contains(options, ClickHouseSkipTimeReceived) && column.Name == "TimeReceived" {
continue
}
if slices.Contains(options, SkipMainOnlyColumns) && column.MainOnly {
if slices.Contains(options, ClickHouseSkipMainOnlyColumns) && column.MainOnly {
continue
}
if slices.Contains(options, SkipGeneratedColumns) && column.GenerateFrom != "" {
if slices.Contains(options, ClickHouseSkipGeneratedColumns) && column.ClickHouseGenerateFrom != "" {
continue
}
if slices.Contains(options, SkipTransformColumns) && column.TransformFrom != nil {
if slices.Contains(options, ClickHouseSkipTransformColumns) && column.ClickHouseTransformFrom != nil {
continue
}
if slices.Contains(options, SkipAliasedColumns) && column.Alias != "" {
if slices.Contains(options, ClickHouseSkipAliasedColumns) && column.ClickHouseAlias != "" {
continue
}
if slices.Contains(options, SkipNotDimension) && column.NotSelectable {
continue
}
if slices.Contains(options, UseTransformFromType) && column.TransformFrom != nil {
for _, ocol := range column.TransformFrom {
if slices.Contains(options, ClickHouseUseTransformFromType) && column.ClickHouseTransformFrom != nil {
for _, ocol := range column.ClickHouseTransformFrom {
// We assume we only need to use name/type
column.Name = ocol.Name
column.Type = ocol.Type
column.ClickHouseType = ocol.ClickHouseType
fn(column)
}
continue
}
if slices.Contains(options, SubstituteGenerates) && column.GenerateFrom != "" {
column.Name = fmt.Sprintf("%s AS %s", column.GenerateFrom, column.Name)
if slices.Contains(options, ClickHouseSubstituteGenerates) && column.ClickHouseGenerateFrom != "" {
column.Name = fmt.Sprintf("%s AS %s", column.ClickHouseGenerateFrom, column.Name)
}
if slices.Contains(options, SubstituteTransforms) && column.TransformFrom != nil {
column.Name = fmt.Sprintf("%s AS %s", column.TransformTo, column.Name)
if slices.Contains(options, ClickHouseSubstituteTransforms) && column.ClickHouseTransformFrom != nil {
column.Name = fmt.Sprintf("%s AS %s", column.ClickHouseTransformTo, column.Name)
}
fn(column)
}
}
// SortingKeys returns the list of sorting keys, prefixed by the primary keys.
func (schema Schema) SortingKeys() []string {
cols := append([]string{}, schema.PrimaryKeys...)
// ClickHouseSortingKeys returns the list of sorting keys, prefixed by the primary keys.
func (schema Schema) ClickHouseSortingKeys() []string {
cols := append([]string{}, schema.ClickHousePrimaryKeys...)
for pair := schema.Columns.Front(); pair != nil; pair = pair.Next() {
column := pair.Value
if column.NotSortingKey || column.MainOnly {
if column.ClickHouseNotSortingKey || column.MainOnly {
continue
}
if !slices.Contains(cols, column.Name) {

View File

@@ -14,7 +14,7 @@ import (
// will be duplicated as Dst/OutIf during init. That's not the case for columns
// in `PrimaryKeys'.
var Flows = Schema{
PrimaryKeys: []string{
ClickHousePrimaryKeys: []string{
"TimeReceived",
"ExporterAddress",
"EType",
@@ -28,113 +28,113 @@ var Flows = Schema{
},
Columns: buildMapFromColumns([]Column{
{
Name: "TimeReceived",
Type: "DateTime",
Codec: "DoubleDelta, LZ4",
NotSelectable: true,
Name: "TimeReceived",
ClickHouseType: "DateTime",
ClickHouseCodec: "DoubleDelta, LZ4",
ConsoleNotDimension: true,
},
{Name: "SamplingRate", Type: "UInt64", NotSelectable: true},
{Name: "ExporterAddress", Type: "LowCardinality(IPv6)"},
{Name: "ExporterName", Type: "LowCardinality(String)", NotSortingKey: true},
{Name: "ExporterGroup", Type: "LowCardinality(String)", NotSortingKey: true},
{Name: "ExporterRole", Type: "LowCardinality(String)", NotSortingKey: true},
{Name: "ExporterSite", Type: "LowCardinality(String)", NotSortingKey: true},
{Name: "ExporterRegion", Type: "LowCardinality(String)", NotSortingKey: true},
{Name: "ExporterTenant", Type: "LowCardinality(String)", NotSortingKey: true},
{Name: "SamplingRate", ClickHouseType: "UInt64", ConsoleNotDimension: true},
{Name: "ExporterAddress", ClickHouseType: "LowCardinality(IPv6)"},
{Name: "ExporterName", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Name: "ExporterGroup", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Name: "ExporterRole", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Name: "ExporterSite", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Name: "ExporterRegion", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Name: "ExporterTenant", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{
Name: "SrcAddr",
Type: "IPv6",
MainOnly: true,
Name: "SrcAddr",
MainOnly: true,
ClickHouseType: "IPv6",
}, {
Name: "SrcNetMask",
Type: "UInt8",
MainOnly: true,
NotSelectable: true,
Name: "SrcNetMask",
MainOnly: true,
ClickHouseType: "UInt8",
ConsoleNotDimension: true,
}, {
Name: "SrcNetPrefix",
Type: "String",
MainOnly: true,
Alias: `CASE
Name: "SrcNetPrefix",
MainOnly: true,
ClickHouseType: "String",
ClickHouseAlias: `CASE
WHEN EType = 0x800 THEN concat(replaceRegexpOne(IPv6CIDRToRange(SrcAddr, (96 + SrcNetMask)::UInt8).1::String, '^::ffff:', ''), '/', SrcNetMask::String)
WHEN EType = 0x86dd THEN concat(IPv6CIDRToRange(SrcAddr, SrcNetMask).1::String, '/', SrcNetMask::String)
ELSE ''
END`,
},
{Name: "SrcAS", Type: "UInt32"},
{Name: "SrcAS", ClickHouseType: "UInt32"},
{
Name: "SrcNetName",
Type: "LowCardinality(String)",
GenerateFrom: "dictGetOrDefault('networks', 'name', SrcAddr, '')",
Name: "SrcNetName",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'name', SrcAddr, '')",
}, {
Name: "SrcNetRole",
Type: "LowCardinality(String)",
GenerateFrom: "dictGetOrDefault('networks', 'role', SrcAddr, '')",
Name: "SrcNetRole",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'role', SrcAddr, '')",
}, {
Name: "SrcNetSite",
Type: "LowCardinality(String)",
GenerateFrom: "dictGetOrDefault('networks', 'site', SrcAddr, '')",
Name: "SrcNetSite",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'site', SrcAddr, '')",
}, {
Name: "SrcNetRegion",
Type: "LowCardinality(String)",
GenerateFrom: "dictGetOrDefault('networks', 'region', SrcAddr, '')",
Name: "SrcNetRegion",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'region', SrcAddr, '')",
}, {
Name: "SrcNetTenant",
Type: "LowCardinality(String)",
GenerateFrom: "dictGetOrDefault('networks', 'tenant', SrcAddr, '')",
Name: "SrcNetTenant",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: "dictGetOrDefault('networks', 'tenant', SrcAddr, '')",
},
{Name: "SrcCountry", Type: "FixedString(2)"},
{Name: "SrcCountry", ClickHouseType: "FixedString(2)"},
{
Name: "DstASPath",
Type: "Array(UInt32)",
MainOnly: true,
Name: "DstASPath",
MainOnly: true,
ClickHouseType: "Array(UInt32)",
}, {
Name: "Dst1stAS",
Type: "UInt32",
GenerateFrom: "c_DstASPath[1]",
Name: "Dst1stAS",
ClickHouseType: "UInt32",
ClickHouseGenerateFrom: "c_DstASPath[1]",
}, {
Name: "Dst2ndAS",
Type: "UInt32",
GenerateFrom: "c_DstASPath[2]",
Name: "Dst2ndAS",
ClickHouseType: "UInt32",
ClickHouseGenerateFrom: "c_DstASPath[2]",
}, {
Name: "Dst3rdAS",
Type: "UInt32",
GenerateFrom: "c_DstASPath[3]",
Name: "Dst3rdAS",
ClickHouseType: "UInt32",
ClickHouseGenerateFrom: "c_DstASPath[3]",
}, {
Name: "DstCommunities",
Type: "Array(UInt32)",
MainOnly: true,
Name: "DstCommunities",
MainOnly: true,
ClickHouseType: "Array(UInt32)",
}, {
Name: "DstLargeCommunities",
Type: "Array(UInt128)",
MainOnly: true,
TransformFrom: []Column{
{Name: "DstLargeCommunities.ASN", Type: "Array(UInt32)"},
{Name: "DstLargeCommunities.LocalData1", Type: "Array(UInt32)"},
{Name: "DstLargeCommunities.LocalData2", Type: "Array(UInt32)"},
Name: "DstLargeCommunities",
MainOnly: true,
ClickHouseType: "Array(UInt128)",
ClickHouseTransformFrom: []Column{
{Name: "DstLargeCommunities.ASN", ClickHouseType: "Array(UInt32)"},
{Name: "DstLargeCommunities.LocalData1", ClickHouseType: "Array(UInt32)"},
{Name: "DstLargeCommunities.LocalData2", ClickHouseType: "Array(UInt32)"},
},
TransformTo: "arrayMap((asn, l1, l2) -> ((bitShiftLeft(CAST(asn, 'UInt128'), 64) + bitShiftLeft(CAST(l1, 'UInt128'), 32)) + CAST(l2, 'UInt128')), `DstLargeCommunities.ASN`, `DstLargeCommunities.LocalData1`, `DstLargeCommunities.LocalData2`)",
NotSelectable: true,
ClickHouseTransformTo: "arrayMap((asn, l1, l2) -> ((bitShiftLeft(CAST(asn, 'UInt128'), 64) + bitShiftLeft(CAST(l1, 'UInt128'), 32)) + CAST(l2, 'UInt128')), `DstLargeCommunities.ASN`, `DstLargeCommunities.LocalData1`, `DstLargeCommunities.LocalData2`)",
ConsoleNotDimension: true,
},
{Name: "InIfName", Type: "LowCardinality(String)"},
{Name: "InIfDescription", Type: "String", NotSortingKey: true},
{Name: "InIfSpeed", Type: "UInt32", NotSortingKey: true},
{Name: "InIfConnectivity", Type: "LowCardinality(String)", NotSortingKey: true},
{Name: "InIfProvider", Type: "LowCardinality(String)", NotSortingKey: true},
{Name: "InIfBoundary", Type: "Enum8('undefined' = 0, 'external' = 1, 'internal' = 2)", NotSortingKey: true},
{Name: "EType", Type: "UInt32"},
{Name: "Proto", Type: "UInt32"},
{Name: "SrcPort", Type: "UInt32", MainOnly: true},
{Name: "Bytes", Type: "UInt64", NotSortingKey: true, NotSelectable: true},
{Name: "Packets", Type: "UInt64", NotSortingKey: true, NotSelectable: true},
{Name: "InIfName", ClickHouseType: "LowCardinality(String)"},
{Name: "InIfDescription", ClickHouseType: "String", ClickHouseNotSortingKey: true},
{Name: "InIfSpeed", ClickHouseType: "UInt32", ClickHouseNotSortingKey: true},
{Name: "InIfConnectivity", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Name: "InIfProvider", ClickHouseType: "LowCardinality(String)", ClickHouseNotSortingKey: true},
{Name: "InIfBoundary", ClickHouseType: "Enum8('undefined' = 0, 'external' = 1, 'internal' = 2)", ClickHouseNotSortingKey: true},
{Name: "EType", ClickHouseType: "UInt32"},
{Name: "Proto", ClickHouseType: "UInt32"},
{Name: "SrcPort", ClickHouseType: "UInt32", MainOnly: true},
{Name: "Bytes", ClickHouseType: "UInt64", ClickHouseNotSortingKey: true, ConsoleNotDimension: true},
{Name: "Packets", ClickHouseType: "UInt64", ClickHouseNotSortingKey: true, ConsoleNotDimension: true},
{
Name: "PacketSize",
Type: "UInt64",
Alias: "intDiv(Bytes, Packets)",
NotSelectable: true,
Name: "PacketSize",
ClickHouseType: "UInt64",
ClickHouseAlias: "intDiv(Bytes, Packets)",
ConsoleNotDimension: true,
}, {
Name: "PacketSizeBucket",
Type: "LowCardinality(String)",
Alias: func() string {
Name: "PacketSizeBucket",
ClickHouseType: "LowCardinality(String)",
ClickHouseAlias: func() string {
boundaries := []int{64, 128, 256, 512, 768, 1024, 1280, 1501,
2048, 3072, 4096, 8192, 10240, 16384, 32768, 65536}
conditions := []string{}
@@ -148,7 +148,7 @@ END`,
return fmt.Sprintf("multiIf(%s)", strings.Join(conditions, ", "))
}(),
},
{Name: "ForwardingStatus", Type: "UInt32"},
{Name: "ForwardingStatus", ClickHouseType: "UInt32"},
}),
}
@@ -156,18 +156,18 @@ func buildMapFromColumns(columns []Column) *orderedmap.OrderedMap[string, Column
omap := orderedmap.NewOrderedMap[string, Column]()
for _, column := range columns {
// Add non-main columns with an alias to NotSortingKey
if !column.MainOnly && column.Alias != "" {
column.NotSortingKey = true
if !column.MainOnly && column.ClickHouseAlias != "" {
column.ClickHouseNotSortingKey = true
}
omap.Set(column.Name, column)
// Expand the schema Src → Dst and InIf → OutIf
if strings.HasPrefix(column.Name, "Src") {
column.Name = fmt.Sprintf("Dst%s", column.Name[3:])
column.Alias = strings.ReplaceAll(column.Alias, "Src", "Dst")
column.ClickHouseAlias = strings.ReplaceAll(column.ClickHouseAlias, "Src", "Dst")
omap.Set(column.Name, column)
} else if strings.HasPrefix(column.Name, "InIf") {
column.Name = fmt.Sprintf("OutIf%s", column.Name[4:])
column.Alias = strings.ReplaceAll(column.Alias, "InIf", "OutIf")
column.ClickHouseAlias = strings.ReplaceAll(column.ClickHouseAlias, "InIf", "OutIf")
omap.Set(column.Name, column)
}
}
@@ -175,11 +175,11 @@ func buildMapFromColumns(columns []Column) *orderedmap.OrderedMap[string, Column
}
func init() {
for _, key := range Flows.PrimaryKeys {
for _, key := range Flows.ClickHousePrimaryKeys {
if column, ok := Flows.Columns.Get(key); !ok {
panic(fmt.Sprintf("primary key %q not a column", key))
} else {
if column.NotSortingKey {
if column.ClickHouseNotSortingKey {
panic(fmt.Sprintf("primary key %q is marked as a non-sorting key", key))
}
}

View File

@@ -15,7 +15,7 @@ type Schema struct {
// For ClickHouse. This is the set of primary keys (order is important and
// may not follow column order).
PrimaryKeys []string
ClickHousePrimaryKeys []string
}
// Column represents a column of data.
@@ -29,14 +29,14 @@ type Column struct {
// retrieved from the protobuf. `TransformFrom' and `TransformTo' work in
// pairs. The first one is the set of column in the raw table while the
// second one is how to transform it for the main table.
Type string
Codec string
Alias string
NotSortingKey bool
GenerateFrom string
TransformFrom []Column
TransformTo string
ClickHouseType string
ClickHouseCodec string
ClickHouseAlias string
ClickHouseNotSortingKey bool
ClickHouseGenerateFrom string
ClickHouseTransformFrom []Column
ClickHouseTransformTo string
// For the console.
NotSelectable bool
ConsoleNotDimension bool
}

View File

@@ -62,11 +62,19 @@ func DefaultConfiguration() Configuration {
}
func (c *Component) configHandlerFunc(gc *gin.Context) {
dimensions := []string{}
for pair := schema.Flows.Columns.Front(); pair != nil; pair = pair.Next() {
column := pair.Value
if column.ConsoleNotDimension {
continue
}
dimensions = append(dimensions, column.Name)
}
gc.JSON(http.StatusOK, gin.H{
"version": c.config.Version,
"defaultVisualizeOptions": c.config.DefaultVisualizeOptions,
"dimensionsLimit": c.config.DimensionsLimit,
"homepageTopWidgets": c.config.HomepageTopWidgets,
"dimensions": schema.Flows.SelectColumns(schema.SkipNotDimension),
"dimensions": dimensions,
})
}

View File

@@ -486,7 +486,7 @@ provided:
`akvorado.yaml` configuration file.
- `asns` maps AS number to names (overriding the builtin ones)
- `orchestrator-url` defines the URL of the orchestrator to be used
by Clickhouse (autodetection when not specified)
by ClickHouse (autodetection when not specified)
The `resolutions` setting contains a list of resolutions. Each
resolution has two keys: `interval` and `ttl`. The first one is the

View File

@@ -146,7 +146,7 @@ func (c *Component) filterCompleteHandlerFunc(gc *gin.Context) {
Detail: "ethernet type",
})
case "proto":
// Do not complete from Clickhouse, we want a subset of options
// Do not complete from ClickHouse, we want a subset of options
completions = append(completions,
filterCompletion{"TCP", "protocol", true},
filterCompletion{"UDP", "protocol", true},

View File

@@ -23,7 +23,7 @@ func (qc queryColumn) String() string {
}
func (qc *queryColumn) UnmarshalText(input []byte) error {
name := string(input)
if column, ok := schema.Flows.Columns.Get(name); ok && !column.NotSelectable {
if column, ok := schema.Flows.Columns.Get(name); ok && !column.ConsoleNotDimension {
*qc = queryColumn(name)
return nil
}

View File

@@ -45,7 +45,7 @@ type Configuration struct {
// sources to be ready. 503 is returned when not.
NetworkSourcesTimeout time.Duration `validate:"min=0"`
// OrchestratorURL allows one to override URL to reach
// orchestrator from Clickhouse
// orchestrator from ClickHouse
OrchestratorURL string `validate:"isdefault|url"`
}

View File

@@ -200,10 +200,10 @@ func (c *Component) createRawFlowsTable(ctx context.Context) error {
gin.H{
"Database": c.config.Database,
"Table": tableName,
"Schema": schema.Flows.CreateTable(
schema.SkipGeneratedColumns,
schema.UseTransformFromType,
schema.SkipAliasedColumns),
"Schema": schema.Flows.ClickHouseCreateTable(
schema.ClickHouseSkipGeneratedColumns,
schema.ClickHouseUseTransformFromType,
schema.ClickHouseSkipAliasedColumns),
"Engine": kafkaEngine,
})
if err != nil {
@@ -245,10 +245,10 @@ func (c *Component) createRawFlowsConsumerView(ctx context.Context) error {
`{{ .With }} SELECT {{ .Columns }} FROM {{ .Database }}.{{ .Table }} WHERE length(_error) = 0`,
gin.H{
"With": "WITH arrayCompact(DstASPath) AS c_DstASPath",
"Columns": strings.Join(schema.Flows.SelectColumns(
schema.SubstituteGenerates,
schema.SubstituteTransforms,
schema.SkipAliasedColumns), ", "),
"Columns": strings.Join(schema.Flows.ClickHouseSelectColumns(
schema.ClickHouseSubstituteGenerates,
schema.ClickHouseSubstituteTransforms,
schema.ClickHouseSkipAliasedColumns), ", "),
"Database": c.config.Database,
"Table": tableName,
})
@@ -352,7 +352,7 @@ PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, INTERVAL {{ .Parti
ORDER BY (TimeReceived, ExporterAddress, InIfName, OutIfName)
TTL TimeReceived + toIntervalSecond({{ .TTL }})
`, gin.H{
"Schema": schema.Flows.CreateTable(),
"Schema": schema.Flows.ClickHouseCreateTable(),
"PartitionInterval": partitionInterval,
"TTL": ttl,
})
@@ -366,10 +366,10 @@ ORDER BY ({{ .SortingKey }})
TTL TimeReceived + toIntervalSecond({{ .TTL }})
`, gin.H{
"Table": tableName,
"Schema": schema.Flows.CreateTable(schema.SkipMainOnlyColumns),
"Schema": schema.Flows.ClickHouseCreateTable(schema.ClickHouseSkipMainOnlyColumns),
"PartitionInterval": partitionInterval,
"PrimaryKey": strings.Join(schema.Flows.PrimaryKeys, ", "),
"SortingKey": strings.Join(schema.Flows.SortingKeys(), ", "),
"PrimaryKey": strings.Join(schema.Flows.ClickHousePrimaryKeys, ", "),
"SortingKey": strings.Join(schema.Flows.ClickHouseSortingKeys(), ", "),
"TTL": ttl,
})
}
@@ -413,15 +413,15 @@ outer:
for _, existingColumn := range existingColumns {
if wantedColumn.Name == existingColumn.Name {
// Do a few sanity checks
if wantedColumn.Type != existingColumn.Type {
if wantedColumn.ClickHouseType != existingColumn.Type {
return fmt.Errorf("table %s, column %s has a non-matching type: %s vs %s",
tableName, wantedColumn.Name, existingColumn.Type, wantedColumn.Type)
tableName, wantedColumn.Name, existingColumn.Type, wantedColumn.ClickHouseType)
}
if resolution.Interval > 0 && slices.Contains(schema.Flows.PrimaryKeys, wantedColumn.Name) && existingColumn.IsPrimaryKey == 0 {
if resolution.Interval > 0 && slices.Contains(schema.Flows.ClickHousePrimaryKeys, wantedColumn.Name) && existingColumn.IsPrimaryKey == 0 {
return fmt.Errorf("table %s, column %s should be a primary key, cannot change that",
tableName, wantedColumn.Name)
}
if resolution.Interval > 0 && !wantedColumn.NotSortingKey && existingColumn.IsSortingKey == 0 {
if resolution.Interval > 0 && !wantedColumn.ClickHouseNotSortingKey && existingColumn.IsSortingKey == 0 {
// That's something we can fix, but we need to drop it before recreating it
err := c.d.ClickHouse.Exec(ctx,
fmt.Sprintf("ALTER TABLE %s DROP COLUMN %s", tableName, existingColumn.Name))
@@ -431,27 +431,27 @@ outer:
}
// Schedule adding it back
modifications = append(modifications,
fmt.Sprintf("ADD COLUMN %s AFTER %s", wantedColumn, previousColumn))
fmt.Sprintf("ADD COLUMN %s AFTER %s", wantedColumn.ClickHouseDefinition(), previousColumn))
}
previousColumn = wantedColumn.Name
continue outer
}
}
// Add the missing column. Only if not primary.
if resolution.Interval > 0 && slices.Contains(schema.Flows.PrimaryKeys, wantedColumn.Name) {
if resolution.Interval > 0 && slices.Contains(schema.Flows.ClickHousePrimaryKeys, wantedColumn.Name) {
return fmt.Errorf("table %s, column %s is missing but it is a primary key",
tableName, wantedColumn.Name)
}
c.r.Debug().Msgf("add missing column %s to %s", wantedColumn.Name, tableName)
modifications = append(modifications,
fmt.Sprintf("ADD COLUMN %s AFTER %s", wantedColumn, previousColumn))
fmt.Sprintf("ADD COLUMN %s AFTER %s", wantedColumn.ClickHouseDefinition(), previousColumn))
previousColumn = wantedColumn.Name
}
if len(modifications) > 0 {
// Also update ORDER BY
if resolution.Interval > 0 {
modifications = append(modifications,
fmt.Sprintf("MODIFY ORDER BY (%s)", strings.Join(schema.Flows.SortingKeys(), ", ")))
fmt.Sprintf("MODIFY ORDER BY (%s)", strings.Join(schema.Flows.ClickHouseSortingKeys(), ", ")))
}
c.r.Info().Msgf("apply %d modifications to %s", len(modifications), tableName)
if resolution.Interval > 0 {
@@ -501,10 +501,10 @@ SELECT
FROM {{ .Database }}.flows`, gin.H{
"Database": c.config.Database,
"Seconds": uint64(resolution.Interval.Seconds()),
"Columns": strings.Join(schema.Flows.SelectColumns(
schema.SkipTimeReceived,
schema.SkipMainOnlyColumns,
schema.SkipAliasedColumns), ",\n "),
"Columns": strings.Join(schema.Flows.ClickHouseSelectColumns(
schema.ClickHouseSkipTimeReceived,
schema.ClickHouseSkipMainOnlyColumns,
schema.ClickHouseSkipAliasedColumns), ",\n "),
})
if err != nil {
return fmt.Errorf("cannot build select statement for consumer %s: %w", viewName, err)