diff --git a/Makefile b/Makefile index aaf885c0..30d67e59 100644 --- a/Makefile +++ b/Makefile @@ -58,6 +58,18 @@ web/data: mkdocs.yml $(wildcard docs/*.md docs/assets/*) ; $(info $(M) build doc -v $(CURDIR)/web/data:/output:rw \ squidfunk/mkdocs-material:8.2.5 build --strict --site-dir /output +# These files are versioned in Git, but we may want to update them. +clickhouse/data/protocols.csv: + $ curl -sL http://www.iana.org/assignments/protocol-numbers/protocol-numbers-1.csv \ + | sed -nE -e "1 s/.*/proto,name,description/p" -e "2,$ s/^([0-9]+,[^ ,]+,[^\",]+),.*/\1/p" \ + > $@ +clickhouse/data/asns.csv: # Need to pipe MaxMind ASN database in CSV format + $Q sed -ne 's/^[^,]*,//p' \ + | LC_ALL=C sort -n \ + | uniq \ + | grep -v '^[0-9,]*$' \ + | sed -e '1casn,name' > $@ + # Tests TEST_TARGETS := test-bench test-short test-verbose test-race @@ -84,7 +96,7 @@ test-coverage: | $(GOCOV) $(GOCOVXML) $(GOTESTSUM) ; $(info $(M) running coverag $Q $(GO) tool cover -html=test/profile.out -o test/coverage.html $Q $(GOCOV) convert test/profile.out | $(GOCOVXML) > test/coverage.xml @echo -n "Code coverage: "; \ - echo "$$(sed -En 's/^ /var/lib/clickhouse/format_schemas/flow.proto <<'EOF' +`)) + flow.FlowProtoHandler.ServeHTTP(w, r) + w.Write([]byte(`EOF`)) + })) + + entries, err := data.ReadDir("data") + if err != nil { + return fmt.Errorf("unable to read data directory: %w", err) + } + for _, entry := range entries { + if entry.IsDir() { + continue + } + url := fmt.Sprintf("/api/v0/clickhouse/%s", entry.Name()) + path := fmt.Sprintf("data/%s", entry.Name()) + c.addHandlerEmbedded(url, path) + } + return nil +} diff --git a/clickhouse/http_test.go b/clickhouse/http_test.go new file mode 100644 index 00000000..a1859e0e --- /dev/null +++ b/clickhouse/http_test.go @@ -0,0 +1,88 @@ +package clickhouse + +import ( + "bufio" + "fmt" + netHTTP "net/http" + "testing" + + "akvorado/helpers" + "akvorado/http" + "akvorado/reporter" +) + +func TestHTTPEndpoints(t *testing.T) { + r := reporter.NewMock(t) + c, err := New(r, DefaultConfiguration, Dependencies{ + HTTP: http.NewMock(t, r), + }) + if err != nil { + t.Fatalf("New() error:\n%+v", err) + } + + cases := []struct { + URL string + ContentType string + FirstLines []string + }{ + { + URL: "/api/v0/clickhouse/protocols.csv", + ContentType: "text/csv; charset=utf-8", + FirstLines: []string{ + `proto,name,description`, + `0,HOPOPT,IPv6 Hop-by-Hop Option`, + `1,ICMP,Internet Control Message`, + }, + }, { + URL: "/api/v0/clickhouse/asns.csv", + ContentType: "text/csv; charset=utf-8", + FirstLines: []string{ + "asn,name", + "1,LVLT-1", + }, + }, { + URL: "/api/v0/clickhouse/flow.proto", + ContentType: "text/plain", + FirstLines: []string{ + `syntax = "proto3";`, + `package flow;`, + }, + }, { + URL: "/api/v0/clickhouse/init.sh", + ContentType: "text/x-shellscript", + FirstLines: []string{ + `#!/bin/sh`, + `cat > /var/lib/clickhouse/format_schemas/flow.proto <<'EOF'`, + `syntax = "proto3";`, + `package flow;`, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.URL, func(t *testing.T) { + resp, err := netHTTP.Get(fmt.Sprintf("http://%s%s", c.d.HTTP.Address, tc.URL)) + if err != nil { + t.Fatalf("GET %s:\n%+v", tc.URL, err) + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + t.Fatalf("GET %s: got status code %d, not 200", tc.URL, resp.StatusCode) + } + gotContentType := resp.Header.Get("Content-Type") + if gotContentType != tc.ContentType { + t.Errorf("GET %s Content-Type (-got, +want):\n-%s\n+%s", + tc.URL, gotContentType, tc.ContentType) + } + reader := bufio.NewScanner(resp.Body) + got := []string{} + for reader.Scan() && len(got) < len(tc.FirstLines) { + got = append(got, reader.Text()) + } + if diff := helpers.Diff(got, tc.FirstLines); diff != "" { + t.Errorf("GET %s (-got, +want):\n%s", tc.URL, diff) + } + }) + } + +} diff --git a/clickhouse/root.go b/clickhouse/root.go new file mode 100644 index 00000000..789abd10 --- /dev/null +++ b/clickhouse/root.go @@ -0,0 +1,32 @@ +// Package clickhouse handles housekeeping for the Clickhouse database. +package clickhouse + +import ( + "akvorado/http" + "akvorado/reporter" +) + +// Component represents the Kafka exporter. +type Component struct { + r *reporter.Reporter + d *Dependencies + config Configuration +} + +// Dependencies define the dependencies of the Kafka exporter. +type Dependencies struct { + HTTP *http.Component +} + +// New creates a new Clickhouse component. +func New(reporter *reporter.Reporter, configuration Configuration, dependencies Dependencies) (*Component, error) { + c := Component{ + r: reporter, + d: &dependencies, + config: configuration, + } + if err := c.registerHTTPHandlers(); err != nil { + return nil, err + } + return &c, nil +} diff --git a/cmd/serve.go b/cmd/serve.go index 639b153c..2f28ea3b 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -12,6 +12,7 @@ import ( "github.com/spf13/viper" "gopkg.in/yaml.v2" + "akvorado/clickhouse" "akvorado/core" "akvorado/daemon" "akvorado/flow" @@ -25,26 +26,28 @@ import ( // ServeConfiguration represents the configuration file for the serve command. type ServeConfiguration struct { - Reporting reporter.Configuration - HTTP http.Configuration - Flow flow.Configuration - SNMP snmp.Configuration - GeoIP geoip.Configuration - Kafka kafka.Configuration - Core core.Configuration - Web web.Configuration + Reporting reporter.Configuration + HTTP http.Configuration + Flow flow.Configuration + SNMP snmp.Configuration + GeoIP geoip.Configuration + Kafka kafka.Configuration + Core core.Configuration + Web web.Configuration + Clickhouse clickhouse.Configuration } // DefaultServeConfiguration is the default configuration for the serve command. var DefaultServeConfiguration = ServeConfiguration{ - Reporting: reporter.DefaultConfiguration, - HTTP: http.DefaultConfiguration, - Flow: flow.DefaultConfiguration, - SNMP: snmp.DefaultConfiguration, - GeoIP: geoip.DefaultConfiguration, - Kafka: kafka.DefaultConfiguration, - Core: core.DefaultConfiguration, - Web: web.DefaultConfiguration, + Reporting: reporter.DefaultConfiguration, + HTTP: http.DefaultConfiguration, + Flow: flow.DefaultConfiguration, + SNMP: snmp.DefaultConfiguration, + GeoIP: geoip.DefaultConfiguration, + Kafka: kafka.DefaultConfiguration, + Core: core.DefaultConfiguration, + Web: web.DefaultConfiguration, + Clickhouse: clickhouse.DefaultConfiguration, } type serveOptions struct { @@ -153,6 +156,12 @@ func daemonStart(r *reporter.Reporter, config ServeConfiguration, checkOnly bool if err != nil { return fmt.Errorf("unable to initialize Kafka component: %w", err) } + clickhouseComponent, err := clickhouse.New(r, config.Clickhouse, clickhouse.Dependencies{ + HTTP: httpComponent, + }) + if err != nil { + return fmt.Errorf("unable to initialize Clickhouse component: %w", err) + } coreComponent, err := core.New(r, config.Core, core.Dependencies{ Daemon: daemonComponent, Flow: flowComponent, @@ -206,6 +215,7 @@ func daemonStart(r *reporter.Reporter, config ServeConfiguration, checkOnly bool snmpComponent, geoipComponent, kafkaComponent, + clickhouseComponent, coreComponent, webComponent, } diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index a6b02cec..725c3c38 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -28,3 +28,6 @@ services: - kafka ports: - 8123:8123/tcp + - 9000:9000/tcp + volumes: + - ./flow/flow.proto:/var/lib/clickhouse/format_schemas/flow.proto:ro diff --git a/docs/configuration.md b/docs/configuration.md index 86c0962c..9dc6e580 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -10,6 +10,7 @@ configured through a different section: - `snmp`: [SNMP poller](#snmp) - `geoip`: [GeoIP database](#geoip) - `kafka`: [Kafka broker](#kafka) +- `clickhouse`: [Clickhouse helper](#clickhouse) - `core`: [Core](#core) You can get the default configuration with `./akvorado --dump --check`. @@ -154,6 +155,11 @@ kafka: cleanup.policy: delete ``` +## Clickhouse + +The Clickhouse component exposes some useful HTTP endpoints to +configure a Clickhouse database. It takes no configuration. + ## Core The core orchestrates the remaining components. It receives the flows diff --git a/docs/integration.md b/docs/integration.md new file mode 100644 index 00000000..28823251 --- /dev/null +++ b/docs/integration.md @@ -0,0 +1,33 @@ +# Integrations + +*Akvorado* needs some integration with external components to be +useful. The most important one is Kafka, but it can also integrate +with Clickhouse and Grafana. + +## Kafka + +The Kafka component sends flows to Kafka. Its +[configuration](configuration.md#kafka) mostly needs a topic name and a list +of brokers. It is possible to let *Akvorado* manage the topic with the +appropriate settings (number of partitions, replication factor and +additional configuration entries). If the topic exists, *Akvorado* +won't update the number of partitions and the replication factor but +other settings will be updated. + +## Clickhouse + +Clickhouse can collect the data from Kafka. To help its configuration, +*Akvorado* exposes a few HTTP endpoint: + +- `/api/v0/clickhouse/flow.proto` contains the schema +- `/api/v0/clickhouse/init.sh` contains the schema in the form of a + script to execute during initialization +- `/api/v0/clickhouse/protocols.csv` contains a CSV with the mapping + between protocol numbers and names +- `/api/v0/clickhouse/asns.csv` contains a CSV with the mapping + between AS numbers and organization names + +## Grafana + +No integration is currently done for Grafana, except a reverse proxy +configured in the [web section](configuration.md#web). diff --git a/docs/usage.md b/docs/usage.md index 7a47597a..ae9f4d61 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -27,12 +27,11 @@ with `--check` if you don't want *Akvorado* to start. ### Exposed HTTP endpoints The embedded HTTP server contains the endpoints listed on the [home -page](/). The [`/api/v0/flows`](/api/v0/flows?limit=1) continously -printed flows sent to Kafka (using [ndjson]()). It also accepts a -`limit` argument to stops after emitting the specified number of -flows. This endpoint should not be used for anything else other than -debug: it can skips some flows and if there are several users, flows -will be dispatched between them. +page](index.md). The [`/api/v0/flows`](/api/v0/flows?limit=1) +continously printed flows sent to Kafka (using [ndjson]()). It also +accepts a `limit` argument to stops after emitting the specified +number of flows. This endpoint should not be used for anything else +other than debug: it can skips some flows and if there are several +users, flows will be dispatched between them. [ndjson]: http://ndjson.org/ - diff --git a/flow/root.go b/flow/root.go index 10d5df1f..d4adc8ae 100644 --- a/flow/root.go +++ b/flow/root.go @@ -50,8 +50,15 @@ type Dependencies struct { HTTP *http.Component } -//go:embed flow.proto -var flowProtoContent []byte +var ( + //go:embed flow.proto + flowProto []byte + // FlowProtoHandler is an HTTP handler serving flow.proto + FlowProtoHandler = netHTTP.HandlerFunc(func(w netHTTP.ResponseWriter, r *netHTTP.Request) { + w.Header().Set("Content-Type", "text/plain") + w.Write(flowProto) + }) +) // New creates a new flow component. func New(r *reporter.Reporter, configuration Configuration, dependencies Dependencies) (*Component, error) { @@ -62,14 +69,8 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende incomingFlows: make(chan *FlowMessage, configuration.BufferLength), } c.d.Daemon.Track(&c.t, "flow") + c.d.HTTP.AddHandler("/api/v0/flow.proto", FlowProtoHandler) c.initMetrics() - if c.d.HTTP != nil { - c.d.HTTP.AddHandler("/api/v0/flow.proto", - netHTTP.HandlerFunc(func(w netHTTP.ResponseWriter, r *netHTTP.Request) { - w.Header().Set("Content-Type", "text/plain") - w.Write(flowProtoContent) - })) - } return &c, nil } diff --git a/mkdocs.yml b/mkdocs.yml index 0e4acfeb..367e4794 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -60,3 +60,5 @@ nav: Installation: install.md Configuration: configuration.md Usage: usage.md + Integration: integration.md +