mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-11 22:14:02 +01:00
orchestrator/clickhouse: test migrations from different states
This commit is contained in:
@@ -2,7 +2,13 @@ package clickhouse
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/csv"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -14,6 +20,85 @@ import (
|
||||
"akvorado/common/reporter"
|
||||
)
|
||||
|
||||
func clearAllTables(t *testing.T, ch *clickhousedb.Component) {
|
||||
rows, err := ch.Query(context.Background(), `
|
||||
SELECT engine, table
|
||||
FROM system.tables
|
||||
WHERE database=currentDatabase() AND table NOT LIKE '.%'`)
|
||||
if err != nil {
|
||||
t.Fatalf("Query() error:\n%+v", err)
|
||||
}
|
||||
for rows.Next() {
|
||||
var engine, table, sql string
|
||||
if err := rows.Scan(&engine, &table); err != nil {
|
||||
t.Fatalf("Scan() error:\n%+v", err)
|
||||
}
|
||||
switch engine {
|
||||
case "Dictionary":
|
||||
sql = "DROP DICTIONARY %s"
|
||||
default:
|
||||
sql = "DROP TABLE %s"
|
||||
}
|
||||
if err := ch.Exec(context.Background(), fmt.Sprintf(sql, table)); err != nil {
|
||||
t.Fatalf("Exec() error:\n%+v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func dumpAllTables(t *testing.T, ch *clickhousedb.Component) map[string]string {
|
||||
rows, err := ch.Query(context.Background(), `
|
||||
SELECT table, create_table_query
|
||||
FROM system.tables
|
||||
WHERE database=currentDatabase() AND table NOT LIKE '.%'`)
|
||||
if err != nil {
|
||||
t.Fatalf("Query() error:\n%+v", err)
|
||||
}
|
||||
schemas := map[string]string{}
|
||||
for rows.Next() {
|
||||
var schema, table string
|
||||
if err := rows.Scan(&table, &schema); err != nil {
|
||||
t.Fatalf("Scan() error:\n%+v", err)
|
||||
}
|
||||
schemas[table] = schema
|
||||
}
|
||||
return schemas
|
||||
}
|
||||
|
||||
func loadTables(t *testing.T, ch *clickhousedb.Component, schemas map[string]string) {
|
||||
for _, schema := range schemas {
|
||||
if err := ch.Exec(context.Background(), schema); err != nil {
|
||||
t.Fatalf("Exec() error:\n%+v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// loadAllTables load tables from a CSV file. Use `format CSV` with
|
||||
// query from dumpAllTables.
|
||||
func loadAllTables(t *testing.T, ch *clickhousedb.Component, filename string) {
|
||||
input, err := os.Open(filename)
|
||||
if err != nil {
|
||||
t.Fatalf("Open(%q) error:\n%+v", filename, err)
|
||||
}
|
||||
defer input.Close()
|
||||
schemas := map[string]string{}
|
||||
r := csv.NewReader(input)
|
||||
for {
|
||||
record, err := r.Read()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("Read(%q) error:\n%+v", filename, err)
|
||||
}
|
||||
if len(record) == 0 {
|
||||
continue
|
||||
}
|
||||
schemas[record[0]] = record[1]
|
||||
}
|
||||
clearAllTables(t, ch)
|
||||
loadTables(t, ch, schemas)
|
||||
}
|
||||
|
||||
func TestGetHTTPBaseURL(t *testing.T) {
|
||||
r := reporter.NewMock(t)
|
||||
http := http.NewMock(t, r)
|
||||
@@ -49,61 +134,81 @@ func TestMigration(t *testing.T) {
|
||||
r := reporter.NewMock(t)
|
||||
chComponent := clickhousedb.SetupClickHouse(t, r)
|
||||
|
||||
func() {
|
||||
// First time
|
||||
configuration := DefaultConfiguration()
|
||||
configuration.OrchestratorURL = "http://something"
|
||||
ch, err := New(r, configuration, Dependencies{
|
||||
Daemon: daemon.NewMock(t),
|
||||
HTTP: http.NewMock(t, r),
|
||||
ClickHouse: chComponent,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New() error:\n%+v", err)
|
||||
}
|
||||
helpers.StartStop(t, ch)
|
||||
select {
|
||||
case <-ch.migrationsDone:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("Migrations not done")
|
||||
}
|
||||
|
||||
// Check with the ClickHouse client we have our tables
|
||||
rows, err := chComponent.Query(context.Background(), "SHOW TABLES")
|
||||
if err != nil {
|
||||
t.Fatalf("Query() error:\n%+v", err)
|
||||
}
|
||||
got := []string{}
|
||||
for rows.Next() {
|
||||
var table string
|
||||
if err := rows.Scan(&table); err != nil {
|
||||
t.Fatalf("Scan() error:\n%+v", err)
|
||||
var lastRun map[string]string
|
||||
files, err := ioutil.ReadDir("testdata/states")
|
||||
if err != nil {
|
||||
t.Fatalf("ReadDir(%q) error:\n%+v", "testdata/states", err)
|
||||
}
|
||||
for _, f := range files {
|
||||
t.Run(f.Name(), func(t *testing.T) {
|
||||
loadAllTables(t, chComponent, path.Join("testdata/states", f.Name()))
|
||||
r := reporter.NewMock(t)
|
||||
configuration := DefaultConfiguration()
|
||||
configuration.OrchestratorURL = "http://something"
|
||||
ch, err := New(r, configuration, Dependencies{
|
||||
Daemon: daemon.NewMock(t),
|
||||
HTTP: http.NewMock(t, r),
|
||||
ClickHouse: chComponent,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New() error:\n%+v", err)
|
||||
}
|
||||
if !strings.HasPrefix(table, ".") {
|
||||
helpers.StartStop(t, ch)
|
||||
select {
|
||||
case <-ch.migrationsDone:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("Migrations not done")
|
||||
}
|
||||
|
||||
// Check with the ClickHouse client we have our tables
|
||||
rows, err := chComponent.Query(context.Background(), `
|
||||
SELECT table
|
||||
FROM system.tables
|
||||
WHERE database=currentDatabase() AND table NOT LIKE '.%'`)
|
||||
if err != nil {
|
||||
t.Fatalf("Query() error:\n%+v", err)
|
||||
}
|
||||
got := []string{}
|
||||
for rows.Next() {
|
||||
var table string
|
||||
if err := rows.Scan(&table); err != nil {
|
||||
t.Fatalf("Scan() error:\n%+v", err)
|
||||
}
|
||||
got = append(got, table)
|
||||
}
|
||||
}
|
||||
expected := []string{
|
||||
"asns",
|
||||
"exporters",
|
||||
"flows",
|
||||
"flows_1_raw",
|
||||
"flows_1_raw_consumer",
|
||||
"flows_1h0m0s",
|
||||
"flows_1h0m0s_consumer",
|
||||
"flows_1m0s",
|
||||
"flows_1m0s_consumer",
|
||||
"flows_5m0s",
|
||||
"flows_5m0s_consumer",
|
||||
"protocols",
|
||||
}
|
||||
if diff := helpers.Diff(got, expected); diff != "" {
|
||||
t.Fatalf("SHOW TABLES (-got, +want):\n%s", diff)
|
||||
}
|
||||
}()
|
||||
expected := []string{
|
||||
"asns",
|
||||
"exporters",
|
||||
"flows",
|
||||
"flows_1_raw",
|
||||
"flows_1_raw_consumer",
|
||||
"flows_1h0m0s",
|
||||
"flows_1h0m0s_consumer",
|
||||
"flows_1m0s",
|
||||
"flows_1m0s_consumer",
|
||||
"flows_5m0s",
|
||||
"flows_5m0s_consumer",
|
||||
"protocols",
|
||||
}
|
||||
if diff := helpers.Diff(got, expected); diff != "" {
|
||||
t.Fatalf("SHOW TABLES (-got, +want):\n%s", diff)
|
||||
}
|
||||
|
||||
func() {
|
||||
// Second time
|
||||
currentRun := dumpAllTables(t, chComponent)
|
||||
if lastRun != nil {
|
||||
if diff := helpers.Diff(lastRun, currentRun); diff != "" {
|
||||
t.Fatalf("Final state is different (-last, +current):\n%s", diff)
|
||||
}
|
||||
}
|
||||
lastRun = currentRun
|
||||
})
|
||||
if t.Failed() {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !t.Failed() {
|
||||
// One more time
|
||||
r := reporter.NewMock(t)
|
||||
configuration := DefaultConfiguration()
|
||||
configuration.OrchestratorURL = "http://something"
|
||||
@@ -118,11 +223,11 @@ func TestMigration(t *testing.T) {
|
||||
helpers.StartStop(t, ch)
|
||||
select {
|
||||
case <-ch.migrationsDone:
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("Migrations not done")
|
||||
}
|
||||
|
||||
// No migration should have been applied the second time
|
||||
// No migration should have been applied the last time
|
||||
gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_migrations_",
|
||||
"applied_steps")
|
||||
expectedMetrics := map[string]string{
|
||||
@@ -131,6 +236,5 @@ func TestMigration(t *testing.T) {
|
||||
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
|
||||
t.Fatalf("Metrics (-got, +want):\n%s", diff)
|
||||
}
|
||||
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
0
orchestrator/clickhouse/testdata/states/000.csv
vendored
Normal file
0
orchestrator/clickhouse/testdata/states/000.csv
vendored
Normal file
|
|
12
orchestrator/clickhouse/testdata/states/001.csv
vendored
Normal file
12
orchestrator/clickhouse/testdata/states/001.csv
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
"asns","CREATE DICTIONARY default.asns (`asn` UInt32 INJECTIVE, `name` String) PRIMARY KEY asn SOURCE(HTTP(URL 'http://something/api/v0/orchestrator/clickhouse/asns.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(HASHED())"
|
||||
"exporters","CREATE MATERIALIZED VIEW default.exporters (`TimeReceived` DateTime, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `IfName` String, `IfDescription` String, `IfSpeed` UInt32, `IfConnectivity` String, `IfProvider` String, `IfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2)) ENGINE = ReplacingMergeTree(TimeReceived) ORDER BY (ExporterAddress, IfName) SETTINGS index_granularity = 8192 AS SELECT DISTINCT TimeReceived, ExporterAddress, ExporterName, ExporterGroup, [InIfName, OutIfName][num] AS IfName, [InIfDescription, OutIfDescription][num] AS IfDescription, [InIfSpeed, OutIfSpeed][num] AS IfSpeed, [InIfConnectivity, OutIfConnectivity][num] AS IfConnectivity, [InIfProvider, OutIfProvider][num] AS IfProvider, [InIfBoundary, OutIfBoundary][num] AS IfBoundary FROM default.flows ARRAY JOIN arrayEnumerate([1, 2]) AS num"
|
||||
"flows","CREATE TABLE default.flows (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt32, `DstPort` UInt32, `Bytes` UInt64, `Packets` UInt64, `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = MergeTree PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalHour(6))) ORDER BY (TimeReceived, ExporterAddress, InIfName, OutIfName) TTL TimeReceived + toIntervalSecond(1296000) SETTINGS index_granularity = 8192"
|
||||
"flows_1_raw","CREATE TABLE default.flows_1_raw (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt32, `DstPort` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) ENGINE = Kafka SETTINGS kafka_broker_list = '', kafka_topic_list = '-v1', kafka_group_name = 'clickhouse', kafka_format = 'Protobuf', kafka_schema = 'flow-1.proto:FlowMessage', kafka_num_consumers = 1, kafka_thread_per_consumer = 1"
|
||||
"flows_1_raw_consumer","CREATE MATERIALIZED VIEW default.flows_1_raw_consumer TO default.flows (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt32, `DstPort` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT * FROM default.flows_1_raw"
|
||||
"flows_1h0m0s","CREATE TABLE default.flows_1h0m0s (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = SummingMergeTree((Bytes, Packets)) PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalHour(6))) ORDER BY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate) TTL TimeReceived + toIntervalSecond(31104000) SETTINGS index_granularity = 8192"
|
||||
"flows_1h0m0s_consumer","CREATE MATERIALIZED VIEW default.flows_1h0m0s_consumer TO default.flows_1h0m0s (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT * EXCEPT (SrcAddr, DstAddr, SrcPort, DstPort) REPLACE toStartOfInterval(TimeReceived, toIntervalSecond(3600)) AS TimeReceived FROM default.flows"
|
||||
"flows_1m0s","CREATE TABLE default.flows_1m0s (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = SummingMergeTree((Bytes, Packets)) PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalHour(6))) ORDER BY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate) TTL TimeReceived + toIntervalSecond(604800) SETTINGS index_granularity = 8192"
|
||||
"flows_1m0s_consumer","CREATE MATERIALIZED VIEW default.flows_1m0s_consumer TO default.flows_1m0s (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT * EXCEPT (SrcAddr, DstAddr, SrcPort, DstPort) REPLACE toStartOfInterval(TimeReceived, toIntervalSecond(60)) AS TimeReceived FROM default.flows"
|
||||
"flows_5m0s","CREATE TABLE default.flows_5m0s (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = SummingMergeTree((Bytes, Packets)) PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalHour(6))) ORDER BY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate) TTL TimeReceived + toIntervalSecond(7776000) SETTINGS index_granularity = 8192"
|
||||
"flows_5m0s_consumer","CREATE MATERIALIZED VIEW default.flows_5m0s_consumer TO default.flows_5m0s (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT * EXCEPT (SrcAddr, DstAddr, SrcPort, DstPort) REPLACE toStartOfInterval(TimeReceived, toIntervalSecond(300)) AS TimeReceived FROM default.flows"
|
||||
"protocols","CREATE DICTIONARY default.protocols (`proto` UInt8 INJECTIVE, `name` String, `description` String) PRIMARY KEY proto SOURCE(HTTP(URL 'http://something/api/v0/orchestrator/clickhouse/protocols.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(HASHED())"
|
||||
|
Reference in New Issue
Block a user