orchestrator/clickhouse: ability to fetch network attributes with HTTP

This commit is contained in:
Vincent Bernat
2022-10-13 18:33:29 +02:00
parent 1a0f00bbc4
commit 95482c9201
14 changed files with 578 additions and 49 deletions

View File

@@ -34,6 +34,18 @@ clickhouse:
2a01:db8:cafe:2::/64: 2a01:db8:cafe:2::/64:
name: ipv6-servers name: ipv6-servers
role: servers role: servers
network-sources: []
# Amazon AWS
# - url: https://ip-ranges.amazonaws.com/ip-ranges.json
# interval: 6h
# transform: |
# (.prefixes + .ipv6_prefixes)[] |
# { prefix: (.ip_prefix // .ipv6_prefix), tenant: "amazon", region: .region, role: .service|ascii_downcase }
# Google Cloud
# - url: https://www.gstatic.com/ipranges/cloud.json
# interval: 6h
# .prefixes[] |
# { prefix: (.ipv4Prefix // .ipv6Prefix), tenant: "google-cloud", region: .scope }
inlet: inlet:
kafka: kafka:

View File

@@ -129,10 +129,7 @@ func (c *Component) Start() error {
if c.config.Listen == "" { if c.config.Listen == "" {
return nil return nil
} }
server := &http.Server{ server := &http.Server{Handler: c.mux}
Addr: c.config.Listen,
Handler: c.mux,
}
// Most of the time, if we have an error, it's here! // Most of the time, if we have an error, it's here!
c.r.Info().Str("listen", c.config.Listen).Msg("starting HTTP server") c.r.Info().Str("listen", c.config.Listen).Msg("starting HTTP server")
@@ -141,6 +138,7 @@ func (c *Component) Start() error {
return fmt.Errorf("unable to listen to %v: %w", c.config.Listen, err) return fmt.Errorf("unable to listen to %v: %w", c.config.Listen, err)
} }
c.address = listener.Addr() c.address = listener.Addr()
server.Addr = listener.Addr().String()
// Start serving requests // Start serving requests
c.t.Go(func() error { c.t.Go(func() error {

View File

@@ -434,6 +434,20 @@ provided:
- `networks` maps subnets to attributes. Attributes are `name`, - `networks` maps subnets to attributes. Attributes are `name`,
`role`, `site`, `region`, and `tenant`. They are exposed as `role`, `site`, `region`, and `tenant`. They are exposed as
`SrcNetName`, `DstNetName`, `SrcNetRole`, `DstNetRole`, etc. `SrcNetName`, `DstNetName`, `SrcNetRole`, `DstNetRole`, etc.
- `network-sources` fetch a remote source mapping subnets to
attributes. This is similar to `networks` but the definition is
fetched through HTTP. It accepts a map from source names to sources.
Each source accepts the following attributes:
- `url` is the URL to fetch
- `proxy` says if we should use a proxy (defined through environment variables like `http_proxy`)
- `timeout` defines the timeout for fetching and parsing
- `interval` is the interval at which the source should be refreshed
- `transform` is a [jq](https://stedolan.github.io/jq/manual/)
expression to transform the received JSON into a set of network
attributes represented as objects. Each object must have a
`prefix` attribute and, optionally, `name`, `role`, `site`,
`region`, and `tenant`. See the example provided in the shipped
`akvorado.yaml` configuration file.
- `asns` maps AS number to names (overriding the builtin ones) - `asns` maps AS number to names (overriding the builtin ones)
- `orchestrator-url` defines the URL of the orchestrator to be used - `orchestrator-url` defines the URL of the orchestrator to be used
by Clickhouse (autodetection when not specified) by Clickhouse (autodetection when not specified)

View File

@@ -11,6 +11,10 @@ identified with a specific icon:
- 🩹: bug fix - 🩹: bug fix
- 🌱: miscellaneous change - 🌱: miscellaneous change
## Unreleased
-*orchestrator*: add `orchestrator.network-sources` to fetch network attributes with HTTP
## 1.6.1 - 2022-10-11 ## 1.6.1 - 2022-10-11
- 🩹 *inlet*: fix SrcAS when receiving flows with sFlow - 🩹 *inlet*: fix SrcAS when receiving flows with sFlow

4
go.mod
View File

@@ -38,7 +38,7 @@ require (
github.com/ti-mo/conntrack v0.4.0 github.com/ti-mo/conntrack v0.4.0
github.com/yuin/goldmark v1.5.2 github.com/yuin/goldmark v1.5.2
github.com/yuin/goldmark-highlighting v0.0.0-20220208100518-594be1970594 github.com/yuin/goldmark-highlighting v0.0.0-20220208100518-594be1970594
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab golang.org/x/sys v0.0.0-20220829200755-d48e67d00261
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9
google.golang.org/protobuf v1.28.1 google.golang.org/protobuf v1.28.1
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
@@ -75,6 +75,8 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/itchyny/gojq v0.12.9 // indirect
github.com/itchyny/timefmt-go v0.1.4 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect github.com/jcmturner/gofork v1.0.0 // indirect

6
go.sum
View File

@@ -240,6 +240,10 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/itchyny/gojq v0.12.9 h1:biKpbKwMxVYhCU1d6mR7qMr3f0Hn9F5k5YykCVb3gmM=
github.com/itchyny/gojq v0.12.9/go.mod h1:T4Ip7AETUXeGpD+436m+UEl3m3tokRgajd5pRfsR5oE=
github.com/itchyny/timefmt-go v0.1.4 h1:hFEfWVdwsEi+CY8xY2FtgWHGQaBaC3JeHd+cve0ynVM=
github.com/itchyny/timefmt-go v0.1.4/go.mod h1:nEP7L+2YmAbT2kZ2HfSs1d8Xtw9LY8D2stDBckWakZ8=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
@@ -637,6 +641,8 @@ golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab h1:2QkjZIsXupsJbJIdSjjUOgWK3aEtzyuh2mPt3l/CkeU= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab h1:2QkjZIsXupsJbJIdSjjUOgWK3aEtzyuh2mPt3l/CkeU=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY=
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View File

@@ -11,12 +11,15 @@ import (
"akvorado/common/helpers" "akvorado/common/helpers"
"akvorado/common/kafka" "akvorado/common/kafka"
"github.com/itchyny/gojq"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
) )
// Configuration describes the configuration for the ClickHouse configurator. // Configuration describes the configuration for the ClickHouse configurator.
type Configuration struct { type Configuration struct {
clickhousedb.Configuration `mapstructure:",squash" yaml:"-,inline"` clickhousedb.Configuration `mapstructure:",squash" yaml:"-,inline"`
// SkipMigrations tell if we should skip migrations.
SkipMigrations bool
// Kafka describes Kafka-specific configuration // Kafka describes Kafka-specific configuration
Kafka KafkaConfiguration Kafka KafkaConfiguration
// Resolutions describe the various resolutions to use to // Resolutions describe the various resolutions to use to
@@ -31,6 +34,14 @@ type Configuration struct {
// Networks is a mapping from IP networks to attributes. It is used // Networks is a mapping from IP networks to attributes. It is used
// to instantiate the SrcNet* and DstNet* columns. // to instantiate the SrcNet* and DstNet* columns.
Networks *helpers.SubnetMap[NetworkAttributes] `validate:"omitempty,dive"` Networks *helpers.SubnetMap[NetworkAttributes] `validate:"omitempty,dive"`
// NetworkSources defines a set of remote network
// definitions to map IP networks to attributes. It is used to
// instantiate the SrcNet* and DstNet* columns. The results
// are overridden by the content of Networks.
NetworkSources map[string]NetworkSource `validate:"dive"`
// NetworkSourceTimeout tells how long to wait for network
// sources to be ready. 503 is returned when not.
NetworkSourcesTimeout time.Duration `validate:"min=0"`
// OrchestratorURL allows one to override URL to reach // OrchestratorURL allows one to override URL to reach
// orchestrator from Clickhouse // orchestrator from Clickhouse
OrchestratorURL string `validate:"isdefault|url"` OrchestratorURL string `validate:"isdefault|url"`
@@ -67,7 +78,8 @@ func DefaultConfiguration() Configuration {
{5 * time.Minute, 3 * 30 * 24 * time.Hour}, // 90 days {5 * time.Minute, 3 * 30 * 24 * time.Hour}, // 90 days
{time.Hour, 12 * 30 * 24 * time.Hour}, // 1 year {time.Hour, 12 * 30 * 24 * time.Hour}, // 1 year
}, },
MaxPartitions: 50, MaxPartitions: 50,
NetworkSourcesTimeout: 10 * time.Second,
} }
} }
@@ -102,6 +114,50 @@ func NetworkAttributesUnmarshallerHook() mapstructure.DecodeHookFunc {
} }
} }
// NetworkSource defines a remote network definition.
type NetworkSource struct {
// URL is the URL to fetch to get remote network definition.
// It should provide a JSON file.
URL string `validate:"url"`
// Proxy is set to true if a proxy should be used.
Proxy bool
// Timeout tells the maximum time the remote request should take
Timeout time.Duration `validate:"isdefault|min=1s"`
// Transform is a jq string to transform the received JSON
// data into a list of network attributes.
Transform TransformQuery
// Interval tells how much time to wait before updating the source.
Interval time.Duration `validate:"min=1m"`
}
// TransformQuery represents a jq query to transform data.
type TransformQuery struct {
*gojq.Query
}
// UnmarshalText parses a jq query.
func (jq *TransformQuery) UnmarshalText(text []byte) error {
q, err := gojq.Parse(string(text))
if err != nil {
return err
}
*jq = TransformQuery{q}
return nil
}
// String turns a jq query into a string.
func (jq TransformQuery) String() string {
if jq.Query != nil {
return jq.Query.String()
}
return ".[]"
}
// MarshalText turns a jq query into a string.
func (jq TransformQuery) MarshalText() ([]byte, error) {
return []byte(jq.String()), nil
}
func init() { func init() {
helpers.RegisterMapstructureUnmarshallerHook(helpers.SubnetMapUnmarshallerHook[NetworkAttributes]()) helpers.RegisterMapstructureUnmarshallerHook(helpers.SubnetMapUnmarshallerHook[NetworkAttributes]())
helpers.RegisterMapstructureUnmarshallerHook(NetworkAttributesUnmarshallerHook()) helpers.RegisterMapstructureUnmarshallerHook(NetworkAttributesUnmarshallerHook())

View File

@@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"testing" "testing"
"time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -86,6 +87,70 @@ func TestNetworkNamesUnmarshalHook(t *testing.T) {
}, helpers.DiffFormatter(reflect.TypeOf(helpers.SubnetMap[NetworkAttributes]{}), fmt.Sprint)) }, helpers.DiffFormatter(reflect.TypeOf(helpers.SubnetMap[NetworkAttributes]{}), fmt.Sprint))
} }
func TestNetworkSourceDecode(t *testing.T) {
helpers.TestConfigurationDecode(t, helpers.ConfigurationDecodeCases{
{
Description: "Empty",
Initial: func() interface{} { return NetworkSource{} },
Configuration: func() interface{} {
return gin.H{
"url": "https://example.net",
"interval": "10m",
}
},
Expected: NetworkSource{
URL: "https://example.net",
Interval: 10 * time.Minute,
},
}, {
Description: "Simple transform",
Initial: func() interface{} { return NetworkSource{} },
Configuration: func() interface{} {
return gin.H{
"url": "https://example.net",
"interval": "10m",
"transform": ".[]",
}
},
Expected: NetworkSource{
URL: "https://example.net",
Interval: 10 * time.Minute,
Transform: MustParseTransformQuery(".[]"),
},
}, {
Description: "Complex transform",
Initial: func() interface{} { return NetworkSource{} },
Configuration: func() interface{} {
return gin.H{
"url": "https://example.net",
"interval": "10m",
"transform": `
.prefixes[] | {prefix: .ip_prefix, tenant: "amazon", region: .region, role: .service|ascii_downcase}
`,
}
},
Expected: NetworkSource{
URL: "https://example.net",
Interval: 10 * time.Minute,
Transform: MustParseTransformQuery(`
.prefixes[] | {prefix: .ip_prefix, tenant: "amazon", region: .region, role: .service|ascii_downcase}
`),
},
}, {
Description: "Incorrect transform",
Initial: func() interface{} { return NetworkSource{} },
Configuration: func() interface{} {
return gin.H{
"url": "https://example.net",
"interval": "10m",
"transform": "878778&&",
}
},
Error: true,
},
}, helpers.DiffFormatter(reflect.TypeOf(TransformQuery{}), fmt.Sprint), helpers.DiffZero)
}
func TestDefaultConfiguration(t *testing.T) { func TestDefaultConfiguration(t *testing.T) {
config := DefaultConfiguration() config := DefaultConfiguration()
config.Kafka.Topic = "flow" config.Kafka.Topic = "flow"

View File

@@ -55,10 +55,24 @@ func (c *Component) registerHTTPHandlers() error {
// networks.csv // networks.csv
c.d.HTTP.AddHandler("/api/v0/orchestrator/clickhouse/networks.csv", c.d.HTTP.AddHandler("/api/v0/orchestrator/clickhouse/networks.csv",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case <-c.networkSourcesReady:
case <-time.After(c.config.NetworkSourcesTimeout):
w.WriteHeader(http.StatusServiceUnavailable)
return
}
w.Header().Set("Content-Type", "text/csv; charset=utf-8") w.Header().Set("Content-Type", "text/csv; charset=utf-8")
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
wr := csv.NewWriter(w) wr := csv.NewWriter(w)
wr.Write([]string{"network", "name", "role", "site", "region", "tenant"}) wr.Write([]string{"network", "name", "role", "site", "region", "tenant"})
c.networkSourcesLock.RLock()
defer c.networkSourcesLock.RUnlock()
for _, ss := range c.networkSources {
for _, v := range ss {
wr.Write([]string{v.Prefix.String(),
v.Name, v.Role, v.Site, v.Region, v.Tenant})
}
}
if c.config.Networks != nil { if c.config.Networks != nil {
for k, v := range c.config.Networks.ToMap() { for k, v := range c.config.Networks.ToMap() {
wr.Write([]string{k, v.Name, v.Role, v.Site, v.Region, v.Tenant}) wr.Write([]string{k, v.Name, v.Role, v.Site, v.Region, v.Tenant})

View File

@@ -4,7 +4,12 @@
package clickhouse package clickhouse
import ( import (
"context"
"fmt"
"net"
netHTTP "net/http"
"testing" "testing"
"time"
"akvorado/common/daemon" "akvorado/common/daemon"
"akvorado/common/helpers" "akvorado/common/helpers"
@@ -15,6 +20,7 @@ import (
func TestHTTPEndpoints(t *testing.T) { func TestHTTPEndpoints(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
config := DefaultConfiguration() config := DefaultConfiguration()
config.SkipMigrations = true
config.Networks = helpers.MustNewSubnetMap(map[string]NetworkAttributes{ config.Networks = helpers.MustNewSubnetMap(map[string]NetworkAttributes{
"::ffff:192.0.2.0/120": {Name: "infra"}, "::ffff:192.0.2.0/120": {Name: "infra"},
}) })
@@ -25,6 +31,7 @@ func TestHTTPEndpoints(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("New() error:\n%+v", err) t.Fatalf("New() error:\n%+v", err)
} }
helpers.StartStop(t, c)
cases := helpers.HTTPEndpointCases{ cases := helpers.HTTPEndpointCases{
{ {
@@ -93,3 +100,116 @@ func TestAdditionalASNs(t *testing.T) {
helpers.TestHTTPEndpoints(t, c.d.HTTP.LocalAddr(), cases) helpers.TestHTTPEndpoints(t, c.d.HTTP.LocalAddr(), cases)
} }
func TestNetworkSources(t *testing.T) {
// Mux to answer requests
ready := make(chan bool)
mux := netHTTP.NewServeMux()
mux.Handle("/amazon.json", netHTTP.HandlerFunc(func(w netHTTP.ResponseWriter, r *netHTTP.Request) {
select {
case <-ready:
default:
w.WriteHeader(404)
return
}
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(200)
w.Write([]byte(`
{
"syncToken": "1665609189",
"createDate": "2022-10-12-21-13-09",
"prefixes": [
{
"ip_prefix": "3.2.34.0/26",
"region": "af-south-1",
"service": "AMAZON",
"network_border_group": "af-south-1"
}
],
"ipv6_prefixes": [
{
"ipv6_prefix": "2600:1ff2:4000::/40",
"region": "us-west-2",
"service": "AMAZON",
"network_border_group": "us-west-2"
},
{
"ipv6_prefix": "2600:1f14:fff:f800::/56",
"region": "us-west-2",
"service": "ROUTE53_HEALTHCHECKS",
"network_border_group": "us-west-2"
}
]
}
`))
}))
// Setup an HTTP server to serve the JSON
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Listen() error:\n%+v", err)
}
server := &netHTTP.Server{
Addr: listener.Addr().String(),
Handler: mux,
}
address := listener.Addr()
go server.Serve(listener)
defer server.Shutdown(context.Background())
r := reporter.NewMock(t)
config := DefaultConfiguration()
config.SkipMigrations = true
config.NetworkSourcesTimeout = 10 * time.Millisecond
config.NetworkSources = map[string]NetworkSource{
"amazon": {
URL: fmt.Sprintf("http://%s/amazon.json", address),
Interval: 100 * time.Millisecond,
Transform: MustParseTransformQuery(`
(.prefixes + .ipv6_prefixes)[] |
{ prefix: (.ip_prefix // .ipv6_prefix), tenant: "amazon", region: .region, role: .service|ascii_downcase }
`),
},
}
c, err := New(r, config, Dependencies{
Daemon: daemon.NewMock(t),
HTTP: http.NewMock(t, r),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
helpers.StartStop(t, c)
// When not ready, we get a 503
helpers.TestHTTPEndpoints(t, c.d.HTTP.LocalAddr(), helpers.HTTPEndpointCases{
{
Description: "try when not ready",
URL: "/api/v0/orchestrator/clickhouse/networks.csv",
StatusCode: 503,
},
})
close(ready)
time.Sleep(50 * time.Millisecond)
helpers.TestHTTPEndpoints(t, c.d.HTTP.LocalAddr(), helpers.HTTPEndpointCases{
{
Description: "try when ready",
URL: "/api/v0/orchestrator/clickhouse/networks.csv",
ContentType: "text/csv; charset=utf-8",
FirstLines: []string{
`network,name,role,site,region,tenant`,
`3.2.34.0/26,,amazon,,af-south-1,amazon`,
`2600:1ff2:4000::/40,,amazon,,us-west-2,amazon`,
`2600:1f14:fff:f800::/56,,route53_healthchecks,,us-west-2,amazon`,
},
},
})
gotMetrics := r.GetMetrics("akvorado_orchestrator_clickhouse_network_source_networks_")
expectedMetrics := map[string]string{
`total{source="amazon"}`: "3",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)
}
}

View File

@@ -0,0 +1,66 @@
// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package clickhouse
import "akvorado/common/reporter"
type metrics struct {
migrationsRunning reporter.Gauge
migrationsApplied reporter.Counter
migrationsNotApplied reporter.Counter
migrationsVersion reporter.Gauge
networkSourceUpdates *reporter.CounterVec
networkSourceErrors *reporter.CounterVec
networkSourceCount *reporter.GaugeVec
}
func (c *Component) initMetrics() {
c.metrics.migrationsRunning = c.r.Gauge(
reporter.GaugeOpts{
Name: "migrations_running",
Help: "Database migrations in progress.",
},
)
c.metrics.migrationsApplied = c.r.Counter(
reporter.CounterOpts{
Name: "migrations_applied_steps",
Help: "Number of migration steps applied",
},
)
c.metrics.migrationsNotApplied = c.r.Counter(
reporter.CounterOpts{
Name: "migrations_notapplied_steps",
Help: "Number of migration steps not applied",
},
)
c.metrics.migrationsVersion = c.r.Gauge(
reporter.GaugeOpts{
Name: "migrations_version",
Help: "Current version for migrations.",
},
)
c.metrics.networkSourceUpdates = c.r.CounterVec(
reporter.CounterOpts{
Name: "network_source_updates_total",
Help: "Number of successful updates for a network source",
},
[]string{"source"},
)
c.metrics.networkSourceErrors = c.r.CounterVec(
reporter.CounterOpts{
Name: "network_source_errors_total",
Help: "Number of failed updates for a network source",
},
[]string{"source", "error"},
)
c.metrics.networkSourceCount = c.r.GaugeVec(
reporter.GaugeOpts{
Name: "network_source_networks_total",
Help: "Number of networks imported from a given source",
},
[]string{"source"},
)
}

View File

@@ -5,9 +5,13 @@
package clickhouse package clickhouse
import ( import (
"context"
"sort" "sort"
"sync"
"time"
"github.com/cenkalti/backoff/v4" "github.com/cenkalti/backoff/v4"
"github.com/itchyny/gojq"
"gopkg.in/tomb.v2" "gopkg.in/tomb.v2"
"akvorado/common/clickhousedb" "akvorado/common/clickhousedb"
@@ -18,19 +22,16 @@ import (
// Component represents the ClickHouse configurator. // Component represents the ClickHouse configurator.
type Component struct { type Component struct {
r *reporter.Reporter r *reporter.Reporter
d *Dependencies d *Dependencies
t tomb.Tomb t tomb.Tomb
config Configuration config Configuration
metrics metrics
metrics struct { migrationsDone chan bool // closed when migrations are done
migrationsRunning reporter.Gauge networkSourcesReady chan bool // closed when all network sources are ready
migrationsApplied reporter.Counter networkSourcesLock sync.RWMutex
migrationsNotApplied reporter.Counter networkSources map[string][]externalNetworkAttributes
migrationsVersion reporter.Gauge
}
migrationsDone chan bool
} }
// Dependencies define the dependencies of the ClickHouse configurator. // Dependencies define the dependencies of the ClickHouse configurator.
@@ -43,38 +44,17 @@ type Dependencies struct {
// New creates a new ClickHouse component. // New creates a new ClickHouse component.
func New(r *reporter.Reporter, configuration Configuration, dependencies Dependencies) (*Component, error) { func New(r *reporter.Reporter, configuration Configuration, dependencies Dependencies) (*Component, error) {
c := Component{ c := Component{
r: r, r: r,
d: &dependencies, d: &dependencies,
config: configuration, config: configuration,
migrationsDone: make(chan bool), migrationsDone: make(chan bool),
networkSourcesReady: make(chan bool),
networkSources: make(map[string][]externalNetworkAttributes),
} }
c.initMetrics()
if err := c.registerHTTPHandlers(); err != nil { if err := c.registerHTTPHandlers(); err != nil {
return nil, err return nil, err
} }
c.metrics.migrationsRunning = c.r.Gauge(
reporter.GaugeOpts{
Name: "migrations_running",
Help: "Database migrations in progress.",
},
)
c.metrics.migrationsApplied = c.r.Counter(
reporter.CounterOpts{
Name: "migrations_applied_steps",
Help: "Number of migration steps applied",
},
)
c.metrics.migrationsNotApplied = c.r.Counter(
reporter.CounterOpts{
Name: "migrations_notapplied_steps",
Help: "Number of migration steps not applied",
},
)
c.metrics.migrationsVersion = c.r.Gauge(
reporter.GaugeOpts{
Name: "migrations_version",
Help: "Current version for migrations.",
},
)
// Ensure resolutions are sorted and we have a 0-interval resolution first. // Ensure resolutions are sorted and we have a 0-interval resolution first.
sort.Slice(c.config.Resolutions, func(i, j int) bool { sort.Slice(c.config.Resolutions, func(i, j int) bool {
@@ -90,6 +70,8 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
// Start the ClickHouse component. // Start the ClickHouse component.
func (c *Component) Start() error { func (c *Component) Start() error {
c.r.Info().Msg("starting ClickHouse component") c.r.Info().Msg("starting ClickHouse component")
// Database migration
c.metrics.migrationsRunning.Set(1) c.metrics.migrationsRunning.Set(1)
c.t.Go(func() error { c.t.Go(func() error {
customBackoff := backoff.NewExponentialBackOff() customBackoff := backoff.NewExponentialBackOff()
@@ -97,9 +79,11 @@ func (c *Component) Start() error {
ticker := backoff.NewTicker(customBackoff) ticker := backoff.NewTicker(customBackoff)
defer ticker.Stop() defer ticker.Stop()
for { for {
c.r.Info().Msg("attempting database migration") if !c.config.SkipMigrations {
if err := c.migrateDatabase(); err == nil { c.r.Info().Msg("attempting database migration")
return nil if err := c.migrateDatabase(); err == nil {
return nil
}
} }
select { select {
case <-c.t.Dying(): case <-c.t.Dying():
@@ -108,6 +92,85 @@ func (c *Component) Start() error {
} }
} }
}) })
// Network sources update
var notReadySources sync.WaitGroup
notReadySources.Add(len(c.config.NetworkSources))
go func() {
notReadySources.Wait()
close(c.networkSourcesReady)
}()
for name, source := range c.config.NetworkSources {
if source.Transform.Query == nil {
source.Transform.Query, _ = gojq.Parse(".")
}
if source.Timeout == 0 {
source.Timeout = time.Minute
}
c.t.Go(func() error {
c.metrics.networkSourceCount.WithLabelValues(name).Set(0)
newRetryTicker := func() *backoff.Ticker {
customBackoff := backoff.NewExponentialBackOff()
customBackoff.MaxElapsedTime = 0
customBackoff.MaxInterval = source.Interval
customBackoff.InitialInterval = source.Interval / 10
if customBackoff.InitialInterval > time.Second {
customBackoff.InitialInterval = time.Second
}
return backoff.NewTicker(customBackoff)
}
newRegularTicker := func() *time.Ticker {
return time.NewTicker(source.Interval)
}
retryTicker := newRetryTicker()
regularTicker := newRegularTicker()
regularTicker.Stop()
success := false
ready := false
defer func() {
if !success {
retryTicker.Stop()
} else {
regularTicker.Stop()
}
if !ready {
notReadySources.Done()
}
}()
for {
ctx, cancel := context.WithTimeout(c.t.Context(nil), source.Timeout)
count, err := c.updateNetworkSource(ctx, name, source)
cancel()
if err == nil {
c.metrics.networkSourceUpdates.WithLabelValues(name).Inc()
c.metrics.networkSourceCount.WithLabelValues(name).Set(float64(count))
} else {
c.metrics.networkSourceErrors.WithLabelValues(name, err.Error()).Inc()
}
if err == nil && !ready {
ready = true
notReadySources.Done()
}
if err == nil && !success {
// On success, change the timer to a regular timer interval
retryTicker.Stop()
regularTicker = newRegularTicker()
success = true
} else if err != nil && success {
// On failure, switch to the retry ticker
regularTicker.Stop()
retryTicker = newRetryTicker()
success = false
}
select {
case <-c.t.Dying():
return nil
case <-retryTicker.C:
case <-regularTicker.C:
}
}
})
}
return nil return nil
} }

View File

@@ -0,0 +1,92 @@
// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package clickhouse
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/netip"
"github.com/mitchellh/mapstructure"
)
type externalNetworkAttributes struct {
Prefix netip.Prefix
NetworkAttributes `mapstructure:",squash"`
}
// updateNetworkSource updates a remote network source. It returns the
// number of networks retrieved.
func (c *Component) updateNetworkSource(ctx context.Context, name string, source NetworkSource) (int, error) {
l := c.r.With().Str("name", name).Str("url", source.URL).Logger()
l.Info().Msg("update network source")
client := &http.Client{Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
}}
req, err := http.NewRequestWithContext(ctx, "GET", source.URL, nil)
req.Header.Set("accept", "application/json")
if err != nil {
l.Err(err).Msg("unable to build new request")
return 0, fmt.Errorf("unable to build new request: %w", err)
}
resp, err := client.Do(req)
if err != nil {
l.Err(err).Msg("unable to fetch network source")
return 0, fmt.Errorf("unable to fetch network source: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
err := fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, resp.Status)
l.Error().Msg(err.Error())
return 0, err
}
reader := bufio.NewReader(resp.Body)
decoder := json.NewDecoder(reader)
var got interface{}
if err := decoder.Decode(&got); err != nil {
l.Err(err).Msg("cannot decode JSON output")
return 0, fmt.Errorf("cannot decode JSON output: %w", err)
}
results := []externalNetworkAttributes{}
iter := source.Transform.Query.RunWithContext(ctx, got)
for {
v, ok := iter.Next()
if !ok {
break
}
if err, ok := v.(error); ok {
l.Err(err).Msg("cannot execute jq filter")
return 0, fmt.Errorf("cannot execute jq filter: %w", err)
}
var result externalNetworkAttributes
config := &mapstructure.DecoderConfig{
Metadata: nil,
Result: &result,
DecodeHook: mapstructure.TextUnmarshallerHookFunc(),
}
decoder, err := mapstructure.NewDecoder(config)
if err != nil {
panic(err)
}
if err := decoder.Decode(v); err != nil {
l.Err(err).Msgf("cannot map returned value for %#v", v)
return 0, fmt.Errorf("cannot map returned value: %w", err)
}
results = append(results, result)
}
if len(results) == 0 {
err := errors.New("empty results")
l.Error().Msg(err.Error())
return 0, err
}
c.networkSourcesLock.Lock()
c.networkSources[name] = results
c.networkSourcesLock.Unlock()
return len(results), nil
}

View File

@@ -0,0 +1,17 @@
// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
//go:build !release
package clickhouse
import "github.com/itchyny/gojq"
// MustParseTransformQuery parses a transform query or panic.
func MustParseTransformQuery(src string) TransformQuery {
q, err := gojq.Parse(src)
if err != nil {
panic(err)
}
return TransformQuery{q}
}