mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-12 06:24:10 +01:00
orchestrator/clickhouse: rework migrations to use an abstract schema
We introduce an leaky abstraction for flows schema and use it for migrations as a first step. For views and dictionaries, we stop relying on a hash to know if they need to be recreated, but we compare the select statements with our target statement. This is a bit fragile, but strictly better than the hash. For data tables, we add the missing columns. We give up on the abstraction of a migration step and just rely on helper functions to get the same result. The migration code is now shorter and we don't need to update it when adding new columns. This is a preparatory work for #211 to allow a user to specify additional fields to collect.
This commit is contained in:
Notes:
Vincent Bernat
2023-01-02 23:51:06 +01:00
Hashing was not as fragile as we were only hashing column names, types and positions, so it is unknown if the new way is strictly better. The next steps are to use the schema abstraction in other places where the schema is hard coded: column names for console (`console/query_consts.go`), protobuf file and flow decoding.
113
common/schema/clickhouse.go
Normal file
113
common/schema/clickhouse.go
Normal file
@@ -0,0 +1,113 @@
|
||||
// SPDX-FileCopyrightText: 2022 Free Mobile
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
package schema
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
// String turns a column into a declaration for ClickHouse
|
||||
func (column Column) String() string {
|
||||
result := []string{fmt.Sprintf("`%s`", column.Name), column.Type}
|
||||
if column.Codec != "" {
|
||||
result = append(result, fmt.Sprintf("CODEC(%s)", column.Codec))
|
||||
}
|
||||
if column.Alias != "" {
|
||||
result = append(result, fmt.Sprintf("ALIAS %s", column.Alias))
|
||||
}
|
||||
return strings.Join(result, " ")
|
||||
}
|
||||
|
||||
// TableOption is an option to alter the values returned by Table() and Columns().
|
||||
type TableOption int
|
||||
|
||||
const (
|
||||
// SkipMainOnlyColumns skips the columns for the main flows table only.
|
||||
SkipMainOnlyColumns TableOption = iota
|
||||
// SkipGeneratedColumns skips the columns with a GenerateFrom value
|
||||
SkipGeneratedColumns
|
||||
// SkipTransformColumns skips the columns with a TransformFrom value
|
||||
SkipTransformColumns
|
||||
// SkipAliasedColumns skips the columns with a Alias value
|
||||
SkipAliasedColumns
|
||||
// SkipTimeReceived skips the time received column
|
||||
SkipTimeReceived
|
||||
// UseTransformFromType uses the type from TransformFrom if any
|
||||
UseTransformFromType
|
||||
// SubstituteGenerates changes the column name to use the default generated value
|
||||
SubstituteGenerates
|
||||
// SubstituteTransforms changes the column name to use the transformed value
|
||||
SubstituteTransforms
|
||||
)
|
||||
|
||||
// CreateTable returns the columns for the CREATE TABLE clause in ClickHouse.
|
||||
func (schema Schema) CreateTable(options ...TableOption) string {
|
||||
lines := []string{}
|
||||
schema.iterate(func(column Column) {
|
||||
lines = append(lines, column.String())
|
||||
}, options...)
|
||||
return strings.Join(lines, ",\n")
|
||||
}
|
||||
|
||||
// SelectColumns returns the column for the SELECT clause in ClickHouse.
|
||||
func (schema Schema) SelectColumns(options ...TableOption) []string {
|
||||
cols := []string{}
|
||||
schema.iterate(func(column Column) {
|
||||
cols = append(cols, column.Name)
|
||||
}, options...)
|
||||
return cols
|
||||
}
|
||||
|
||||
func (schema Schema) iterate(fn func(column Column), options ...TableOption) {
|
||||
for _, column := range schema.Columns {
|
||||
if slices.Contains(options, SkipTimeReceived) && column.Name == "TimeReceived" {
|
||||
continue
|
||||
}
|
||||
if slices.Contains(options, SkipMainOnlyColumns) && column.MainOnly {
|
||||
continue
|
||||
}
|
||||
if slices.Contains(options, SkipGeneratedColumns) && column.GenerateFrom != "" {
|
||||
continue
|
||||
}
|
||||
if slices.Contains(options, SkipTransformColumns) && column.TransformFrom != nil {
|
||||
continue
|
||||
}
|
||||
if slices.Contains(options, SkipAliasedColumns) && column.Alias != "" {
|
||||
continue
|
||||
}
|
||||
if slices.Contains(options, UseTransformFromType) && column.TransformFrom != nil {
|
||||
for _, ocol := range column.TransformFrom {
|
||||
// We assume we only need to use name/type
|
||||
column.Name = ocol.Name
|
||||
column.Type = ocol.Type
|
||||
fn(column)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if slices.Contains(options, SubstituteGenerates) && column.GenerateFrom != "" {
|
||||
column.Name = fmt.Sprintf("%s AS %s", column.GenerateFrom, column.Name)
|
||||
}
|
||||
if slices.Contains(options, SubstituteTransforms) && column.TransformFrom != nil {
|
||||
column.Name = fmt.Sprintf("%s AS %s", column.TransformTo, column.Name)
|
||||
}
|
||||
fn(column)
|
||||
}
|
||||
}
|
||||
|
||||
// SortingKeys returns the list of sorting keys, prefixed by the primary keys.
|
||||
func (schema Schema) SortingKeys() []string {
|
||||
cols := append([]string{}, schema.PrimaryKeys...)
|
||||
for _, column := range schema.Columns {
|
||||
if column.NotSortingKey || column.MainOnly {
|
||||
continue
|
||||
}
|
||||
if !slices.Contains(cols, column.Name) {
|
||||
cols = append(cols, column.Name)
|
||||
}
|
||||
}
|
||||
return cols
|
||||
}
|
||||
186
common/schema/flows.go
Normal file
186
common/schema/flows.go
Normal file
@@ -0,0 +1,186 @@
|
||||
// SPDX-FileCopyrightText: 2022 Free Mobile
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
package schema
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Flows is the data schema for flows tables. Any column starting with Src/InIf
|
||||
// will be duplicated as Dst/OutIf during init. That's not the case for columns
|
||||
// in `PrimaryKeys'.
|
||||
var Flows = Schema{
|
||||
PrimaryKeys: []string{
|
||||
"TimeReceived",
|
||||
"ExporterAddress",
|
||||
"EType",
|
||||
"Proto",
|
||||
"InIfName",
|
||||
"SrcAS",
|
||||
"ForwardingStatus",
|
||||
"OutIfName",
|
||||
"DstAS",
|
||||
"SamplingRate",
|
||||
},
|
||||
Columns: []Column{
|
||||
{
|
||||
Name: "TimeReceived",
|
||||
Type: "DateTime",
|
||||
Codec: "DoubleDelta, LZ4",
|
||||
},
|
||||
{Name: "SamplingRate", Type: "UInt64"},
|
||||
{Name: "ExporterAddress", Type: "LowCardinality(IPv6)"},
|
||||
{Name: "ExporterName", Type: "LowCardinality(String)", NotSortingKey: true},
|
||||
{Name: "ExporterGroup", Type: "LowCardinality(String)", NotSortingKey: true},
|
||||
{Name: "ExporterRole", Type: "LowCardinality(String)", NotSortingKey: true},
|
||||
{Name: "ExporterSite", Type: "LowCardinality(String)", NotSortingKey: true},
|
||||
{Name: "ExporterRegion", Type: "LowCardinality(String)", NotSortingKey: true},
|
||||
{Name: "ExporterTenant", Type: "LowCardinality(String)", NotSortingKey: true},
|
||||
{
|
||||
Name: "SrcAddr",
|
||||
Type: "IPv6",
|
||||
MainOnly: true,
|
||||
}, {
|
||||
Name: "SrcNetMask",
|
||||
Type: "UInt8",
|
||||
MainOnly: true,
|
||||
}, {
|
||||
Name: "SrcNetPrefix",
|
||||
Type: "String",
|
||||
MainOnly: true,
|
||||
Alias: `CASE
|
||||
WHEN EType = 0x800 THEN concat(replaceRegexpOne(IPv6CIDRToRange(SrcAddr, (96 + SrcNetMask)::UInt8).1::String, '^::ffff:', ''), '/', SrcNetMask::String)
|
||||
WHEN EType = 0x86dd THEN concat(IPv6CIDRToRange(SrcAddr, SrcNetMask).1::String, '/', SrcNetMask::String)
|
||||
ELSE ''
|
||||
END`,
|
||||
},
|
||||
{Name: "SrcAS", Type: "UInt32"},
|
||||
{
|
||||
Name: "SrcNetName",
|
||||
Type: "LowCardinality(String)",
|
||||
GenerateFrom: "dictGetOrDefault('networks', 'name', SrcAddr, '')",
|
||||
}, {
|
||||
Name: "SrcNetRole",
|
||||
Type: "LowCardinality(String)",
|
||||
GenerateFrom: "dictGetOrDefault('networks', 'role', SrcAddr, '')",
|
||||
}, {
|
||||
Name: "SrcNetSite",
|
||||
Type: "LowCardinality(String)",
|
||||
GenerateFrom: "dictGetOrDefault('networks', 'site', SrcAddr, '')",
|
||||
}, {
|
||||
Name: "SrcNetRegion",
|
||||
Type: "LowCardinality(String)",
|
||||
GenerateFrom: "dictGetOrDefault('networks', 'region', SrcAddr, '')",
|
||||
}, {
|
||||
Name: "SrcNetTenant",
|
||||
Type: "LowCardinality(String)",
|
||||
GenerateFrom: "dictGetOrDefault('networks', 'tenant', SrcAddr, '')",
|
||||
},
|
||||
{Name: "SrcCountry", Type: "FixedString(2)"},
|
||||
{
|
||||
Name: "DstASPath",
|
||||
Type: "Array(UInt32)",
|
||||
MainOnly: true,
|
||||
}, {
|
||||
Name: "Dst1stAS",
|
||||
Type: "UInt32",
|
||||
GenerateFrom: "c_DstASPath[1]",
|
||||
}, {
|
||||
Name: "Dst2ndAS",
|
||||
Type: "UInt32",
|
||||
GenerateFrom: "c_DstASPath[2]",
|
||||
}, {
|
||||
Name: "Dst3rdAS",
|
||||
Type: "UInt32",
|
||||
GenerateFrom: "c_DstASPath[3]",
|
||||
}, {
|
||||
Name: "DstCommunities",
|
||||
Type: "Array(UInt32)",
|
||||
MainOnly: true,
|
||||
}, {
|
||||
Name: "DstLargeCommunities",
|
||||
Type: "Array(UInt128)",
|
||||
MainOnly: true,
|
||||
TransformFrom: []Column{
|
||||
{Name: "DstLargeCommunities.ASN", Type: "Array(UInt32)"},
|
||||
{Name: "DstLargeCommunities.LocalData1", Type: "Array(UInt32)"},
|
||||
{Name: "DstLargeCommunities.LocalData2", Type: "Array(UInt32)"},
|
||||
},
|
||||
TransformTo: "arrayMap((asn, l1, l2) -> ((bitShiftLeft(CAST(asn, 'UInt128'), 64) + bitShiftLeft(CAST(l1, 'UInt128'), 32)) + CAST(l2, 'UInt128')), `DstLargeCommunities.ASN`, `DstLargeCommunities.LocalData1`, `DstLargeCommunities.LocalData2`)",
|
||||
},
|
||||
{Name: "InIfName", Type: "LowCardinality(String)"},
|
||||
{Name: "InIfDescription", Type: "String", NotSortingKey: true},
|
||||
{Name: "InIfSpeed", Type: "UInt32", NotSortingKey: true},
|
||||
{Name: "InIfConnectivity", Type: "LowCardinality(String)", NotSortingKey: true},
|
||||
{Name: "InIfProvider", Type: "LowCardinality(String)", NotSortingKey: true},
|
||||
{Name: "InIfBoundary", Type: "Enum8('undefined' = 0, 'external' = 1, 'internal' = 2)", NotSortingKey: true},
|
||||
{Name: "EType", Type: "UInt32"},
|
||||
{Name: "Proto", Type: "UInt32"},
|
||||
{Name: "SrcPort", Type: "UInt32", MainOnly: true},
|
||||
{Name: "Bytes", Type: "UInt64", NotSortingKey: true},
|
||||
{Name: "Packets", Type: "UInt64", NotSortingKey: true},
|
||||
{
|
||||
Name: "PacketSize",
|
||||
Type: "UInt64",
|
||||
Alias: "intDiv(Bytes, Packets)",
|
||||
}, {
|
||||
Name: "PacketSizeBucket",
|
||||
Type: "LowCardinality(String)",
|
||||
Alias: func() string {
|
||||
boundaries := []int{64, 128, 256, 512, 768, 1024, 1280, 1501,
|
||||
2048, 3072, 4096, 8192, 10240, 16384, 32768, 65536}
|
||||
conditions := []string{}
|
||||
last := 0
|
||||
for _, boundary := range boundaries {
|
||||
conditions = append(conditions, fmt.Sprintf("PacketSize < %d, '%d-%d'",
|
||||
boundary, last, boundary-1))
|
||||
last = boundary
|
||||
}
|
||||
conditions = append(conditions, fmt.Sprintf("'%d-Inf'", last))
|
||||
return fmt.Sprintf("multiIf(%s)", strings.Join(conditions, ", "))
|
||||
}(),
|
||||
},
|
||||
{Name: "ForwardingStatus", Type: "UInt32"},
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Expand the schema Src → Dst and InIf → OutIf
|
||||
newSchema := []Column{}
|
||||
for _, column := range Flows.Columns {
|
||||
newSchema = append(newSchema, column)
|
||||
if strings.HasPrefix(column.Name, "Src") {
|
||||
column.Name = fmt.Sprintf("Dst%s", column.Name[3:])
|
||||
column.Alias = strings.ReplaceAll(column.Alias, "Src", "Dst")
|
||||
newSchema = append(newSchema, column)
|
||||
} else if strings.HasPrefix(column.Name, "InIf") {
|
||||
column.Name = fmt.Sprintf("OutIf%s", column.Name[4:])
|
||||
column.Alias = strings.ReplaceAll(column.Alias, "InIf", "OutIf")
|
||||
newSchema = append(newSchema, column)
|
||||
}
|
||||
}
|
||||
Flows.Columns = newSchema
|
||||
|
||||
// Add non-main columns with an alias to NotSortingKey
|
||||
for idx, column := range Flows.Columns {
|
||||
if !column.MainOnly && column.Alias != "" {
|
||||
Flows.Columns[idx].NotSortingKey = true
|
||||
}
|
||||
}
|
||||
|
||||
// Also do some checks.
|
||||
outer:
|
||||
for _, key := range Flows.PrimaryKeys {
|
||||
for _, column := range Flows.Columns {
|
||||
if column.Name == key {
|
||||
if column.NotSortingKey {
|
||||
panic(fmt.Sprintf("primary key %q is marked as a non-sorting key", key))
|
||||
}
|
||||
continue outer
|
||||
}
|
||||
}
|
||||
panic(fmt.Sprintf("primary key %q not a column", key))
|
||||
}
|
||||
}
|
||||
36
common/schema/root.go
Normal file
36
common/schema/root.go
Normal file
@@ -0,0 +1,36 @@
|
||||
// SPDX-FileCopyrightText: 2022 Free Mobile
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
// Package schema is an abstraction of the data schema used by Akvorado. It is a
|
||||
// leaky abstraction as there are multiple parts dependant of the subsystem that
|
||||
// will use it.
|
||||
package schema
|
||||
|
||||
// Schema is the data schema.
|
||||
type Schema struct {
|
||||
Columns []Column
|
||||
|
||||
// For ClickHouse. This is the set of primary keys (order is important and
|
||||
// may not follow column order).
|
||||
PrimaryKeys []string
|
||||
}
|
||||
|
||||
// Column represents a column of data.
|
||||
type Column struct {
|
||||
Name string
|
||||
MainOnly bool
|
||||
|
||||
// For ClickHouse. `NotSortingKey' is for columns generated from other
|
||||
// columns. It is only useful if not MainOnly and not Alias. `GenerateFrom'
|
||||
// is for a column that's generated from an SQL expression instead of being
|
||||
// retrieved from the protobuf. `TransformFrom' and `TransformTo' work in
|
||||
// pairs. The first one is the set of column in the raw table while the
|
||||
// second one is how to transform it for the main table.
|
||||
Type string
|
||||
Codec string
|
||||
Alias string
|
||||
NotSortingKey bool
|
||||
GenerateFrom string
|
||||
TransformFrom []Column
|
||||
TransformTo string
|
||||
}
|
||||
1
go.mod
1
go.mod
@@ -40,6 +40,7 @@ require (
|
||||
github.com/xdg-go/scram v1.1.2
|
||||
github.com/yuin/goldmark v1.5.3
|
||||
github.com/yuin/goldmark-highlighting v0.0.0-20220208100518-594be1970594
|
||||
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15
|
||||
golang.org/x/sys v0.3.0
|
||||
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9
|
||||
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
|
||||
|
||||
4
go.sum
4
go.sum
@@ -530,6 +530,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
|
||||
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
|
||||
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
|
||||
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
|
||||
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15 h1:5oN1Pz/eDhCpbMbLstvIPa0b/BEQo6g6nwV3pLjfM6w=
|
||||
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
|
||||
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
|
||||
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
@@ -755,8 +757,8 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f
|
||||
golang.org/x/tools v0.0.0-20210112230658-8b4aab62c064/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
|
||||
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
|
||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
||||
golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
||||
@@ -9,7 +9,6 @@ type metrics struct {
|
||||
migrationsRunning reporter.Gauge
|
||||
migrationsApplied reporter.Counter
|
||||
migrationsNotApplied reporter.Counter
|
||||
migrationsVersion reporter.Gauge
|
||||
|
||||
networkSourceUpdates *reporter.CounterVec
|
||||
networkSourceErrors *reporter.CounterVec
|
||||
@@ -35,12 +34,6 @@ func (c *Component) initMetrics() {
|
||||
Help: "Number of migration steps not applied",
|
||||
},
|
||||
)
|
||||
c.metrics.migrationsVersion = c.r.Gauge(
|
||||
reporter.GaugeOpts{
|
||||
Name: "migrations_version",
|
||||
Help: "Current version for migrations.",
|
||||
},
|
||||
)
|
||||
|
||||
c.metrics.networkSourceUpdates = c.r.CounterVec(
|
||||
reporter.CounterOpts{
|
||||
|
||||
@@ -58,116 +58,54 @@ func (c *Component) migrateDatabase() error {
|
||||
c.config.Kafka.Consumers = int(threads)
|
||||
}
|
||||
|
||||
steps := []migrationStepWithDescription{
|
||||
{"create protocols dictionary", c.migrationStepCreateProtocolsDictionary},
|
||||
{"create asns dictionary", c.migrationStepCreateASNsDictionary},
|
||||
{"create networks dictionary", c.migrationStepCreateNetworksDictionary},
|
||||
// Create dictionaries
|
||||
err := c.wrapMigrations(
|
||||
func() error {
|
||||
return c.createDictionary(ctx, "asns", "hashed",
|
||||
"`asn` UInt32 INJECTIVE, `name` String", "asn")
|
||||
}, func() error {
|
||||
return c.createDictionary(ctx, "protocols", "hashed",
|
||||
"`proto` UInt8 INJECTIVE, `name` String, `description` String", "proto")
|
||||
}, func() error {
|
||||
return c.createDictionary(ctx, "networks", "ip_trie",
|
||||
"`network` String, `name` String, `role` String, `site` String, `region` String, `tenant` String",
|
||||
"network")
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the various non-raw flow tables
|
||||
for _, resolution := range c.config.Resolutions {
|
||||
steps = append(steps, []migrationStepWithDescription{
|
||||
{
|
||||
fmt.Sprintf("create flows table with resolution %s", resolution.Interval),
|
||||
c.migrationsStepCreateFlowsTable(resolution),
|
||||
}, {
|
||||
fmt.Sprintf("add PacketSizeBucket to flows table with resolution %s", resolution.Interval),
|
||||
c.migrationStepAddPacketSizeBucketColumn(resolution),
|
||||
}, {
|
||||
fmt.Sprintf("add SrcNetName/DstNetName to flows table with resolution %s", resolution.Interval),
|
||||
c.migrationStepAddSrcNetNameDstNetNameColumns(resolution),
|
||||
}, {
|
||||
fmt.Sprintf("add SrcNet*/DstNet* to flows table with resolution %s", resolution.Interval),
|
||||
c.migrationStepAddSrcNetNameDstNetOthersColumns(resolution),
|
||||
}, {
|
||||
fmt.Sprintf("add Exporter* to flows table with resolution %s", resolution.Interval),
|
||||
c.migrationStepAddExporterColumns(resolution),
|
||||
}, {
|
||||
fmt.Sprintf("add SrcCountry/DstCountry to ORDER BY for resolution %s", resolution.Interval),
|
||||
c.migrationStepFixOrderByCountry(resolution),
|
||||
}, {
|
||||
fmt.Sprintf("add DstASPath columns to flows table with resolution %s", resolution.Interval),
|
||||
c.migrationStepAddDstASPathColumns(resolution),
|
||||
},
|
||||
}...)
|
||||
if resolution.Interval == 0 {
|
||||
steps = append(steps, migrationStepWithDescription{
|
||||
"add DstCommunities column to flows table",
|
||||
c.migrationStepAddDstCommunitiesColumn,
|
||||
}, migrationStepWithDescription{
|
||||
"add DstLargeCommunities column to flows table",
|
||||
c.migrationStepAddDstLargeCommunitiesColumn,
|
||||
}, migrationStepWithDescription{
|
||||
"add SrcNetMask/DstNetMask column to flows table",
|
||||
c.migrationStepAddSrcNetMaskDstNetMaskColumns,
|
||||
}, migrationStepWithDescription{
|
||||
"add SrcNetPrefix/DstNetPrefix aliases to flows table",
|
||||
c.migrationStepAddSrcNetPrefixDstNetPrefixColumn,
|
||||
err := c.wrapMigrations(
|
||||
func() error {
|
||||
return c.createOrUpdateFlowsTable(ctx, resolution)
|
||||
}, func() error {
|
||||
return c.createFlowsConsumerView(ctx, resolution)
|
||||
})
|
||||
}
|
||||
steps = append(steps, []migrationStepWithDescription{
|
||||
{
|
||||
fmt.Sprintf("create flows table consumer with resolution %s", resolution.Interval),
|
||||
c.migrationsStepCreateFlowsConsumerTable(resolution),
|
||||
}, {
|
||||
fmt.Sprintf("configure TTL for flows table with resolution %s", resolution.Interval),
|
||||
c.migrationsStepSetTTLFlowsTable(resolution),
|
||||
},
|
||||
}...)
|
||||
}
|
||||
steps = append(steps, []migrationStepWithDescription{
|
||||
{"create exporters view", c.migrationStepCreateExportersView},
|
||||
{"create raw flows table", c.migrationStepCreateRawFlowsTable},
|
||||
{"create raw flows consumer view", c.migrationStepCreateRawFlowsConsumerView},
|
||||
{"create raw flows errors view", c.migrationStepCreateRawFlowsErrorsView},
|
||||
}...)
|
||||
|
||||
count := 0
|
||||
total := 0
|
||||
for _, step := range steps {
|
||||
total++
|
||||
l := c.r.Logger.With().Str("step", step.Description).Logger()
|
||||
l.Debug().Msg("checking migration step")
|
||||
step := step.Step(ctx, l, c.d.ClickHouse)
|
||||
rows, err := c.d.ClickHouse.Query(ctx, step.CheckQuery, step.Args...)
|
||||
if err != nil {
|
||||
l.Err(err).Msg("cannot execute check")
|
||||
return fmt.Errorf("cannot execute check: %w", err)
|
||||
return err
|
||||
}
|
||||
if rows.Next() {
|
||||
var val uint8
|
||||
if err := rows.Scan(&val); err != nil {
|
||||
rows.Close()
|
||||
l.Err(err).Msg("cannot parse check result")
|
||||
return fmt.Errorf("cannot parse check result: %w", err)
|
||||
}
|
||||
if val != 0 {
|
||||
rows.Close()
|
||||
l.Debug().Msg("result not equal to 0, skipping step")
|
||||
c.metrics.migrationsNotApplied.Inc()
|
||||
continue
|
||||
} else {
|
||||
l.Debug().Msg("got 0, executing step")
|
||||
}
|
||||
} else {
|
||||
l.Debug().Msg("no result, executing step")
|
||||
}
|
||||
rows.Close()
|
||||
if err := step.Do(); err != nil {
|
||||
l.Err(err).Msg("cannot execute migration step")
|
||||
return fmt.Errorf("during migration step: %w", err)
|
||||
}
|
||||
l.Info().Msg("migration step executed successfully")
|
||||
c.metrics.migrationsApplied.Inc()
|
||||
count++
|
||||
}
|
||||
|
||||
if count == 0 {
|
||||
c.r.Info().Msg("no migration needed")
|
||||
} else {
|
||||
c.r.Info().Msg("migrations done")
|
||||
// Remaining tables
|
||||
err = c.wrapMigrations(
|
||||
func() error {
|
||||
return c.createExportersView(ctx)
|
||||
}, func() error {
|
||||
return c.createRawFlowsTable(ctx)
|
||||
}, func() error {
|
||||
return c.createRawFlowsConsumerView(ctx)
|
||||
}, func() error {
|
||||
return c.createRawFlowsErrorsView(ctx)
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
close(c.migrationsDone)
|
||||
c.metrics.migrationsRunning.Set(0)
|
||||
c.metrics.migrationsVersion.Set(float64(total))
|
||||
|
||||
// Reload dictionaries
|
||||
if err := c.d.ClickHouse.Exec(ctx, "SYSTEM RELOAD DICTIONARIES"); err != nil {
|
||||
|
||||
524
orchestrator/clickhouse/migrations_helpers.go
Normal file
524
orchestrator/clickhouse/migrations_helpers.go
Normal file
@@ -0,0 +1,524 @@
|
||||
// SPDX-FileCopyrightText: 2023 Free Mobile
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"golang.org/x/exp/slices"
|
||||
|
||||
"akvorado/common/schema"
|
||||
"akvorado/inlet/flow"
|
||||
)
|
||||
|
||||
var errSkipStep = errors.New("migration: skip this step")
|
||||
|
||||
// wrapMigrations can be used to wrap migration functions. It will keep the
|
||||
// metrics up-to-date as long as the migration function returns `errSkipStep`
|
||||
// when a step is skipped.
|
||||
func (c *Component) wrapMigrations(fns ...func() error) error {
|
||||
for _, fn := range fns {
|
||||
if err := fn(); err == nil {
|
||||
c.metrics.migrationsApplied.Inc()
|
||||
} else if err == errSkipStep {
|
||||
c.metrics.migrationsNotApplied.Inc()
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// stemplate is a simple wrapper around text/template.
|
||||
func stemplate(t string, data any) (string, error) {
|
||||
tpl, err := template.New("tpl").Parse(t)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
var result strings.Builder
|
||||
if err := tpl.Execute(&result, data); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return result.String(), nil
|
||||
}
|
||||
|
||||
// tableAlreadyExists compare the provided table with the one in database.
|
||||
// `column` can either be "create_table_query" or "as_select". target is the
|
||||
// expected value.
|
||||
func (c *Component) tableAlreadyExists(ctx context.Context, table, column, target string) (bool, error) {
|
||||
// Normalize a bit the target. This is far from perfect, but we test that
|
||||
// and we hope this does not differ between ClickHouse versions!
|
||||
target = strings.TrimSpace(regexp.MustCompile("\\s+").ReplaceAllString(target, " "))
|
||||
|
||||
// Fetch the existing one
|
||||
row := c.d.ClickHouse.QueryRow(ctx,
|
||||
fmt.Sprintf("SELECT %s FROM system.tables WHERE name = $1 AND database = $2", column),
|
||||
table, c.config.Database)
|
||||
var existing string
|
||||
if err := row.Scan(&existing); err != nil && err != sql.ErrNoRows {
|
||||
return false, fmt.Errorf("cannot check if table %s already exists", table)
|
||||
}
|
||||
existing = strings.ReplaceAll(existing,
|
||||
fmt.Sprintf(`dictGetOrDefault('%s.`, c.config.Database),
|
||||
"dictGetOrDefault('")
|
||||
|
||||
// Compare!
|
||||
if existing == target {
|
||||
return true, nil
|
||||
}
|
||||
c.r.Debug().
|
||||
Str("target", target).Str("existing", existing).
|
||||
Msgf("table %s is not in the expected state", table)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// createDictionary creates the provided dictionary.
|
||||
func (c *Component) createDictionary(ctx context.Context, name, layout, schema, primary string) error {
|
||||
url := fmt.Sprintf("%s/api/v0/orchestrator/clickhouse/%s.csv", c.config.OrchestratorURL, name)
|
||||
source := fmt.Sprintf(`SOURCE(HTTP(URL '%s' FORMAT 'CSVWithNames'))`, url)
|
||||
settings := `SETTINGS(format_csv_allow_single_quotes = 0)`
|
||||
createQuery, err := stemplate(`
|
||||
CREATE DICTIONARY {{ .Database }}.{{ .Name }} ({{ .Schema }})
|
||||
PRIMARY KEY {{ .PrimaryKey}}
|
||||
{{ .Source }}
|
||||
LIFETIME(MIN 0 MAX 3600)
|
||||
LAYOUT({{ .Layout }}())
|
||||
{{ .Settings }}
|
||||
`, gin.H{
|
||||
"Database": c.config.Database,
|
||||
"Name": name,
|
||||
"Schema": schema,
|
||||
"PrimaryKey": primary,
|
||||
"Layout": strings.ToUpper(layout),
|
||||
"Source": source,
|
||||
"Settings": settings,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot build query to create dictionary %s: %w", name, err)
|
||||
}
|
||||
|
||||
// Check if dictionary exists and create it if not
|
||||
if ok, err := c.tableAlreadyExists(ctx, name, "create_table_query", createQuery); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
c.r.Info().Msgf("dictionary %s already exists, skip migration", name)
|
||||
return errSkipStep
|
||||
}
|
||||
c.r.Info().Msgf("create dictionary %s", name)
|
||||
createOrReplaceQuery := strings.Replace(createQuery, "CREATE ", "CREATE OR REPLACE ", 1)
|
||||
if err := c.d.ClickHouse.Exec(ctx, createOrReplaceQuery); err != nil {
|
||||
return fmt.Errorf("cannot create dictionary %s: %w", name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// createExportersView creates the exporters table/view.
|
||||
func (c *Component) createExportersView(ctx context.Context) error {
|
||||
// Select the columns we need
|
||||
cols := []string{}
|
||||
for _, column := range schema.Flows.Columns {
|
||||
if column.Name == "TimeReceived" || strings.HasPrefix(column.Name, "Exporter") {
|
||||
cols = append(cols, column.Name)
|
||||
}
|
||||
if strings.HasPrefix(column.Name, "InIf") {
|
||||
cols = append(cols, fmt.Sprintf("[InIf%s, OutIf%s][num] AS If%s",
|
||||
column.Name[4:], column.Name[4:], column.Name[4:],
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
// Build SELECT query
|
||||
selectQuery, err := stemplate(
|
||||
`SELECT DISTINCT {{ .Columns }} FROM {{ .Database }}.flows ARRAY JOIN arrayEnumerate([1, 2]) AS num`,
|
||||
gin.H{
|
||||
"Database": c.config.Database,
|
||||
"Columns": strings.Join(cols, ", "),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot build query to create exporters view: %w", err)
|
||||
}
|
||||
|
||||
// Check if the table already exists with these columns.
|
||||
if ok, err := c.tableAlreadyExists(ctx, "exporters", "as_select", selectQuery); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
c.r.Info().Msg("exporters view already exists, skip migration")
|
||||
return errSkipStep
|
||||
}
|
||||
|
||||
// Drop existing table and recreate
|
||||
c.r.Info().Msg("create exporters view")
|
||||
if err := c.d.ClickHouse.Exec(ctx, `DROP TABLE IF EXISTS exporters SYNC`); err != nil {
|
||||
return fmt.Errorf("cannot drop existing exporters view: %w", err)
|
||||
}
|
||||
if err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf(`
|
||||
CREATE MATERIALIZED VIEW exporters
|
||||
ENGINE = ReplacingMergeTree(TimeReceived)
|
||||
ORDER BY (ExporterAddress, IfName)
|
||||
AS %s`, selectQuery)); err != nil {
|
||||
return fmt.Errorf("cannot create exporters view: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// createRawFlowsTable creates the raw flow table
|
||||
func (c *Component) createRawFlowsTable(ctx context.Context) error {
|
||||
tableName := fmt.Sprintf("flows_%d_raw", flow.CurrentSchemaVersion)
|
||||
kafkaEngine := fmt.Sprintf("Kafka SETTINGS %s", strings.Join([]string{
|
||||
fmt.Sprintf(`kafka_broker_list = '%s'`,
|
||||
strings.Join(c.config.Kafka.Brokers, ",")),
|
||||
fmt.Sprintf(`kafka_topic_list = '%s-v%d'`,
|
||||
c.config.Kafka.Topic, flow.CurrentSchemaVersion),
|
||||
`kafka_group_name = 'clickhouse'`,
|
||||
`kafka_format = 'Protobuf'`,
|
||||
fmt.Sprintf(`kafka_schema = 'flow-%d.proto:FlowMessagev%d'`,
|
||||
flow.CurrentSchemaVersion, flow.CurrentSchemaVersion),
|
||||
fmt.Sprintf(`kafka_num_consumers = %d`, c.config.Kafka.Consumers),
|
||||
`kafka_thread_per_consumer = 1`,
|
||||
`kafka_handle_error_mode = 'stream'`,
|
||||
}, ", "))
|
||||
|
||||
// Build CREATE query
|
||||
createQuery, err := stemplate(
|
||||
`CREATE TABLE {{ .Database }}.{{ .Table }} ({{ .Schema }}) ENGINE = {{ .Engine }}`,
|
||||
gin.H{
|
||||
"Database": c.config.Database,
|
||||
"Table": tableName,
|
||||
"Schema": schema.Flows.CreateTable(
|
||||
schema.SkipGeneratedColumns,
|
||||
schema.UseTransformFromType,
|
||||
schema.SkipAliasedColumns),
|
||||
"Engine": kafkaEngine,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot build query to create raw flows table: %w", err)
|
||||
}
|
||||
|
||||
// Check if the table already exists with the right schema
|
||||
if ok, err := c.tableAlreadyExists(ctx, tableName, "create_table_query", createQuery); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
c.r.Info().Msg("raw flows table already exists, skip migration")
|
||||
return errSkipStep
|
||||
}
|
||||
|
||||
// Drop table if it exists as well as all the dependents and recreate the raw table
|
||||
c.r.Info().Msg("create raw flows table")
|
||||
for _, table := range []string{
|
||||
fmt.Sprintf("%s_consumer", tableName),
|
||||
fmt.Sprintf("%s_errors", tableName),
|
||||
tableName,
|
||||
} {
|
||||
if err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, table)); err != nil {
|
||||
return fmt.Errorf("cannot drop %s: %w", table, err)
|
||||
}
|
||||
}
|
||||
if err := c.d.ClickHouse.Exec(ctx, createQuery); err != nil {
|
||||
return fmt.Errorf("cannot create raw flows table: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Component) createRawFlowsConsumerView(ctx context.Context) error {
|
||||
tableName := fmt.Sprintf("flows_%d_raw", flow.CurrentSchemaVersion)
|
||||
viewName := fmt.Sprintf("%s_consumer", tableName)
|
||||
|
||||
// Build SELECT query
|
||||
selectQuery, err := stemplate(
|
||||
`{{ .With }} SELECT {{ .Columns }} FROM {{ .Database }}.{{ .Table }} WHERE length(_error) = 0`,
|
||||
gin.H{
|
||||
"With": "WITH arrayCompact(DstASPath) AS c_DstASPath",
|
||||
"Columns": strings.Join(schema.Flows.SelectColumns(
|
||||
schema.SubstituteGenerates,
|
||||
schema.SubstituteTransforms,
|
||||
schema.SkipAliasedColumns), ", "),
|
||||
"Database": c.config.Database,
|
||||
"Table": tableName,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot build select statement for raw flows consumer view: %w", err)
|
||||
}
|
||||
|
||||
// Check the existing one
|
||||
if ok, err := c.tableAlreadyExists(ctx, viewName, "as_select", selectQuery); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
c.r.Info().Msg("raw flows consumer view already exists, skip migration")
|
||||
return errSkipStep
|
||||
}
|
||||
|
||||
// Drop and create
|
||||
c.r.Info().Msg("create raw flows consumer view")
|
||||
if err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName)); err != nil {
|
||||
return fmt.Errorf("cannot drop table %s: %w", viewName, err)
|
||||
}
|
||||
if err := c.d.ClickHouse.Exec(ctx,
|
||||
fmt.Sprintf("CREATE MATERIALIZED VIEW %s TO flows AS %s",
|
||||
viewName, selectQuery)); err != nil {
|
||||
return fmt.Errorf("cannot create raw flows consumer view: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Component) createRawFlowsErrorsView(ctx context.Context) error {
|
||||
tableName := fmt.Sprintf("flows_%d_raw", flow.CurrentSchemaVersion)
|
||||
viewName := fmt.Sprintf("%s_errors", tableName)
|
||||
|
||||
// Build SELECT query
|
||||
selectQuery, err := stemplate(`
|
||||
SELECT
|
||||
now() AS timestamp,
|
||||
_topic AS topic,
|
||||
_partition AS partition,
|
||||
_offset AS offset,
|
||||
_raw_message AS raw,
|
||||
_error AS error
|
||||
FROM {{ .Database }}.{{ .Table }}
|
||||
WHERE length(_error) > 0`, gin.H{
|
||||
"Database": c.config.Database,
|
||||
"Table": tableName,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot build select statement for raw flows error: %w", err)
|
||||
}
|
||||
|
||||
// Check the existing one
|
||||
if ok, err := c.tableAlreadyExists(ctx, viewName, "as_select", selectQuery); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
c.r.Info().Msg("raw flows errors view already exists, skip migration")
|
||||
return errSkipStep
|
||||
}
|
||||
|
||||
// Drop and create
|
||||
c.r.Info().Msg("create raw flows errors view")
|
||||
if err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName)); err != nil {
|
||||
return fmt.Errorf("cannot drop table %s: %w", viewName, err)
|
||||
}
|
||||
if err := c.d.ClickHouse.Exec(ctx,
|
||||
fmt.Sprintf(`
|
||||
CREATE MATERIALIZED VIEW %s
|
||||
ENGINE = MergeTree
|
||||
ORDER BY (timestamp, topic, partition, offset)
|
||||
PARTITION BY toYYYYMMDDhhmmss(toStartOfHour(timestamp))
|
||||
TTL timestamp + INTERVAL 1 DAY
|
||||
AS %s`,
|
||||
viewName, selectQuery)); err != nil {
|
||||
return fmt.Errorf("cannot create raw flows errors view: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Component) createOrUpdateFlowsTable(ctx context.Context, resolution ResolutionConfiguration) error {
|
||||
var tableName string
|
||||
if resolution.Interval == 0 {
|
||||
tableName = "flows"
|
||||
} else {
|
||||
tableName = fmt.Sprintf("flows_%s", resolution.Interval)
|
||||
}
|
||||
partitionInterval := uint64((resolution.TTL / time.Duration(c.config.MaxPartitions)).Seconds())
|
||||
ttl := uint64(resolution.TTL.Seconds())
|
||||
|
||||
// Create table if it does not exist
|
||||
if ok, err := c.tableAlreadyExists(ctx, tableName, "name", tableName); err != nil {
|
||||
return err
|
||||
} else if !ok {
|
||||
var createQuery string
|
||||
var err error
|
||||
if resolution.Interval == 0 {
|
||||
createQuery, err = stemplate(`
|
||||
CREATE TABLE flows ({{ .Schema }})
|
||||
ENGINE = MergeTree
|
||||
PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, INTERVAL {{ .PartitionInterval }} second))
|
||||
ORDER BY (TimeReceived, ExporterAddress, InIfName, OutIfName)
|
||||
TTL TimeReceived + toIntervalSecond({{ .TTL }})
|
||||
`, gin.H{
|
||||
"Schema": schema.Flows.CreateTable(),
|
||||
"PartitionInterval": partitionInterval,
|
||||
"TTL": ttl,
|
||||
})
|
||||
} else {
|
||||
createQuery, err = stemplate(`
|
||||
CREATE TABLE {{ .Table }} ({{ .Schema }})
|
||||
ENGINE = SummingMergeTree((Bytes, Packets))
|
||||
PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, INTERVAL {{ .PartitionInterval }} second))
|
||||
PRIMARY KEY ({{ .PrimaryKey }})
|
||||
ORDER BY ({{ .SortingKey }})
|
||||
TTL TimeReceived + toIntervalSecond({{ .TTL }})
|
||||
`, gin.H{
|
||||
"Table": tableName,
|
||||
"Schema": schema.Flows.CreateTable(schema.SkipMainOnlyColumns),
|
||||
"PartitionInterval": partitionInterval,
|
||||
"PrimaryKey": strings.Join(schema.Flows.PrimaryKeys, ", "),
|
||||
"SortingKey": strings.Join(schema.Flows.SortingKeys(), ", "),
|
||||
"TTL": ttl,
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot build create table statement for %s: %w", tableName, err)
|
||||
}
|
||||
if err := c.d.ClickHouse.Exec(ctx, createQuery); err != nil {
|
||||
return fmt.Errorf("cannot create %s: %w", tableName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get existing columns
|
||||
var existingColumns []struct {
|
||||
Name string `ch:"name"`
|
||||
Type string `ch:"type"`
|
||||
IsSortingKey uint8 `ch:"is_in_sorting_key"`
|
||||
IsPrimaryKey uint8 `ch:"is_in_primary_key"`
|
||||
}
|
||||
if err := c.d.ClickHouse.Select(ctx, &existingColumns, `
|
||||
SELECT name, type, is_in_sorting_key, is_in_primary_key
|
||||
FROM system.columns
|
||||
WHERE database = $1
|
||||
AND table = $2
|
||||
ORDER BY position ASC
|
||||
`, c.config.Database, tableName); err != nil {
|
||||
return fmt.Errorf("cannot query columns table: %w", err)
|
||||
}
|
||||
|
||||
// Plan for modifications. We don't check everything: we assume the
|
||||
// modifications to be done are covered by the unit tests.
|
||||
modifications := []string{}
|
||||
previousColumn := ""
|
||||
outer:
|
||||
for _, wantedColumn := range schema.Flows.Columns {
|
||||
if resolution.Interval > 0 && wantedColumn.MainOnly {
|
||||
continue
|
||||
}
|
||||
// Check if the column already exists
|
||||
for _, existingColumn := range existingColumns {
|
||||
if wantedColumn.Name == existingColumn.Name {
|
||||
// Do a few sanity checks
|
||||
if wantedColumn.Type != existingColumn.Type {
|
||||
return fmt.Errorf("table %s, column %s has a non-matching type: %s vs %s",
|
||||
tableName, wantedColumn.Name, existingColumn.Type, wantedColumn.Type)
|
||||
}
|
||||
if resolution.Interval > 0 && slices.Contains(schema.Flows.PrimaryKeys, wantedColumn.Name) && existingColumn.IsPrimaryKey == 0 {
|
||||
return fmt.Errorf("table %s, column %s should be a primary key, cannot change that",
|
||||
tableName, wantedColumn.Name)
|
||||
}
|
||||
if resolution.Interval > 0 && !wantedColumn.NotSortingKey && existingColumn.IsSortingKey == 0 {
|
||||
// That's something we can fix, but we need to drop it before recreating it
|
||||
err := c.d.ClickHouse.Exec(ctx,
|
||||
fmt.Sprintf("ALTER TABLE %s DROP COLUMN %s", tableName, existingColumn.Name))
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot drop %s from %s to fix ordering: %w",
|
||||
existingColumn.Name, tableName, err)
|
||||
}
|
||||
// Schedule adding it back
|
||||
modifications = append(modifications,
|
||||
fmt.Sprintf("ADD COLUMN %s AFTER %s", wantedColumn, previousColumn))
|
||||
}
|
||||
previousColumn = wantedColumn.Name
|
||||
continue outer
|
||||
}
|
||||
}
|
||||
// Add the missing column. Only if not primary.
|
||||
if resolution.Interval > 0 && slices.Contains(schema.Flows.PrimaryKeys, wantedColumn.Name) {
|
||||
return fmt.Errorf("table %s, column %s is missing but it is a primary key",
|
||||
tableName, wantedColumn.Name)
|
||||
}
|
||||
c.r.Debug().Msgf("add missing column %s to %s", wantedColumn.Name, tableName)
|
||||
modifications = append(modifications,
|
||||
fmt.Sprintf("ADD COLUMN %s AFTER %s", wantedColumn, previousColumn))
|
||||
previousColumn = wantedColumn.Name
|
||||
}
|
||||
if len(modifications) > 0 {
|
||||
// Also update ORDER BY
|
||||
if resolution.Interval > 0 {
|
||||
modifications = append(modifications,
|
||||
fmt.Sprintf("MODIFY ORDER BY (%s)", strings.Join(schema.Flows.SortingKeys(), ", ")))
|
||||
}
|
||||
c.r.Info().Msgf("apply %d modifications to %s", len(modifications), tableName)
|
||||
if resolution.Interval > 0 {
|
||||
// Drop the view
|
||||
viewName := fmt.Sprintf("%s_consumer", tableName)
|
||||
if err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName)); err != nil {
|
||||
return fmt.Errorf("cannot drop %s: %w", viewName, err)
|
||||
}
|
||||
}
|
||||
err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf("ALTER TABLE %s %s", tableName, strings.Join(modifications, ", ")))
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot update table %s: %w", tableName, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we need to update the TTL
|
||||
ttlClause := fmt.Sprintf("TTL TimeReceived + toIntervalSecond(%d)", ttl)
|
||||
ttlClauseLike := fmt.Sprintf("CAST(engine_full LIKE '%% %s %%', 'String')", ttlClause)
|
||||
if ok, err := c.tableAlreadyExists(ctx, tableName, ttlClauseLike, "1"); err != nil {
|
||||
return err
|
||||
} else if !ok {
|
||||
c.r.Warn().
|
||||
Msgf("updating TTL of %s with interval %s, this can take a long time", tableName, resolution.Interval)
|
||||
if err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf("ALTER TABLE %s MODIFY %s", tableName, ttlClause)); err != nil {
|
||||
return fmt.Errorf("cannot modify TTL for table %s: %w", tableName, err)
|
||||
}
|
||||
return nil
|
||||
} else if len(modifications) > 0 {
|
||||
return nil
|
||||
}
|
||||
return errSkipStep
|
||||
}
|
||||
|
||||
func (c *Component) createFlowsConsumerView(ctx context.Context, resolution ResolutionConfiguration) error {
|
||||
if resolution.Interval == 0 {
|
||||
// The consumer for the main table is created elsewhere.
|
||||
return errSkipStep
|
||||
}
|
||||
tableName := fmt.Sprintf("flows_%s", resolution.Interval)
|
||||
viewName := fmt.Sprintf("%s_consumer", tableName)
|
||||
|
||||
// Build SELECT query
|
||||
selectQuery, err := stemplate(`
|
||||
SELECT
|
||||
toStartOfInterval(TimeReceived, toIntervalSecond({{ .Seconds }})) AS TimeReceived,
|
||||
{{ .Columns }}
|
||||
FROM {{ .Database }}.flows`, gin.H{
|
||||
"Database": c.config.Database,
|
||||
"Seconds": uint64(resolution.Interval.Seconds()),
|
||||
"Columns": strings.Join(schema.Flows.SelectColumns(
|
||||
schema.SkipTimeReceived,
|
||||
schema.SkipMainOnlyColumns,
|
||||
schema.SkipAliasedColumns), ",\n "),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot build select statement for consumer %s: %w", viewName, err)
|
||||
}
|
||||
|
||||
// Check the existing one
|
||||
if ok, err := c.tableAlreadyExists(ctx, viewName, "as_select", selectQuery); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
c.r.Info().Msgf("%s already exists, skip migration", viewName)
|
||||
return errSkipStep
|
||||
}
|
||||
|
||||
// Drop and create
|
||||
c.r.Info().Msgf("create %s", viewName)
|
||||
if err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName)); err != nil {
|
||||
return fmt.Errorf("cannot drop table %s: %w", viewName, err)
|
||||
}
|
||||
if err := c.d.ClickHouse.Exec(ctx,
|
||||
fmt.Sprintf(`CREATE MATERIALIZED VIEW %s TO flows AS %s`, viewName, selectQuery)); err != nil {
|
||||
return fmt.Errorf("cannot create %s: %w", viewName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -22,6 +22,8 @@ import (
|
||||
"akvorado/common/http"
|
||||
"akvorado/common/kafka"
|
||||
"akvorado/common/reporter"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
|
||||
)
|
||||
|
||||
var ignoredTables = []string{
|
||||
@@ -237,6 +239,16 @@ WHERE database=currentDatabase() AND table NOT LIKE '.%'`)
|
||||
lastRun = currentRun
|
||||
})
|
||||
if t.Failed() {
|
||||
row := chComponent.QueryRow(context.Background(), `
|
||||
SELECT query
|
||||
FROM system.query_log
|
||||
WHERE client_name = $1
|
||||
ORDER BY event_time_microseconds DESC
|
||||
LIMIT 1`, proto.ClientName)
|
||||
var lastQuery string
|
||||
if err := row.Scan(&lastQuery); err == nil {
|
||||
t.Logf("last ClickHouse query: %s", lastQuery)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -263,24 +275,6 @@ WHERE database=currentDatabase() AND table NOT LIKE '.%'`)
|
||||
t.Fatalf("Migrations not done")
|
||||
}
|
||||
|
||||
// Compute hash for all tables
|
||||
rows, err := chComponent.Query(context.Background(), `
|
||||
SELECT table, groupBitXor(cityHash64(name,type,position))
|
||||
FROM system.columns
|
||||
WHERE database = currentDatabase()
|
||||
GROUP BY table`)
|
||||
if err != nil {
|
||||
t.Fatalf("Query() error:\n%+v", err)
|
||||
}
|
||||
for rows.Next() {
|
||||
var table string
|
||||
var hash uint64
|
||||
if err := rows.Scan(&table, &hash); err != nil {
|
||||
t.Fatalf("Scan() error:\n%+v", err)
|
||||
}
|
||||
t.Logf("table %s hash is %d", table, hash)
|
||||
}
|
||||
|
||||
// No migration should have been applied the last time
|
||||
gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_",
|
||||
"applied_steps")
|
||||
|
||||
@@ -1,849 +0,0 @@
|
||||
// SPDX-FileCopyrightText: 2022 Free Mobile
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
|
||||
"akvorado/common/reporter"
|
||||
"akvorado/inlet/flow"
|
||||
)
|
||||
|
||||
const (
|
||||
// flowsSchema is the canonical schema for flows table
|
||||
flowsSchema = `
|
||||
TimeReceived DateTime CODEC(DoubleDelta, LZ4),
|
||||
SamplingRate UInt64,
|
||||
ExporterAddress LowCardinality(IPv6),
|
||||
ExporterName LowCardinality(String),
|
||||
ExporterGroup LowCardinality(String),
|
||||
ExporterRole LowCardinality(String),
|
||||
ExporterSite LowCardinality(String),
|
||||
ExporterRegion LowCardinality(String),
|
||||
ExporterTenant LowCardinality(String),
|
||||
SrcAddr IPv6,
|
||||
DstAddr IPv6,
|
||||
SrcNetMask UInt8,
|
||||
DstNetMask UInt8,
|
||||
SrcAS UInt32,
|
||||
DstAS UInt32,
|
||||
SrcNetName LowCardinality(String),
|
||||
DstNetName LowCardinality(String),
|
||||
SrcNetRole LowCardinality(String),
|
||||
DstNetRole LowCardinality(String),
|
||||
SrcNetSite LowCardinality(String),
|
||||
DstNetSite LowCardinality(String),
|
||||
SrcNetRegion LowCardinality(String),
|
||||
DstNetRegion LowCardinality(String),
|
||||
SrcNetTenant LowCardinality(String),
|
||||
DstNetTenant LowCardinality(String),
|
||||
SrcCountry FixedString(2),
|
||||
DstCountry FixedString(2),
|
||||
DstASPath Array(UInt32),
|
||||
Dst1stAS UInt32,
|
||||
Dst2ndAS UInt32,
|
||||
Dst3rdAS UInt32,
|
||||
DstCommunities Array(UInt32),
|
||||
DstLargeCommunities Array(UInt128),
|
||||
InIfName LowCardinality(String),
|
||||
OutIfName LowCardinality(String),
|
||||
InIfDescription String,
|
||||
OutIfDescription String,
|
||||
InIfSpeed UInt32,
|
||||
OutIfSpeed UInt32,
|
||||
InIfConnectivity LowCardinality(String),
|
||||
OutIfConnectivity LowCardinality(String),
|
||||
InIfProvider LowCardinality(String),
|
||||
OutIfProvider LowCardinality(String),
|
||||
InIfBoundary Enum8('undefined' = 0, 'external' = 1, 'internal' = 2),
|
||||
OutIfBoundary Enum8('undefined' = 0, 'external' = 1, 'internal' = 2),
|
||||
EType UInt32,
|
||||
Proto UInt32,
|
||||
SrcPort UInt32,
|
||||
DstPort UInt32,
|
||||
Bytes UInt64,
|
||||
Packets UInt64,
|
||||
ForwardingStatus UInt32
|
||||
`
|
||||
)
|
||||
|
||||
// queryTableHash can be used to check if a table exists with the
|
||||
// specified schema. This is not foolproof as it needs help if
|
||||
// settings or populate query is changed.
|
||||
func queryTableHash(hash uint64, more string) string {
|
||||
return fmt.Sprintf(`
|
||||
SELECT bitAnd(v1, v2) FROM (
|
||||
SELECT 1 AS v1
|
||||
FROM system.tables
|
||||
WHERE name = $1 AND database = currentDatabase() %s
|
||||
) t1, (
|
||||
SELECT groupBitXor(cityHash64(name,type,position)) == %d AS v2
|
||||
FROM system.columns
|
||||
WHERE table = $1 AND database = currentDatabase()
|
||||
) t2`, more, hash)
|
||||
}
|
||||
|
||||
// partialSchema returns the above schema minus some columns
|
||||
func partialSchema(remove ...string) string {
|
||||
schema := []string{}
|
||||
outer:
|
||||
for _, l := range strings.Split(flowsSchema, "\n") {
|
||||
for _, p := range remove {
|
||||
if strings.HasPrefix(strings.TrimSpace(l), fmt.Sprintf("%s ", p)) {
|
||||
continue outer
|
||||
}
|
||||
}
|
||||
schema = append(schema, l)
|
||||
}
|
||||
return strings.Join(schema, "\n")
|
||||
}
|
||||
|
||||
// columnSpecToName extracts column name from its creation spec
|
||||
func columnSpecToName(spec string) string {
|
||||
spec = strings.TrimPrefix(spec, "IF NOT EXISTS ")
|
||||
return strings.Split(spec, " ")[0]
|
||||
}
|
||||
|
||||
// addColumnsAfter build a string to add columns after another column
|
||||
func addColumnsAfter(after string, columns ...string) string {
|
||||
modifications := []string{}
|
||||
last := after
|
||||
for _, column := range columns {
|
||||
modifications = append(modifications, fmt.Sprintf("ADD COLUMN %s AFTER %s", column, last))
|
||||
last = columnSpecToName(column)
|
||||
}
|
||||
return strings.Join(modifications, ", ")
|
||||
}
|
||||
|
||||
// appendToSortingKey returns a sorting key using the original one and
|
||||
// adding the specified columns.
|
||||
func appendToSortingKey(ctx context.Context, conn clickhouse.Conn, table string, columns ...string) (string, error) {
|
||||
row := conn.QueryRow(
|
||||
ctx,
|
||||
`SELECT sorting_key FROM system.tables WHERE name = $1 AND database = currentDatabase()`,
|
||||
table)
|
||||
if err := row.Err(); err != nil {
|
||||
return "", fmt.Errorf("cannot query sorting key for %q: %w", table, err)
|
||||
}
|
||||
var sortingKey string
|
||||
if err := row.Scan(&sortingKey); err != nil {
|
||||
return "", fmt.Errorf("unable to parse sorting key: %w", err)
|
||||
}
|
||||
return fmt.Sprintf("%s, %s", sortingKey, strings.Join(columns, ", ")), nil
|
||||
}
|
||||
|
||||
// addColumnsAndUpdateSortingKey combines addColumnsAfter and appendToSortingKey
|
||||
func addColumnsAndUpdateSortingKey(ctx context.Context, conn clickhouse.Conn, table string, after string, columns ...string) (string, error) {
|
||||
modifications := []string{addColumnsAfter(after, columns...)}
|
||||
columnNames := []string{}
|
||||
for _, column := range columns {
|
||||
columnNames = append(columnNames, columnSpecToName(column))
|
||||
}
|
||||
if table != "flows" {
|
||||
sortingKey, err := appendToSortingKey(ctx, conn, table, columnNames...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
modifications = append(modifications, fmt.Sprintf("MODIFY ORDER BY (%s)", sortingKey))
|
||||
}
|
||||
return strings.Join(modifications, ", "), nil
|
||||
}
|
||||
|
||||
var nullMigrationStep = migrationStep{
|
||||
CheckQuery: `SELECT 1`,
|
||||
Args: []interface{}{},
|
||||
Do: func() error { return nil },
|
||||
}
|
||||
|
||||
func (c *Component) migrationsStepCreateFlowsTable(resolution ResolutionConfiguration) migrationStepFunc {
|
||||
return func(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
if resolution.Interval == 0 {
|
||||
// Unconsolidated flows table
|
||||
partitionInterval := uint64((resolution.TTL / time.Duration(c.config.MaxPartitions)).Seconds())
|
||||
return migrationStep{
|
||||
CheckQuery: `SELECT 1 FROM system.tables WHERE name = $1 AND database = currentDatabase()`,
|
||||
Args: []interface{}{"flows"},
|
||||
Do: func() error {
|
||||
return conn.Exec(ctx, fmt.Sprintf(`
|
||||
CREATE TABLE flows (
|
||||
%s
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, INTERVAL %d second))
|
||||
ORDER BY (TimeReceived, ExporterAddress, InIfName, OutIfName)`, flowsSchema, partitionInterval))
|
||||
},
|
||||
}
|
||||
}
|
||||
// Consolidated table. The ORDER BY clause excludes
|
||||
// field that are usually deduced from included
|
||||
// fields, assuming they won't change for the interval
|
||||
// of time considered. It excludes Bytes and Packets
|
||||
// that are summed. The order is the one we are most
|
||||
// likely to use when filtering. SrcAddr and DstAddr
|
||||
// are removed.
|
||||
tableName := fmt.Sprintf("flows_%s", resolution.Interval)
|
||||
viewName := fmt.Sprintf("%s_consumer", tableName)
|
||||
return migrationStep{
|
||||
CheckQuery: `SELECT 1 FROM system.tables WHERE name = $1 AND database = currentDatabase()`,
|
||||
Args: []interface{}{tableName},
|
||||
Do: func() error {
|
||||
l.Debug().Msgf("drop flows consumer table for interval %s", resolution.Interval)
|
||||
err := conn.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName))
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot drop flows consumer table for interval %s: %w",
|
||||
resolution.Interval, err)
|
||||
}
|
||||
|
||||
partitionInterval := uint64((resolution.TTL / time.Duration(c.config.MaxPartitions)).Seconds())
|
||||
// Primary key does not cover all the sorting key as we cannot modify it
|
||||
// and it would impact performance negatively to have a too long
|
||||
// primary key. We do not use ExporterName, ExporterGroup, ... in
|
||||
// ORDER BY because we assume that for a value of ExporterAddress, they
|
||||
// are constant. The same applies for InIfDescription, InIfProvider (for
|
||||
// a value of ExporterAddress and InIfName, they are constant).
|
||||
// That's not the case for SrcNetName and others (they depend on the
|
||||
// SrcAddr which we don't have anymore).
|
||||
return conn.Exec(ctx, fmt.Sprintf(`
|
||||
CREATE TABLE %s (
|
||||
%s
|
||||
)
|
||||
ENGINE = SummingMergeTree((Bytes, Packets))
|
||||
PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, INTERVAL %d second))
|
||||
PRIMARY KEY (TimeReceived,
|
||||
ExporterAddress,
|
||||
EType, Proto,
|
||||
InIfName, SrcAS, ForwardingStatus,
|
||||
OutIfName, DstAS,
|
||||
SamplingRate)
|
||||
ORDER BY (TimeReceived,
|
||||
ExporterAddress,
|
||||
EType, Proto,
|
||||
InIfName, SrcAS, ForwardingStatus,
|
||||
OutIfName, DstAS,
|
||||
SamplingRate,
|
||||
SrcNetName, DstNetName,
|
||||
SrcNetRole, DstNetRole,
|
||||
SrcNetSite, DstNetSite,
|
||||
SrcNetRegion, DstNetRegion,
|
||||
SrcNetTenant, DstNetTenant,
|
||||
SrcCountry, DstCountry,
|
||||
Dst1stAS, Dst2ndAS, Dst3rdAS)`,
|
||||
tableName,
|
||||
partialSchema(
|
||||
"SrcAddr", "DstAddr",
|
||||
"SrcNetMask", "DstNetMask",
|
||||
"SrcPort", "DstPort",
|
||||
"DstASPath", "DstCommunities", "DstLargeCommunities"),
|
||||
partitionInterval))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepAddPacketSizeBucketColumn(resolution ResolutionConfiguration) migrationStepFunc {
|
||||
return func(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
var tableName string
|
||||
if resolution.Interval == 0 {
|
||||
tableName = "flows"
|
||||
} else {
|
||||
tableName = fmt.Sprintf("flows_%s", resolution.Interval)
|
||||
}
|
||||
return migrationStep{
|
||||
CheckQuery: `
|
||||
SELECT 1 FROM system.columns
|
||||
WHERE table = $1 AND database = currentDatabase() AND name = $2`,
|
||||
Args: []interface{}{tableName, "PacketSizeBucket"},
|
||||
Do: func() error {
|
||||
boundaries := []int{64, 128, 256, 512, 768, 1024, 1280, 1501,
|
||||
2048, 3072, 4096, 8192, 10240, 16384, 32768, 65536}
|
||||
conditions := []string{}
|
||||
last := 0
|
||||
for _, boundary := range boundaries {
|
||||
conditions = append(conditions, fmt.Sprintf("PacketSize < %d, '%d-%d'",
|
||||
boundary, last, boundary-1))
|
||||
last = boundary
|
||||
}
|
||||
conditions = append(conditions, fmt.Sprintf("'%d-Inf'", last))
|
||||
return conn.Exec(ctx, fmt.Sprintf("ALTER TABLE %s %s",
|
||||
tableName, addColumnsAfter("Packets",
|
||||
"PacketSize UInt64 ALIAS intDiv(Bytes, Packets)",
|
||||
fmt.Sprintf("PacketSizeBucket LowCardinality(String) ALIAS multiIf(%s)",
|
||||
strings.Join(conditions, ", ")))))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepAddSrcNetNameDstNetNameColumns(resolution ResolutionConfiguration) migrationStepFunc {
|
||||
return func(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
var tableName string
|
||||
if resolution.Interval == 0 {
|
||||
tableName = "flows"
|
||||
} else {
|
||||
tableName = fmt.Sprintf("flows_%s", resolution.Interval)
|
||||
}
|
||||
return migrationStep{
|
||||
CheckQuery: `
|
||||
SELECT 1 FROM system.columns
|
||||
WHERE table = $1 AND database = currentDatabase() AND name = $2`,
|
||||
Args: []interface{}{tableName, "DstNetName"},
|
||||
Do: func() error {
|
||||
modifications, err := addColumnsAndUpdateSortingKey(ctx, conn, tableName,
|
||||
"DstAS",
|
||||
`SrcNetName LowCardinality(String)`,
|
||||
`DstNetName LowCardinality(String)`,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return conn.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s %s`,
|
||||
tableName, modifications))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepAddSrcNetNameDstNetOthersColumns(resolution ResolutionConfiguration) migrationStepFunc {
|
||||
return func(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
var tableName string
|
||||
if resolution.Interval == 0 {
|
||||
tableName = "flows"
|
||||
} else {
|
||||
tableName = fmt.Sprintf("flows_%s", resolution.Interval)
|
||||
}
|
||||
return migrationStep{
|
||||
CheckQuery: `
|
||||
SELECT 1 FROM system.columns
|
||||
WHERE table = $1 AND database = currentDatabase() AND name = $2`,
|
||||
Args: []interface{}{tableName, "DstNetRole"},
|
||||
Do: func() error {
|
||||
modifications, err := addColumnsAndUpdateSortingKey(ctx, conn, tableName,
|
||||
"DstNetName",
|
||||
`SrcNetRole LowCardinality(String)`,
|
||||
`DstNetRole LowCardinality(String)`,
|
||||
`SrcNetSite LowCardinality(String)`,
|
||||
`DstNetSite LowCardinality(String)`,
|
||||
`SrcNetRegion LowCardinality(String)`,
|
||||
`DstNetRegion LowCardinality(String)`,
|
||||
`SrcNetTenant LowCardinality(String)`,
|
||||
`DstNetTenant LowCardinality(String)`,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return conn.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s %s`,
|
||||
tableName, modifications))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepAddExporterColumns(resolution ResolutionConfiguration) migrationStepFunc {
|
||||
return func(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
var tableName string
|
||||
if resolution.Interval == 0 {
|
||||
tableName = "flows"
|
||||
} else {
|
||||
tableName = fmt.Sprintf("flows_%s", resolution.Interval)
|
||||
}
|
||||
return migrationStep{
|
||||
CheckQuery: `
|
||||
SELECT 1 FROM system.columns
|
||||
WHERE table = $1 AND database = currentDatabase() AND name = $2`,
|
||||
Args: []interface{}{tableName, "ExporterTenant"},
|
||||
Do: func() error {
|
||||
return conn.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s %s`,
|
||||
tableName, addColumnsAfter("ExporterGroup",
|
||||
`ExporterRole LowCardinality(String)`,
|
||||
`ExporterSite LowCardinality(String)`,
|
||||
`ExporterRegion LowCardinality(String)`,
|
||||
`ExporterTenant LowCardinality(String)`,
|
||||
)))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepFixOrderByCountry(resolution ResolutionConfiguration) migrationStepFunc {
|
||||
return func(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
var tableName string
|
||||
if resolution.Interval == 0 {
|
||||
return nullMigrationStep
|
||||
}
|
||||
tableName = fmt.Sprintf("flows_%s", resolution.Interval)
|
||||
return migrationStep{
|
||||
CheckQuery: `
|
||||
SELECT 1 FROM system.tables
|
||||
WHERE name = $1 AND database = currentDatabase()
|
||||
AND has(splitByRegexp(',\\s*', sorting_key), $2)`,
|
||||
Args: []interface{}{tableName, "SrcCountry"},
|
||||
Do: func() error {
|
||||
// Drop the columns
|
||||
l.Debug().Msg("drop SrcCountry/DstCountry columns")
|
||||
err := conn.Exec(ctx,
|
||||
fmt.Sprintf(`ALTER TABLE %s DROP COLUMN SrcCountry, DROP COLUMN DstCountry`,
|
||||
tableName))
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot drop SrcCountry/DstCountry columns: %w", err)
|
||||
}
|
||||
// Add them back
|
||||
l.Debug().Msg("add back SrcCountry/DstCountry columns")
|
||||
modifications, err := addColumnsAndUpdateSortingKey(ctx, conn, tableName,
|
||||
"DstNetTenant",
|
||||
`SrcCountry FixedString(2)`,
|
||||
`DstCountry FixedString(2)`,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return conn.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s %s`,
|
||||
tableName, modifications))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepAddDstASPathColumns(resolution ResolutionConfiguration) migrationStepFunc {
|
||||
return func(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
var tableName string
|
||||
if resolution.Interval == 0 {
|
||||
tableName = "flows"
|
||||
} else {
|
||||
tableName = fmt.Sprintf("flows_%s", resolution.Interval)
|
||||
}
|
||||
return migrationStep{
|
||||
CheckQuery: `
|
||||
SELECT 1 FROM system.columns
|
||||
WHERE table = $1 AND database = currentDatabase() AND name = $2`,
|
||||
Args: []interface{}{tableName, "Dst1stAS"},
|
||||
Do: func() error {
|
||||
var modifications string
|
||||
var err error
|
||||
if tableName == "flows" {
|
||||
// The flows table will get DstASPath, 1st, 2nd, 3rd ASN.
|
||||
modifications, err = addColumnsAndUpdateSortingKey(ctx, conn, tableName,
|
||||
"DstCountry",
|
||||
`DstASPath Array(UInt32)`,
|
||||
`Dst1stAS UInt32`,
|
||||
`Dst2ndAS UInt32`,
|
||||
`Dst3rdAS UInt32`,
|
||||
)
|
||||
} else {
|
||||
// The consolidated table will only get the three first ASNs.
|
||||
modifications, err = addColumnsAndUpdateSortingKey(ctx, conn, tableName,
|
||||
"DstCountry",
|
||||
`Dst1stAS UInt32`,
|
||||
`Dst2ndAS UInt32`,
|
||||
`Dst3rdAS UInt32`,
|
||||
)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return conn.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s %s`,
|
||||
tableName, modifications))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepAddDstCommunitiesColumn(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
return migrationStep{
|
||||
CheckQuery: `
|
||||
SELECT 1 FROM system.columns
|
||||
WHERE table = $1 AND database = currentDatabase() AND name = $2`,
|
||||
Args: []interface{}{"flows", "DstCommunities"},
|
||||
Do: func() error {
|
||||
modifications, err := addColumnsAndUpdateSortingKey(ctx, conn, "flows",
|
||||
"Dst3rdAS",
|
||||
"DstCommunities Array(UInt32)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return conn.Exec(ctx, fmt.Sprintf(`ALTER TABLE flows %s`, modifications))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepAddDstLargeCommunitiesColumn(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
return migrationStep{
|
||||
CheckQuery: `
|
||||
SELECT 1 FROM system.columns
|
||||
WHERE table = $1 AND database = currentDatabase() AND name = $2`,
|
||||
Args: []interface{}{"flows", "DstLargeCommunities"},
|
||||
Do: func() error {
|
||||
modifications, err := addColumnsAndUpdateSortingKey(ctx, conn, "flows",
|
||||
"DstCommunities",
|
||||
"DstLargeCommunities Array(UInt128)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return conn.Exec(ctx, fmt.Sprintf(`ALTER TABLE flows %s`, modifications))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepAddSrcNetMaskDstNetMaskColumns(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
return migrationStep{
|
||||
CheckQuery: `
|
||||
SELECT 1 FROM system.columns
|
||||
WHERE table = $1 AND database = currentDatabase() AND name = $2`,
|
||||
Args: []interface{}{"flows", "SrcNetMask"},
|
||||
Do: func() error {
|
||||
modifications, err := addColumnsAndUpdateSortingKey(ctx, conn, "flows",
|
||||
"DstAddr",
|
||||
"SrcNetMask UInt8",
|
||||
"DstNetMask UInt8")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return conn.Exec(ctx, fmt.Sprintf(`ALTER TABLE flows %s`, modifications))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepAddSrcNetPrefixDstNetPrefixColumn(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
return migrationStep{
|
||||
CheckQuery: `
|
||||
SELECT 1 FROM system.columns
|
||||
WHERE table = $1 AND database = currentDatabase() AND name = $2`,
|
||||
Args: []interface{}{"flows", "SrcNetPrefix"},
|
||||
Do: func() error {
|
||||
sql := func(prefix string) string {
|
||||
return fmt.Sprintf(`
|
||||
%sNetPrefix String ALIAS
|
||||
CASE
|
||||
WHEN EType = 0x800 THEN concat(replaceRegexpOne(IPv6CIDRToRange(%sAddr, (96 + %sNetMask)::UInt8).1::String, '^::ffff:', ''), '/', %sNetMask::String)
|
||||
WHEN EType = 0x86dd THEN concat(IPv6CIDRToRange(%sAddr, %sNetMask).1::String, '/', %sNetMask::String)
|
||||
ELSE ''
|
||||
END
|
||||
`, prefix, prefix, prefix, prefix, prefix, prefix, prefix)
|
||||
}
|
||||
return conn.Exec(ctx, fmt.Sprintf("ALTER TABLE flows %s",
|
||||
addColumnsAfter("DstNetMask", sql("Src"), sql("Dst"))))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationsStepCreateFlowsConsumerTable(resolution ResolutionConfiguration) migrationStepFunc {
|
||||
return func(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
if resolution.Interval == 0 {
|
||||
// Consumer for the flows table are done later.
|
||||
return nullMigrationStep
|
||||
}
|
||||
tableName := fmt.Sprintf("flows_%s", resolution.Interval)
|
||||
viewName := fmt.Sprintf("%s_consumer", tableName)
|
||||
selectClause := fmt.Sprintf(`
|
||||
SELECT *
|
||||
EXCEPT (SrcAddr, DstAddr, SrcNetMask, DstNetMask, SrcPort, DstPort, DstASPath, DstCommunities, DstLargeCommunities)
|
||||
REPLACE toStartOfInterval(TimeReceived, toIntervalSecond(%d)) AS TimeReceived`,
|
||||
uint64(resolution.Interval.Seconds()))
|
||||
selectClause = strings.TrimSpace(strings.ReplaceAll(selectClause, "\n", " "))
|
||||
return migrationStep{
|
||||
CheckQuery: queryTableHash(10874532506016793032,
|
||||
fmt.Sprintf("AND as_select LIKE '%s FROM %%'", selectClause)),
|
||||
Args: []interface{}{viewName},
|
||||
// No GROUP BY, the SummingMergeTree will take care of that
|
||||
Do: func() error {
|
||||
l.Debug().Msg("drop consumer table")
|
||||
err := conn.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName))
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot drop consumer table: %w", err)
|
||||
}
|
||||
l.Debug().Msg("create consumer table")
|
||||
return conn.Exec(ctx, fmt.Sprintf(`
|
||||
CREATE MATERIALIZED VIEW %s TO %s
|
||||
AS %s
|
||||
FROM %s`, viewName, tableName, selectClause, "flows"))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationsStepSetTTLFlowsTable(resolution ResolutionConfiguration) migrationStepFunc {
|
||||
return func(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
if resolution.TTL == 0 {
|
||||
l.Info().Msgf("not changing TTL for flows table with interval %s", resolution.Interval)
|
||||
return migrationStep{
|
||||
CheckQuery: `SELECT 1`,
|
||||
Args: []interface{}{},
|
||||
Do: func() error { return nil },
|
||||
}
|
||||
}
|
||||
tableName := "flows"
|
||||
if resolution.Interval != 0 {
|
||||
tableName = fmt.Sprintf("flows_%s", resolution.Interval)
|
||||
}
|
||||
seconds := uint64(resolution.TTL.Seconds())
|
||||
ttl := fmt.Sprintf("TTL TimeReceived + toIntervalSecond(%d)", seconds)
|
||||
return migrationStep{
|
||||
CheckQuery: `
|
||||
SELECT 1 FROM system.tables
|
||||
WHERE name = $1 AND database = currentDatabase() AND engine_full LIKE $2`,
|
||||
Args: []interface{}{
|
||||
tableName,
|
||||
fmt.Sprintf("%% %s %%", ttl),
|
||||
},
|
||||
Do: func() error {
|
||||
l.Warn().Msgf("updating TTL of flows table with interval %s, this can take a long time",
|
||||
resolution.Interval)
|
||||
return conn.Exec(ctx, fmt.Sprintf("ALTER TABLE %s MODIFY %s", tableName, ttl))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepCreateExportersView(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
return migrationStep{
|
||||
CheckQuery: queryTableHash(9989732154180416521, ""),
|
||||
Args: []interface{}{"exporters"},
|
||||
Do: func() error {
|
||||
l.Debug().Msg("drop exporters table")
|
||||
err := conn.Exec(ctx, `DROP TABLE IF EXISTS exporters SYNC`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot drop exporters table: %w", err)
|
||||
}
|
||||
return conn.Exec(ctx, `
|
||||
CREATE MATERIALIZED VIEW exporters
|
||||
ENGINE = ReplacingMergeTree(TimeReceived)
|
||||
ORDER BY (ExporterAddress, IfName)
|
||||
AS
|
||||
SELECT DISTINCT
|
||||
TimeReceived,
|
||||
ExporterAddress,
|
||||
ExporterName,
|
||||
ExporterGroup,
|
||||
ExporterRole,
|
||||
ExporterSite,
|
||||
ExporterRegion,
|
||||
ExporterTenant,
|
||||
[InIfName, OutIfName][num] AS IfName,
|
||||
[InIfDescription, OutIfDescription][num] AS IfDescription,
|
||||
[InIfSpeed, OutIfSpeed][num] AS IfSpeed,
|
||||
[InIfConnectivity, OutIfConnectivity][num] AS IfConnectivity,
|
||||
[InIfProvider, OutIfProvider][num] AS IfProvider,
|
||||
[InIfBoundary, OutIfBoundary][num] AS IfBoundary
|
||||
FROM flows
|
||||
ARRAY JOIN arrayEnumerate([1,2]) AS num
|
||||
`)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepCreateProtocolsDictionary(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
protocolsURL := fmt.Sprintf("%s/api/v0/orchestrator/clickhouse/protocols.csv", c.config.OrchestratorURL)
|
||||
source := fmt.Sprintf(`SOURCE(HTTP(URL '%s' FORMAT 'CSVWithNames'))`, protocolsURL)
|
||||
settings := `SETTINGS(format_csv_allow_single_quotes = 0)`
|
||||
sourceLike := fmt.Sprintf("%% %s%% %s%%", source, settings)
|
||||
return migrationStep{
|
||||
CheckQuery: `
|
||||
SELECT 1 FROM system.tables
|
||||
WHERE name = $1 AND database = currentDatabase() AND create_table_query LIKE $2`,
|
||||
Args: []interface{}{"protocols", sourceLike},
|
||||
Do: func() error {
|
||||
return conn.Exec(ctx, fmt.Sprintf(`
|
||||
CREATE OR REPLACE DICTIONARY protocols (
|
||||
proto UInt8 INJECTIVE,
|
||||
name String,
|
||||
description String
|
||||
)
|
||||
PRIMARY KEY proto
|
||||
%s
|
||||
LIFETIME(MIN 0 MAX 3600)
|
||||
LAYOUT(HASHED())
|
||||
%s
|
||||
`, source, settings))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepCreateASNsDictionary(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
asnsURL := fmt.Sprintf("%s/api/v0/orchestrator/clickhouse/asns.csv", c.config.OrchestratorURL)
|
||||
source := fmt.Sprintf(`SOURCE(HTTP(URL '%s' FORMAT 'CSVWithNames'))`, asnsURL)
|
||||
settings := `SETTINGS(format_csv_allow_single_quotes = 0)`
|
||||
sourceLike := fmt.Sprintf("%% %s%% %s%%", source, settings)
|
||||
return migrationStep{
|
||||
CheckQuery: `
|
||||
SELECT 1 FROM system.tables
|
||||
WHERE name = $1 AND database = currentDatabase() AND create_table_query LIKE $2`,
|
||||
Args: []interface{}{"asns", sourceLike},
|
||||
Do: func() error {
|
||||
return conn.Exec(ctx, fmt.Sprintf(`
|
||||
CREATE OR REPLACE DICTIONARY asns (
|
||||
asn UInt32 INJECTIVE,
|
||||
name String
|
||||
)
|
||||
|
||||
PRIMARY KEY asn
|
||||
%s
|
||||
LIFETIME(MIN 0 MAX 3600)
|
||||
LAYOUT(HASHED())
|
||||
%s
|
||||
`, source, settings))
|
||||
},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepCreateNetworksDictionary(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
networksURL := fmt.Sprintf("%s/api/v0/orchestrator/clickhouse/networks.csv", c.config.OrchestratorURL)
|
||||
source := fmt.Sprintf(`SOURCE(HTTP(URL '%s' FORMAT 'CSVWithNames'))`, networksURL)
|
||||
settings := `SETTINGS(format_csv_allow_single_quotes = 0)`
|
||||
sourceLike := fmt.Sprintf("%% %s%% %s%%", source, settings)
|
||||
return migrationStep{
|
||||
CheckQuery: queryTableHash(5246378884861475308, "AND create_table_query LIKE $2"),
|
||||
Args: []interface{}{"networks", sourceLike},
|
||||
Do: func() error {
|
||||
return conn.Exec(ctx, fmt.Sprintf(`
|
||||
CREATE OR REPLACE DICTIONARY networks (
|
||||
network String,
|
||||
name String,
|
||||
role String,
|
||||
site String,
|
||||
region String,
|
||||
tenant String
|
||||
)
|
||||
|
||||
PRIMARY KEY network
|
||||
%s
|
||||
LIFETIME(MIN 0 MAX 3600)
|
||||
LAYOUT(IP_TRIE())
|
||||
%s
|
||||
`, source, settings))
|
||||
},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepCreateRawFlowsTable(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
tableName := fmt.Sprintf("flows_%d_raw", flow.CurrentSchemaVersion)
|
||||
kafkaEngine := fmt.Sprintf("Kafka SETTINGS %s", strings.Join([]string{
|
||||
fmt.Sprintf(`kafka_broker_list = '%s'`,
|
||||
strings.Join(c.config.Kafka.Brokers, ",")),
|
||||
fmt.Sprintf(`kafka_topic_list = '%s-v%d'`,
|
||||
c.config.Kafka.Topic, flow.CurrentSchemaVersion),
|
||||
`kafka_group_name = 'clickhouse'`,
|
||||
`kafka_format = 'Protobuf'`,
|
||||
fmt.Sprintf(`kafka_schema = 'flow-%d.proto:FlowMessagev%d'`,
|
||||
flow.CurrentSchemaVersion, flow.CurrentSchemaVersion),
|
||||
fmt.Sprintf(`kafka_num_consumers = %d`, c.config.Kafka.Consumers),
|
||||
`kafka_thread_per_consumer = 1`,
|
||||
`kafka_handle_error_mode = 'stream'`,
|
||||
}, ", "))
|
||||
return migrationStep{
|
||||
CheckQuery: queryTableHash(8163754828379578018, "AND engine_full = $2"),
|
||||
Args: []interface{}{tableName, kafkaEngine},
|
||||
Do: func() error {
|
||||
l.Debug().Msg("drop raw consumer table")
|
||||
err := conn.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s_consumer SYNC`, tableName))
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot drop raw consumer table: %w", err)
|
||||
}
|
||||
l.Debug().Msg("drop raw table")
|
||||
err = conn.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, tableName))
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot drop raw table: %w", err)
|
||||
}
|
||||
l.Debug().Msg("create raw table")
|
||||
return conn.Exec(ctx, fmt.Sprintf(`
|
||||
CREATE TABLE %s
|
||||
(
|
||||
%s,
|
||||
DstLargeCommunities Nested(ASN UInt32, LocalData1 UInt32, LocalData2 UInt32)
|
||||
)
|
||||
ENGINE = %s`, tableName, partialSchema(
|
||||
"SrcNetName", "DstNetName",
|
||||
"SrcNetRole", "DstNetRole",
|
||||
"SrcNetSite", "DstNetSite",
|
||||
"SrcNetRegion", "DstNetRegion",
|
||||
"SrcNetTenant", "DstNetTenant",
|
||||
"Dst1stAS", "Dst2ndAS", "Dst3rdAS",
|
||||
"DstLargeCommunities",
|
||||
), kafkaEngine))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepCreateRawFlowsConsumerView(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
tableName := fmt.Sprintf("flows_%d_raw", flow.CurrentSchemaVersion)
|
||||
viewName := fmt.Sprintf("%s_consumer", tableName)
|
||||
return migrationStep{
|
||||
CheckQuery: queryTableHash(7925127510274634003, "AND as_select LIKE '% WHERE length(_error) = 0'"),
|
||||
Args: []interface{}{viewName},
|
||||
Do: func() error {
|
||||
l.Debug().Msg("drop consumer table")
|
||||
err := conn.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName))
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot drop consumer table: %w", err)
|
||||
}
|
||||
l.Debug().Msg("create consumer table")
|
||||
largeCommunitiesColumns := strings.Join([]string{
|
||||
"`DstLargeCommunities.ASN`",
|
||||
"`DstLargeCommunities.LocalData1`",
|
||||
"`DstLargeCommunities.LocalData2`"}, ",")
|
||||
return conn.Exec(ctx, fmt.Sprintf(`
|
||||
CREATE MATERIALIZED VIEW %s TO flows
|
||||
AS WITH arrayCompact(DstASPath) AS c_DstASPath SELECT
|
||||
* EXCEPT (%s),
|
||||
dictGetOrDefault('networks', 'name', SrcAddr, '') AS SrcNetName,
|
||||
dictGetOrDefault('networks', 'name', DstAddr, '') AS DstNetName,
|
||||
dictGetOrDefault('networks', 'role', SrcAddr, '') AS SrcNetRole,
|
||||
dictGetOrDefault('networks', 'role', DstAddr, '') AS DstNetRole,
|
||||
dictGetOrDefault('networks', 'site', SrcAddr, '') AS SrcNetSite,
|
||||
dictGetOrDefault('networks', 'site', DstAddr, '') AS DstNetSite,
|
||||
dictGetOrDefault('networks', 'region', SrcAddr, '') AS SrcNetRegion,
|
||||
dictGetOrDefault('networks', 'region', DstAddr, '') AS DstNetRegion,
|
||||
dictGetOrDefault('networks', 'tenant', SrcAddr, '') AS SrcNetTenant,
|
||||
dictGetOrDefault('networks', 'tenant', DstAddr, '') AS DstNetTenant,
|
||||
c_DstASPath[1] AS Dst1stAS,
|
||||
c_DstASPath[2] AS Dst2ndAS,
|
||||
c_DstASPath[3] AS Dst3rdAS,
|
||||
arrayMap((asn, l1, l2) -> bitShiftLeft(asn::UInt128, 64) + bitShiftLeft(l1::UInt128, 32) + l2::UInt128, %s) AS DstLargeCommunities
|
||||
FROM %s
|
||||
WHERE length(_error) = 0`,
|
||||
viewName,
|
||||
largeCommunitiesColumns, largeCommunitiesColumns,
|
||||
tableName))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepCreateRawFlowsErrorsView(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
tableName := fmt.Sprintf("flows_%d_raw", flow.CurrentSchemaVersion)
|
||||
viewName := fmt.Sprintf("%s_errors", tableName)
|
||||
return migrationStep{
|
||||
CheckQuery: queryTableHash(9120662669408051900, ""),
|
||||
Args: []interface{}{viewName},
|
||||
Do: func() error {
|
||||
l.Debug().Msg("drop kafka errors table")
|
||||
err := conn.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName))
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot drop kafka errors table: %w", err)
|
||||
}
|
||||
l.Debug().Msg("create kafka errors table")
|
||||
return conn.Exec(ctx, fmt.Sprintf(`
|
||||
CREATE MATERIALIZED VIEW %s
|
||||
ENGINE = MergeTree
|
||||
ORDER BY (timestamp, topic, partition, offset)
|
||||
PARTITION BY toYYYYMMDDhhmmss(toStartOfHour(timestamp))
|
||||
TTL timestamp + INTERVAL 1 DAY
|
||||
AS SELECT
|
||||
now() AS timestamp,
|
||||
_topic AS topic,
|
||||
_partition AS partition,
|
||||
_offset AS offset,
|
||||
_raw_message AS raw,
|
||||
_error AS error
|
||||
FROM %s
|
||||
WHERE length(_error) > 0`,
|
||||
viewName,
|
||||
tableName))
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -81,7 +81,9 @@ func (c *Component) Start() error {
|
||||
for {
|
||||
if !c.config.SkipMigrations {
|
||||
c.r.Info().Msg("attempting database migration")
|
||||
if err := c.migrateDatabase(); err == nil {
|
||||
if err := c.migrateDatabase(); err != nil {
|
||||
c.r.Err(err).Msg("database migration error")
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
12
orchestrator/clickhouse/testdata/states/001-ttl.csv
vendored
Normal file
12
orchestrator/clickhouse/testdata/states/001-ttl.csv
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
"asns","CREATE DICTIONARY default.asns (`asn` UInt32 INJECTIVE, `name` String) PRIMARY KEY asn SOURCE(HTTP(URL 'http://something/api/v0/orchestrator/clickhouse/asns.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(HASHED())"
|
||||
"exporters","CREATE MATERIALIZED VIEW default.exporters (`TimeReceived` DateTime, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `IfName` String, `IfDescription` String, `IfSpeed` UInt32, `IfConnectivity` String, `IfProvider` String, `IfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2)) ENGINE = ReplacingMergeTree(TimeReceived) ORDER BY (ExporterAddress, IfName) SETTINGS index_granularity = 8192 AS SELECT DISTINCT TimeReceived, ExporterAddress, ExporterName, ExporterGroup, [InIfName, OutIfName][num] AS IfName, [InIfDescription, OutIfDescription][num] AS IfDescription, [InIfSpeed, OutIfSpeed][num] AS IfSpeed, [InIfConnectivity, OutIfConnectivity][num] AS IfConnectivity, [InIfProvider, OutIfProvider][num] AS IfProvider, [InIfBoundary, OutIfBoundary][num] AS IfBoundary FROM default.flows ARRAY JOIN arrayEnumerate([1, 2]) AS num"
|
||||
"flows","CREATE TABLE default.flows (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt32, `DstPort` UInt32, `Bytes` UInt64, `Packets` UInt64, `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = MergeTree PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalSecond(25920))) ORDER BY (TimeReceived, ExporterAddress, InIfName, OutIfName) TTL TimeReceived + toIntervalSecond(1296002) SETTINGS index_granularity = 8192"
|
||||
"flows_1_raw","CREATE TABLE default.flows_1_raw (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt32, `DstPort` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'flows-v1', kafka_group_name = 'clickhouse', kafka_format = 'Protobuf', kafka_schema = 'flow-1.proto:FlowMessage', kafka_num_consumers = 1, kafka_thread_per_consumer = 1"
|
||||
"flows_1_raw_consumer","CREATE MATERIALIZED VIEW default.flows_1_raw_consumer TO default.flows (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt32, `DstPort` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT * FROM default.flows_1_raw"
|
||||
"flows_1h0m0s","CREATE TABLE default.flows_1h0m0s (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = SummingMergeTree((Bytes, Packets)) PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalSecond(622080))) ORDER BY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate) TTL TimeReceived + toIntervalSecond(31104002) SETTINGS index_granularity = 8192"
|
||||
"flows_1h0m0s_consumer","CREATE MATERIALIZED VIEW default.flows_1h0m0s_consumer TO default.flows_1h0m0s (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT * EXCEPT (SrcAddr, DstAddr, SrcPort, DstPort) REPLACE toStartOfInterval(TimeReceived, toIntervalSecond(3600)) AS TimeReceived FROM default.flows"
|
||||
"flows_1m0s","CREATE TABLE default.flows_1m0s (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = SummingMergeTree((Bytes, Packets)) PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalSecond(12096))) ORDER BY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate) TTL TimeReceived + toIntervalSecond(604802) SETTINGS index_granularity = 8192"
|
||||
"flows_1m0s_consumer","CREATE MATERIALIZED VIEW default.flows_1m0s_consumer TO default.flows_1m0s (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT * EXCEPT (SrcAddr, DstAddr, SrcPort, DstPort) REPLACE toStartOfInterval(TimeReceived, toIntervalSecond(60)) AS TimeReceived FROM default.flows"
|
||||
"flows_5m0s","CREATE TABLE default.flows_5m0s (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = SummingMergeTree((Bytes, Packets)) PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalSecond(155520))) ORDER BY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate) TTL TimeReceived + toIntervalSecond(7776002) SETTINGS index_granularity = 8192"
|
||||
"flows_5m0s_consumer","CREATE MATERIALIZED VIEW default.flows_5m0s_consumer TO default.flows_5m0s (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT * EXCEPT (SrcAddr, DstAddr, SrcPort, DstPort) REPLACE toStartOfInterval(TimeReceived, toIntervalSecond(300)) AS TimeReceived FROM default.flows"
|
||||
"protocols","CREATE DICTIONARY default.protocols (`proto` UInt8 INJECTIVE, `name` String, `description` String) PRIMARY KEY proto SOURCE(HTTP(URL 'http://something/api/v0/orchestrator/clickhouse/protocols.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(HASHED())"
|
||||
|
@@ -5,7 +5,7 @@
|
||||
"protocols","CREATE DICTIONARY default.protocols (`proto` UInt8 INJECTIVE, `name` String, `description` String) PRIMARY KEY proto SOURCE(HTTP(URL 'http://something/api/v0/orchestrator/clickhouse/protocols.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(HASHED()) SETTINGS(format_csv_allow_single_quotes = 0)"
|
||||
"flows_1m0s","CREATE TABLE default.flows_1m0s (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = SummingMergeTree((Bytes, Packets)) PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalSecond(12096))) PRIMARY KEY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate) ORDER BY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate, SrcNetName, DstNetName, SrcNetRole, DstNetRole, SrcNetSite, DstNetSite, SrcNetRegion, DstNetRegion, SrcNetTenant, DstNetTenant, SrcCountry, DstCountry, Dst1stAS, Dst2ndAS, Dst3rdAS) TTL TimeReceived + toIntervalSecond(604800) SETTINGS index_granularity = 8192"
|
||||
"flows_5m0s","CREATE TABLE default.flows_5m0s (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = SummingMergeTree((Bytes, Packets)) PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalSecond(155520))) PRIMARY KEY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate) ORDER BY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate, SrcNetName, DstNetName, SrcNetRole, DstNetRole, SrcNetSite, DstNetSite, SrcNetRegion, DstNetRegion, SrcNetTenant, DstNetTenant, SrcCountry, DstCountry, Dst1stAS, Dst2ndAS, Dst3rdAS) TTL TimeReceived + toIntervalSecond(7776000) SETTINGS index_granularity = 8192"
|
||||
"flows_4_raw","CREATE TABLE default.flows_4_raw (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcNetMask` UInt8, `DstNetMask` UInt8, `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `DstASPath` Array(UInt32), `DstCommunities` Array(UInt32), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt32, `DstPort` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32, `DstLargeCommunities.ASN` Array(UInt32), `DstLargeCommunities.LocalData1` Array(UInt32), `DstLargeCommunities.LocalData2` Array(UInt32)) ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'flows-v4', kafka_group_name = 'clickhouse', kafka_format = 'Protobuf', kafka_schema = 'flow-4.proto:FlowMessagev4', kafka_num_consumers = 1, kafka_thread_per_consumer = 1, kafka_handle_error_mode = 'stream'"
|
||||
"flows_4_raw","CREATE TABLE default.flows_4_raw (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcNetMask` UInt8, `DstNetMask` UInt8, `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `DstASPath` Array(UInt32), `DstCommunities` Array(UInt32), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `DstLargeCommunities.ASN` Array(UInt32), `DstLargeCommunities.LocalData1` Array(UInt32), `DstLargeCommunities.LocalData2` Array(UInt32), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt32, `DstPort` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'flows-v4', kafka_group_name = 'clickhouse', kafka_format = 'Protobuf', kafka_schema = 'flow-4.proto:FlowMessagev4', kafka_num_consumers = 1, kafka_thread_per_consumer = 1, kafka_handle_error_mode = 'stream'"
|
||||
"flows_1h0m0s","CREATE TABLE default.flows_1h0m0s (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = SummingMergeTree((Bytes, Packets)) PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalSecond(622080))) PRIMARY KEY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate) ORDER BY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate, SrcNetName, DstNetName, SrcNetRole, DstNetRole, SrcNetSite, DstNetSite, SrcNetRegion, DstNetRegion, SrcNetTenant, DstNetTenant, SrcCountry, DstCountry, Dst1stAS, Dst2ndAS, Dst3rdAS) TTL TimeReceived + toIntervalSecond(31104000) SETTINGS index_granularity = 8192"
|
||||
"flows_4_raw_errors","CREATE MATERIALIZED VIEW default.flows_4_raw_errors (`timestamp` DateTime, `topic` LowCardinality(String), `partition` UInt64, `offset` UInt64, `raw` String, `error` String) ENGINE = MergeTree PARTITION BY toYYYYMMDDhhmmss(toStartOfHour(timestamp)) ORDER BY (timestamp, topic, partition, offset) TTL timestamp + toIntervalDay(1) SETTINGS index_granularity = 8192 AS SELECT now() AS timestamp, _topic AS topic, _partition AS partition, _offset AS offset, _raw_message AS raw, _error AS error FROM default.flows_4_raw WHERE length(_error) > 0"
|
||||
"flows_1m0s_consumer","CREATE MATERIALIZED VIEW default.flows_1m0s_consumer TO default.flows_1m0s (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT * EXCEPT (SrcAddr, DstAddr, SrcNetMask, DstNetMask, SrcPort, DstPort, DstASPath, DstCommunities, DstLargeCommunities) REPLACE toStartOfInterval(TimeReceived, toIntervalSecond(60)) AS TimeReceived FROM default.flows"
|
||||
|
||||
|
Reference in New Issue
Block a user