From 28783ff4f3bc2997b8ca971bb8d1cb41aa267f41 Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Sun, 24 Mar 2024 12:22:57 +0100 Subject: [PATCH] orchestrator/clickhouse: add support for distributed/replicated tables Fix #605 All MergeTree tables are now replicated. For some tables, a `_local` variant is added and the non-`_local` variant is now distributed. The distributed tables are the `flows` table, the `flows_DDDD` tables (where `DDDD` is a duration), as well as the `flows_raw_errors` table. The `exporters` table is not distributed and stays local. The data is following this schema: - data is coming from `flows_HHHH_raw` table, using the Kafka engine - the `flows_HHHH_raw_consumer` reads data from `flows_HHHH_raw` (local) and sends it to `flows` (distributed) when there is no error - the `flows_raw_errors_consumer` reads data from `flows_HHHH_raw` (local) and sends it to `flows_raw_errors` (distributed) - the `flows_DDDD_consumer` reads fata from `flows_local` (local) and sends it to `flow_DDDD_local` (local) - the `exporters_consumer` reads data from `flows` (distributed) and sends it to `exporters` (local) The reason for `flows_HHHH_raw_consumer` to send data to the distributed `flows` table, and not the local one is to ensure flows are balanced (for example, if there is not enough Kafka partitions). But sending it to `flows_local` would have been possible. On the other hand, it is important for `flows_DDDD_consumer` to read from local to avoid duplication. It could have sent to distributed, but the data is now balanced correctly and we just send it to local instead for better performance. The `exporters_consumer` is allowed to read from the distributed `flows` table because it writes the result to the local `exporters` table. --- Makefile | 2 +- common/clickhousedb/config.go | 9 + common/clickhousedb/root_test.go | 2 +- common/clickhousedb/tests.go | 20 +- common/clickhousedb/utils.go | 80 +++ common/clickhousedb/utils_test.go | 74 ++ console/clickhouse.go | 4 +- console/clickhouse_test.go | 4 +- console/data/docs/02-configuration.md | 17 +- console/data/docs/99-changelog.md | 7 + docker/clickhouse-cluster-1.xml | 6 + docker/clickhouse-cluster-2.xml | 6 + docker/clickhouse-cluster-3.xml | 6 + docker/clickhouse-cluster-4.xml | 6 + docker/clickhouse-cluster.xml | 39 + docker/docker-compose-dev.yml | 28 +- orchestrator/clickhouse/geoip_test.go | 2 +- orchestrator/clickhouse/http_test.go | 4 +- orchestrator/clickhouse/migrations.go | 55 +- orchestrator/clickhouse/migrations_helpers.go | 213 ++++-- orchestrator/clickhouse/migrations_test.go | 666 +++++++----------- orchestrator/clickhouse/source_test.go | 2 +- .../testdata/states/000-cluster.csv | 0 .../testdata/states/001-cluster.csv | 22 + 24 files changed, 775 insertions(+), 499 deletions(-) create mode 100644 common/clickhousedb/utils.go create mode 100644 common/clickhousedb/utils_test.go create mode 100644 docker/clickhouse-cluster-1.xml create mode 100644 docker/clickhouse-cluster-2.xml create mode 100644 docker/clickhouse-cluster-3.xml create mode 100644 docker/clickhouse-cluster-4.xml create mode 100644 docker/clickhouse-cluster.xml create mode 100644 orchestrator/clickhouse/testdata/states/000-cluster.csv create mode 100644 orchestrator/clickhouse/testdata/states/001-cluster.csv diff --git a/Makefile b/Makefile index e2a8c37f..1c61be7e 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ PKGS = $(or $(PKG),$(shell env GO111MODULE=on $(GO) list ./...)) BIN = bin GO = go -TIMEOUT = 30 +TIMEOUT = 45 LSFILES = git ls-files -cmo --exclude-standard -- V = 0 Q = $(if $(filter 1,$V),,@) diff --git a/common/clickhousedb/config.go b/common/clickhousedb/config.go index cc4d9624..8ded068d 100644 --- a/common/clickhousedb/config.go +++ b/common/clickhousedb/config.go @@ -13,6 +13,10 @@ import ( type Configuration struct { // Servers define the list of clickhouse servers to connect to (with ports) Servers []string `validate:"min=1,dive,listen"` + // Cluster defines the cluster to operate on. This should not change + // anything from a client point of view, but this switch some mode of + // operations. + Cluster string // Database defines the database to use Database string `validate:"required"` // Username defines the username to use for authentication @@ -41,3 +45,8 @@ func DefaultConfiguration() Configuration { }, } } + +// IsInCluster returns true if the ClickHouse server runs on a cluster. +func (c *Component) IsInCluster() bool { + return c.config.Cluster != "" +} diff --git a/common/clickhousedb/root_test.go b/common/clickhousedb/root_test.go index 68a0b7c6..bd3b932f 100644 --- a/common/clickhousedb/root_test.go +++ b/common/clickhousedb/root_test.go @@ -116,7 +116,7 @@ func TestMock(t *testing.T) { func TestRealClickHouse(t *testing.T) { r := reporter.NewMock(t) - chComponent := SetupClickHouse(t, r) + chComponent := SetupClickHouse(t, r, false) // Check a select query t.Run("select", func(t *testing.T) { diff --git a/common/clickhousedb/tests.go b/common/clickhousedb/tests.go index 59571a06..aaad298e 100644 --- a/common/clickhousedb/tests.go +++ b/common/clickhousedb/tests.go @@ -19,24 +19,30 @@ import ( ) // SetupClickHouse configures a client to use for testing. -func SetupClickHouse(t *testing.T, r *reporter.Reporter) *Component { +func SetupClickHouse(t *testing.T, r *reporter.Reporter, cluster bool) *Component { t.Helper() - chServer := helpers.CheckExternalService(t, "ClickHouse", - []string{"clickhouse:9000", "127.0.0.1:9000"}) config := DefaultConfiguration() - config.Servers = []string{chServer} + config.Servers = []string{ + helpers.CheckExternalService(t, "ClickHouse", + []string{"clickhouse:9000", "127.0.0.1:9000"})} + if cluster { + config.Cluster = "akvorado" + } config.DialTimeout = 100 * time.Millisecond c, err := New(r, config, Dependencies{Daemon: daemon.NewMock(t)}) if err != nil { t.Fatalf("New() error:\n%+v", err) } helpers.StartStop(t, c) - if err := c.Exec(context.Background(), "DROP TABLE IF EXISTS system.metric_log"); err != nil { - t.Fatalf("Exec() error:\n%+v", err) - } + c.ExecOnCluster(context.Background(), "DROP TABLE IF EXISTS system.metric_log") return c } +// ClusterName returns the name of the cluster (for testing only) +func (c *Component) ClusterName() string { + return c.config.Cluster +} + // NewMock creates a new component using a mock driver. It returns // both the component and the mock driver. func NewMock(t *testing.T, r *reporter.Reporter) (*Component, *mocks.MockConn) { diff --git a/common/clickhousedb/utils.go b/common/clickhousedb/utils.go new file mode 100644 index 00000000..e687c817 --- /dev/null +++ b/common/clickhousedb/utils.go @@ -0,0 +1,80 @@ +// SPDX-FileCopyrightText: 2024 Free Mobile +// SPDX-License-Identifier: AGPL-3.0-only + +package clickhousedb + +import ( + "context" + "fmt" + "regexp" + "strings" +) + +// ExecOnCluster executes a query on a cluster. It's a wrapper around Exec() +// invoking TransformQueryOnCluster. +func (c *Component) ExecOnCluster(ctx context.Context, query string, args ...any) error { + if c.config.Cluster != "" { + query = TransformQueryOnCluster(query, c.config.Cluster) + } + return c.Exec(ctx, query, args...) +} + +var ( + spacesRegexp = regexp.MustCompile("\\s+") + statementBeforeOnClusterRegexp = regexp.MustCompile(fmt.Sprintf("^((?i)%s)", strings.Join([]string{ + `ALTER TABLE \S+`, + `ATTACH DICTIONARY \S+`, + `(ATTACH|CREATE) DATABASE( IF NOT EXISTS)? \S+`, + `(ATTACH|CREATE( OR REPLACE)?|REPLACE) DICTIONARY( IF NOT EXISTS)? \S+`, + `(ATTACH|CREATE) LIVE VIEW (IF NOT EXISTS)? \S+`, + `(ATTACH|CREATE) MATERIALIZED VIEW( IF NOT EXISTS)? \S+`, + `(ATTACH|CREATE( OR REPLACE)?|REPLACE)( TEMPORARY)? TABLE( IF NOT EXISTS)? \S+`, + `(DETACH|DROP) DATABASE( IF EXISTS)? \S+`, + `(DETACH|DROP) (DICTIONARY|(TEMPORARY )?TABLE|VIEW)( IF EXISTS?) \S+`, + `KILL MUTATION`, + `OPTIMIZE TABLE \S+`, + `RENAME TABLE \S+ TO \S+`, // this is incomplete + `TRUNCATE( TEMPORARY)?( TABLE)?( IF EXISTS)? \S+`, + // not part of the grammar + `SYSTEM RELOAD DICTIONARIES`, + `SYSTEM RELOAD DICTIONARY`, + }, "|"))) +) + +// TransformQueryOnCluster turns a ClickHouse query into its equivalent to be +// run on a cluster by adding the ON CLUSTER directive. +func TransformQueryOnCluster(query string, cluster string) string { + // From utils/antlr/ClickHouseParser.g4: + // + // ALTER TABLE tableIdentifier clusterClause? alterTableClause (COMMA alterTableClause)* + // ATTACH DICTIONARY tableIdentifier clusterClause? + // (ATTACH | CREATE) DATABASE (IF NOT EXISTS)? databaseIdentifier clusterClause? engineExpr? + // (ATTACH | CREATE (OR REPLACE)? | REPLACE) DICTIONARY (IF NOT EXISTS)? tableIdentifier uuidClause? clusterClause? dictionarySchemaClause dictionaryEngineClause + // (ATTACH | CREATE) LIVE VIEW (IF NOT EXISTS)? tableIdentifier uuidClause? clusterClause? (WITH TIMEOUT DECIMAL_LITERAL?)? destinationClause? tableSchemaClause? subqueryClause + // (ATTACH | CREATE) MATERIALIZED VIEW (IF NOT EXISTS)? tableIdentifier uuidClause? clusterClause? tableSchemaClause? (destinationClause | engineClause POPULATE?) subqueryClause + // (ATTACH | CREATE (OR REPLACE)? | REPLACE) TEMPORARY? TABLE (IF NOT EXISTS)? tableIdentifier uuidClause? clusterClause? tableSchemaClause? engineClause? subqueryClause? + // (ATTACH | CREATE) (OR REPLACE)? VIEW (IF NOT EXISTS)? tableIdentifier uuidClause? clusterClause? tableSchemaClause? subqueryClause + // (DETACH | DROP) DATABASE (IF EXISTS)? databaseIdentifier clusterClause? + // (DETACH | DROP) (DICTIONARY | TEMPORARY? TABLE | VIEW) (IF EXISTS)? tableIdentifier clusterClause? (NO DELAY)? + // KILL MUTATION clusterClause? whereClause (SYNC | ASYNC | TEST)? + // OPTIMIZE TABLE tableIdentifier clusterClause? partitionClause? FINAL? DEDUPLICATE?; + // RENAME TABLE tableIdentifier TO tableIdentifier (COMMA tableIdentifier TO tableIdentifier)* clusterClause?; + // TRUNCATE TEMPORARY? TABLE? (IF EXISTS)? tableIdentifier clusterClause?; + + // In ClickHouse, an identifier uses the following syntax: + // + // IDENTIFIER + // : (LETTER | UNDERSCORE) (LETTER | UNDERSCORE | DEC_DIGIT)* + // | BACKQUOTE ( ~([\\`]) | (BACKSLASH .) | (BACKQUOTE BACKQUOTE) )* BACKQUOTE + // | QUOTE_DOUBLE ( ~([\\"]) | (BACKSLASH .) | (QUOTE_DOUBLE QUOTE_DOUBLE) )* QUOTE_DOUBLE + // ; + // + // Since we don't have to accept everything, we simplify it to \S+. + query = strings.TrimSpace(spacesRegexp.ReplaceAllString(query, " ")) + prefix := statementBeforeOnClusterRegexp.FindString(query) + if prefix == "" { + return query + } + + return fmt.Sprintf("%s ON CLUSTER %s%s", prefix, cluster, query[len(prefix):]) +} diff --git a/common/clickhousedb/utils_test.go b/common/clickhousedb/utils_test.go new file mode 100644 index 00000000..e051fedd --- /dev/null +++ b/common/clickhousedb/utils_test.go @@ -0,0 +1,74 @@ +// SPDX-FileCopyrightText: 2024 Free Mobile +// SPDX-License-Identifier: AGPL-3.0-only + +package clickhousedb + +import ( + "testing" + + "akvorado/common/helpers" +) + +func TestTransformQueryOnCluster(t *testing.T) { + cases := []struct { + Input string + Expected string + }{ + {"SYSTEM RELOAD DICTIONARIES", "SYSTEM RELOAD DICTIONARIES ON CLUSTER akvorado"}, + {"system reload dictionaries", "system reload dictionaries ON CLUSTER akvorado"}, + {" system reload dictionaries ", "system reload dictionaries ON CLUSTER akvorado"}, + {"DROP DATABASE IF EXISTS 02028_db", "DROP DATABASE IF EXISTS 02028_db ON CLUSTER akvorado"}, + { + "CREATE TABLE test_01148_atomic.rmt2 (n int, PRIMARY KEY n) ENGINE=ReplicatedMergeTree", + "CREATE TABLE test_01148_atomic.rmt2 ON CLUSTER akvorado (n int, PRIMARY KEY n) ENGINE=ReplicatedMergeTree", + }, + { + "DROP TABLE IF EXISTS test_repl NO DELAY", + "DROP TABLE IF EXISTS test_repl ON CLUSTER akvorado NO DELAY", + }, + { + "ALTER TABLE 02577_keepermap_delete_update UPDATE value2 = value2 * 10 + 2 WHERE value2 < 100", + "ALTER TABLE 02577_keepermap_delete_update ON CLUSTER akvorado UPDATE value2 = value2 * 10 + 2 WHERE value2 < 100", + }, + {"ATTACH DICTIONARY db_01018.dict1", "ATTACH DICTIONARY db_01018.dict1 ON CLUSTER akvorado"}, + { + `CREATE DICTIONARY default.asns +( + asn UInt32 INJECTIVE, + name String +) +PRIMARY KEY asn +SOURCE(HTTP(URL 'http://akvorado-orchestrator:8080/api/v0/orchestrator/clickhouse/asns.csv' FORMAT 'CSVWithNames')) +LIFETIME(MIN 0 MAX 3600) +LAYOUT(HASHED()) +SETTINGS(format_csv_allow_single_quotes = 0)`, + `CREATE DICTIONARY default.asns ON CLUSTER akvorado ( asn UInt32 INJECTIVE, name String ) PRIMARY KEY asn SOURCE(HTTP(URL 'http://akvorado-orchestrator:8080/api/v0/orchestrator/clickhouse/asns.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(HASHED()) SETTINGS(format_csv_allow_single_quotes = 0)`, + }, + { + ` +CREATE TABLE queue ( + timestamp UInt64, + level String, + message String + ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow') +`, + `CREATE TABLE queue ON CLUSTER akvorado ( timestamp UInt64, level String, message String ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow')`, + }, + { + ` +CREATE MATERIALIZED VIEW consumer TO daily +AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total +FROM queue GROUP BY day, level +`, + `CREATE MATERIALIZED VIEW consumer ON CLUSTER akvorado TO daily AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total FROM queue GROUP BY day, level`, + }, + // Not modified + {"SELECT 1", "SELECT 1"}, + } + for _, tc := range cases { + got := TransformQueryOnCluster(tc.Input, "akvorado") + if diff := helpers.Diff(got, tc.Expected); diff != "" { + t.Errorf("TransformQueryOnCluster(%q) (-got +want):\n%s", tc.Input, diff) + } + } +} diff --git a/console/clickhouse.go b/console/clickhouse.go index d1b81f98..7ac3901c 100644 --- a/console/clickhouse.go +++ b/console/clickhouse.go @@ -36,7 +36,9 @@ SELECT name FROM system.tables WHERE database=currentDatabase() AND table LIKE 'flows%' -AND engine LIKE '%MergeTree' +AND table NOT LIKE '%_local' +AND table != 'flows_raw_errors' +AND (engine LIKE '%MergeTree' OR engine = 'Distributed') `) if err != nil { return fmt.Errorf("cannot query flows table metadata: %w", err) diff --git a/console/clickhouse_test.go b/console/clickhouse_test.go index 8ee0fb15..a75baf2d 100644 --- a/console/clickhouse_test.go +++ b/console/clickhouse_test.go @@ -21,7 +21,9 @@ SELECT name FROM system.tables WHERE database=currentDatabase() AND table LIKE 'flows%' -AND engine LIKE '%MergeTree' +AND table NOT LIKE '%_local' +AND table != 'flows_raw_errors' +AND (engine LIKE '%MergeTree' OR engine = 'Distributed') `). Return(nil). SetArg(1, []struct { diff --git a/console/data/docs/02-configuration.md b/console/data/docs/02-configuration.md index 4d61d3d5..8e4b9554 100644 --- a/console/data/docs/02-configuration.md +++ b/console/data/docs/02-configuration.md @@ -790,6 +790,7 @@ provided: - `username` is the username to use for authentication - `password` is the password to use for authentication - `database` defines the database to use to create tables +- `cluster` defines the cluster for replicated and distributed tables, see below for more information - `kafka` defines the configuration for the Kafka consumer. The accepted keys are: - `consumers` defines the number of consumers to use to consume messages from the Kafka topic. It is silently bound by the maximum number of threads @@ -838,7 +839,7 @@ resolution has two keys: `interval` and `ttl`. The first one is the consolidation interval. The second is how long to keep the data in the database. If `ttl` is 0, then the data is kept forever. If `interval` is 0, it applies to the raw data (the one in the `flows` table). For -each resolution, a materialized view `flows_XXXX` is created with the +each resolution, a materialized view `flows_DDDD` is created with the specified interval. It should be noted that consolidated tables do not contain information about source/destination IP addresses and ports. That's why you may want to keep the interval-0 table data a bit @@ -868,6 +869,20 @@ ClickHouse database and will continue to be populated. It is mandatory to specify a configuration for `interval: 0`. +When specifying a cluster name with `cluster`, the orchestrator will manage a +set of replicated and distributed tables. No migration is done between the +cluster and the non-cluster modes, therefore, you shouldn't change this setting +without also changing the database. If you already have an existing setup, this +means you need to start from scratch and copy data. There is currently no +instruction for that, but it's mostly a matter of copying `flows` table to +`flows_local`, and `flows_DDDD` (where `DDDD` is an interval) tables to +`flows_DDDD_local`. + +When using `docker compose`, the provided setup for ClickHouse is not +cluster-ready. Have a look at `docker/docker-compose-dev.yml` to see how to +setup a ClickHouse cluster (but it makes little sense to have a single-node +`docker compose` setup with a ClickHouse cluster). + ### GeoIP The `geoip` directive allows one to configure two databases using the [MaxMind diff --git a/console/data/docs/99-changelog.md b/console/data/docs/99-changelog.md index 8e18f273..c2b6f27e 100644 --- a/console/data/docs/99-changelog.md +++ b/console/data/docs/99-changelog.md @@ -19,6 +19,12 @@ automatically migrated from the inlet component to the clickhouse component. This also changes how geo IP is used for AS numbers: geo IP is used as last resort when configured. It also increases memory usage (1.3GB for ClickHouse). +Another new feature is the ability to use a ClickHouse cluster deployment. This +is enabled when specifying a cluster name in `clickhouse`→`cluster`. There is no +automatic migration of an existing database. You should start from scratch and +copy data from the previous setup. Do not try to enable the cluster mode on +existing setup! + Support for Docker Compose V1 (`docker-compose` command) has been removed in favor of Docker Compose V2 (`docker compose` command). On Ubuntu/Debian systems, this means you can no longer use the `docker-compose` package. On Ubuntu, you @@ -28,6 +34,7 @@ can install the `docker-compose-v2` package. For other options, check the - 💥 *inlet*: GeoIP data is moved from inlets to ClickHouse, add city and region - 💥 *console*: persist console database on the default `docker compose` setup - 💥 *docker*: remove support for `docker-compose` V1 +- ✨ *orchestrator*: add support for ClickHouse clusters - ✨ *inlet*: add gNMI metadata provider - ✨ *inlet*: static metadata provider can provide exporter and interface metadata - ✨ *inlet*: static metadata provider can fetch its configuration from an HTTP endpoint diff --git a/docker/clickhouse-cluster-1.xml b/docker/clickhouse-cluster-1.xml new file mode 100644 index 00000000..aa6600ba --- /dev/null +++ b/docker/clickhouse-cluster-1.xml @@ -0,0 +1,6 @@ + + + 01 + 01 + + diff --git a/docker/clickhouse-cluster-2.xml b/docker/clickhouse-cluster-2.xml new file mode 100644 index 00000000..f27d73f4 --- /dev/null +++ b/docker/clickhouse-cluster-2.xml @@ -0,0 +1,6 @@ + + + 01 + 02 + + diff --git a/docker/clickhouse-cluster-3.xml b/docker/clickhouse-cluster-3.xml new file mode 100644 index 00000000..d1751faf --- /dev/null +++ b/docker/clickhouse-cluster-3.xml @@ -0,0 +1,6 @@ + + + 02 + 01 + + diff --git a/docker/clickhouse-cluster-4.xml b/docker/clickhouse-cluster-4.xml new file mode 100644 index 00000000..647fa72d --- /dev/null +++ b/docker/clickhouse-cluster-4.xml @@ -0,0 +1,6 @@ + + + 02 + 02 + + diff --git a/docker/clickhouse-cluster.xml b/docker/clickhouse-cluster.xml new file mode 100644 index 00000000..f40148d5 --- /dev/null +++ b/docker/clickhouse-cluster.xml @@ -0,0 +1,39 @@ + + + + + + true + + clickhouse + 9000 + + + clickhouse-2 + 9000 + + + + true + + clickhouse-3 + 9000 + + + clickhouse-4 + 9000 + + + + + + + zookeeper + 2181 + + + + 0 + diff --git a/docker/docker-compose-dev.yml b/docker/docker-compose-dev.yml index 913e35a7..3c5c81c4 100644 --- a/docker/docker-compose-dev.yml +++ b/docker/docker-compose-dev.yml @@ -32,13 +32,39 @@ services: ports: - 127.0.0.1:6379:6379/tcp - clickhouse: + clickhouse: &clickhouse extends: file: versions.yml service: clickhouse + depends_on: + - zookeeper + volumes: + - ./clickhouse-cluster.xml:/etc/clickhouse-server/config.d/cluster.xml + - ./clickhouse-cluster-1.xml:/etc/clickhouse-server/config.d/cluster-1.xml ports: - 127.0.0.1:8123:8123/tcp - 127.0.0.1:9000:9000/tcp + clickhouse-2: + <<: *clickhouse + volumes: + - ./clickhouse-cluster.xml:/etc/clickhouse-server/config.d/cluster.xml + - ./clickhouse-cluster-2.xml:/etc/clickhouse-server/config.d/cluster-2.xml + ports: + - 127.0.0.1:9001:9000/tcp + clickhouse-3: + <<: *clickhouse + volumes: + - ./clickhouse-cluster.xml:/etc/clickhouse-server/config.d/cluster.xml + - ./clickhouse-cluster-3.xml:/etc/clickhouse-server/config.d/cluster-3.xml + ports: + - 127.0.0.1:9002:9000/tcp + clickhouse-4: + <<: *clickhouse + volumes: + - ./clickhouse-cluster.xml:/etc/clickhouse-server/config.d/cluster.xml + - ./clickhouse-cluster-4.xml:/etc/clickhouse-server/config.d/cluster-4.xml + ports: + - 127.0.0.1:9003:9000/tcp srlinux: extends: diff --git a/orchestrator/clickhouse/geoip_test.go b/orchestrator/clickhouse/geoip_test.go index 847d3110..483a11b4 100644 --- a/orchestrator/clickhouse/geoip_test.go +++ b/orchestrator/clickhouse/geoip_test.go @@ -19,7 +19,7 @@ func TestNetworkGeoip(t *testing.T) { config := DefaultConfiguration() config.SkipMigrations = true r := reporter.NewMock(t) - clickHouseComponent := clickhousedb.SetupClickHouse(t, r) + clickHouseComponent := clickhousedb.SetupClickHouse(t, r, false) c, err := New(r, config, Dependencies{ Daemon: daemon.NewMock(t), diff --git a/orchestrator/clickhouse/http_test.go b/orchestrator/clickhouse/http_test.go index 1fb5331c..5e95d4e0 100644 --- a/orchestrator/clickhouse/http_test.go +++ b/orchestrator/clickhouse/http_test.go @@ -18,7 +18,7 @@ import ( func TestHTTPEndpoints(t *testing.T) { r := reporter.NewMock(t) - clickHouseComponent := clickhousedb.SetupClickHouse(t, r) + clickHouseComponent := clickhousedb.SetupClickHouse(t, r, false) config := DefaultConfiguration() config.SkipMigrations = true config.Networks = helpers.MustNewSubnetMap(map[string]NetworkAttributes{ @@ -113,7 +113,7 @@ func TestHTTPEndpoints(t *testing.T) { func TestAdditionalASNs(t *testing.T) { r := reporter.NewMock(t) - clickHouseComponent := clickhousedb.SetupClickHouse(t, r) + clickHouseComponent := clickhousedb.SetupClickHouse(t, r, false) config := DefaultConfiguration() config.ASNs = map[uint32]string{ 1: "New network", diff --git a/orchestrator/clickhouse/migrations.go b/orchestrator/clickhouse/migrations.go index 437363ee..d859691a 100644 --- a/orchestrator/clickhouse/migrations.go +++ b/orchestrator/clickhouse/migrations.go @@ -66,16 +66,17 @@ func (c *Component) migrateDatabase() error { // Create dictionaries err := c.wrapMigrations( - func() error { + ctx, + func(ctx context.Context) error { return c.createDictionary(ctx, schema.DictionaryASNs, "hashed", "`asn` UInt32 INJECTIVE, `name` String", "asn") - }, func() error { + }, func(ctx context.Context) error { return c.createDictionary(ctx, schema.DictionaryProtocols, "hashed", "`proto` UInt8 INJECTIVE, `name` String, `description` String", "proto") - }, func() error { + }, func(ctx context.Context) error { return c.createDictionary(ctx, schema.DictionaryICMP, "complex_key_hashed", "`proto` UInt8, `type` UInt8, `code` UInt8, `name` String", "proto, type, code") - }, func() error { + }, func(ctx context.Context) error { return c.createDictionary(ctx, schema.DictionaryNetworks, "ip_trie", "`network` String, `name` String, `role` String, `site` String, `region` String, `city` String, `state` String, `country` String, `tenant` String, `asn` UInt32", "network") @@ -85,7 +86,7 @@ func (c *Component) migrateDatabase() error { } // Prepare custom dictionary migrations - var dictMigrations []func() error + var dictMigrations []func(context.Context) error for k, v := range c.d.Schema.GetCustomDictConfig() { var schemaStr []string var keys []string @@ -103,7 +104,7 @@ func (c *Component) migrateDatabase() error { // This is only an attribute. We only need it in the schema schemaStr = append(schemaStr, fmt.Sprintf("`%s` %s DEFAULT '%s'", a.Name, a.Type, defaultValue)) } - dictMigrations = append(dictMigrations, func() error { + dictMigrations = append(dictMigrations, func(ctx context.Context) error { return c.createDictionary( ctx, fmt.Sprintf("custom_dict_%s", k), @@ -113,17 +114,22 @@ func (c *Component) migrateDatabase() error { }) } // Create custom dictionaries - err = c.wrapMigrations(dictMigrations...) + err = c.wrapMigrations(ctx, dictMigrations...) if err != nil { return err } // Create the various non-raw flow tables for _, resolution := range c.config.Resolutions { - err := c.wrapMigrations( - func() error { + err := c.wrapMigrations(ctx, + func(ctx context.Context) error { return c.createOrUpdateFlowsTable(ctx, resolution) - }, func() error { + }, func(ctx context.Context) error { + if resolution.Interval == 0 { + return c.createDistributedTable(ctx, "flows") + } + return c.createDistributedTable(ctx, fmt.Sprintf("flows_%s", resolution.Interval)) + }, func(ctx context.Context) error { return c.createFlowsConsumerView(ctx, resolution) }) if err != nil { @@ -132,22 +138,17 @@ func (c *Component) migrateDatabase() error { } // Remaining tables - err = c.wrapMigrations( - func() error { - return c.createExportersTable(ctx) - }, func() error { - return c.createExportersConsumerView(ctx) - }, func() error { - return c.createRawFlowsTable(ctx) - }, func() error { - return c.createRawFlowsConsumerView(ctx) - }, func() error { - return c.createRawFlowsErrors(ctx) - }, func() error { - return c.createRawFlowsErrorsConsumerView(ctx) - }, func() error { - return c.deleteOldRawFlowsErrorsView(ctx) + err = c.wrapMigrations(ctx, + c.createExportersTable, + c.createExportersConsumerView, + c.createRawFlowsTable, + c.createRawFlowsConsumerView, + c.createRawFlowsErrors, + func(ctx context.Context) error { + return c.createDistributedTable(ctx, "flows_raw_errors") }, + c.createRawFlowsErrorsConsumerView, + c.deleteOldRawFlowsErrorsView, ) if err != nil { return err @@ -158,7 +159,7 @@ func (c *Component) migrateDatabase() error { c.r.Info().Msg("database migration done") // Reload dictionaries - if err := c.d.ClickHouse.Exec(ctx, "SYSTEM RELOAD DICTIONARIES"); err != nil { + if err := c.d.ClickHouse.ExecOnCluster(ctx, "SYSTEM RELOAD DICTIONARIES"); err != nil { c.r.Err(err).Msg("unable to reload dictionaries after migration") } @@ -190,5 +191,5 @@ func (c *Component) getHTTPBaseURL(address string) (string, error) { // ReloadDictionary will reload the specified dictionnary. func (c *Component) ReloadDictionary(ctx context.Context, dictName string) error { - return c.d.ClickHouse.Exec(ctx, fmt.Sprintf("SYSTEM RELOAD DICTIONARY %s", dictName)) + return c.d.ClickHouse.ExecOnCluster(ctx, fmt.Sprintf("SYSTEM RELOAD DICTIONARY %s", dictName)) } diff --git a/orchestrator/clickhouse/migrations_helpers.go b/orchestrator/clickhouse/migrations_helpers.go index 4e3cd38d..c757abe4 100644 --- a/orchestrator/clickhouse/migrations_helpers.go +++ b/orchestrator/clickhouse/migrations_helpers.go @@ -25,9 +25,9 @@ var errSkipStep = errors.New("migration: skip this step") // wrapMigrations can be used to wrap migration functions. It will keep the // metrics up-to-date as long as the migration function returns `errSkipStep` // when a step is skipped. -func (c *Component) wrapMigrations(fns ...func() error) error { +func (c *Component) wrapMigrations(ctx context.Context, fns ...func(context.Context) error) error { for _, fn := range fns { - if err := fn(); err == nil { + if err := fn(ctx); err == nil { c.metrics.migrationsApplied.Inc() } else if err == errSkipStep { c.metrics.migrationsNotApplied.Inc() @@ -82,10 +82,42 @@ func (c *Component) tableAlreadyExists(ctx context.Context, table, column, targe } c.r.Debug(). Str("target", target).Str("existing", existing). - Msgf("table %s is assumed to not exist in the expected state", table) + Msgf("table %s state difference detected", table) return false, nil } +// mergeTreeEngine returns a MergeTree engine definition, either plain or using +// Replicated if we are on a cluster. +func (c *Component) mergeTreeEngine(table string, variant string, args ...string) string { + if c.config.Cluster != "" { + return fmt.Sprintf(`Replicated%sMergeTree(%s)`, variant, strings.Join( + append([]string{ + fmt.Sprintf("'/clickhouse/tables/shard-{shard}/%s'", table), + "'replica-{replica}'", + }, args...), + ", ")) + } + if len(args) == 0 { + return fmt.Sprintf("%sMergeTree", variant) + } + return fmt.Sprintf("%sMergeTree(%s)", variant, strings.Join(args, ", ")) +} + +// distributedTable turns a table name to the matching distributed table if we +// are in a cluster. +func (c *Component) distributedTable(table string) string { + return table +} + +// localTable turns a table name to the matching local distributed table if we +// are in a cluster. +func (c *Component) localTable(table string) string { + if c.config.Cluster != "" { + return fmt.Sprintf("%s_local", table) + } + return table +} + // createDictionary creates the provided dictionary. func (c *Component) createDictionary(ctx context.Context, name, layout, schema, primary string) error { url := fmt.Sprintf("%s/api/v0/orchestrator/clickhouse/%s.csv", c.config.OrchestratorURL, name) @@ -120,13 +152,13 @@ LAYOUT({{ .Layout }}()) } c.r.Info().Msgf("create dictionary %s", name) createOrReplaceQuery := strings.Replace(createQuery, "CREATE ", "CREATE OR REPLACE ", 1) - if err := c.d.ClickHouse.Exec(ctx, createOrReplaceQuery); err != nil { + if err := c.d.ClickHouse.ExecOnCluster(ctx, createOrReplaceQuery); err != nil { return fmt.Errorf("cannot create dictionary %s: %w", name, err) } return nil } -// createExportersConsumerView creates the exporters table. +// createExportersTable creates the exporters table. This table is always local. func (c *Component) createExportersTable(ctx context.Context) error { // Select the columns we need cols := []string{} @@ -142,22 +174,25 @@ func (c *Component) createExportersTable(ctx context.Context) error { } // Build CREATE TABLE + name := "exporters" createQuery, err := stemplate( - `CREATE TABLE {{ .Database }}.exporters + `CREATE TABLE {{ .Database }}.{{ .Table }} ({{ .Schema }}) -ENGINE = ReplacingMergeTree(TimeReceived) +ENGINE = {{ .Engine }} ORDER BY (ExporterAddress, IfName) TTL TimeReceived + toIntervalDay(1)`, gin.H{ "Database": c.config.Database, + "Table": name, "Schema": strings.Join(cols, ", "), + "Engine": c.mergeTreeEngine(name, "Replacing", "TimeReceived"), }) if err != nil { return fmt.Errorf("cannot build query to create exporters view: %w", err) } // Check if the table already exists - if ok, err := c.tableAlreadyExists(ctx, "exporters", "create_table_query", createQuery); err != nil { + if ok, err := c.tableAlreadyExists(ctx, name, "create_table_query", createQuery); err != nil { return err } else if ok { c.r.Info().Msg("exporters table already exists, skip migration") @@ -166,13 +201,11 @@ TTL TimeReceived + toIntervalDay(1)`, // Drop existing table and recreate c.r.Info().Msg("create exporters table") - if err := c.d.ClickHouse.Exec(ctx, `DROP TABLE IF EXISTS exporters SYNC`); err != nil { - return fmt.Errorf("cannot drop existing exporters view: %w", err) - } ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{ "allow_suspicious_low_cardinality_types": 1, })) - if err := c.d.ClickHouse.Exec(ctx, createQuery); err != nil { + createOrReplaceQuery := strings.Replace(createQuery, "CREATE ", "CREATE OR REPLACE ", 1) + if err := c.d.ClickHouse.ExecOnCluster(ctx, createOrReplaceQuery); err != nil { return fmt.Errorf("cannot create exporters table: %w", err) } @@ -196,8 +229,9 @@ func (c *Component) createExportersConsumerView(ctx context.Context) error { // Build SELECT query selectQuery, err := stemplate( - `SELECT DISTINCT {{ .Columns }} FROM {{ .Database }}.flows ARRAY JOIN arrayEnumerate([1, 2]) AS num`, + `SELECT DISTINCT {{ .Columns }} FROM {{ .Database }}.{{ .Table }} ARRAY JOIN arrayEnumerate([1, 2]) AS num`, gin.H{ + "Table": c.distributedTable("flows"), "Database": c.config.Database, "Columns": strings.Join(cols, ", "), }) @@ -217,13 +251,12 @@ func (c *Component) createExportersConsumerView(ctx context.Context) error { // Drop existing table and recreate c.r.Info().Msg("create exporters view") - if err := c.d.ClickHouse.Exec(ctx, `DROP TABLE IF EXISTS exporters_consumer SYNC`); err != nil { + if err := c.d.ClickHouse.ExecOnCluster(ctx, `DROP TABLE IF EXISTS exporters_consumer SYNC`); err != nil { return fmt.Errorf("cannot drop existing exporters view: %w", err) } - if err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf(` -CREATE MATERIALIZED VIEW exporters_consumer TO exporters -AS %s -`, selectQuery)); err != nil { + if err := c.d.ClickHouse.ExecOnCluster(ctx, fmt.Sprintf(` +CREATE MATERIALIZED VIEW exporters_consumer TO %s AS %s +`, "exporters", selectQuery)); err != nil { return fmt.Errorf("cannot create exporters view: %w", err) } @@ -282,14 +315,14 @@ func (c *Component) createRawFlowsTable(ctx context.Context) error { fmt.Sprintf("%s_errors", tableName), tableName, } { - if err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, table)); err != nil { + if err := c.d.ClickHouse.ExecOnCluster(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, table)); err != nil { return fmt.Errorf("cannot drop %s: %w", table, err) } } ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{ "allow_suspicious_low_cardinality_types": 1, })) - if err := c.d.ClickHouse.Exec(ctx, createQuery); err != nil { + if err := c.d.ClickHouse.ExecOnCluster(ctx, createQuery); err != nil { return fmt.Errorf("cannot create raw flows table: %w", err) } @@ -384,12 +417,12 @@ func (c *Component) createRawFlowsConsumerView(ctx context.Context) error { // Drop and create c.r.Info().Msg("create raw flows consumer view") - if err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName)); err != nil { + if err := c.d.ClickHouse.ExecOnCluster(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName)); err != nil { return fmt.Errorf("cannot drop table %s: %w", viewName, err) } - if err := c.d.ClickHouse.Exec(ctx, - fmt.Sprintf("CREATE MATERIALIZED VIEW %s TO flows AS %s", - viewName, selectQuery)); err != nil { + if err := c.d.ClickHouse.ExecOnCluster(ctx, + fmt.Sprintf("CREATE MATERIALIZED VIEW %s TO %s AS %s", + viewName, c.distributedTable("flows"), selectQuery)); err != nil { return fmt.Errorf("cannot create raw flows consumer view: %w", err) } @@ -397,39 +430,42 @@ func (c *Component) createRawFlowsConsumerView(ctx context.Context) error { } func (c *Component) createRawFlowsErrors(ctx context.Context) error { - createQuery, err := stemplate(`CREATE TABLE {{ .Database }}.flows_raw_errors + name := c.localTable("flows_raw_errors") + createQuery, err := stemplate(`CREATE TABLE {{ .Database }}.{{ .Table }} (`+"`timestamp`"+` DateTime, `+"`topic`"+` LowCardinality(String), `+"`partition`"+` UInt64, `+"`offset`"+` UInt64, `+"`raw`"+` String, `+"`error`"+` String) -ENGINE = MergeTree +ENGINE = {{ .Engine }} PARTITION BY toYYYYMMDDhhmmss(toStartOfHour(timestamp)) ORDER BY (timestamp, topic, partition, offset) TTL timestamp + toIntervalDay(1) `, gin.H{ + "Table": name, "Database": c.config.Database, + "Engine": c.mergeTreeEngine(name, ""), }) if err != nil { return fmt.Errorf("cannot build query to create flow error table: %w", err) } - if ok, err := c.tableAlreadyExists(ctx, "flows_raw_errors", "create_table_query", createQuery); err != nil { + if ok, err := c.tableAlreadyExists(ctx, name, "create_table_query", createQuery); err != nil { return err } else if ok { - c.r.Info().Msgf("table flows_raw_errors already exists, skip migration") + c.r.Info().Msgf("table %s already exists, skip migration", name) return errSkipStep } - c.r.Info().Msg("create table flows_raw_errors") + c.r.Info().Msgf("create table %s", name) createOrReplaceQuery := strings.Replace(createQuery, "CREATE ", "CREATE OR REPLACE ", 1) - if err := c.d.ClickHouse.Exec(ctx, createOrReplaceQuery); err != nil { - return fmt.Errorf("cannot create table flows_raw_errors: %w", err) + if err := c.d.ClickHouse.ExecOnCluster(ctx, createOrReplaceQuery); err != nil { + return fmt.Errorf("cannot create table %s: %w", name, err) } return nil } func (c *Component) createRawFlowsErrorsConsumerView(ctx context.Context) error { - tableName := fmt.Sprintf("flows_%s_raw", c.d.Schema.ProtobufMessageHash()) + source := fmt.Sprintf("flows_%s_raw", c.d.Schema.ProtobufMessageHash()) viewName := "flows_raw_errors_consumer" // Build SELECT query @@ -444,7 +480,7 @@ SELECT FROM {{ .Database }}.{{ .Table }} WHERE length(_error) > 0`, gin.H{ "Database": c.config.Database, - "Table": tableName, + "Table": source, }) if err != nil { return fmt.Errorf("cannot build select statement for raw flows error: %w", err) @@ -460,12 +496,12 @@ WHERE length(_error) > 0`, gin.H{ // Drop and create c.r.Info().Msg("create raw flows errors view") - if err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName)); err != nil { + if err := c.d.ClickHouse.ExecOnCluster(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName)); err != nil { return fmt.Errorf("cannot drop table %s: %w", viewName, err) } - if err := c.d.ClickHouse.Exec(ctx, + if err := c.d.ClickHouse.ExecOnCluster(ctx, fmt.Sprintf(`CREATE MATERIALIZED VIEW %s TO %s AS %s`, - viewName, "flows_raw_errors", selectQuery)); err != nil { + viewName, c.distributedTable("flows_raw_errors"), selectQuery)); err != nil { return fmt.Errorf("cannot create raw flows errors view: %w", err) } @@ -486,7 +522,7 @@ func (c *Component) deleteOldRawFlowsErrorsView(ctx context.Context) error { // Drop c.r.Info().Msg("delete old raw flows errors view") - if err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName)); err != nil { + if err := c.d.ClickHouse.ExecOnCluster(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName)); err != nil { return fmt.Errorf("cannot drop table %s: %w", viewName, err) } return nil @@ -502,6 +538,7 @@ func (c *Component) createOrUpdateFlowsTable(ctx context.Context, resolution Res } else { tableName = fmt.Sprintf("flows_%s", resolution.Interval) } + tableName = c.localTable(tableName) partitionInterval := uint64((resolution.TTL / time.Duration(c.config.MaxPartitions)).Seconds()) ttl := uint64(resolution.TTL.Seconds()) @@ -513,20 +550,22 @@ func (c *Component) createOrUpdateFlowsTable(ctx context.Context, resolution Res var err error if resolution.Interval == 0 { createQuery, err = stemplate(` -CREATE TABLE flows ({{ .Schema }}) -ENGINE = MergeTree +CREATE TABLE {{ .Table }} ({{ .Schema }}) +ENGINE = {{ .Engine }} PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, INTERVAL {{ .PartitionInterval }} second)) ORDER BY (TimeReceived, ExporterAddress, InIfName, OutIfName) TTL TimeReceived + toIntervalSecond({{ .TTL }}) `, gin.H{ + "Table": tableName, "Schema": c.d.Schema.ClickHouseCreateTable(), "PartitionInterval": partitionInterval, "TTL": ttl, + "Engine": c.mergeTreeEngine(tableName, ""), }) } else { createQuery, err = stemplate(` CREATE TABLE {{ .Table }} ({{ .Schema }}) -ENGINE = SummingMergeTree((Bytes, Packets)) +ENGINE = {{ .Engine }} PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, INTERVAL {{ .PartitionInterval }} second)) PRIMARY KEY ({{ .PrimaryKey }}) ORDER BY ({{ .SortingKey }}) @@ -538,12 +577,13 @@ TTL TimeReceived + toIntervalSecond({{ .TTL }}) "PrimaryKey": strings.Join(c.d.Schema.ClickHousePrimaryKeys(), ", "), "SortingKey": strings.Join(c.d.Schema.ClickHouseSortingKeys(), ", "), "TTL": ttl, + "Engine": c.mergeTreeEngine(tableName, "Summing", "(Bytes, Packets)"), }) } if err != nil { return fmt.Errorf("cannot build create table statement for %s: %w", tableName, err) } - if err := c.d.ClickHouse.Exec(ctx, createQuery); err != nil { + if err := c.d.ClickHouse.ExecOnCluster(ctx, createQuery); err != nil { return fmt.Errorf("cannot create %s: %w", tableName, err) } return nil @@ -598,7 +638,7 @@ outer: if (wantedColumn.ClickHouseAlias != "") != (existingColumn.DefaultKind == "ALIAS") { // either the column was an alias and should be none, or the other way around. Either way, we need to recreate. c.r.Logger.Debug().Msg(fmt.Sprintf("column %s alias content has changed, recreating. New ALIAS: %s", existingColumn.Name, wantedColumn.ClickHouseAlias)) - err := c.d.ClickHouse.Exec(ctx, + err := c.d.ClickHouse.ExecOnCluster(ctx, fmt.Sprintf("ALTER TABLE %s DROP COLUMN %s", tableName, existingColumn.Name)) if err != nil { return fmt.Errorf("cannot drop %s from %s to cleanup aliasing: %w", @@ -615,7 +655,7 @@ outer: } if resolution.Interval > 0 && !wantedColumn.ClickHouseNotSortingKey && existingColumn.IsSortingKey == 0 { // That's something we can fix, but we need to drop it before recreating it - err := c.d.ClickHouse.Exec(ctx, + err := c.d.ClickHouse.ExecOnCluster(ctx, fmt.Sprintf("ALTER TABLE %s DROP COLUMN %s", tableName, existingColumn.Name)) if err != nil { return fmt.Errorf("cannot drop %s from %s to fix ordering: %w", @@ -652,11 +692,11 @@ outer: if resolution.Interval > 0 { // Drop the view viewName := fmt.Sprintf("%s_consumer", tableName) - if err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName)); err != nil { + if err := c.d.ClickHouse.ExecOnCluster(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName)); err != nil { return fmt.Errorf("cannot drop %s: %w", viewName, err) } } - err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf("ALTER TABLE %s %s", tableName, strings.Join(modifications, ", "))) + err := c.d.ClickHouse.ExecOnCluster(ctx, fmt.Sprintf("ALTER TABLE %s %s", tableName, strings.Join(modifications, ", "))) if err != nil { return fmt.Errorf("cannot update table %s: %w", tableName, err) } @@ -670,7 +710,7 @@ outer: } else if !ok { c.r.Warn(). Msgf("updating TTL of %s with interval %s, this can take a long time", tableName, resolution.Interval) - if err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf("ALTER TABLE %s MODIFY %s", tableName, ttlClause)); err != nil { + if err := c.d.ClickHouse.ExecOnCluster(ctx, fmt.Sprintf("ALTER TABLE %s MODIFY %s", tableName, ttlClause)); err != nil { return fmt.Errorf("cannot modify TTL for table %s: %w", tableName, err) } return nil @@ -693,8 +733,9 @@ func (c *Component) createFlowsConsumerView(ctx context.Context, resolution Reso SELECT toStartOfInterval(TimeReceived, toIntervalSecond({{ .Seconds }})) AS TimeReceived, {{ .Columns }} -FROM {{ .Database }}.flows`, gin.H{ +FROM {{ .Database }}.{{ .Table }}`, gin.H{ "Database": c.config.Database, + "Table": c.localTable("flows"), "Seconds": uint64(resolution.Interval.Seconds()), "Columns": strings.Join(c.d.Schema.ClickHouseSelectColumns( schema.ClickHouseSkipTimeReceived, @@ -715,12 +756,84 @@ FROM {{ .Database }}.flows`, gin.H{ // Drop and create c.r.Info().Msgf("create %s", viewName) - if err := c.d.ClickHouse.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName)); err != nil { + if err := c.d.ClickHouse.ExecOnCluster(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s SYNC`, viewName)); err != nil { return fmt.Errorf("cannot drop table %s: %w", viewName, err) } - if err := c.d.ClickHouse.Exec(ctx, - fmt.Sprintf(`CREATE MATERIALIZED VIEW %s TO %s AS %s`, viewName, tableName, selectQuery)); err != nil { + if err := c.d.ClickHouse.ExecOnCluster(ctx, + fmt.Sprintf(`CREATE MATERIALIZED VIEW %s TO %s AS %s`, viewName, + c.localTable(tableName), selectQuery)); err != nil { return fmt.Errorf("cannot create %s: %w", viewName, err) } return nil } + +// createDistributedTable creates the distributed version of an existing table. +// If the table already exists and does not match the definition, it is +// replaced. +func (c *Component) createDistributedTable(ctx context.Context, source string) error { + if c.config.Cluster == "" { + return errSkipStep + } + // Get the schema of the source table + var existingColumns []struct { + Name string `ch:"name"` + Type string `ch:"type"` + CompressionCodec string `ch:"compression_codec"` + DefaultKind string `ch:"default_kind"` + DefaultExpression string `ch:"default_expression"` + } + if err := c.d.ClickHouse.Select(ctx, &existingColumns, ` +SELECT name, type, compression_codec, default_kind, default_expression +FROM system.columns +WHERE database = $1 AND table = $2 +ORDER BY position ASC +`, c.config.Database, c.localTable(source)); err != nil { + return fmt.Errorf("cannot query columns table: %w", err) + } + cols := []string{} + for _, column := range existingColumns { + col := fmt.Sprintf("`%s` %s", column.Name, column.Type) + if column.CompressionCodec != "" { + col = fmt.Sprintf("%s %s", col, column.CompressionCodec) + } + if column.DefaultKind != "" { + col = fmt.Sprintf("%s %s %s", col, column.DefaultKind, column.DefaultExpression) + } + cols = append(cols, col) + } + + // Build the CREATE TABLE + createQuery, err := stemplate( + `CREATE TABLE {{ .Database }}.{{ .Target }} +({{ .Schema }}) +ENGINE = Distributed('{{ .Cluster }}', '{{ .Database}}', '{{ .Source }}', rand())`, + gin.H{ + "Cluster": c.config.Cluster, + "Database": c.config.Database, + "Source": c.localTable(source), + "Target": c.distributedTable(source), + "Schema": strings.Join(cols, ", "), + }) + if err != nil { + return fmt.Errorf("cannot build query to create exporters view: %w", err) + } + + // Check if the table already exists + if ok, err := c.tableAlreadyExists(ctx, c.distributedTable(source), "create_table_query", createQuery); err != nil { + return err + } else if ok { + c.r.Info().Msgf("%s already exists, skip migration", c.distributedTable(source)) + return errSkipStep + } + + // Recreate the table + c.r.Info().Msgf("create %s", c.distributedTable(source)) + createOrReplaceQuery := strings.Replace(createQuery, "CREATE ", "CREATE OR REPLACE ", 1) + ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{ + "allow_suspicious_low_cardinality_types": 1, + })) + if err := c.d.ClickHouse.ExecOnCluster(ctx, createOrReplaceQuery); err != nil { + return fmt.Errorf("cannot create %s: %w", c.distributedTable(source), err) + } + return nil +} diff --git a/orchestrator/clickhouse/migrations_test.go b/orchestrator/clickhouse/migrations_test.go index b09652b5..38a19d22 100644 --- a/orchestrator/clickhouse/migrations_test.go +++ b/orchestrator/clickhouse/migrations_test.go @@ -28,26 +28,6 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" ) -func dropAllTables(t *testing.T, ch *clickhousedb.Component) { - rows, err := ch.Query(context.Background(), `SELECT currentDatabase()`) - if err != nil { - t.Fatalf("Query() error:\n%+v", err) - } - for rows.Next() { - var database string - if err := rows.Scan(&database); err != nil { - t.Fatalf("Scan() error:\n%+v", err) - } - t.Logf("(%s) Drop database %s", time.Now(), database) - if err := ch.Exec(context.Background(), fmt.Sprintf("DROP DATABASE %s", database)); err != nil { - t.Fatalf("Exec() error:\n%+v", err) - } - if err := ch.Exec(context.Background(), fmt.Sprintf("CREATE DATABASE %s ENGINE = Atomic", database)); err != nil { - t.Fatalf("Exec() error:\n%+v", err) - } - } -} - type tableWithSchema struct { Table string Schema string @@ -57,7 +37,7 @@ const dumpAllTablesQuery = ` SELECT table, create_table_query FROM system.tables WHERE database=currentDatabase() AND table NOT LIKE '.%' -ORDER BY indexOf(['Dictionary'], engine) DESC, indexOf(['MaterializedView'], engine) ASC +ORDER BY indexOf(['Dictionary'], engine) DESC, indexOf(['Distributed', 'MaterializedView'], engine) ASC ` func dumpAllTables(t *testing.T, ch *clickhousedb.Component, schemaComponent *schema.Component) []tableWithSchema { @@ -72,41 +52,37 @@ func dumpAllTables(t *testing.T, ch *clickhousedb.Component, schemaComponent *sc if err := rows.Scan(&table, &schema); err != nil { t.Fatalf("Scan() error:\n%+v", err) } - if !oldTable(schemaComponent, table) { + if !isOldTable(schemaComponent, table) { schemas = append(schemas, tableWithSchema{table, schema}) } } return schemas } +func dropAllTables(t *testing.T, ch *clickhousedb.Component) { + t.Logf("(%s) Drop database default", time.Now()) + for _, sql := range []string{"DROP DATABASE IF EXISTS default SYNC", "CREATE DATABASE IF NOT EXISTS default"} { + if err := ch.ExecOnCluster(context.Background(), sql); err != nil { + t.Fatalf("Exec(%q) error:\n%+v", sql, err) + } + } +} + func loadTables(t *testing.T, ch *clickhousedb.Component, sch *schema.Component, schemas []tableWithSchema) { ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{ "allow_suspicious_low_cardinality_types": 1, })) for _, tws := range schemas { - if oldTable(sch, tws.Table) { + if isOldTable(sch, tws.Table) { continue } t.Logf("Load table %s", tws.Table) - if err := ch.Exec(ctx, tws.Schema); err != nil { + if err := ch.ExecOnCluster(ctx, tws.Schema); err != nil { t.Fatalf("Exec(%q) error:\n%+v", tws.Schema, err) } } } -func oldTable(schema *schema.Component, table string) bool { - if strings.Contains(table, schema.ProtobufMessageHash()) { - return false - } - if table == "flows_raw_errors" { - return false - } - if strings.HasSuffix(table, "_raw") || strings.HasSuffix(table, "_raw_consumer") || strings.HasSuffix(table, "_raw_errors") { - return true - } - return false -} - // loadAllTables load tables from a CSV file. Use `format CSV` with // query from dumpAllTables. func loadAllTables(t *testing.T, ch *clickhousedb.Component, sch *schema.Component, filename string) { @@ -139,6 +115,46 @@ func loadAllTables(t *testing.T, ch *clickhousedb.Component, sch *schema.Compone t.Logf("(%s) Loaded all tables from dump %s", time.Now(), filename) } +func isOldTable(schema *schema.Component, table string) bool { + if strings.Contains(table, schema.ProtobufMessageHash()) { + return false + } + if table == "flows_raw_errors" { + return false + } + if strings.HasSuffix(table, "_raw") || strings.HasSuffix(table, "_raw_consumer") || strings.HasSuffix(table, "_raw_errors") { + return true + } + return false +} + +// startTestComponent starts a test component and wait for migrations to be done +func startTestComponent(t *testing.T, r *reporter.Reporter, chComponent *clickhousedb.Component, sch *schema.Component) *Component { + t.Helper() + if sch == nil { + sch = schema.NewMock(t) + } + configuration := DefaultConfiguration() + configuration.OrchestratorURL = "http://127.0.0.1:0" + configuration.Kafka.Configuration = kafka.DefaultConfiguration() + // This is a bit hacky, in real setup, the same configuration block is + // used for both clickhousedb.Component and clickhouse.Component. + configuration.Cluster = chComponent.ClusterName() + ch, err := New(r, configuration, Dependencies{ + Daemon: daemon.NewMock(t), + HTTP: httpserver.NewMock(t, r), + Schema: sch, + ClickHouse: chComponent, + GeoIP: geoip.NewMock(t, r, true), + }) + if err != nil { + t.Fatalf("New() error:\n%+v", err) + } + helpers.StartStop(t, ch) + waitMigrations(t, ch) + return ch +} + func waitMigrations(t *testing.T, ch *Component) { t.Helper() select { @@ -157,7 +173,7 @@ func waitMigrations(t *testing.T, ch *Component) { func TestGetHTTPBaseURL(t *testing.T) { r := reporter.NewMock(t) - clickHouseComponent := clickhousedb.SetupClickHouse(t, r) + clickHouseComponent := clickhousedb.SetupClickHouse(t, r, false) http := httpserver.NewMock(t, r) c, err := New(r, DefaultConfiguration(), Dependencies{ Daemon: daemon.NewMock(t), @@ -190,35 +206,28 @@ func TestGetHTTPBaseURL(t *testing.T) { } } -func TestMigration(t *testing.T) { - r := reporter.NewMock(t) - chComponent := clickhousedb.SetupClickHouse(t, r) - +func testMigrationFromPreviousStates(t *testing.T, cluster bool) { var lastRun []tableWithSchema var lastSteps int files, err := os.ReadDir("testdata/states") if err != nil { t.Fatalf("ReadDir(%q) error:\n%+v", "testdata/states", err) } + + r := reporter.NewMock(t) + chComponent := clickhousedb.SetupClickHouse(t, r, cluster) + for _, f := range files { - t.Run(f.Name(), func(t *testing.T) { + if !cluster && strings.Contains(f.Name(), "cluster") { + continue + } + if cluster && !strings.Contains(f.Name(), "cluster") { + continue + } + if ok := t.Run(fmt.Sprintf("from %s", f.Name()), func(t *testing.T) { loadAllTables(t, chComponent, schema.NewMock(t), path.Join("testdata/states", f.Name())) r := reporter.NewMock(t) - configuration := DefaultConfiguration() - configuration.OrchestratorURL = "http://127.0.0.1:0" - configuration.Kafka.Configuration = kafka.DefaultConfiguration() - ch, err := New(r, configuration, Dependencies{ - Daemon: daemon.NewMock(t), - HTTP: httpserver.NewMock(t, r), - Schema: schema.NewMock(t), - ClickHouse: chComponent, - GeoIP: geoip.NewMock(t, r, true), - }) - if err != nil { - t.Fatalf("New() error:\n%+v", err) - } - helpers.StartStop(t, ch) - waitMigrations(t, ch) + ch := startTestComponent(t, r, chComponent, nil) // Check with the ClickHouse client we have our tables rows, err := chComponent.Query(context.Background(), ` @@ -235,7 +244,7 @@ WHERE database=currentDatabase() AND table NOT LIKE '.%'`) if err := rows.Scan(&table); err != nil { t.Fatalf("Scan() error:\n%+v", err) } - if !oldTable(ch.d.Schema, table) { + if !isOldTable(ch.d.Schema, table) { got = append(got, table) } } @@ -243,21 +252,36 @@ WHERE database=currentDatabase() AND table NOT LIKE '.%'`) schema.DictionaryASNs, "exporters", "exporters_consumer", + // No exporters_local, because exporters is always local "flows", "flows_1h0m0s", "flows_1h0m0s_consumer", + "flows_1h0m0s_local", "flows_1m0s", "flows_1m0s_consumer", + "flows_1m0s_local", "flows_5m0s", "flows_5m0s_consumer", + "flows_5m0s_local", fmt.Sprintf("flows_%s_raw", hash), fmt.Sprintf("flows_%s_raw_consumer", hash), + "flows_local", "flows_raw_errors", "flows_raw_errors_consumer", + "flows_raw_errors_local", schema.DictionaryICMP, schema.DictionaryNetworks, schema.DictionaryProtocols, } + if !cluster { + filteredExpected := []string{} + for _, item := range expected { + if !strings.HasSuffix(item, "_local") { + filteredExpected = append(filteredExpected, item) + } + } + expected = filteredExpected + } if diff := helpers.Diff(got, expected); diff != "" { t.Fatalf("SHOW TABLES (-got, +want):\n%s", diff) } @@ -272,318 +296,167 @@ WHERE database=currentDatabase() AND table NOT LIKE '.%'`) lastRun = currentRun lastSteps, _ = strconv.Atoi(gotMetrics["applied_steps_total"]) t.Logf("%d steps applied for this migration", lastSteps) - }) - if t.Failed() { - row := chComponent.QueryRow(context.Background(), ` -SELECT query, exception -FROM system.query_log -WHERE client_name LIKE 'akvorado/%' -AND query NOT LIKE '%ORDER BY event_time_microseconds%' -ORDER BY event_time_microseconds DESC -LIMIT 1`) - var lastQuery, exception string - if err := row.Scan(&lastQuery, &exception); err == nil { - t.Logf("last ClickHouse query: %s", lastQuery) - if exception != "" { - t.Logf("last ClickHouse error: %s", exception) - } - } - break + }); !ok { + return } } - if !t.Failed() { - // One more time - t.Run("idempotency", func(t *testing.T) { - r := reporter.NewMock(t) - configuration := DefaultConfiguration() - configuration.OrchestratorURL = "http://127.0.0.1:0" - configuration.Kafka.Configuration = kafka.DefaultConfiguration() - ch, err := New(r, configuration, Dependencies{ - Daemon: daemon.NewMock(t), - HTTP: httpserver.NewMock(t, r), - Schema: schema.NewMock(t), - ClickHouse: chComponent, - GeoIP: geoip.NewMock(t, r, true), - }) + _ = t.Run("idempotency", func(t *testing.T) { + r := reporter.NewMock(t) + startTestComponent(t, r, chComponent, nil) + + // No migration should have been applied the last time + gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps_total") + expectedMetrics := map[string]string{`applied_steps_total`: "0"} + if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" { + t.Fatalf("Metrics (-got, +want):\n%s", diff) + } + }) && t.Run("final state", func(t *testing.T) { + if lastSteps != 0 { + f, err := os.CreateTemp("", "clickhouse-dump-*.csv") if err != nil { - t.Fatalf("New() error:\n%+v", err) + t.Fatalf("CreateTemp() error:\n%+v", err) } - helpers.StartStop(t, ch) - waitMigrations(t, ch) - - // No migration should have been applied the last time - gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps_total") - expectedMetrics := map[string]string{`applied_steps_total`: "0"} - if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" { - t.Fatalf("Metrics (-got, +want):\n%s", diff) + defer f.Close() + writer := csv.NewWriter(f) + defer writer.Flush() + allTables := dumpAllTables(t, chComponent, schema.NewMock(t)) + for _, item := range allTables { + writer.Write([]string{item.Table, item.Schema}) } - }) - } - - if !t.Failed() { - t.Run("final state", func(t *testing.T) { - if lastSteps != 0 { - f, err := os.CreateTemp("", "clickhouse-dump-*.csv") - if err != nil { - t.Fatalf("CreateTemp() error:\n%+v", err) - } - defer f.Close() - writer := csv.NewWriter(f) - defer writer.Flush() - allTables := dumpAllTables(t, chComponent, schema.NewMock(t)) - for _, item := range allTables { - writer.Write([]string{item.Table, item.Schema}) - } - t.Fatalf("Last step was not idempotent. Check %s for the current dump", f.Name()) - } - }) - } - - // Also try with a full schema - if !t.Failed() { - t.Run("full schema", func(t *testing.T) { - r := reporter.NewMock(t) - configuration := DefaultConfiguration() - configuration.OrchestratorURL = "http://127.0.0.1:0" - configuration.Kafka.Configuration = kafka.DefaultConfiguration() - ch, err := New(r, configuration, Dependencies{ - Daemon: daemon.NewMock(t), - HTTP: httpserver.NewMock(t, r), - Schema: schema.NewMock(t).EnableAllColumns(), - ClickHouse: chComponent, - GeoIP: geoip.NewMock(t, r, true), - }) - if err != nil { - t.Fatalf("New() error:\n%+v", err) - } - helpers.StartStop(t, ch) - waitMigrations(t, ch) - - // We need to have at least one migration - gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps_total") - if gotMetrics["applied_steps_total"] == "0" { - t.Fatal("No migration applied when enabling all columns") - } - }) - } - - // And with a partial one - if !t.Failed() { - t.Run("partial schema", func(t *testing.T) { - r := reporter.NewMock(t) - schConfig := schema.DefaultConfiguration() - schConfig.Disabled = []schema.ColumnKey{ - schema.ColumnDst1stAS, schema.ColumnDst2ndAS, schema.ColumnDst3rdAS, - schema.ColumnDstASPath, - schema.ColumnDstCommunities, - schema.ColumnDstLargeCommunities, - schema.ColumnDstLargeCommunitiesASN, - schema.ColumnDstLargeCommunitiesLocalData1, - schema.ColumnDstLargeCommunitiesLocalData2, - } - sch, err := schema.New(schConfig) - if err != nil { - t.Fatalf("schema.New() error:\n%+v", err) - } - configuration := DefaultConfiguration() - configuration.OrchestratorURL = "http://127.0.0.1:0" - configuration.Kafka.Configuration = kafka.DefaultConfiguration() - ch, err := New(r, configuration, Dependencies{ - Daemon: daemon.NewMock(t), - HTTP: httpserver.NewMock(t, r), - Schema: sch, - ClickHouse: chComponent, - GeoIP: geoip.NewMock(t, r, true), - }) - if err != nil { - t.Fatalf("New() error:\n%+v", err) - } - helpers.StartStop(t, ch) - waitMigrations(t, ch) - - // We need to have at least one migration - gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps_total") - if gotMetrics["applied_steps_total"] == "0" { - t.Fatal("No migration applied when disabling some columns") - } - }) - } - - // Convert a column from alias to materialize - if !t.Failed() { - t.Run("materialize alias", func(t *testing.T) { - r := reporter.NewMock(t) - schConfig := schema.DefaultConfiguration() - schConfig.Materialize = []schema.ColumnKey{ - schema.ColumnSrcNetPrefix, - } - sch, err := schema.New(schConfig) - if err != nil { - t.Fatalf("schema.New() error:\n%+v", err) - } - configuration := DefaultConfiguration() - configuration.OrchestratorURL = "http://127.0.0.1:0" - configuration.Kafka.Configuration = kafka.DefaultConfiguration() - ch, err := New(r, configuration, Dependencies{ - Daemon: daemon.NewMock(t), - HTTP: httpserver.NewMock(t, r), - Schema: sch, - ClickHouse: chComponent, - GeoIP: geoip.NewMock(t, r, true), - }) - if err != nil { - t.Fatalf("New() error:\n%+v", err) - } - helpers.StartStop(t, ch) - waitMigrations(t, ch) - - // We need to have at least one migration - gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps_total") - if gotMetrics["applied_steps_total"] == "0" { - t.Fatal("No migration applied when disabling some columns") - } - - // We need SrcNetPrefix materialized and DstNetPrefix an alias - row := ch.d.ClickHouse.QueryRow(context.Background(), ` -SELECT toString(groupArray(tuple(name, default_kind))) -FROM system.columns -WHERE table = $1 -AND database = $2 -AND name LIKE $3`, "flows", ch.config.Database, "%NetPrefix") - var existing string - if err := row.Scan(&existing); err != nil { - t.Fatalf("Scan() error:\n%+v", err) - } - if diff := helpers.Diff(existing, "[('SrcNetPrefix',''),('DstNetPrefix','ALIAS')]"); diff != "" { - t.Fatalf("Unexpected state:\n%s", diff) - } - }) - } + t.Fatalf("Last step was not idempotent. Check %s for the current dump", f.Name()) + } + }) } -func TestCustomDictMigration(t *testing.T) { - r := reporter.NewMock(t) - chComponent := clickhousedb.SetupClickHouse(t, r) - dropAllTables(t, chComponent) - // First, setup a default configuration - t.Run("default schema", func(t *testing.T) { +func TestMigrationFromPreviousStates(t *testing.T) { + _ = t.Run("no cluster", func(t *testing.T) { + testMigrationFromPreviousStates(t, false) + }) && t.Run("full schema", func(t *testing.T) { r := reporter.NewMock(t) - sch, err := schema.New(schema.DefaultConfiguration()) - if err != nil { - t.Fatalf("schema.New() error:\n%+v", err) - } - configuration := DefaultConfiguration() - configuration.OrchestratorURL = "http://127.0.0.1:0" - configuration.Kafka.Configuration = kafka.DefaultConfiguration() - ch, err := New(r, configuration, Dependencies{ - Daemon: daemon.NewMock(t), - HTTP: httpserver.NewMock(t, r), - Schema: sch, - ClickHouse: chComponent, - GeoIP: geoip.NewMock(t, r, true), - }) - if err != nil { - t.Fatalf("New() error:\n%+v", err) - } - helpers.StartStop(t, ch) - waitMigrations(t, ch) + chComponent := clickhousedb.SetupClickHouse(t, r, false) + startTestComponent(t, r, chComponent, schema.NewMock(t).EnableAllColumns()) // We need to have at least one migration gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps_total") if gotMetrics["applied_steps_total"] == "0" { - t.Fatal("No migration applied when applying a fresh default schema") + t.Fatal("No migration applied when enabling all columns") } + }) && t.Run("partial schema", func(t *testing.T) { + r := reporter.NewMock(t) + schConfig := schema.DefaultConfiguration() + schConfig.Disabled = []schema.ColumnKey{ + schema.ColumnDst1stAS, schema.ColumnDst2ndAS, schema.ColumnDst3rdAS, + schema.ColumnDstASPath, + schema.ColumnDstCommunities, + schema.ColumnDstLargeCommunities, + schema.ColumnDstLargeCommunitiesASN, + schema.ColumnDstLargeCommunitiesLocalData1, + schema.ColumnDstLargeCommunitiesLocalData2, + } + sch, err := schema.New(schConfig) + if err != nil { + t.Fatalf("schema.New() error:\n%+v", err) + } + chComponent := clickhousedb.SetupClickHouse(t, r, false) + startTestComponent(t, r, chComponent, sch) + + // We need to have at least one migration + gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps_total") + if gotMetrics["applied_steps_total"] == "0" { + t.Fatal("No migration applied when disabling some columns") + } + }) && t.Run("cluster", func(t *testing.T) { + testMigrationFromPreviousStates(t, true) }) +} + +func TestCustomDictMigration(t *testing.T) { + r := reporter.NewMock(t) + chComponent := clickhousedb.SetupClickHouse(t, r, false) + dropAllTables(t, chComponent) + startTestComponent(t, r, chComponent, nil) + + // We need to have at least one migration + gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps_total") + if gotMetrics["applied_steps_total"] == "0" { + t.Fatal("No migration applied when applying a fresh default schema") + } + // Now, create a custom dictionary on top - if !t.Failed() { - t.Run("custom dictionary", func(t *testing.T) { - r := reporter.NewMock(t) - schConfig := schema.DefaultConfiguration() - schConfig.CustomDictionaries = make(map[string]schema.CustomDict) - schConfig.CustomDictionaries["test"] = schema.CustomDict{ - Keys: []schema.CustomDictKey{ - {Name: "SrcAddr", Type: "String"}, - }, - Attributes: []schema.CustomDictAttribute{ - {Name: "csv_col_name", Type: "String", Label: "DimensionAttribute"}, - {Name: "csv_col_default", Type: "String", Label: "DefaultDimensionAttribute", Default: "Hello World"}, - }, - Source: "test.csv", - Dimensions: []string{"SrcAddr", "DstAddr"}, - Layout: "complex_key_hashed", - } - sch, err := schema.New(schConfig) + _ = t.Run("add", func(t *testing.T) { + r := reporter.NewMock(t) + schConfig := schema.DefaultConfiguration() + schConfig.CustomDictionaries = make(map[string]schema.CustomDict) + schConfig.CustomDictionaries["test"] = schema.CustomDict{ + Keys: []schema.CustomDictKey{ + {Name: "SrcAddr", Type: "String"}, + }, + Attributes: []schema.CustomDictAttribute{ + {Name: "csv_col_name", Type: "String", Label: "DimensionAttribute"}, + {Name: "csv_col_default", Type: "String", Label: "DefaultDimensionAttribute", Default: "Hello World"}, + }, + Source: "test.csv", + Dimensions: []string{"SrcAddr", "DstAddr"}, + Layout: "complex_key_hashed", + } + sch, err := schema.New(schConfig) - if err != nil { - t.Fatalf("schema.New() error:\n%+v", err) - } - configuration := DefaultConfiguration() - configuration.OrchestratorURL = "http://127.0.0.1:0" - configuration.Kafka.Configuration = kafka.DefaultConfiguration() - ch, err := New(r, configuration, Dependencies{ - Daemon: daemon.NewMock(t), - HTTP: httpserver.NewMock(t, r), - Schema: sch, - ClickHouse: chComponent, - GeoIP: geoip.NewMock(t, r, true), - }) - if err != nil { - t.Fatalf("New() error:\n%+v", err) - } - helpers.StartStop(t, ch) - waitMigrations(t, ch) + if err != nil { + t.Fatalf("schema.New() error:\n%+v", err) + } + ch := startTestComponent(t, r, chComponent, sch) - // We need to have at least one migration - gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps_total") - if gotMetrics["applied_steps_total"] == "0" { - t.Fatal("No migration applied when enabling a custom dictionary") - } + // We need to have at least one migration + gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps_total") + if gotMetrics["applied_steps_total"] == "0" { + t.Fatal("No migration applied when enabling a custom dictionary") + } - // Check if the rows were created in the main flows table - row := ch.d.ClickHouse.QueryRow(context.Background(), ` + // Check if the rows were created in the main flows table + row := ch.d.ClickHouse.QueryRow(context.Background(), ` SELECT toString(groupArray(tuple(name, type, default_expression))) FROM system.columns WHERE table = $1 AND database = $2 AND name LIKE $3`, "flows", ch.config.Database, "%DimensionAttribute") - var existing string - if err := row.Scan(&existing); err != nil { - t.Fatalf("Scan() error:\n%+v", err) - } - if diff := helpers.Diff(existing, - "[('SrcAddrDimensionAttribute','LowCardinality(String)',''),('SrcAddrDefaultDimensionAttribute','LowCardinality(String)',''),('DstAddrDimensionAttribute','LowCardinality(String)',''),('DstAddrDefaultDimensionAttribute','LowCardinality(String)','')]"); diff != "" { - t.Fatalf("Unexpected state:\n%s", diff) - } + var existing string + if err := row.Scan(&existing); err != nil { + t.Fatalf("Scan() error:\n%+v", err) + } + if diff := helpers.Diff(existing, + "[('SrcAddrDimensionAttribute','LowCardinality(String)',''),('SrcAddrDefaultDimensionAttribute','LowCardinality(String)',''),('DstAddrDimensionAttribute','LowCardinality(String)',''),('DstAddrDefaultDimensionAttribute','LowCardinality(String)','')]"); diff != "" { + t.Fatalf("Unexpected state:\n%s", diff) + } - // Check if the rows were created in the consumer flows table - rowConsumer := ch.d.ClickHouse.QueryRow(context.Background(), ` + // Check if the rows were created in the consumer flows table + rowConsumer := ch.d.ClickHouse.QueryRow(context.Background(), ` SHOW CREATE flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_consumer`) - var existingConsumer string - if err := rowConsumer.Scan(&existingConsumer); err != nil { - t.Fatalf("Scan() error:\n%+v", err) - } - // Check if the definitions are part of the consumer - expectedStatements := []string{ - "dictGet('default.custom_dict_test', 'csv_col_name', DstAddr) AS DstAddrDimensionAttribute", - "dictGet('default.custom_dict_test', 'csv_col_name', SrcAddr) AS SrcAddrDimensionAttribute", - "dictGet('default.custom_dict_test', 'csv_col_default', SrcAddr) AS SrcAddrDefaultDimensionAttribute", - "dictGet('default.custom_dict_test', 'csv_col_default', DstAddr) AS DstAddrDefaultDimensionAttribute", - } - for _, s := range expectedStatements { - if !strings.Contains(existingConsumer, s) { - t.Fatalf("Missing statement in consumer:\n%s", s) - } + var existingConsumer string + if err := rowConsumer.Scan(&existingConsumer); err != nil { + t.Fatalf("Scan() error:\n%+v", err) + } + // Check if the definitions are part of the consumer + expectedStatements := []string{ + "dictGet('default.custom_dict_test', 'csv_col_name', DstAddr) AS DstAddrDimensionAttribute", + "dictGet('default.custom_dict_test', 'csv_col_name', SrcAddr) AS SrcAddrDimensionAttribute", + "dictGet('default.custom_dict_test', 'csv_col_default', SrcAddr) AS SrcAddrDefaultDimensionAttribute", + "dictGet('default.custom_dict_test', 'csv_col_default', DstAddr) AS DstAddrDefaultDimensionAttribute", + } + for _, s := range expectedStatements { + if !strings.Contains(existingConsumer, s) { + t.Fatalf("Missing statement in consumer:\n%s", s) } + } - // Check if the dictionary was created - dictCreate := ch.d.ClickHouse.QueryRow(context.Background(), ` + // Check if the dictionary was created + dictCreate := ch.d.ClickHouse.QueryRow(context.Background(), ` SHOW CREATE custom_dict_test`) - var got string - if err := dictCreate.Scan(&got); err != nil { - t.Fatalf("Scan() error:\n%+v", err) - } - expected := `CREATE DICTIONARY default.custom_dict_test + var got string + if err := dictCreate.Scan(&got); err != nil { + t.Fatalf("Scan() error:\n%+v", err) + } + expected := `CREATE DICTIONARY default.custom_dict_test ( ` + "`SrcAddr`" + ` String, ` + "`csv_col_name`" + ` String DEFAULT 'None', @@ -594,77 +467,60 @@ SOURCE(HTTP(URL 'http://127.0.0.1:0/api/v0/orchestrator/clickhouse/custom_dict_t LIFETIME(MIN 0 MAX 3600) LAYOUT(COMPLEX_KEY_HASHED()) SETTINGS(format_csv_allow_single_quotes = 0)` - if diff := helpers.Diff(got, expected); diff != "" { - t.Fatalf("Unexpected state:\n%s", diff) - } - }) - } - // Next test: with the custom dict removed again, the cols should still exist, but the consumer should be gone - if !t.Failed() { - t.Run("remove custom dictionary", func(t *testing.T) { - r := reporter.NewMock(t) - sch, err := schema.New(schema.DefaultConfiguration()) + if diff := helpers.Diff(got, expected); diff != "" { + t.Fatalf("Unexpected state:\n%s", diff) + } + }) && t.Run("remove", func(t *testing.T) { + // Next test: with the custom dict removed again, the cols should still exist, but the consumer should be gone + r := reporter.NewMock(t) + sch, err := schema.New(schema.DefaultConfiguration()) - if err != nil { - t.Fatalf("schema.New() error:\n%+v", err) - } - configuration := DefaultConfiguration() - configuration.OrchestratorURL = "http://127.0.0.1:0" - configuration.Kafka.Configuration = kafka.DefaultConfiguration() - ch, err := New(r, configuration, Dependencies{ - Daemon: daemon.NewMock(t), - HTTP: httpserver.NewMock(t, r), - Schema: sch, - ClickHouse: chComponent, - GeoIP: geoip.NewMock(t, r, true), - }) - if err != nil { - t.Fatalf("New() error:\n%+v", err) - } - helpers.StartStop(t, ch) - waitMigrations(t, ch) + if err != nil { + t.Fatalf("schema.New() error:\n%+v", err) + } + ch := startTestComponent(t, r, chComponent, sch) + waitMigrations(t, ch) - // We need to have at least one migration - gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps_total") - if gotMetrics["applied_steps_total"] == "0" { - t.Fatal("No migration applied when disabling the custom dict") - } + // We need to have at least one migration + gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_", "applied_steps_total") + if gotMetrics["applied_steps_total"] == "0" { + t.Fatal("No migration applied when disabling the custom dict") + } - // Check if the rows were created in the main flows table - row := ch.d.ClickHouse.QueryRow(context.Background(), ` + // Check if the rows were created in the main flows table + row := ch.d.ClickHouse.QueryRow(context.Background(), ` SELECT toString(groupArray(tuple(name, type, default_expression))) FROM system.columns WHERE table = $1 AND database = $2 AND name LIKE $3`, "flows", ch.config.Database, "%DimensionAttribute") - var existing string - if err := row.Scan(&existing); err != nil { - t.Fatalf("Scan() error:\n%+v", err) - } - if diff := helpers.Diff(existing, - "[('SrcAddrDimensionAttribute','LowCardinality(String)',''),('SrcAddrDefaultDimensionAttribute','LowCardinality(String)',''),('DstAddrDimensionAttribute','LowCardinality(String)',''),('DstAddrDefaultDimensionAttribute','LowCardinality(String)','')]"); diff != "" { - t.Fatalf("Unexpected state:\n%s", diff) - } + var existing string + if err := row.Scan(&existing); err != nil { + t.Fatalf("Scan() error:\n%+v", err) + } + if diff := helpers.Diff(existing, + "[('SrcAddrDimensionAttribute','LowCardinality(String)',''),('SrcAddrDefaultDimensionAttribute','LowCardinality(String)',''),('DstAddrDimensionAttribute','LowCardinality(String)',''),('DstAddrDefaultDimensionAttribute','LowCardinality(String)','')]"); diff != "" { + t.Fatalf("Unexpected state:\n%s", diff) + } - // Check if the rows were removed in the consumer flows table - rowConsumer := ch.d.ClickHouse.QueryRow(context.Background(), - `SHOW CREATE flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_consumer`) - var existingConsumer string - if err := rowConsumer.Scan(&existingConsumer); err != nil { - t.Fatalf("Scan() error:\n%+v", err) + // Check if the rows were removed in the consumer flows table + rowConsumer := ch.d.ClickHouse.QueryRow(context.Background(), + `SHOW CREATE flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_consumer`) + var existingConsumer string + if err := rowConsumer.Scan(&existingConsumer); err != nil { + t.Fatalf("Scan() error:\n%+v", err) + } + // Check if the definitions are missing in the consumer + expectedStatements := []string{ + "dictGet('default.custom_dict_test', 'csv_col_name', DstAddr) AS DstAddrDimensionAttribute", + "dictGet('default.custom_dict_test', 'csv_col_name', SrcAddr) AS SrcAddrDimensionAttribute", + "dictGet('default.custom_dict_test', 'csv_col_default', SrcAddr) AS SrcAddrDefaultDimensionAttribute", + "dictGet('default.custom_dict_test', 'csv_col_default', DstAddr) AS DstAddrDefaultDimensionAttribute", + } + for _, s := range expectedStatements { + if strings.Contains(existingConsumer, s) { + t.Fatalf("Unexpected statement found in consumer:\n%s", s) } - // Check if the definitions are missing in the consumer - expectedStatements := []string{ - "dictGet('default.custom_dict_test', 'csv_col_name', DstAddr) AS DstAddrDimensionAttribute", - "dictGet('default.custom_dict_test', 'csv_col_name', SrcAddr) AS SrcAddrDimensionAttribute", - "dictGet('default.custom_dict_test', 'csv_col_default', SrcAddr) AS SrcAddrDefaultDimensionAttribute", - "dictGet('default.custom_dict_test', 'csv_col_default', DstAddr) AS DstAddrDefaultDimensionAttribute", - } - for _, s := range expectedStatements { - if strings.Contains(existingConsumer, s) { - t.Fatalf("Unexpected statement found in consumer:\n%s", s) - } - } - }) - } + } + }) } diff --git a/orchestrator/clickhouse/source_test.go b/orchestrator/clickhouse/source_test.go index 85490459..b6792bbc 100644 --- a/orchestrator/clickhouse/source_test.go +++ b/orchestrator/clickhouse/source_test.go @@ -24,7 +24,7 @@ import ( func TestNetworkSources(t *testing.T) { r := reporter.NewMock(t) - clickHouseComponent := clickhousedb.SetupClickHouse(t, r) + clickHouseComponent := clickhousedb.SetupClickHouse(t, r, false) // Mux to answer requests ready := make(chan bool) diff --git a/orchestrator/clickhouse/testdata/states/000-cluster.csv b/orchestrator/clickhouse/testdata/states/000-cluster.csv new file mode 100644 index 00000000..e69de29b diff --git a/orchestrator/clickhouse/testdata/states/001-cluster.csv b/orchestrator/clickhouse/testdata/states/001-cluster.csv new file mode 100644 index 00000000..9504ee2e --- /dev/null +++ b/orchestrator/clickhouse/testdata/states/001-cluster.csv @@ -0,0 +1,22 @@ +protocols,"CREATE DICTIONARY default.protocols (`proto` UInt8 INJECTIVE, `name` String, `description` String) PRIMARY KEY proto SOURCE(HTTP(URL 'http://127.0.0.1:0/api/v0/orchestrator/clickhouse/protocols.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(HASHED()) SETTINGS(format_csv_allow_single_quotes = 0)" +asns,"CREATE DICTIONARY default.asns (`asn` UInt32 INJECTIVE, `name` String) PRIMARY KEY asn SOURCE(HTTP(URL 'http://127.0.0.1:0/api/v0/orchestrator/clickhouse/asns.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(HASHED()) SETTINGS(format_csv_allow_single_quotes = 0)" +icmp,"CREATE DICTIONARY default.icmp (`proto` UInt8, `type` UInt8, `code` UInt8, `name` String) PRIMARY KEY proto, type, code SOURCE(HTTP(URL 'http://127.0.0.1:0/api/v0/orchestrator/clickhouse/icmp.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(COMPLEX_KEY_HASHED()) SETTINGS(format_csv_allow_single_quotes = 0)" +networks,"CREATE DICTIONARY default.networks (`network` String, `name` String, `role` String, `site` String, `region` String, `city` String, `state` String, `country` String, `tenant` String, `asn` UInt32) PRIMARY KEY network SOURCE(HTTP(URL 'http://127.0.0.1:0/api/v0/orchestrator/clickhouse/networks.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(IP_TRIE()) SETTINGS(format_csv_allow_single_quotes = 0)" +exporters,"CREATE TABLE default.exporters (`TimeReceived` DateTime, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `IfName` LowCardinality(String), `IfDescription` LowCardinality(String), `IfSpeed` UInt32, `IfConnectivity` LowCardinality(String), `IfProvider` LowCardinality(String), `IfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2)) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/shard-{shard}/exporters', 'replica-{replica}', TimeReceived) ORDER BY (ExporterAddress, IfName) TTL TimeReceived + toIntervalDay(1) SETTINGS index_granularity = 8192" +flows_local,"CREATE TABLE default.flows_local (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAddr` IPv6 CODEC(ZSTD(1)), `DstAddr` IPv6 CODEC(ZSTD(1)), `SrcNetMask` UInt8, `DstNetMask` UInt8, `SrcNetPrefix` String ALIAS multiIf(EType = 2048, concat(replaceRegexpOne(CAST(IPv6CIDRToRange(SrcAddr, CAST(96 + SrcNetMask, 'UInt8')).1, 'String'), '^::ffff:', ''), '/', CAST(SrcNetMask, 'String')), EType = 34525, concat(CAST(IPv6CIDRToRange(SrcAddr, SrcNetMask).1, 'String'), '/', CAST(SrcNetMask, 'String')), ''), `DstNetPrefix` String ALIAS multiIf(EType = 2048, concat(replaceRegexpOne(CAST(IPv6CIDRToRange(DstAddr, CAST(96 + DstNetMask, 'UInt8')).1, 'String'), '^::ffff:', ''), '/', CAST(DstNetMask, 'String')), EType = 34525, concat(CAST(IPv6CIDRToRange(DstAddr, DstNetMask).1, 'String'), '/', CAST(DstNetMask, 'String')), ''), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `SrcGeoCity` LowCardinality(String), `DstGeoCity` LowCardinality(String), `SrcGeoState` LowCardinality(String), `DstGeoState` LowCardinality(String), `DstASPath` Array(UInt32), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `DstCommunities` Array(UInt32), `DstLargeCommunities` Array(UInt128), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt16, `DstPort` UInt16, `Bytes` UInt64 CODEC(T64, LZ4), `Packets` UInt64 CODEC(T64, LZ4), `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/shard-{shard}/flows_local', 'replica-{replica}') PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalSecond(25920))) ORDER BY (TimeReceived, ExporterAddress, InIfName, OutIfName) TTL TimeReceived + toIntervalSecond(1296000) SETTINGS index_granularity = 8192" +flows_1h0m0s_local,"CREATE TABLE default.flows_1h0m0s_local (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `SrcGeoCity` LowCardinality(String), `DstGeoCity` LowCardinality(String), `SrcGeoState` LowCardinality(String), `DstGeoState` LowCardinality(String), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64 CODEC(T64, LZ4), `Packets` UInt64 CODEC(T64, LZ4), `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/shard-{shard}/flows_1h0m0s_local', 'replica-{replica}', (Bytes, Packets)) PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalSecond(622080))) PRIMARY KEY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate) ORDER BY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate, SrcNetName, DstNetName, SrcNetRole, DstNetRole, SrcNetSite, DstNetSite, SrcNetRegion, DstNetRegion, SrcNetTenant, DstNetTenant, SrcCountry, DstCountry, SrcGeoCity, DstGeoCity, SrcGeoState, DstGeoState, Dst1stAS, Dst2ndAS, Dst3rdAS) TTL TimeReceived + toIntervalSecond(31104000) SETTINGS index_granularity = 8192" +flows_1m0s_local,"CREATE TABLE default.flows_1m0s_local (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `SrcGeoCity` LowCardinality(String), `DstGeoCity` LowCardinality(String), `SrcGeoState` LowCardinality(String), `DstGeoState` LowCardinality(String), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64 CODEC(T64, LZ4), `Packets` UInt64 CODEC(T64, LZ4), `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/shard-{shard}/flows_1m0s_local', 'replica-{replica}', (Bytes, Packets)) PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalSecond(12096))) PRIMARY KEY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate) ORDER BY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate, SrcNetName, DstNetName, SrcNetRole, DstNetRole, SrcNetSite, DstNetSite, SrcNetRegion, DstNetRegion, SrcNetTenant, DstNetTenant, SrcCountry, DstCountry, SrcGeoCity, DstGeoCity, SrcGeoState, DstGeoState, Dst1stAS, Dst2ndAS, Dst3rdAS) TTL TimeReceived + toIntervalSecond(604800) SETTINGS index_granularity = 8192" +flows_5m0s_local,"CREATE TABLE default.flows_5m0s_local (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `SrcGeoCity` LowCardinality(String), `DstGeoCity` LowCardinality(String), `SrcGeoState` LowCardinality(String), `DstGeoState` LowCardinality(String), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64 CODEC(T64, LZ4), `Packets` UInt64 CODEC(T64, LZ4), `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/shard-{shard}/flows_5m0s_local', 'replica-{replica}', (Bytes, Packets)) PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalSecond(155520))) PRIMARY KEY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate) ORDER BY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate, SrcNetName, DstNetName, SrcNetRole, DstNetRole, SrcNetSite, DstNetSite, SrcNetRegion, DstNetRegion, SrcNetTenant, DstNetTenant, SrcCountry, DstCountry, SrcGeoCity, DstGeoCity, SrcGeoState, DstGeoState, Dst1stAS, Dst2ndAS, Dst3rdAS) TTL TimeReceived + toIntervalSecond(7776000) SETTINGS index_granularity = 8192" +flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw,"CREATE TABLE default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAddr` IPv6 CODEC(ZSTD(1)), `DstAddr` IPv6 CODEC(ZSTD(1)), `SrcNetMask` UInt8, `DstNetMask` UInt8, `SrcAS` UInt32, `DstAS` UInt32, `DstASPath` Array(UInt32), `DstCommunities` Array(UInt32), `DstLargeCommunitiesASN` Array(UInt32), `DstLargeCommunitiesLocalData1` Array(UInt32), `DstLargeCommunitiesLocalData2` Array(UInt32), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt16, `DstPort` UInt16, `Bytes` UInt64 CODEC(T64, LZ4), `Packets` UInt64 CODEC(T64, LZ4), `ForwardingStatus` UInt32) ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'flows-LAABIGYMRYZPTGOYIIFZNYDEQM', kafka_group_name = 'clickhouse', kafka_format = 'Protobuf', kafka_schema = 'flow-LAABIGYMRYZPTGOYIIFZNYDEQM.proto:FlowMessagevLAABIGYMRYZPTGOYIIFZNYDEQM', kafka_num_consumers = 1, kafka_thread_per_consumer = 1, kafka_handle_error_mode = 'stream'" +flows_raw_errors_local,"CREATE TABLE default.flows_raw_errors_local (`timestamp` DateTime, `topic` LowCardinality(String), `partition` UInt64, `offset` UInt64, `raw` String, `error` String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/shard-{shard}/flows_raw_errors_local', 'replica-{replica}') PARTITION BY toYYYYMMDDhhmmss(toStartOfHour(timestamp)) ORDER BY (timestamp, topic, partition, offset) TTL timestamp + toIntervalDay(1) SETTINGS index_granularity = 8192" +flows_1h0m0s,"CREATE TABLE default.flows_1h0m0s (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `SrcGeoCity` LowCardinality(String), `DstGeoCity` LowCardinality(String), `SrcGeoState` LowCardinality(String), `DstGeoState` LowCardinality(String), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64 CODEC(T64, LZ4), `Packets` UInt64 CODEC(T64, LZ4), `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = Distributed('akvorado', 'default', 'flows_1h0m0s_local', rand())" +flows_1m0s,"CREATE TABLE default.flows_1m0s (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `SrcGeoCity` LowCardinality(String), `DstGeoCity` LowCardinality(String), `SrcGeoState` LowCardinality(String), `DstGeoState` LowCardinality(String), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64 CODEC(T64, LZ4), `Packets` UInt64 CODEC(T64, LZ4), `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = Distributed('akvorado', 'default', 'flows_1m0s_local', rand())" +flows,"CREATE TABLE default.flows (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAddr` IPv6 CODEC(ZSTD(1)), `DstAddr` IPv6 CODEC(ZSTD(1)), `SrcNetMask` UInt8, `DstNetMask` UInt8, `SrcNetPrefix` String ALIAS multiIf(EType = 2048, concat(replaceRegexpOne(CAST(IPv6CIDRToRange(SrcAddr, CAST(96 + SrcNetMask, 'UInt8')).1, 'String'), '^::ffff:', ''), '/', CAST(SrcNetMask, 'String')), EType = 34525, concat(CAST(IPv6CIDRToRange(SrcAddr, SrcNetMask).1, 'String'), '/', CAST(SrcNetMask, 'String')), ''), `DstNetPrefix` String ALIAS multiIf(EType = 2048, concat(replaceRegexpOne(CAST(IPv6CIDRToRange(DstAddr, CAST(96 + DstNetMask, 'UInt8')).1, 'String'), '^::ffff:', ''), '/', CAST(DstNetMask, 'String')), EType = 34525, concat(CAST(IPv6CIDRToRange(DstAddr, DstNetMask).1, 'String'), '/', CAST(DstNetMask, 'String')), ''), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `SrcGeoCity` LowCardinality(String), `DstGeoCity` LowCardinality(String), `SrcGeoState` LowCardinality(String), `DstGeoState` LowCardinality(String), `DstASPath` Array(UInt32), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `DstCommunities` Array(UInt32), `DstLargeCommunities` Array(UInt128), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt16, `DstPort` UInt16, `Bytes` UInt64 CODEC(T64, LZ4), `Packets` UInt64 CODEC(T64, LZ4), `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = Distributed('akvorado', 'default', 'flows_local', rand())" +flows_raw_errors,"CREATE TABLE default.flows_raw_errors (`timestamp` DateTime, `topic` LowCardinality(String), `partition` UInt64, `offset` UInt64, `raw` String, `error` String) ENGINE = Distributed('akvorado', 'default', 'flows_raw_errors_local', rand())" +flows_5m0s,"CREATE TABLE default.flows_5m0s (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `SrcGeoCity` LowCardinality(String), `DstGeoCity` LowCardinality(String), `SrcGeoState` LowCardinality(String), `DstGeoState` LowCardinality(String), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64 CODEC(T64, LZ4), `Packets` UInt64 CODEC(T64, LZ4), `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = Distributed('akvorado', 'default', 'flows_5m0s_local', rand())" +exporters_consumer,"CREATE MATERIALIZED VIEW default.exporters_consumer TO default.exporters (`TimeReceived` DateTime, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `IfName` String, `IfDescription` String, `IfSpeed` UInt32, `IfConnectivity` String, `IfProvider` String, `IfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2)) AS SELECT DISTINCT TimeReceived, ExporterAddress, ExporterName, ExporterGroup, ExporterRole, ExporterSite, ExporterRegion, ExporterTenant, [InIfName, OutIfName][num] AS IfName, [InIfDescription, OutIfDescription][num] AS IfDescription, [InIfSpeed, OutIfSpeed][num] AS IfSpeed, [InIfConnectivity, OutIfConnectivity][num] AS IfConnectivity, [InIfProvider, OutIfProvider][num] AS IfProvider, [InIfBoundary, OutIfBoundary][num] AS IfBoundary FROM default.flows ARRAY JOIN arrayEnumerate([1, 2]) AS num" +flows_1h0m0s_consumer,"CREATE MATERIALIZED VIEW default.flows_1h0m0s_consumer TO default.flows_1h0m0s_local (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `SrcGeoCity` LowCardinality(String), `DstGeoCity` LowCardinality(String), `SrcGeoState` LowCardinality(String), `DstGeoState` LowCardinality(String), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT toStartOfInterval(TimeReceived, toIntervalSecond(3600)) AS TimeReceived, SamplingRate, ExporterAddress, ExporterName, ExporterGroup, ExporterRole, ExporterSite, ExporterRegion, ExporterTenant, SrcAS, DstAS, SrcNetName, DstNetName, SrcNetRole, DstNetRole, SrcNetSite, DstNetSite, SrcNetRegion, DstNetRegion, SrcNetTenant, DstNetTenant, SrcCountry, DstCountry, SrcGeoCity, DstGeoCity, SrcGeoState, DstGeoState, Dst1stAS, Dst2ndAS, Dst3rdAS, InIfName, OutIfName, InIfDescription, OutIfDescription, InIfSpeed, OutIfSpeed, InIfConnectivity, OutIfConnectivity, InIfProvider, OutIfProvider, InIfBoundary, OutIfBoundary, EType, Proto, Bytes, Packets, ForwardingStatus FROM default.flows_local" +flows_1m0s_consumer,"CREATE MATERIALIZED VIEW default.flows_1m0s_consumer TO default.flows_1m0s_local (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `SrcGeoCity` LowCardinality(String), `DstGeoCity` LowCardinality(String), `SrcGeoState` LowCardinality(String), `DstGeoState` LowCardinality(String), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT toStartOfInterval(TimeReceived, toIntervalSecond(60)) AS TimeReceived, SamplingRate, ExporterAddress, ExporterName, ExporterGroup, ExporterRole, ExporterSite, ExporterRegion, ExporterTenant, SrcAS, DstAS, SrcNetName, DstNetName, SrcNetRole, DstNetRole, SrcNetSite, DstNetSite, SrcNetRegion, DstNetRegion, SrcNetTenant, DstNetTenant, SrcCountry, DstCountry, SrcGeoCity, DstGeoCity, SrcGeoState, DstGeoState, Dst1stAS, Dst2ndAS, Dst3rdAS, InIfName, OutIfName, InIfDescription, OutIfDescription, InIfSpeed, OutIfSpeed, InIfConnectivity, OutIfConnectivity, InIfProvider, OutIfProvider, InIfBoundary, OutIfBoundary, EType, Proto, Bytes, Packets, ForwardingStatus FROM default.flows_local" +flows_5m0s_consumer,"CREATE MATERIALIZED VIEW default.flows_5m0s_consumer TO default.flows_5m0s_local (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `SrcGeoCity` LowCardinality(String), `DstGeoCity` LowCardinality(String), `SrcGeoState` LowCardinality(String), `DstGeoState` LowCardinality(String), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT toStartOfInterval(TimeReceived, toIntervalSecond(300)) AS TimeReceived, SamplingRate, ExporterAddress, ExporterName, ExporterGroup, ExporterRole, ExporterSite, ExporterRegion, ExporterTenant, SrcAS, DstAS, SrcNetName, DstNetName, SrcNetRole, DstNetRole, SrcNetSite, DstNetSite, SrcNetRegion, DstNetRegion, SrcNetTenant, DstNetTenant, SrcCountry, DstCountry, SrcGeoCity, DstGeoCity, SrcGeoState, DstGeoState, Dst1stAS, Dst2ndAS, Dst3rdAS, InIfName, OutIfName, InIfDescription, OutIfDescription, InIfSpeed, OutIfSpeed, InIfConnectivity, OutIfConnectivity, InIfProvider, OutIfProvider, InIfBoundary, OutIfBoundary, EType, Proto, Bytes, Packets, ForwardingStatus FROM default.flows_local" +flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_consumer,"CREATE MATERIALIZED VIEW default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_consumer TO default.flows (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcNetMask` UInt8, `DstNetMask` UInt8, `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` String, `DstNetName` String, `SrcNetRole` String, `DstNetRole` String, `SrcNetSite` String, `DstNetSite` String, `SrcNetRegion` String, `DstNetRegion` String, `SrcNetTenant` String, `DstNetTenant` String, `SrcCountry` String, `DstCountry` String, `SrcGeoCity` String, `DstGeoCity` String, `SrcGeoState` String, `DstGeoState` String, `DstASPath` Array(UInt32), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `DstCommunities` Array(UInt32), `DstLargeCommunities` Array(UInt128), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt16, `DstPort` UInt16, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS WITH arrayCompact(DstASPath) AS c_DstASPath, dictGet('default.networks', ('asn', 'name', 'role', 'site', 'region', 'tenant', 'country', 'city', 'state'), SrcAddr) AS c_SrcNetworks, dictGet('default.networks', ('asn', 'name', 'role', 'site', 'region', 'tenant', 'country', 'city', 'state'), DstAddr) AS c_DstNetworks SELECT TimeReceived, SamplingRate, ExporterAddress, ExporterName, ExporterGroup, ExporterRole, ExporterSite, ExporterRegion, ExporterTenant, SrcAddr, DstAddr, SrcNetMask, DstNetMask, if(SrcAS = 0, c_SrcNetworks.1, SrcAS) AS SrcAS, if(DstAS = 0, c_DstNetworks.1, DstAS) AS DstAS, c_SrcNetworks.2 AS SrcNetName, c_DstNetworks.2 AS DstNetName, c_SrcNetworks.3 AS SrcNetRole, c_DstNetworks.3 AS DstNetRole, c_SrcNetworks.4 AS SrcNetSite, c_DstNetworks.4 AS DstNetSite, c_SrcNetworks.5 AS SrcNetRegion, c_DstNetworks.5 AS DstNetRegion, c_SrcNetworks.6 AS SrcNetTenant, c_DstNetworks.6 AS DstNetTenant, c_SrcNetworks.7 AS SrcCountry, c_DstNetworks.7 AS DstCountry, c_SrcNetworks.8 AS SrcGeoCity, c_DstNetworks.8 AS DstGeoCity, c_SrcNetworks.9 AS SrcGeoState, c_DstNetworks.9 AS DstGeoState, DstASPath, c_DstASPath[1] AS Dst1stAS, c_DstASPath[2] AS Dst2ndAS, c_DstASPath[3] AS Dst3rdAS, DstCommunities, arrayMap((asn, l1, l2) -> ((bitShiftLeft(CAST(asn, 'UInt128'), 64) + bitShiftLeft(CAST(l1, 'UInt128'), 32)) + CAST(l2, 'UInt128')), DstLargeCommunitiesASN, DstLargeCommunitiesLocalData1, DstLargeCommunitiesLocalData2) AS DstLargeCommunities, InIfName, OutIfName, InIfDescription, OutIfDescription, InIfSpeed, OutIfSpeed, InIfConnectivity, OutIfConnectivity, InIfProvider, OutIfProvider, InIfBoundary, OutIfBoundary, EType, Proto, SrcPort, DstPort, Bytes, Packets, ForwardingStatus FROM default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw WHERE length(_error) = 0" +flows_raw_errors_consumer,"CREATE MATERIALIZED VIEW default.flows_raw_errors_consumer TO default.flows_raw_errors (`timestamp` DateTime, `topic` LowCardinality(String), `partition` UInt64, `offset` UInt64, `raw` String, `error` String) AS SELECT now() AS timestamp, _topic AS topic, _partition AS partition, _offset AS offset, _raw_message AS raw, _error AS error FROM default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw WHERE length(_error) > 0"