mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-12 06:24:10 +01:00
orchestrator: add more attributes to classify networks
Like for exporters, we add role, site, region, and tenant. This time, this is done in ClickHouse.
This commit is contained in:
@@ -22,10 +22,18 @@ clickhouse:
|
||||
asns:
|
||||
64501: ACME Corporation
|
||||
networks:
|
||||
- 192.0.2.0/24: customers
|
||||
- 203.0.113.0/24: servers
|
||||
- 2a01:db8:cafe:1::/64: customers
|
||||
- 2a01:db8:cafe:2::/64: servers
|
||||
192.0.2.0/24:
|
||||
name: ipv4-customers
|
||||
role: customers
|
||||
203.0.113.0/24:
|
||||
name: ipv4-servers
|
||||
role: servers
|
||||
2a01:db8:cafe:1::/64:
|
||||
name: ipv6-customers
|
||||
role: customers
|
||||
2a01:db8:cafe:2::/64:
|
||||
name: ipv6-servers
|
||||
role: servers
|
||||
inlet:
|
||||
kafka:
|
||||
compression-codec: zstd
|
||||
|
||||
@@ -93,7 +93,7 @@ func (c ConfigRelatedOptions) Parse(out io.Writer, component string, config inte
|
||||
DecodeHook: mapstructure.ComposeDecodeHookFunc(
|
||||
defaultHook,
|
||||
flow.ConfigurationUnmarshalerHook(),
|
||||
clickhouse.NetworkNamesUnmarshalerHook(),
|
||||
clickhouse.NetworkMapUnmarshalerHook(),
|
||||
mapstructure.TextUnmarshallerHookFunc(),
|
||||
mapstructure.StringToTimeDurationHookFunc(),
|
||||
mapstructure.StringToSliceHookFunc(","),
|
||||
|
||||
@@ -69,3 +69,9 @@ It is also possible to get Akvorado as a
|
||||
```console
|
||||
# docker pull ghcr.io/vincentbernat/akvorado:latest
|
||||
```
|
||||
|
||||
## Upgrade
|
||||
|
||||
Be sure to read the [changelog](99-changelog.md) before attempting an
|
||||
upgrade. Upgrade the orchestrator first. This will update the
|
||||
ClickHouse database. Then, upgrade all inlets. Then the console.
|
||||
|
||||
@@ -331,7 +331,9 @@ provided:
|
||||
- `resolutions` defines the various resolutions to keep data
|
||||
- `max-partitions` defines the number of partitions to use when
|
||||
creating consolidated tables
|
||||
- `networks` maps subnets to names (used as `SrcNetName` and `DstNetName`)
|
||||
- `networks` maps subnets to attributes. Attributes are `name`,
|
||||
`role`, `site`, `region`, and `tenant`. They are exposed as
|
||||
`SrcNetName`, `DstNetName`, `SrcNetRole`, `DstNetRole`, etc.
|
||||
- `asns` maps AS number to names (overriding the builtin ones)
|
||||
- `orchestrator-url` defines the URL of the orchestrator to be used
|
||||
by Clickhouse (autodetection when not specified)
|
||||
|
||||
@@ -230,27 +230,23 @@ In the future, we may:
|
||||
have a system service running tcpdump dumping packets to a directory
|
||||
and use that as input. This would be allow *Akvorado* to block from
|
||||
end-to-end instead of trying to be realtime.
|
||||
- Collect routes by integrating biorouting or BIRD. This is low
|
||||
priority if we consider information from Maxmind good enough for our
|
||||
use. However, this would also allows us to get AS paths or at least
|
||||
the next AS. GoBGP would be another option, but it is not very
|
||||
efficient with fullviews. biorouting is using a compressed trie.
|
||||
BIRD is still on the balance because it shares data when receiving
|
||||
multiple views.
|
||||
- Add more annotations to exporters, interfaces and IP addresses. We
|
||||
already cover quite a lot with connectivity/boundary/provider for
|
||||
interfaces, but exporters can be classified in a single group and we
|
||||
can put names on networks, but we may want to do more: add role,
|
||||
site, region, tenant (for exporters), category, site, region, tenant
|
||||
(for networks). Currently, categorization is done both in the inlet
|
||||
service (when it requires an expression language, like for exporters
|
||||
and interfaces) and in ClickHouse (when a single dictionary lookup
|
||||
is enough). Look at [Flowhouse][] for inspirations.
|
||||
- Collect routes by integrating a BMP server. [bio-routing RIS
|
||||
server][] could serve as a base. All NLRI received are stored in a
|
||||
compressed trie ([cidranger][] seems a good candidate for that).
|
||||
Each node stores the origin AS and a map from exporter IP address
|
||||
and BGP next hop (or an indirection to keep memory usage down) to
|
||||
the next AS and the AS path (in this case, again, an indirection to
|
||||
keep memory down). We need a configuration knob to determine what
|
||||
source to use for origin AS: BGP, Netflow (likely the same
|
||||
information), or GeoIP. This could be dependant on the fact we have
|
||||
a private AS or not.
|
||||
- DDoS service to detect and mitigate DDoS (with Flowspec).
|
||||
- Support VRFS.
|
||||
- Support VRFs.
|
||||
- Add dynamic configuration with something like [go-archaius][] or
|
||||
[Harvester][].
|
||||
|
||||
[cidranger]: https://github.com/yl2chen/cidranger
|
||||
[bio-routing RIS server]: https://github.com/bio-routing/bio-rd/tree/master/cmd/ris
|
||||
[go-archaius]: https://github.com/go-chassis/go-archaius
|
||||
[Harvester]: https://github.com/beatlabs/harvester
|
||||
[Flowhouse]: https://github.com/bio-routing/flowhouse
|
||||
|
||||
@@ -13,6 +13,7 @@ identified with a specific icon:
|
||||
## Unreleased
|
||||
|
||||
- ✨ *inlet*: classify exporters to group, role, site, region, and tenant [PR #14][]
|
||||
- ✨ *orchestrator*: add role, site, region, and tenant attributes to networks [PR #15][]
|
||||
- 🩹 *console*: fix use of `InIfBoundary` and `OutIfBoundary` as dimensions [PR #11][]
|
||||
- 🩹 *docker-compose*: avoid starting bogus "akvorado-image" service
|
||||
- 🩹 *build*: make *Akvorado* compile on MacOS
|
||||
@@ -22,6 +23,7 @@ identified with a specific icon:
|
||||
|
||||
[PR #11]: https://github.com/vincentbernat/akvorado/pull/11
|
||||
[PR #14]: https://github.com/vincentbernat/akvorado/pull/14
|
||||
[PR #15]: https://github.com/vincentbernat/akvorado/pull/15
|
||||
[UI for Apache Kafka]: https://github.com/provectus/kafka-ui
|
||||
|
||||
## 1.4.2 - 2022-07-16
|
||||
|
||||
@@ -200,22 +200,23 @@ UNION DISTINCT
|
||||
})
|
||||
}
|
||||
input.Prefix = "" // We have handled this internally
|
||||
case "srcnetname", "dstnetname":
|
||||
case "srcnetname", "dstnetname", "srcnetrole", "dstnetrole", "srcnetsite", "dstnetsite", "srcnetregion", "dstnetregion", "srcnettenant", "dstnettenant":
|
||||
attributeName := inputColumn[6:]
|
||||
results := []struct {
|
||||
Name string `ch:"name"`
|
||||
Attribute string `ch:"attribute"`
|
||||
}{}
|
||||
if err := c.d.ClickHouseDB.Conn.Select(ctx, &results, `
|
||||
SELECT DISTINCT name
|
||||
if err := c.d.ClickHouseDB.Conn.Select(ctx, &results, fmt.Sprintf(`
|
||||
SELECT DISTINCT %s AS attribute
|
||||
FROM networks
|
||||
WHERE positionCaseInsensitive(name, $1) >= 1
|
||||
ORDER BY name
|
||||
LIMIT 20`, input.Prefix); err != nil {
|
||||
WHERE positionCaseInsensitive(%s, $1) >= 1
|
||||
ORDER BY %s
|
||||
LIMIT 20`, attributeName, attributeName, attributeName), input.Prefix); err != nil {
|
||||
c.r.Err(err).Msg("unable to query database")
|
||||
break
|
||||
}
|
||||
for _, result := range results {
|
||||
completions = append(completions, filterCompletion{
|
||||
Label: result.Name,
|
||||
Label: result.Attribute,
|
||||
Detail: "network name",
|
||||
Quoted: true,
|
||||
})
|
||||
|
||||
@@ -82,6 +82,14 @@ ConditionStringExpr "condition on string" ←
|
||||
/ "DstCountry"i { return "DstCountry", nil }
|
||||
/ "SrcNetName"i { return "SrcNetName", nil }
|
||||
/ "DstNetName"i { return "DstNetName", nil }
|
||||
/ "SrcNetRole"i { return "SrcNetRole", nil }
|
||||
/ "DstNetRole"i { return "DstNetRole", nil }
|
||||
/ "SrcNetSite"i { return "SrcNetSite", nil }
|
||||
/ "DstNetSite"i { return "DstNetSite", nil }
|
||||
/ "SrcNetRegion"i { return "SrcNetRegion", nil }
|
||||
/ "DstNetRegion"i { return "DstNetRegion", nil }
|
||||
/ "SrcNetTenant"i { return "SrcNetTenant", nil }
|
||||
/ "DstNetTenant"i { return "DstNetTenant", nil }
|
||||
/ "InIfName"i { return "InIfName", nil }
|
||||
/ "OutIfName"i { return "OutIfName", nil }
|
||||
/ "InIfDescription"i { return "InIfDescription", nil }
|
||||
|
||||
@@ -36,6 +36,8 @@ func TestValidFilter(t *testing.T) {
|
||||
{`DstAddr=203.0.113.2`, `DstAddr = IPv6StringToNum('203.0.113.2')`},
|
||||
{`SrcNetName="alpha"`, `SrcNetName = 'alpha'`},
|
||||
{`DstNetName="alpha"`, `DstNetName = 'alpha'`},
|
||||
{`DstNetRole="stuff"`, `DstNetRole = 'stuff'`},
|
||||
{`SrcNetTenant="mobile"`, `SrcNetTenant = 'mobile'`},
|
||||
{`SrcAS=12322`, `SrcAS = 12322`},
|
||||
{`SrcAS=AS12322`, `SrcAS = 12322`},
|
||||
{`SrcAS=as12322`, `SrcAS = 12322`},
|
||||
|
||||
@@ -34,17 +34,28 @@ LIMIT 20`,
|
||||
Return(nil)
|
||||
mockConn.EXPECT().
|
||||
Select(gomock.Any(), gomock.Any(), `
|
||||
SELECT DISTINCT name
|
||||
SELECT DISTINCT name AS attribute
|
||||
FROM networks
|
||||
WHERE positionCaseInsensitive(name, $1) >= 1
|
||||
ORDER BY name
|
||||
LIMIT 20`, "c").
|
||||
SetArg(1, []struct {
|
||||
Name string `ch:"name"`
|
||||
Attribute string `ch:"attribute"`
|
||||
}{{"customer-1"}, {"customer-2"}, {"customer-3"}}).
|
||||
Return(nil)
|
||||
mockConn.EXPECT().
|
||||
Select(gomock.Any(), gomock.Any(), `
|
||||
SELECT DISTINCT role AS attribute
|
||||
FROM networks
|
||||
WHERE positionCaseInsensitive(role, $1) >= 1
|
||||
ORDER BY role
|
||||
LIMIT 20`, "c").
|
||||
SetArg(1, []struct {
|
||||
Attribute string `ch:"attribute"`
|
||||
}{{"customer"}}).
|
||||
Return(nil)
|
||||
mockConn.EXPECT().
|
||||
Select(gomock.Any(), gomock.Any(), `
|
||||
SELECT label, detail FROM (
|
||||
SELECT concat('AS', toString(DstAS)) AS label, dictGet('asns', 'name', DstAS) AS detail, 1 AS rank
|
||||
FROM flows
|
||||
@@ -109,6 +120,10 @@ UNION DISTINCT
|
||||
{"label": "DstAddr", "detail": "column name", "quoted": false},
|
||||
{"label": "DstCountry", "detail": "column name", "quoted": false},
|
||||
{"label": "DstNetName", "detail": "column name", "quoted": false},
|
||||
{"label": "DstNetRegion", "detail": "column name", "quoted": false},
|
||||
{"label": "DstNetRole", "detail": "column name", "quoted": false},
|
||||
{"label": "DstNetSite", "detail": "column name", "quoted": false},
|
||||
{"label": "DstNetTenant", "detail": "column name", "quoted": false},
|
||||
{"label": "DstPort", "detail": "column name", "quoted": false},
|
||||
}},
|
||||
}, {
|
||||
@@ -189,6 +204,13 @@ UNION DISTINCT
|
||||
{"label": "customer-2", "detail": "network name", "quoted": true},
|
||||
{"label": "customer-3", "detail": "network name", "quoted": true},
|
||||
}},
|
||||
}, {
|
||||
URL: "/api/v0/console/filter/complete",
|
||||
StatusCode: 200,
|
||||
JSONInput: gin.H{"what": "value", "column": "dstnetRole", "prefix": "c"},
|
||||
JSONOutput: gin.H{"completions": []gin.H{
|
||||
{"label": "customer", "detail": "network name", "quoted": true},
|
||||
}},
|
||||
}, {
|
||||
Description: "list, no filters",
|
||||
URL: "/api/v0/console/filter/saved",
|
||||
|
||||
@@ -24,6 +24,10 @@ const (
|
||||
queryColumnExporterTenant
|
||||
queryColumnSrcAS
|
||||
queryColumnSrcNetName
|
||||
queryColumnSrcNetRole
|
||||
queryColumnSrcNetSite
|
||||
queryColumnSrcNetRegion
|
||||
queryColumnSrcNetTenant
|
||||
queryColumnSrcCountry
|
||||
queryColumnInIfName
|
||||
queryColumnInIfDescription
|
||||
@@ -37,6 +41,10 @@ const (
|
||||
queryColumnSrcAddr
|
||||
queryColumnDstAS
|
||||
queryColumnDstNetName
|
||||
queryColumnDstNetRole
|
||||
queryColumnDstNetSite
|
||||
queryColumnDstNetRegion
|
||||
queryColumnDstNetTenant
|
||||
queryColumnDstCountry
|
||||
queryColumnOutIfName
|
||||
queryColumnOutIfDescription
|
||||
@@ -64,6 +72,14 @@ var queryColumnMap = helpers.NewBimap(map[queryColumn]string{
|
||||
queryColumnDstAS: "DstAS",
|
||||
queryColumnSrcNetName: "SrcNetName",
|
||||
queryColumnDstNetName: "DstNetName",
|
||||
queryColumnSrcNetRole: "SrcNetRole",
|
||||
queryColumnDstNetRole: "DstNetRole",
|
||||
queryColumnSrcNetSite: "SrcNetSite",
|
||||
queryColumnDstNetSite: "DstNetSite",
|
||||
queryColumnSrcNetRegion: "SrcNetRegion",
|
||||
queryColumnDstNetRegion: "DstNetRegion",
|
||||
queryColumnSrcNetTenant: "SrcNetTenant",
|
||||
queryColumnDstNetTenant: "DstNetTenant",
|
||||
queryColumnSrcCountry: "SrcCountry",
|
||||
queryColumnDstCountry: "DstCountry",
|
||||
queryColumnInIfName: "InIfName",
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
@@ -30,9 +29,9 @@ type Configuration struct {
|
||||
// ASNs is a mapping from AS numbers to names. It replaces or
|
||||
// extends the builtin list of AS numbers.
|
||||
ASNs map[uint32]string
|
||||
// Networks is a mapping from IP networks to names. It is used
|
||||
// to instantiate the SrcNetName and DstNetName columns.
|
||||
Networks NetworkNames
|
||||
// Networks is a mapping from IP networks to attributes. It is used
|
||||
// to instantiate the SrcNet* and DstNet* columns.
|
||||
Networks NetworkMap
|
||||
// OrchestratorURL allows one to override URL to reach
|
||||
// orchestrator from Clickhouse
|
||||
OrchestratorURL string `validate:"isdefault|url"`
|
||||
@@ -73,35 +72,67 @@ func DefaultConfiguration() Configuration {
|
||||
}
|
||||
}
|
||||
|
||||
// NetworkNames is a mapping from a network to a name.
|
||||
type NetworkNames map[string]string
|
||||
// NetworkMap is a mapping from a network to a attributes.
|
||||
type NetworkMap map[string]NetworkAttributes
|
||||
|
||||
// NetworkNamesUnmarshalerHook decodes NetworkNames mapping and notably check that valid networks are provided as key.
|
||||
func NetworkNamesUnmarshalerHook() mapstructure.DecodeHookFunc {
|
||||
// NetworkAttributes is a set of attributes attached to a network
|
||||
type NetworkAttributes struct {
|
||||
// Name is a name attached to the network. May be unique or not.
|
||||
Name string
|
||||
// Role is a role attached to the network (server, customer).
|
||||
Role string
|
||||
// Site is the site of the network (paris, berlin).
|
||||
Site string
|
||||
// Region is the region of the network (france, italy).
|
||||
Region string
|
||||
// Tenant is a tenant for the network.
|
||||
Tenant string
|
||||
}
|
||||
|
||||
// NetworkMapUnmarshalerHook decodes NetworkMap mapping and notably
|
||||
// check that valid networks are provided as key. It also accepts a
|
||||
// string instead of attributes for backward compatibility.
|
||||
func NetworkMapUnmarshalerHook() mapstructure.DecodeHookFunc {
|
||||
return func(from, to reflect.Type, data interface{}) (interface{}, error) {
|
||||
if from != reflect.TypeOf(map[string]string{}) || to != reflect.TypeOf(NetworkNames{}) {
|
||||
if from.Kind() != reflect.Map || to != reflect.TypeOf(NetworkMap{}) {
|
||||
return data, nil
|
||||
}
|
||||
input := data.(map[string]string)
|
||||
output := NetworkNames{}
|
||||
if input == nil {
|
||||
input = map[string]string{}
|
||||
output := map[string]interface{}{}
|
||||
iter := reflect.ValueOf(data).MapRange()
|
||||
for i := 0; iter.Next(); i++ {
|
||||
k := iter.Key()
|
||||
v := iter.Value()
|
||||
if k.Kind() == reflect.Interface {
|
||||
k = k.Elem()
|
||||
}
|
||||
for k, v := range input {
|
||||
// Parse
|
||||
_, ipNet, err := net.ParseCIDR(k)
|
||||
if k.Kind() != reflect.String {
|
||||
return nil, fmt.Errorf("key %d is not a string (%s)", i, k.Kind())
|
||||
}
|
||||
// Parse key
|
||||
_, ipNet, err := net.ParseCIDR(k.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Convert to IPv6
|
||||
// Convert key to IPv6
|
||||
ones, bits := ipNet.Mask.Size()
|
||||
if bits != 32 && bits != 128 {
|
||||
return nil, errors.New("invalid netmask")
|
||||
return nil, fmt.Errorf("key %d has an invalid netmask", i)
|
||||
}
|
||||
var key string
|
||||
if bits == 32 {
|
||||
output[fmt.Sprintf("::ffff:%s/%d", ipNet.IP.String(), ones+96)] = v
|
||||
key = fmt.Sprintf("::ffff:%s/%d", ipNet.IP.String(), ones+96)
|
||||
} else {
|
||||
output[ipNet.String()] = v
|
||||
key = ipNet.String()
|
||||
}
|
||||
|
||||
// Parse value (partially)
|
||||
if v.Kind() == reflect.Interface {
|
||||
v = v.Elem()
|
||||
}
|
||||
if v.Kind() == reflect.String {
|
||||
output[key] = NetworkAttributes{Name: v.String()}
|
||||
} else {
|
||||
output[key] = v.Interface()
|
||||
}
|
||||
}
|
||||
return output, nil
|
||||
|
||||
@@ -6,6 +6,7 @@ package clickhouse
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
|
||||
"akvorado/common/helpers"
|
||||
@@ -14,44 +15,68 @@ import (
|
||||
func TestNetworkNamesUnmarshalHook(t *testing.T) {
|
||||
cases := []struct {
|
||||
Description string
|
||||
Input map[string]string
|
||||
Output NetworkNames
|
||||
Input map[string]interface{}
|
||||
Output NetworkMap
|
||||
}{
|
||||
{
|
||||
Description: "nil",
|
||||
Input: nil,
|
||||
Output: NetworkNames{},
|
||||
Output: NetworkMap{},
|
||||
}, {
|
||||
Description: "empty",
|
||||
Input: map[string]string{},
|
||||
Output: NetworkNames{},
|
||||
Input: gin.H{},
|
||||
Output: NetworkMap{},
|
||||
}, {
|
||||
Description: "IPv4 subnet",
|
||||
Input: map[string]string{"203.0.113.0/24": "customer"},
|
||||
Output: NetworkNames{"::ffff:203.0.113.0/120": "customer"},
|
||||
Description: "IPv4",
|
||||
Input: gin.H{"203.0.113.0/24": gin.H{"name": "customer"}},
|
||||
Output: NetworkMap{"::ffff:203.0.113.0/120": NetworkAttributes{Name: "customer"}},
|
||||
}, {
|
||||
Description: "IPv6 subnet",
|
||||
Input: map[string]string{"2001:db8:1::/64": "customer"},
|
||||
Output: NetworkNames{"2001:db8:1::/64": "customer"},
|
||||
Description: "IPv6",
|
||||
Input: gin.H{"2001:db8:1::/64": gin.H{"name": "customer"}},
|
||||
Output: NetworkMap{"2001:db8:1::/64": NetworkAttributes{Name: "customer"}},
|
||||
}, {
|
||||
Description: "IPv4 subnet (compatibility)",
|
||||
Input: gin.H{"203.0.113.0/24": "customer"},
|
||||
Output: NetworkMap{"::ffff:203.0.113.0/120": NetworkAttributes{Name: "customer"}},
|
||||
}, {
|
||||
Description: "IPv6 subnet (compatibility)",
|
||||
Input: gin.H{"2001:db8:1::/64": "customer"},
|
||||
Output: NetworkMap{"2001:db8:1::/64": NetworkAttributes{Name: "customer"}},
|
||||
}, {
|
||||
Description: "all attributes",
|
||||
Input: gin.H{"203.0.113.0/24": gin.H{
|
||||
"name": "customer1",
|
||||
"role": "customer",
|
||||
"site": "paris",
|
||||
"region": "france",
|
||||
"tenant": "mobile",
|
||||
}},
|
||||
Output: NetworkMap{"::ffff:203.0.113.0/120": NetworkAttributes{
|
||||
Name: "customer1",
|
||||
Role: "customer",
|
||||
Site: "paris",
|
||||
Region: "france",
|
||||
Tenant: "mobile",
|
||||
}},
|
||||
}, {
|
||||
Description: "Invalid subnet (1)",
|
||||
Input: map[string]string{"192.0.2.1/38": "customer"},
|
||||
Input: gin.H{"192.0.2.1/38": "customer"},
|
||||
Output: nil,
|
||||
}, {
|
||||
Description: "Invalid subnet (2)",
|
||||
Input: map[string]string{"192.0.2.1/255.0.255.0": "customer"},
|
||||
Input: gin.H{"192.0.2.1/255.0.255.0": "customer"},
|
||||
Output: nil,
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Description, func(t *testing.T) {
|
||||
var got NetworkNames
|
||||
var got NetworkMap
|
||||
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
|
||||
Result: &got,
|
||||
ErrorUnused: true,
|
||||
Metadata: nil,
|
||||
DecodeHook: mapstructure.ComposeDecodeHookFunc(
|
||||
NetworkNamesUnmarshalerHook(),
|
||||
NetworkMapUnmarshalerHook(),
|
||||
),
|
||||
})
|
||||
if err != nil {
|
||||
|
||||
@@ -58,10 +58,10 @@ func (c *Component) registerHTTPHandlers() error {
|
||||
w.Header().Set("Content-Type", "text/csv; charset=utf-8")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
wr := csv.NewWriter(w)
|
||||
wr.Write([]string{"network", "name"})
|
||||
wr.Write([]string{"network", "name", "role", "site", "region", "tenant"})
|
||||
if c.config.Networks != nil {
|
||||
for k, v := range c.config.Networks {
|
||||
wr.Write([]string{k, v})
|
||||
wr.Write([]string{k, v.Name, v.Role, v.Site, v.Region, v.Tenant})
|
||||
}
|
||||
}
|
||||
wr.Flush()
|
||||
|
||||
@@ -15,8 +15,8 @@ import (
|
||||
func TestHTTPEndpoints(t *testing.T) {
|
||||
r := reporter.NewMock(t)
|
||||
config := DefaultConfiguration()
|
||||
config.Networks = NetworkNames{
|
||||
"::ffff:192.0.2.0/24": "infra",
|
||||
config.Networks = NetworkMap{
|
||||
"::ffff:192.0.2.0/24": NetworkAttributes{Name: "infra"},
|
||||
}
|
||||
c, err := New(r, config, Dependencies{
|
||||
Daemon: daemon.NewMock(t),
|
||||
@@ -46,8 +46,8 @@ func TestHTTPEndpoints(t *testing.T) {
|
||||
URL: "/api/v0/orchestrator/clickhouse/networks.csv",
|
||||
ContentType: "text/csv; charset=utf-8",
|
||||
FirstLines: []string{
|
||||
`network,name`,
|
||||
`::ffff:192.0.2.0/24,infra`,
|
||||
`network,name,role,site,region,tenant`,
|
||||
`::ffff:192.0.2.0/24,infra,,,,`,
|
||||
},
|
||||
}, {
|
||||
URL: "/api/v0/orchestrator/clickhouse/init.sh",
|
||||
|
||||
@@ -74,6 +74,9 @@ func (c *Component) migrateDatabase() error {
|
||||
}, {
|
||||
fmt.Sprintf("add SrcNetName/DstNetName to flows table with resolution %s", resolution.Interval),
|
||||
c.migrationStepAddSrcNetNameDstNetNameColumns(resolution),
|
||||
}, {
|
||||
fmt.Sprintf("add SrcNet*/DstNet* to flows table with resolution %s", resolution.Interval),
|
||||
c.migrationStepAddSrcNetNameDstNetOthersColumns(resolution),
|
||||
}, {
|
||||
fmt.Sprintf("add Exporter* to flows table with resolution %s", resolution.Interval),
|
||||
c.migrationStepAddExporterColumns(resolution),
|
||||
|
||||
@@ -239,6 +239,7 @@ WHERE database=currentDatabase() AND table NOT LIKE '.%'`)
|
||||
|
||||
if !t.Failed() {
|
||||
// One more time
|
||||
t.Run("idempotency", func(t *testing.T) {
|
||||
r := reporter.NewMock(t)
|
||||
configuration := DefaultConfiguration()
|
||||
configuration.OrchestratorURL = "http://something"
|
||||
@@ -267,5 +268,6 @@ WHERE database=currentDatabase() AND table NOT LIKE '.%'`)
|
||||
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
|
||||
t.Fatalf("Metrics (-got, +want):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,14 @@ const (
|
||||
DstAS UInt32,
|
||||
SrcNetName LowCardinality(String),
|
||||
DstNetName LowCardinality(String),
|
||||
SrcNetRole LowCardinality(String),
|
||||
DstNetRole LowCardinality(String),
|
||||
SrcNetSite LowCardinality(String),
|
||||
DstNetSite LowCardinality(String),
|
||||
SrcNetRegion LowCardinality(String),
|
||||
DstNetRegion LowCardinality(String),
|
||||
SrcNetTenant LowCardinality(String),
|
||||
DstNetTenant LowCardinality(String),
|
||||
SrcCountry FixedString(2),
|
||||
DstCountry FixedString(2),
|
||||
InIfName LowCardinality(String),
|
||||
@@ -151,7 +159,11 @@ ORDER BY (TimeReceived,
|
||||
InIfName, SrcAS, ForwardingStatus,
|
||||
OutIfName, DstAS,
|
||||
SamplingRate,
|
||||
SrcNetName, DstNetName)`,
|
||||
SrcNetName, DstNetName,
|
||||
SrcNetRole, DstNetRole,
|
||||
SrcNetSite, DstNetSite,
|
||||
SrcNetRegion, DstNetRegion,
|
||||
SrcNetTenant, DstNetTenant)`,
|
||||
tableName,
|
||||
partialSchema("SrcAddr", "DstAddr", "SrcPort", "DstPort"),
|
||||
partitionInterval))
|
||||
@@ -215,6 +227,48 @@ WHERE table = $1 AND database = currentDatabase() AND name = $2`,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepAddSrcNetNameDstNetOthersColumns(resolution ResolutionConfiguration) migrationStepFunc {
|
||||
return func(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
var tableName string
|
||||
if resolution.Interval == 0 {
|
||||
tableName = "flows"
|
||||
} else {
|
||||
tableName = fmt.Sprintf("flows_%s", resolution.Interval)
|
||||
}
|
||||
return migrationStep{
|
||||
CheckQuery: `
|
||||
SELECT 1 FROM system.columns
|
||||
WHERE table = $1 AND database = currentDatabase() AND name = $2`,
|
||||
Args: []interface{}{tableName, "DstNetRole"},
|
||||
Do: func() error {
|
||||
modifications := []string{
|
||||
`ADD COLUMN SrcNetRole LowCardinality(String) AFTER DstNetName`,
|
||||
`ADD COLUMN DstNetRole LowCardinality(String) AFTER SrcNetRole`,
|
||||
`ADD COLUMN SrcNetSite LowCardinality(String) AFTER DstNetRole`,
|
||||
`ADD COLUMN DstNetSite LowCardinality(String) AFTER SrcNetSite`,
|
||||
`ADD COLUMN SrcNetRegion LowCardinality(String) AFTER DstNetSite`,
|
||||
`ADD COLUMN DstNetRegion LowCardinality(String) AFTER SrcNetRegion`,
|
||||
`ADD COLUMN SrcNetTenant LowCardinality(String) AFTER DstNetRegion`,
|
||||
`ADD COLUMN DstNetTenant LowCardinality(String) AFTER SrcNetTenant`,
|
||||
}
|
||||
if tableName != "flows" {
|
||||
modifications = append(modifications,
|
||||
`MODIFY ORDER BY (TimeReceived, ExporterAddress, EType, Proto,
|
||||
InIfName, SrcAS, ForwardingStatus,
|
||||
OutIfName, DstAS, SamplingRate,
|
||||
SrcNetName, DstNetName,
|
||||
SrcNetRole, DstNetRole,
|
||||
SrcNetSite, DstNetSite,
|
||||
SrcNetRegion, DstNetRegion,
|
||||
SrcNetTenant, DstNetTenant)`)
|
||||
}
|
||||
return conn.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s %s`,
|
||||
tableName, strings.Join(modifications, ", ")))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Component) migrationStepAddExporterColumns(resolution ResolutionConfiguration) migrationStepFunc {
|
||||
return func(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep {
|
||||
var tableName string
|
||||
@@ -251,7 +305,7 @@ func (c *Component) migrationsStepCreateFlowsConsumerTable(resolution Resolution
|
||||
tableName := fmt.Sprintf("flows_%s", resolution.Interval)
|
||||
viewName := fmt.Sprintf("%s_consumer", tableName)
|
||||
return migrationStep{
|
||||
CheckQuery: queryTableHash(8417690430320478031, ""),
|
||||
CheckQuery: queryTableHash(7356168458686845598, ""),
|
||||
Args: []interface{}{viewName},
|
||||
// No GROUP BY, the SummingMergeTree will take care of that
|
||||
Do: func() error {
|
||||
@@ -404,15 +458,17 @@ func (c *Component) migrationStepCreateNetworksDictionary(ctx context.Context, l
|
||||
settings := `SETTINGS(format_csv_allow_single_quotes = 0)`
|
||||
sourceLike := fmt.Sprintf("%% %s%% %s%%", source, settings)
|
||||
return migrationStep{
|
||||
CheckQuery: `
|
||||
SELECT 1 FROM system.tables
|
||||
WHERE name = $1 AND database = currentDatabase() AND create_table_query LIKE $2`,
|
||||
CheckQuery: queryTableHash(5246378884861475308, "AND create_table_query LIKE $2"),
|
||||
Args: []interface{}{"networks", sourceLike},
|
||||
Do: func() error {
|
||||
return conn.Exec(ctx, fmt.Sprintf(`
|
||||
CREATE OR REPLACE DICTIONARY networks (
|
||||
network String,
|
||||
name String
|
||||
name String,
|
||||
role String,
|
||||
site String,
|
||||
region String,
|
||||
tenant String
|
||||
)
|
||||
|
||||
PRIMARY KEY network
|
||||
@@ -461,7 +517,13 @@ CREATE TABLE %s
|
||||
(
|
||||
%s
|
||||
)
|
||||
ENGINE = %s`, tableName, partialSchema("SrcNetName", "DstNetName"), kafkaEngine))
|
||||
ENGINE = %s`, tableName, partialSchema(
|
||||
"SrcNetName", "DstNetName",
|
||||
"SrcNetRole", "DstNetRole",
|
||||
"SrcNetSite", "DstNetSite",
|
||||
"SrcNetRegion", "DstNetRegion",
|
||||
"SrcNetTenant", "DstNetTenant",
|
||||
), kafkaEngine))
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -470,7 +532,7 @@ func (c *Component) migrationStepCreateRawFlowsConsumerView(ctx context.Context,
|
||||
tableName := fmt.Sprintf("flows_%d_raw", flow.CurrentSchemaVersion)
|
||||
viewName := fmt.Sprintf("%s_consumer", tableName)
|
||||
return migrationStep{
|
||||
CheckQuery: queryTableHash(16363620252697412587, ""),
|
||||
CheckQuery: queryTableHash(17295069153939039375, ""),
|
||||
Args: []interface{}{viewName},
|
||||
Do: func() error {
|
||||
l.Debug().Msg("drop consumer table")
|
||||
@@ -484,7 +546,15 @@ CREATE MATERIALIZED VIEW %s TO flows
|
||||
AS SELECT
|
||||
*,
|
||||
dictGetOrDefault('networks', 'name', SrcAddr, '') AS SrcNetName,
|
||||
dictGetOrDefault('networks', 'name', DstAddr, '') AS DstNetName
|
||||
dictGetOrDefault('networks', 'name', DstAddr, '') AS DstNetName,
|
||||
dictGetOrDefault('networks', 'role', SrcAddr, '') AS SrcNetRole,
|
||||
dictGetOrDefault('networks', 'role', DstAddr, '') AS DstNetRole,
|
||||
dictGetOrDefault('networks', 'site', SrcAddr, '') AS SrcNetSite,
|
||||
dictGetOrDefault('networks', 'site', DstAddr, '') AS DstNetSite,
|
||||
dictGetOrDefault('networks', 'region', SrcAddr, '') AS SrcNetRegion,
|
||||
dictGetOrDefault('networks', 'region', DstAddr, '') AS DstNetRegion,
|
||||
dictGetOrDefault('networks', 'tenant', SrcAddr, '') AS SrcNetTenant,
|
||||
dictGetOrDefault('networks', 'tenant', DstAddr, '') AS DstNetTenant
|
||||
FROM %s`, viewName, tableName))
|
||||
},
|
||||
}
|
||||
|
||||
13
orchestrator/clickhouse/testdata/states/005.csv
vendored
Normal file
13
orchestrator/clickhouse/testdata/states/005.csv
vendored
Normal file
@@ -0,0 +1,13 @@
|
||||
"asns","CREATE DICTIONARY default.asns (`asn` UInt32 INJECTIVE, `name` String) PRIMARY KEY asn SOURCE(HTTP(URL 'http://something/api/v0/orchestrator/clickhouse/asns.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(HASHED()) SETTINGS(format_csv_allow_single_quotes = 0)"
|
||||
"exporters","CREATE MATERIALIZED VIEW default.exporters (`TimeReceived` DateTime, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `IfName` String, `IfDescription` String, `IfSpeed` UInt32, `IfConnectivity` String, `IfProvider` String, `IfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2)) ENGINE = ReplacingMergeTree(TimeReceived) ORDER BY (ExporterAddress, IfName) SETTINGS index_granularity = 8192 AS SELECT DISTINCT TimeReceived, ExporterAddress, ExporterName, ExporterGroup, ExporterRole, ExporterSite, ExporterRegion, ExporterTenant, [InIfName, OutIfName][num] AS IfName, [InIfDescription, OutIfDescription][num] AS IfDescription, [InIfSpeed, OutIfSpeed][num] AS IfSpeed, [InIfConnectivity, OutIfConnectivity][num] AS IfConnectivity, [InIfProvider, OutIfProvider][num] AS IfProvider, [InIfBoundary, OutIfBoundary][num] AS IfBoundary FROM default.flows ARRAY JOIN arrayEnumerate([1, 2]) AS num"
|
||||
"flows","CREATE TABLE default.flows (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt32, `DstPort` UInt32, `Bytes` UInt64, `Packets` UInt64, `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = MergeTree PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalSecond(25920))) ORDER BY (TimeReceived, ExporterAddress, InIfName, OutIfName) TTL TimeReceived + toIntervalSecond(1296000) SETTINGS index_granularity = 8192"
|
||||
"flows_1h0m0s","CREATE TABLE default.flows_1h0m0s (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = SummingMergeTree((Bytes, Packets)) PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalSecond(622080))) PRIMARY KEY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate) ORDER BY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate, SrcNetName, DstNetName, SrcNetRole, DstNetRole, SrcNetSite, DstNetSite, SrcNetRegion, DstNetRegion, SrcNetTenant, DstNetTenant) TTL TimeReceived + toIntervalSecond(31104000) SETTINGS index_granularity = 8192"
|
||||
"flows_1h0m0s_consumer","CREATE MATERIALIZED VIEW default.flows_1h0m0s_consumer TO default.flows_1h0m0s (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT * EXCEPT (SrcAddr, DstAddr, SrcPort, DstPort) REPLACE toStartOfInterval(TimeReceived, toIntervalSecond(3600)) AS TimeReceived FROM default.flows"
|
||||
"flows_1m0s","CREATE TABLE default.flows_1m0s (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = SummingMergeTree((Bytes, Packets)) PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalSecond(12096))) PRIMARY KEY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate) ORDER BY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate, SrcNetName, DstNetName, SrcNetRole, DstNetRole, SrcNetSite, DstNetSite, SrcNetRegion, DstNetRegion, SrcNetTenant, DstNetTenant) TTL TimeReceived + toIntervalSecond(604800) SETTINGS index_granularity = 8192"
|
||||
"flows_1m0s_consumer","CREATE MATERIALIZED VIEW default.flows_1m0s_consumer TO default.flows_1m0s (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT * EXCEPT (SrcAddr, DstAddr, SrcPort, DstPort) REPLACE toStartOfInterval(TimeReceived, toIntervalSecond(60)) AS TimeReceived FROM default.flows"
|
||||
"flows_2_raw","CREATE TABLE default.flows_2_raw (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt32, `DstPort` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'flows-v2', kafka_group_name = 'clickhouse', kafka_format = 'Protobuf', kafka_schema = 'flow-2.proto:FlowMessage', kafka_num_consumers = 1, kafka_thread_per_consumer = 1"
|
||||
"flows_2_raw_consumer","CREATE MATERIALIZED VIEW default.flows_2_raw_consumer TO default.flows (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAddr` IPv6, `DstAddr` IPv6, `SrcAS` UInt32, `DstAS` UInt32, `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `SrcPort` UInt32, `DstPort` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32, `SrcNetName` String, `DstNetName` String, `SrcNetRole` String, `DstNetRole` String, `SrcNetSite` String, `DstNetSite` String, `SrcNetRegion` String, `DstNetRegion` String, `SrcNetTenant` String, `DstNetTenant` String) AS SELECT *, dictGetOrDefault('default.networks', 'name', SrcAddr, '') AS SrcNetName, dictGetOrDefault('default.networks', 'name', DstAddr, '') AS DstNetName, dictGetOrDefault('default.networks', 'role', SrcAddr, '') AS SrcNetRole, dictGetOrDefault('default.networks', 'role', DstAddr, '') AS DstNetRole, dictGetOrDefault('default.networks', 'site', SrcAddr, '') AS SrcNetSite, dictGetOrDefault('default.networks', 'site', DstAddr, '') AS DstNetSite, dictGetOrDefault('default.networks', 'region', SrcAddr, '') AS SrcNetRegion, dictGetOrDefault('default.networks', 'region', DstAddr, '') AS DstNetRegion, dictGetOrDefault('default.networks', 'tenant', SrcAddr, '') AS SrcNetTenant, dictGetOrDefault('default.networks', 'tenant', DstAddr, '') AS DstNetTenant FROM default.flows_2_raw"
|
||||
"flows_5m0s","CREATE TABLE default.flows_5m0s (`TimeReceived` DateTime CODEC(DoubleDelta, LZ4), `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `PacketSize` UInt64 ALIAS intDiv(Bytes, Packets), `PacketSizeBucket` LowCardinality(String) ALIAS multiIf(PacketSize < 64, '0-63', PacketSize < 128, '64-127', PacketSize < 256, '128-255', PacketSize < 512, '256-511', PacketSize < 768, '512-767', PacketSize < 1024, '768-1023', PacketSize < 1280, '1024-1279', PacketSize < 1501, '1280-1500', PacketSize < 2048, '1501-2047', PacketSize < 3072, '2048-3071', PacketSize < 4096, '3072-4095', PacketSize < 8192, '4096-8191', PacketSize < 10240, '8192-10239', PacketSize < 16384, '10240-16383', PacketSize < 32768, '16384-32767', PacketSize < 65536, '32768-65535', '65536-Inf'), `ForwardingStatus` UInt32) ENGINE = SummingMergeTree((Bytes, Packets)) PARTITION BY toYYYYMMDDhhmmss(toStartOfInterval(TimeReceived, toIntervalSecond(155520))) PRIMARY KEY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate) ORDER BY (TimeReceived, ExporterAddress, EType, Proto, InIfName, SrcAS, ForwardingStatus, OutIfName, DstAS, SamplingRate, SrcNetName, DstNetName, SrcNetRole, DstNetRole, SrcNetSite, DstNetSite, SrcNetRegion, DstNetRegion, SrcNetTenant, DstNetTenant) TTL TimeReceived + toIntervalSecond(7776000) SETTINGS index_granularity = 8192"
|
||||
"flows_5m0s_consumer","CREATE MATERIALIZED VIEW default.flows_5m0s_consumer TO default.flows_5m0s (`TimeReceived` DateTime, `SamplingRate` UInt64, `ExporterAddress` LowCardinality(IPv6), `ExporterName` LowCardinality(String), `ExporterGroup` LowCardinality(String), `ExporterRole` LowCardinality(String), `ExporterSite` LowCardinality(String), `ExporterRegion` LowCardinality(String), `ExporterTenant` LowCardinality(String), `SrcAS` UInt32, `DstAS` UInt32, `SrcNetName` LowCardinality(String), `DstNetName` LowCardinality(String), `SrcNetRole` LowCardinality(String), `DstNetRole` LowCardinality(String), `SrcNetSite` LowCardinality(String), `DstNetSite` LowCardinality(String), `SrcNetRegion` LowCardinality(String), `DstNetRegion` LowCardinality(String), `SrcNetTenant` LowCardinality(String), `DstNetTenant` LowCardinality(String), `SrcCountry` FixedString(2), `DstCountry` FixedString(2), `InIfName` LowCardinality(String), `OutIfName` LowCardinality(String), `InIfDescription` String, `OutIfDescription` String, `InIfSpeed` UInt32, `OutIfSpeed` UInt32, `InIfConnectivity` LowCardinality(String), `OutIfConnectivity` LowCardinality(String), `InIfProvider` LowCardinality(String), `OutIfProvider` LowCardinality(String), `InIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `OutIfBoundary` Enum8('undefined' = 0, 'external' = 1, 'internal' = 2), `EType` UInt32, `Proto` UInt32, `Bytes` UInt64, `Packets` UInt64, `ForwardingStatus` UInt32) AS SELECT * EXCEPT (SrcAddr, DstAddr, SrcPort, DstPort) REPLACE toStartOfInterval(TimeReceived, toIntervalSecond(300)) AS TimeReceived FROM default.flows"
|
||||
"networks","CREATE DICTIONARY default.networks (`network` String, `name` String, `role` String, `site` String, `region` String, `tenant` String) PRIMARY KEY network SOURCE(HTTP(URL 'http://something/api/v0/orchestrator/clickhouse/networks.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(IP_TRIE()) SETTINGS(format_csv_allow_single_quotes = 0)"
|
||||
"protocols","CREATE DICTIONARY default.protocols (`proto` UInt8 INJECTIVE, `name` String, `description` String) PRIMARY KEY proto SOURCE(HTTP(URL 'http://something/api/v0/orchestrator/clickhouse/protocols.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 3600) LAYOUT(HASHED()) SETTINGS(format_csv_allow_single_quotes = 0)"
|
||||
|
Reference in New Issue
Block a user