orchestrator/clickhouse: optimize dictionary lookup for networks

This commit is contained in:
Vincent Bernat
2024-03-13 20:32:20 +01:00
parent 345aa6c09b
commit dcdbf208d1
3 changed files with 81 additions and 25 deletions

View File

@@ -246,111 +246,111 @@ END`,
{
Key: ColumnSrcAS,
ClickHouseType: "UInt32",
ClickHouseGenerateFrom: fmt.Sprintf("if(SrcAS = 0, dictGetOrDefault('%s', 'asn', SrcAddr, 0), SrcAS)", DictionaryNetworks),
ClickHouseGenerateFrom: "if(SrcAS = 0, c_SrcNetworks[asn], SrcAS)",
ClickHouseSelfGenerated: true,
},
{
Key: ColumnDstAS,
ClickHouseType: "UInt32",
ClickHouseGenerateFrom: fmt.Sprintf("if(DstAS = 0, dictGetOrDefault('%s', 'asn', DstAddr, 0), DstAS)", DictionaryNetworks),
ClickHouseGenerateFrom: "if(DstAS = 0, c_DstNetworks[asn], DstAS)",
ClickHouseSelfGenerated: true,
},
{
Key: ColumnSrcNetName,
ParserType: "string",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'name', SrcAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_SrcNetworks[name]",
},
{
Key: ColumnDstNetName,
ParserType: "string",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'name', DstAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_DstNetworks[name]",
},
{
Key: ColumnSrcNetRole,
ParserType: "string",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'role', SrcAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_SrcNetworks[role]",
},
{
Key: ColumnDstNetRole,
ParserType: "string",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'role', DstAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_DstNetworks[role]",
},
{
Key: ColumnSrcNetSite,
ParserType: "string",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'site', SrcAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_SrcNetworks[site]",
},
{
Key: ColumnDstNetSite,
ParserType: "string",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'site', DstAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_DstNetworks[site]",
},
{
Key: ColumnSrcNetRegion,
ParserType: "string",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'region', SrcAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_SrcNetworks[region]",
},
{
Key: ColumnDstNetRegion,
ParserType: "string",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'region', DstAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_DstNetworks[region]",
},
{
Key: ColumnSrcNetTenant,
ParserType: "string",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'tenant', SrcAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_SrcNetworks[tenant]",
},
{
Key: ColumnDstNetTenant,
ParserType: "string",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'tenant', DstAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_DstNetworks[tenant]",
},
{Key: ColumnSrcVlan, ParserType: "uint", ClickHouseType: "UInt16", Disabled: true, Group: ColumnGroupL2},
{
Key: ColumnSrcCountry,
ParserType: "string",
ClickHouseType: "FixedString(2)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'country', SrcAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_SrcNetworks[country]",
},
{
Key: ColumnDstCountry,
ParserType: "string",
ClickHouseType: "FixedString(2)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'country', DstAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_DstNetworks[country]",
},
{
Key: ColumnSrcGeoCity,
ParserType: "string",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'city', SrcAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_SrcNetworks[city]",
},
{
Key: ColumnDstGeoCity,
ParserType: "string",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'city', DstAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_DstNetworks[city]",
},
{
Key: ColumnSrcGeoState,
ParserType: "string",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'state', SrcAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_SrcNetworks[state]",
},
{
Key: ColumnDstGeoState,
ParserType: "string",
ClickHouseType: "LowCardinality(String)",
ClickHouseGenerateFrom: fmt.Sprintf("dictGetOrDefault('%s', 'state', DstAddr, '')", DictionaryNetworks),
ClickHouseGenerateFrom: "c_DstNetworks[state]",
},
{
Key: ColumnDstASPath,

View File

@@ -70,6 +70,9 @@ func (c *Component) tableAlreadyExists(ctx context.Context, table, column, targe
existing = strings.ReplaceAll(existing,
fmt.Sprintf(`dictGetOrDefault('%s.`, c.config.Database),
"dictGetOrDefault('")
existing = strings.ReplaceAll(existing,
fmt.Sprintf(`dictGet('%s.`, c.config.Database),
"dictGet('")
// Compare!
if existing == target {
@@ -245,6 +248,8 @@ func (c *Component) createRawFlowsTable(ctx context.Context) error {
return nil
}
var dictionaryNetworksLookupRegex = regexp.MustCompile(`\bc_(Src|Dst)Networks\[([[:lower:]]+)\]\B`)
func (c *Component) createRawFlowsConsumerView(ctx context.Context) error {
tableName := fmt.Sprintf("flows_%s_raw", c.d.Schema.ProtobufMessageHash())
viewName := fmt.Sprintf("%s_consumer", tableName)
@@ -258,17 +263,68 @@ func (c *Component) createRawFlowsConsumerView(ctx context.Context) error {
"Database": c.config.Database,
"Table": tableName,
}
if column, ok := c.d.Schema.LookupColumnByKey(schema.ColumnDstASPath); ok && !column.Disabled {
args["With"] = "WITH arrayCompact(DstASPath) AS c_DstASPath "
} else {
args["With"] = ""
}
selectQuery, err := stemplate(
`{{ .With }}SELECT {{ .Columns }} FROM {{ .Database }}.{{ .Table }} WHERE length(_error) = 0`,
`SELECT {{ .Columns }} FROM {{ .Database }}.{{ .Table }} WHERE length(_error) = 0`,
args)
if err != nil {
return fmt.Errorf("cannot build select statement for raw flows consumer view: %w", err)
}
with := []string{}
// c_DstAsPath
if column, ok := c.d.Schema.LookupColumnByKey(schema.ColumnDstASPath); ok && !column.Disabled {
with = append(with, "arrayCompact(DstASPath) AS c_DstASPath")
}
// c_SrcNetworks and c_DstNetworks
lookups := dictionaryNetworksLookupRegex.FindAllStringSubmatch(selectQuery, -1)
if len(lookups) > 0 {
// Build the with clause
srcColumns := []string{}
dstColumns := []string{}
for _, lookup := range lookups {
if lookup[1] == "Src" {
srcColumns = append(srcColumns, lookup[2])
} else if lookup[1] == "Dst" {
dstColumns = append(dstColumns, lookup[2])
}
}
for _, columns := range []struct {
direction string
names []string
}{
{direction: "Src", names: srcColumns},
{direction: "Dst", names: dstColumns},
} {
if len(columns.names) > 0 {
names := []string{}
for _, column := range columns.names {
names = append(names, fmt.Sprintf("'%s'", column))
}
with = append(with,
fmt.Sprintf("dictGet('%s', (%s), %sAddr) AS c_%sNetworks",
schema.DictionaryNetworks,
strings.Join(names, ", "),
columns.direction,
columns.direction,
))
}
}
// Replace in query to use the index
srcIdx := 0
dstIdx := 0
selectQuery = dictionaryNetworksLookupRegex.ReplaceAllStringFunc(selectQuery, func(match string) string {
if strings.Contains(match, "Src") {
srcIdx++
return fmt.Sprintf("c_SrcNetworks.%d", srcIdx)
} else if strings.Contains(match, "Dst") {
dstIdx++
return fmt.Sprintf("c_DstNetworks.%d", dstIdx)
}
return match
})
}
if len(with) > 0 {
selectQuery = fmt.Sprintf("WITH %s %s", strings.Join(with, ", "), selectQuery)
}
// Check the existing one
if ok, err := c.tableAlreadyExists(ctx, viewName, "as_select", selectQuery); err != nil {

View File

@@ -12,4 +12,4 @@
"flows_1h0m0s_consumer","CREATE MATERIALIZED VIEW default.flows_1h0m0s_consumer TO default.flows_1h0m0s (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `SrcGeoCity` LowCardinality(String), `DstGeoCity` LowCardinality(String), `SrcGeoState` LowCardinality(String), `DstGeoState` LowCardinality(String), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT toStartOfInterval(TimeReceived, toIntervalSecond(3600)) AS TimeReceived, SamplingRate, ExporterAddress, ExporterName, ExporterGroup, ExporterRole, ExporterSite, ExporterRegion, ExporterTenant, SrcAS, DstAS, SrcNetName, DstNetName, SrcNetRole, DstNetRole, SrcNetSite, DstNetSite, SrcNetRegion, DstNetRegion, SrcNetTenant, DstNetTenant, SrcCountry, DstCountry, SrcGeoCity, DstGeoCity, SrcGeoState, DstGeoState, Dst1stAS, Dst2ndAS, Dst3rdAS, InIfName, OutIfName, InIfDescription, OutIfDescription, InIfSpeed, OutIfSpeed, InIfConnectivity, OutIfConnectivity, InIfProvider, OutIfProvider, InIfBoundary, OutIfBoundary, EType, Proto, Bytes, Packets, ForwardingStatus FROM default.flows"
"flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw","CREATE TABLE default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAddr` IPv6 CODEC(ZSTD(1)), `DstAddr` IPv6 CODEC(ZSTD(1)), `SrcNetMask` UInt8, `DstNetMask` UInt8, `SrcAS` UInt32, `DstAS` UInt32, `DstASPath` Array(UInt32), `DstCommunities` Array(UInt32), `DstLargeCommunitiesASN` Array(UInt32), `DstLargeCommunitiesLocalData1` Array(UInt32), `DstLargeCommunitiesLocalData2` Array(UInt32), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt16, `DstPort` UInt16, `Bytes` UInt64 CODEC(T64, LZ4), `Packets` UInt64 CODEC(T64, LZ4), `ForwardingStatus` UInt32) ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'flows-LAABIGYMRYZPTGOYIIFZNYDEQM', kafka_group_name = 'clickhouse', kafka_format = 'Protobuf', kafka_schema = 'flow-LAABIGYMRYZPTGOYIIFZNYDEQM.proto:FlowMessagevLAABIGYMRYZPTGOYIIFZNYDEQM', kafka_num_consumers = 1, kafka_thread_per_consumer = 1, kafka_handle_error_mode = 'stream'"
"flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_errors","CREATE MATERIALIZED VIEW default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_errors (`timestamp` DateTime, `topic` LowCardinality(String), `partition` UInt64, `offset` UInt64, `raw` String, `error` String) ENGINE = MergeTree PARTITION BY toYYYYMMDDhhmmss(toStartOfHour(timestamp)) ORDER BY (timestamp, topic, partition, offset) TTL timestamp + toIntervalDay(1) SETTINGS index_granularity = 8192 AS SELECT now() AS timestamp, _topic AS topic, _partition AS partition, _offset AS offset, _raw_message AS raw, _error AS error FROM default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw WHERE length(_error) > 0"
"flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_consumer","CREATE MATERIALIZED VIEW default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_consumer TO default.flows (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcNetMask` UInt8, `DstNetMask` UInt8, `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` String, `DstNetName` String, `SrcNetRole` String, `DstNetRole` String, `SrcNetSite` String, `DstNetSite` String, `SrcNetRegion` String, `DstNetRegion` String, `SrcNetTenant` String, `DstNetTenant` String, `SrcCountry` String, `DstCountry` String, `SrcGeoCity` String, `DstGeoCity` String, `SrcGeoState` String, `DstGeoState` String, `DstASPath` Array(UInt32), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `DstCommunities` Array(UInt32), `DstLargeCommunities` Array(UInt128), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt16, `DstPort` UInt16, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS WITH arrayCompact(DstASPath) AS c_DstASPath SELECT TimeReceived, SamplingRate, ExporterAddress, ExporterName, ExporterGroup, ExporterRole, ExporterSite, ExporterRegion, ExporterTenant, SrcAddr, DstAddr, SrcNetMask, DstNetMask, if(SrcAS = 0, dictGetOrDefault('default.networks', 'asn', SrcAddr, 0), SrcAS) AS SrcAS, if(DstAS = 0, dictGetOrDefault('default.networks', 'asn', DstAddr, 0), DstAS) AS DstAS, dictGetOrDefault('default.networks', 'name', SrcAddr, '') AS SrcNetName, dictGetOrDefault('default.networks', 'name', DstAddr, '') AS DstNetName, dictGetOrDefault('default.networks', 'role', SrcAddr, '') AS SrcNetRole, dictGetOrDefault('default.networks', 'role', DstAddr, '') AS DstNetRole, dictGetOrDefault('default.networks', 'site', SrcAddr, '') AS SrcNetSite, dictGetOrDefault('default.networks', 'site', DstAddr, '') AS DstNetSite, dictGetOrDefault('default.networks', 'region', SrcAddr, '') AS SrcNetRegion, dictGetOrDefault('default.networks', 'region', DstAddr, '') AS DstNetRegion, dictGetOrDefault('default.networks', 'tenant', SrcAddr, '') AS SrcNetTenant, dictGetOrDefault('default.networks', 'tenant', DstAddr, '') AS DstNetTenant, dictGetOrDefault('default.networks', 'country', SrcAddr, '') AS SrcCountry, dictGetOrDefault('default.networks', 'country', DstAddr, '') AS DstCountry, dictGetOrDefault('default.networks', 'city', SrcAddr, '') AS SrcGeoCity, dictGetOrDefault('default.networks', 'city', DstAddr, '') AS DstGeoCity, dictGetOrDefault('default.networks', 'state', SrcAddr, '') AS SrcGeoState, dictGetOrDefault('default.networks', 'state', DstAddr, '') AS DstGeoState, DstASPath, c_DstASPath[1] AS Dst1stAS, c_DstASPath[2] AS Dst2ndAS, c_DstASPath[3] AS Dst3rdAS, DstCommunities, arrayMap((asn, l1, l2) -> ((bitShiftLeft(CAST(asn, 'UInt128'), 64) + bitShiftLeft(CAST(l1, 'UInt128'), 32)) + CAST(l2, 'UInt128')), DstLargeCommunitiesASN, DstLargeCommunitiesLocalData1, DstLargeCommunitiesLocalData2) AS DstLargeCommunities, InIfName, OutIfName, InIfDescription, OutIfDescription, InIfSpeed, OutIfSpeed, InIfConnectivity, OutIfConnectivity, InIfProvider, OutIfProvider, InIfBoundary, OutIfBoundary, EType, Proto, SrcPort, DstPort, Bytes, Packets, ForwardingStatus FROM default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw WHERE length(_error) = 0"
"flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_consumer","CREATE MATERIALIZED VIEW default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_consumer TO default.flows (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcNetMask` UInt8, `DstNetMask` UInt8, `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` String, `DstNetName` String, `SrcNetRole` String, `DstNetRole` String, `SrcNetSite` String, `DstNetSite` String, `SrcNetRegion` String, `DstNetRegion` String, `SrcNetTenant` String, `DstNetTenant` String, `SrcCountry` String, `DstCountry` String, `SrcGeoCity` String, `DstGeoCity` String, `SrcGeoState` String, `DstGeoState` String, `DstASPath` Array(UInt32), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `DstCommunities` Array(UInt32), `DstLargeCommunities` Array(UInt128), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt16, `DstPort` UInt16, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS WITH arrayCompact(DstASPath) AS c_DstASPath, dictGet('default.networks', ('asn', 'name', 'role', 'site', 'region', 'tenant', 'country', 'city', 'state'), SrcAddr) AS c_SrcNetworks, dictGet('default.networks', ('asn', 'name', 'role', 'site', 'region', 'tenant', 'country', 'city', 'state'), DstAddr) AS c_DstNetworks SELECT TimeReceived, SamplingRate, ExporterAddress, ExporterName, ExporterGroup, ExporterRole, ExporterSite, ExporterRegion, ExporterTenant, SrcAddr, DstAddr, SrcNetMask, DstNetMask, if(SrcAS = 0, c_SrcNetworks.1, SrcAS) AS SrcAS, if(DstAS = 0, c_DstNetworks.1, DstAS) AS DstAS, c_SrcNetworks.2 AS SrcNetName, c_DstNetworks.2 AS DstNetName, c_SrcNetworks.3 AS SrcNetRole, c_DstNetworks.3 AS DstNetRole, c_SrcNetworks.4 AS SrcNetSite, c_DstNetworks.4 AS DstNetSite, c_SrcNetworks.5 AS SrcNetRegion, c_DstNetworks.5 AS DstNetRegion, c_SrcNetworks.6 AS SrcNetTenant, c_DstNetworks.6 AS DstNetTenant, c_SrcNetworks.7 AS SrcCountry, c_DstNetworks.7 AS DstCountry, c_SrcNetworks.8 AS SrcGeoCity, c_DstNetworks.8 AS DstGeoCity, c_SrcNetworks.9 AS SrcGeoState, c_DstNetworks.9 AS DstGeoState, DstASPath, c_DstASPath[1] AS Dst1stAS, c_DstASPath[2] AS Dst2ndAS, c_DstASPath[3] AS Dst3rdAS, DstCommunities, arrayMap((asn, l1, l2) -> ((bitShiftLeft(CAST(asn, 'UInt128'), 64) + bitShiftLeft(CAST(l1, 'UInt128'), 32)) + CAST(l2, 'UInt128')), DstLargeCommunitiesASN, DstLargeCommunitiesLocalData1, DstLargeCommunitiesLocalData2) AS DstLargeCommunities, InIfName, OutIfName, InIfDescription, OutIfDescription, InIfSpeed, OutIfSpeed, InIfConnectivity, OutIfConnectivity, InIfProvider, OutIfProvider, InIfBoundary, OutIfBoundary, EType, Proto, SrcPort, DstPort, Bytes, Packets, ForwardingStatus FROM default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw WHERE length(_error) = 0"
1 asns CREATE DICTIONARY default.asns (`asn` UInt32 INJECTIVE, `name` String) PRIMARY KEY asn SOURCE(HTTP(URL 'http://127.0.0.1:0/api/v0/orchestrator/clickhouse/asns.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(HASHED()) SETTINGS(format_csv_allow_single_quotes = 0)
12 flows_1h0m0s_consumer CREATE MATERIALIZED VIEW default.flows_1h0m0s_consumer TO default.flows_1h0m0s (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `SrcGeoCity` LowCardinality(String), `DstGeoCity` LowCardinality(String), `SrcGeoState` LowCardinality(String), `DstGeoState` LowCardinality(String), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT toStartOfInterval(TimeReceived, toIntervalSecond(3600)) AS TimeReceived, SamplingRate, ExporterAddress, ExporterName, ExporterGroup, ExporterRole, ExporterSite, ExporterRegion, ExporterTenant, SrcAS, DstAS, SrcNetName, DstNetName, SrcNetRole, DstNetRole, SrcNetSite, DstNetSite, SrcNetRegion, DstNetRegion, SrcNetTenant, DstNetTenant, SrcCountry, DstCountry, SrcGeoCity, DstGeoCity, SrcGeoState, DstGeoState, Dst1stAS, Dst2ndAS, Dst3rdAS, InIfName, OutIfName, InIfDescription, OutIfDescription, InIfSpeed, OutIfSpeed, InIfConnectivity, OutIfConnectivity, InIfProvider, OutIfProvider, InIfBoundary, OutIfBoundary, EType, Proto, Bytes, Packets, ForwardingStatus FROM default.flows
13 flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw CREATE TABLE default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAddr` IPv6 CODEC(ZSTD(1)), `DstAddr` IPv6 CODEC(ZSTD(1)), `SrcNetMask` UInt8, `DstNetMask` UInt8, `SrcAS` UInt32, `DstAS` UInt32, `DstASPath` Array(UInt32), `DstCommunities` Array(UInt32), `DstLargeCommunitiesASN` Array(UInt32), `DstLargeCommunitiesLocalData1` Array(UInt32), `DstLargeCommunitiesLocalData2` Array(UInt32), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt16, `DstPort` UInt16, `Bytes` UInt64 CODEC(T64, LZ4), `Packets` UInt64 CODEC(T64, LZ4), `ForwardingStatus` UInt32) ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'flows-LAABIGYMRYZPTGOYIIFZNYDEQM', kafka_group_name = 'clickhouse', kafka_format = 'Protobuf', kafka_schema = 'flow-LAABIGYMRYZPTGOYIIFZNYDEQM.proto:FlowMessagevLAABIGYMRYZPTGOYIIFZNYDEQM', kafka_num_consumers = 1, kafka_thread_per_consumer = 1, kafka_handle_error_mode = 'stream'
14 flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_errors CREATE MATERIALIZED VIEW default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_errors (`timestamp` DateTime, `topic` LowCardinality(String), `partition` UInt64, `offset` UInt64, `raw` String, `error` String) ENGINE = MergeTree PARTITION BY toYYYYMMDDhhmmss(toStartOfHour(timestamp)) ORDER BY (timestamp, topic, partition, offset) TTL timestamp + toIntervalDay(1) SETTINGS index_granularity = 8192 AS SELECT now() AS timestamp, _topic AS topic, _partition AS partition, _offset AS offset, _raw_message AS raw, _error AS error FROM default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw WHERE length(_error) > 0
15 flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_consumer CREATE MATERIALIZED VIEW default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_consumer TO default.flows (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcNetMask` UInt8, `DstNetMask` UInt8, `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` String, `DstNetName` String, `SrcNetRole` String, `DstNetRole` String, `SrcNetSite` String, `DstNetSite` String, `SrcNetRegion` String, `DstNetRegion` String, `SrcNetTenant` String, `DstNetTenant` String, `SrcCountry` String, `DstCountry` String, `SrcGeoCity` String, `DstGeoCity` String, `SrcGeoState` String, `DstGeoState` String, `DstASPath` Array(UInt32), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `DstCommunities` Array(UInt32), `DstLargeCommunities` Array(UInt128), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt16, `DstPort` UInt16, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS WITH arrayCompact(DstASPath) AS c_DstASPath SELECT TimeReceived, SamplingRate, ExporterAddress, ExporterName, ExporterGroup, ExporterRole, ExporterSite, ExporterRegion, ExporterTenant, SrcAddr, DstAddr, SrcNetMask, DstNetMask, if(SrcAS = 0, dictGetOrDefault('default.networks', 'asn', SrcAddr, 0), SrcAS) AS SrcAS, if(DstAS = 0, dictGetOrDefault('default.networks', 'asn', DstAddr, 0), DstAS) AS DstAS, dictGetOrDefault('default.networks', 'name', SrcAddr, '') AS SrcNetName, dictGetOrDefault('default.networks', 'name', DstAddr, '') AS DstNetName, dictGetOrDefault('default.networks', 'role', SrcAddr, '') AS SrcNetRole, dictGetOrDefault('default.networks', 'role', DstAddr, '') AS DstNetRole, dictGetOrDefault('default.networks', 'site', SrcAddr, '') AS SrcNetSite, dictGetOrDefault('default.networks', 'site', DstAddr, '') AS DstNetSite, dictGetOrDefault('default.networks', 'region', SrcAddr, '') AS SrcNetRegion, dictGetOrDefault('default.networks', 'region', DstAddr, '') AS DstNetRegion, dictGetOrDefault('default.networks', 'tenant', SrcAddr, '') AS SrcNetTenant, dictGetOrDefault('default.networks', 'tenant', DstAddr, '') AS DstNetTenant, dictGetOrDefault('default.networks', 'country', SrcAddr, '') AS SrcCountry, dictGetOrDefault('default.networks', 'country', DstAddr, '') AS DstCountry, dictGetOrDefault('default.networks', 'city', SrcAddr, '') AS SrcGeoCity, dictGetOrDefault('default.networks', 'city', DstAddr, '') AS DstGeoCity, dictGetOrDefault('default.networks', 'state', SrcAddr, '') AS SrcGeoState, dictGetOrDefault('default.networks', 'state', DstAddr, '') AS DstGeoState, DstASPath, c_DstASPath[1] AS Dst1stAS, c_DstASPath[2] AS Dst2ndAS, c_DstASPath[3] AS Dst3rdAS, DstCommunities, arrayMap((asn, l1, l2) -> ((bitShiftLeft(CAST(asn, 'UInt128'), 64) + bitShiftLeft(CAST(l1, 'UInt128'), 32)) + CAST(l2, 'UInt128')), DstLargeCommunitiesASN, DstLargeCommunitiesLocalData1, DstLargeCommunitiesLocalData2) AS DstLargeCommunities, InIfName, OutIfName, InIfDescription, OutIfDescription, InIfSpeed, OutIfSpeed, InIfConnectivity, OutIfConnectivity, InIfProvider, OutIfProvider, InIfBoundary, OutIfBoundary, EType, Proto, SrcPort, DstPort, Bytes, Packets, ForwardingStatus FROM default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw WHERE length(_error) = 0 CREATE MATERIALIZED VIEW default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw_consumer TO default.flows (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcNetMask` UInt8, `DstNetMask` UInt8, `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` String, `DstNetName` String, `SrcNetRole` String, `DstNetRole` String, `SrcNetSite` String, `DstNetSite` String, `SrcNetRegion` String, `DstNetRegion` String, `SrcNetTenant` String, `DstNetTenant` String, `SrcCountry` String, `DstCountry` String, `SrcGeoCity` String, `DstGeoCity` String, `SrcGeoState` String, `DstGeoState` String, `DstASPath` Array(UInt32), `Dst1stAS` UInt32, `Dst2ndAS` UInt32, `Dst3rdAS` UInt32, `DstCommunities` Array(UInt32), `DstLargeCommunities` Array(UInt128), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` LowCardinality(String), `OutIfDescription` LowCardinality(String), `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt16, `DstPort` UInt16, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS WITH arrayCompact(DstASPath) AS c_DstASPath, dictGet('default.networks', ('asn', 'name', 'role', 'site', 'region', 'tenant', 'country', 'city', 'state'), SrcAddr) AS c_SrcNetworks, dictGet('default.networks', ('asn', 'name', 'role', 'site', 'region', 'tenant', 'country', 'city', 'state'), DstAddr) AS c_DstNetworks SELECT TimeReceived, SamplingRate, ExporterAddress, ExporterName, ExporterGroup, ExporterRole, ExporterSite, ExporterRegion, ExporterTenant, SrcAddr, DstAddr, SrcNetMask, DstNetMask, if(SrcAS = 0, c_SrcNetworks.1, SrcAS) AS SrcAS, if(DstAS = 0, c_DstNetworks.1, DstAS) AS DstAS, c_SrcNetworks.2 AS SrcNetName, c_DstNetworks.2 AS DstNetName, c_SrcNetworks.3 AS SrcNetRole, c_DstNetworks.3 AS DstNetRole, c_SrcNetworks.4 AS SrcNetSite, c_DstNetworks.4 AS DstNetSite, c_SrcNetworks.5 AS SrcNetRegion, c_DstNetworks.5 AS DstNetRegion, c_SrcNetworks.6 AS SrcNetTenant, c_DstNetworks.6 AS DstNetTenant, c_SrcNetworks.7 AS SrcCountry, c_DstNetworks.7 AS DstCountry, c_SrcNetworks.8 AS SrcGeoCity, c_DstNetworks.8 AS DstGeoCity, c_SrcNetworks.9 AS SrcGeoState, c_DstNetworks.9 AS DstGeoState, DstASPath, c_DstASPath[1] AS Dst1stAS, c_DstASPath[2] AS Dst2ndAS, c_DstASPath[3] AS Dst3rdAS, DstCommunities, arrayMap((asn, l1, l2) -> ((bitShiftLeft(CAST(asn, 'UInt128'), 64) + bitShiftLeft(CAST(l1, 'UInt128'), 32)) + CAST(l2, 'UInt128')), DstLargeCommunitiesASN, DstLargeCommunitiesLocalData1, DstLargeCommunitiesLocalData2) AS DstLargeCommunities, InIfName, OutIfName, InIfDescription, OutIfDescription, InIfSpeed, OutIfSpeed, InIfConnectivity, OutIfConnectivity, InIfProvider, OutIfProvider, InIfBoundary, OutIfBoundary, EType, Proto, SrcPort, DstPort, Bytes, Packets, ForwardingStatus FROM default.flows_LAABIGYMRYZPTGOYIIFZNYDEQM_raw WHERE length(_error) = 0