mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-12 06:24:10 +01:00
clickhouse: add a new Clickhouse component to help setting Clickhouse
This commit is contained in:
14
Makefile
14
Makefile
@@ -58,6 +58,18 @@ web/data: mkdocs.yml $(wildcard docs/*.md docs/assets/*) ; $(info $(M) build doc
|
|||||||
-v $(CURDIR)/web/data:/output:rw \
|
-v $(CURDIR)/web/data:/output:rw \
|
||||||
squidfunk/mkdocs-material:8.2.5 build --strict --site-dir /output
|
squidfunk/mkdocs-material:8.2.5 build --strict --site-dir /output
|
||||||
|
|
||||||
|
# These files are versioned in Git, but we may want to update them.
|
||||||
|
clickhouse/data/protocols.csv:
|
||||||
|
$ curl -sL http://www.iana.org/assignments/protocol-numbers/protocol-numbers-1.csv \
|
||||||
|
| sed -nE -e "1 s/.*/proto,name,description/p" -e "2,$ s/^([0-9]+,[^ ,]+,[^\",]+),.*/\1/p" \
|
||||||
|
> $@
|
||||||
|
clickhouse/data/asns.csv: # Need to pipe MaxMind ASN database in CSV format
|
||||||
|
$Q sed -ne 's/^[^,]*,//p' \
|
||||||
|
| LC_ALL=C sort -n \
|
||||||
|
| uniq \
|
||||||
|
| grep -v '^[0-9,]*$' \
|
||||||
|
| sed -e '1casn,name' > $@
|
||||||
|
|
||||||
# Tests
|
# Tests
|
||||||
|
|
||||||
TEST_TARGETS := test-bench test-short test-verbose test-race
|
TEST_TARGETS := test-bench test-short test-verbose test-race
|
||||||
@@ -84,7 +96,7 @@ test-coverage: | $(GOCOV) $(GOCOVXML) $(GOTESTSUM) ; $(info $(M) running coverag
|
|||||||
$Q $(GO) tool cover -html=test/profile.out -o test/coverage.html
|
$Q $(GO) tool cover -html=test/profile.out -o test/coverage.html
|
||||||
$Q $(GOCOV) convert test/profile.out | $(GOCOVXML) > test/coverage.xml
|
$Q $(GOCOV) convert test/profile.out | $(GOCOVXML) > test/coverage.xml
|
||||||
@echo -n "Code coverage: "; \
|
@echo -n "Code coverage: "; \
|
||||||
echo "$$(sed -En 's/^<coverage line-rate="([0-9.]+)".*/\1/p' test/coverage.xml) * 100 / 1" | bc -q
|
echo "scale=1;$$(sed -En 's/^<coverage line-rate="([0-9.]+)".*/\1/p' test/coverage.xml) * 100 / 1" | bc -q
|
||||||
|
|
||||||
.PHONY: lint
|
.PHONY: lint
|
||||||
lint: | $(REVIVE) ; $(info $(M) running golint…) @ ## Run golint
|
lint: | $(REVIVE) ; $(info $(M) running golint…) @ ## Run golint
|
||||||
|
|||||||
7
clickhouse/config.go
Normal file
7
clickhouse/config.go
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
package clickhouse
|
||||||
|
|
||||||
|
// Configuration describes the configuration for the Clickhouse component.
|
||||||
|
type Configuration struct{}
|
||||||
|
|
||||||
|
// DefaultConfiguration represents the default configuration for the Clickhouse component.
|
||||||
|
var DefaultConfiguration = Configuration{}
|
||||||
78022
clickhouse/data/asns.csv
Normal file
78022
clickhouse/data/asns.csv
Normal file
File diff suppressed because it is too large
Load Diff
127
clickhouse/data/protocols.csv
Normal file
127
clickhouse/data/protocols.csv
Normal file
@@ -0,0 +1,127 @@
|
|||||||
|
proto,name,description
|
||||||
|
0,HOPOPT,IPv6 Hop-by-Hop Option
|
||||||
|
1,ICMP,Internet Control Message
|
||||||
|
2,IGMP,Internet Group Management
|
||||||
|
3,GGP,Gateway-to-Gateway
|
||||||
|
4,IPv4,IPv4 encapsulation
|
||||||
|
5,ST,Stream
|
||||||
|
6,TCP,Transmission Control
|
||||||
|
7,CBT,CBT
|
||||||
|
8,EGP,Exterior Gateway Protocol
|
||||||
|
10,BBN-RCC-MON,BBN RCC Monitoring
|
||||||
|
11,NVP-II,Network Voice Protocol
|
||||||
|
12,PUP,PUP
|
||||||
|
14,EMCON,EMCON
|
||||||
|
15,XNET,Cross Net Debugger
|
||||||
|
16,CHAOS,Chaos
|
||||||
|
17,UDP,User Datagram
|
||||||
|
18,MUX,Multiplexing
|
||||||
|
19,DCN-MEAS,DCN Measurement Subsystems
|
||||||
|
20,HMP,Host Monitoring
|
||||||
|
21,PRM,Packet Radio Measurement
|
||||||
|
22,XNS-IDP,XEROX NS IDP
|
||||||
|
23,TRUNK-1,Trunk-1
|
||||||
|
24,TRUNK-2,Trunk-2
|
||||||
|
25,LEAF-1,Leaf-1
|
||||||
|
26,LEAF-2,Leaf-2
|
||||||
|
27,RDP,Reliable Data Protocol
|
||||||
|
28,IRTP,Internet Reliable Transaction
|
||||||
|
29,ISO-TP4,ISO Transport Protocol Class 4
|
||||||
|
30,NETBLT,Bulk Data Transfer Protocol
|
||||||
|
31,MFE-NSP,MFE Network Services Protocol
|
||||||
|
32,MERIT-INP,MERIT Internodal Protocol
|
||||||
|
33,DCCP,Datagram Congestion Control Protocol
|
||||||
|
34,3PC,Third Party Connect Protocol
|
||||||
|
35,IDPR,Inter-Domain Policy Routing Protocol
|
||||||
|
36,XTP,XTP
|
||||||
|
37,DDP,Datagram Delivery Protocol
|
||||||
|
38,IDPR-CMTP,IDPR Control Message Transport Proto
|
||||||
|
39,TP++,TP++ Transport Protocol
|
||||||
|
40,IL,IL Transport Protocol
|
||||||
|
41,IPv6,IPv6 encapsulation
|
||||||
|
42,SDRP,Source Demand Routing Protocol
|
||||||
|
43,IPv6-Route,Routing Header for IPv6
|
||||||
|
44,IPv6-Frag,Fragment Header for IPv6
|
||||||
|
45,IDRP,Inter-Domain Routing Protocol
|
||||||
|
46,RSVP,Reservation Protocol
|
||||||
|
47,GRE,Generic Routing Encapsulation
|
||||||
|
48,DSR,Dynamic Source Routing Protocol
|
||||||
|
49,BNA,BNA
|
||||||
|
50,ESP,Encap Security Payload
|
||||||
|
51,AH,Authentication Header
|
||||||
|
52,I-NLSP,Integrated Net Layer Security TUBA
|
||||||
|
54,NARP,NBMA Address Resolution Protocol
|
||||||
|
55,MOBILE,IP Mobility
|
||||||
|
57,SKIP,SKIP
|
||||||
|
58,IPv6-ICMP,ICMP for IPv6
|
||||||
|
59,IPv6-NoNxt,No Next Header for IPv6
|
||||||
|
60,IPv6-Opts,Destination Options for IPv6
|
||||||
|
62,CFTP,CFTP
|
||||||
|
64,SAT-EXPAK,SATNET and Backroom EXPAK
|
||||||
|
65,KRYPTOLAN,Kryptolan
|
||||||
|
66,RVD,MIT Remote Virtual Disk Protocol
|
||||||
|
67,IPPC,Internet Pluribus Packet Core
|
||||||
|
69,SAT-MON,SATNET Monitoring
|
||||||
|
70,VISA,VISA Protocol
|
||||||
|
71,IPCV,Internet Packet Core Utility
|
||||||
|
72,CPNX,Computer Protocol Network Executive
|
||||||
|
73,CPHB,Computer Protocol Heart Beat
|
||||||
|
74,WSN,Wang Span Network
|
||||||
|
75,PVP,Packet Video Protocol
|
||||||
|
76,BR-SAT-MON,Backroom SATNET Monitoring
|
||||||
|
77,SUN-ND,SUN ND PROTOCOL-Temporary
|
||||||
|
78,WB-MON,WIDEBAND Monitoring
|
||||||
|
79,WB-EXPAK,WIDEBAND EXPAK
|
||||||
|
80,ISO-IP,ISO Internet Protocol
|
||||||
|
81,VMTP,VMTP
|
||||||
|
82,SECURE-VMTP,SECURE-VMTP
|
||||||
|
83,VINES,VINES
|
||||||
|
84,TTP,Transaction Transport Protocol
|
||||||
|
84,IPTM,Internet Protocol Traffic Manager
|
||||||
|
85,NSFNET-IGP,NSFNET-IGP
|
||||||
|
86,DGP,Dissimilar Gateway Protocol
|
||||||
|
87,TCF,TCF
|
||||||
|
88,EIGRP,EIGRP
|
||||||
|
89,OSPFIGP,OSPFIGP
|
||||||
|
90,Sprite-RPC,Sprite RPC Protocol
|
||||||
|
91,LARP,Locus Address Resolution Protocol
|
||||||
|
92,MTP,Multicast Transport Protocol
|
||||||
|
93,AX.25,AX.25 Frames
|
||||||
|
94,IPIP,IP-within-IP Encapsulation Protocol
|
||||||
|
96,SCC-SP,Semaphore Communications Sec. Pro.
|
||||||
|
97,ETHERIP,Ethernet-within-IP Encapsulation
|
||||||
|
98,ENCAP,Encapsulation Header
|
||||||
|
100,GMTP,GMTP
|
||||||
|
101,IFMP,Ipsilon Flow Management Protocol
|
||||||
|
102,PNNI,PNNI over IP
|
||||||
|
103,PIM,Protocol Independent Multicast
|
||||||
|
104,ARIS,ARIS
|
||||||
|
105,SCPS,SCPS
|
||||||
|
106,QNX,QNX
|
||||||
|
107,A/N,Active Networks
|
||||||
|
108,IPComp,IP Payload Compression Protocol
|
||||||
|
109,SNP,Sitara Networks Protocol
|
||||||
|
110,Compaq-Peer,Compaq Peer Protocol
|
||||||
|
111,IPX-in-IP,IPX in IP
|
||||||
|
112,VRRP,Virtual Router Redundancy Protocol
|
||||||
|
113,PGM,PGM Reliable Transport Protocol
|
||||||
|
115,L2TP,Layer Two Tunneling Protocol
|
||||||
|
116,DDX,D-II Data Exchange (DDX)
|
||||||
|
117,IATP,Interactive Agent Transfer Protocol
|
||||||
|
118,STP,Schedule Transfer Protocol
|
||||||
|
119,SRP,SpectraLink Radio Protocol
|
||||||
|
120,UTI,UTI
|
||||||
|
121,SMP,Simple Message Protocol
|
||||||
|
123,PTP,Performance Transparency Protocol
|
||||||
|
126,CRTP,Combat Radio Transport Protocol
|
||||||
|
127,CRUDP,Combat Radio User Datagram
|
||||||
|
130,SPS,Secure Packet Shield
|
||||||
|
131,PIPE,Private IP Encapsulation within IP
|
||||||
|
132,SCTP,Stream Control Transmission Protocol
|
||||||
|
133,FC,Fibre Channel
|
||||||
|
138,manet,MANET Protocols
|
||||||
|
139,HIP,Host Identity Protocol
|
||||||
|
140,Shim6,Shim6 Protocol
|
||||||
|
141,WESP,Wrapped Encapsulating Security Payload
|
||||||
|
142,ROHC,Robust Header Compression
|
||||||
|
143,Ethernet,Ethernet
|
||||||
|
56
clickhouse/http.go
Normal file
56
clickhouse/http.go
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
package clickhouse
|
||||||
|
|
||||||
|
import (
|
||||||
|
"embed"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"akvorado/flow"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:embed data/protocols.csv
|
||||||
|
//go:embed data/asns.csv
|
||||||
|
var data embed.FS
|
||||||
|
|
||||||
|
func (c *Component) addHandlerEmbedded(url string, path string) {
|
||||||
|
c.d.HTTP.AddHandler(url,
|
||||||
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
f, err := http.FS(data).Open(path)
|
||||||
|
if err != nil {
|
||||||
|
c.r.Err(err).Msgf("unable to open %s", path)
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
http.ServeContent(w, r, path, time.Time{}, f)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
// registerHTTPHandler register some handlers that will be useful for
|
||||||
|
// Clickhouse
|
||||||
|
func (c *Component) registerHTTPHandlers() error {
|
||||||
|
c.d.HTTP.AddHandler("/api/v0/clickhouse/flow.proto", flow.FlowProtoHandler)
|
||||||
|
c.d.HTTP.AddHandler("/api/v0/clickhouse/init.sh",
|
||||||
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "text/x-shellscript")
|
||||||
|
w.Write([]byte(`#!/bin/sh
|
||||||
|
cat > /var/lib/clickhouse/format_schemas/flow.proto <<'EOF'
|
||||||
|
`))
|
||||||
|
flow.FlowProtoHandler.ServeHTTP(w, r)
|
||||||
|
w.Write([]byte(`EOF`))
|
||||||
|
}))
|
||||||
|
|
||||||
|
entries, err := data.ReadDir("data")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to read data directory: %w", err)
|
||||||
|
}
|
||||||
|
for _, entry := range entries {
|
||||||
|
if entry.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
url := fmt.Sprintf("/api/v0/clickhouse/%s", entry.Name())
|
||||||
|
path := fmt.Sprintf("data/%s", entry.Name())
|
||||||
|
c.addHandlerEmbedded(url, path)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
88
clickhouse/http_test.go
Normal file
88
clickhouse/http_test.go
Normal file
@@ -0,0 +1,88 @@
|
|||||||
|
package clickhouse
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"fmt"
|
||||||
|
netHTTP "net/http"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"akvorado/helpers"
|
||||||
|
"akvorado/http"
|
||||||
|
"akvorado/reporter"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestHTTPEndpoints(t *testing.T) {
|
||||||
|
r := reporter.NewMock(t)
|
||||||
|
c, err := New(r, DefaultConfiguration, Dependencies{
|
||||||
|
HTTP: http.NewMock(t, r),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("New() error:\n%+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
URL string
|
||||||
|
ContentType string
|
||||||
|
FirstLines []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
URL: "/api/v0/clickhouse/protocols.csv",
|
||||||
|
ContentType: "text/csv; charset=utf-8",
|
||||||
|
FirstLines: []string{
|
||||||
|
`proto,name,description`,
|
||||||
|
`0,HOPOPT,IPv6 Hop-by-Hop Option`,
|
||||||
|
`1,ICMP,Internet Control Message`,
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
URL: "/api/v0/clickhouse/asns.csv",
|
||||||
|
ContentType: "text/csv; charset=utf-8",
|
||||||
|
FirstLines: []string{
|
||||||
|
"asn,name",
|
||||||
|
"1,LVLT-1",
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
URL: "/api/v0/clickhouse/flow.proto",
|
||||||
|
ContentType: "text/plain",
|
||||||
|
FirstLines: []string{
|
||||||
|
`syntax = "proto3";`,
|
||||||
|
`package flow;`,
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
URL: "/api/v0/clickhouse/init.sh",
|
||||||
|
ContentType: "text/x-shellscript",
|
||||||
|
FirstLines: []string{
|
||||||
|
`#!/bin/sh`,
|
||||||
|
`cat > /var/lib/clickhouse/format_schemas/flow.proto <<'EOF'`,
|
||||||
|
`syntax = "proto3";`,
|
||||||
|
`package flow;`,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.URL, func(t *testing.T) {
|
||||||
|
resp, err := netHTTP.Get(fmt.Sprintf("http://%s%s", c.d.HTTP.Address, tc.URL))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GET %s:\n%+v", tc.URL, err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
t.Fatalf("GET %s: got status code %d, not 200", tc.URL, resp.StatusCode)
|
||||||
|
}
|
||||||
|
gotContentType := resp.Header.Get("Content-Type")
|
||||||
|
if gotContentType != tc.ContentType {
|
||||||
|
t.Errorf("GET %s Content-Type (-got, +want):\n-%s\n+%s",
|
||||||
|
tc.URL, gotContentType, tc.ContentType)
|
||||||
|
}
|
||||||
|
reader := bufio.NewScanner(resp.Body)
|
||||||
|
got := []string{}
|
||||||
|
for reader.Scan() && len(got) < len(tc.FirstLines) {
|
||||||
|
got = append(got, reader.Text())
|
||||||
|
}
|
||||||
|
if diff := helpers.Diff(got, tc.FirstLines); diff != "" {
|
||||||
|
t.Errorf("GET %s (-got, +want):\n%s", tc.URL, diff)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
32
clickhouse/root.go
Normal file
32
clickhouse/root.go
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
// Package clickhouse handles housekeeping for the Clickhouse database.
|
||||||
|
package clickhouse
|
||||||
|
|
||||||
|
import (
|
||||||
|
"akvorado/http"
|
||||||
|
"akvorado/reporter"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Component represents the Kafka exporter.
|
||||||
|
type Component struct {
|
||||||
|
r *reporter.Reporter
|
||||||
|
d *Dependencies
|
||||||
|
config Configuration
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dependencies define the dependencies of the Kafka exporter.
|
||||||
|
type Dependencies struct {
|
||||||
|
HTTP *http.Component
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new Clickhouse component.
|
||||||
|
func New(reporter *reporter.Reporter, configuration Configuration, dependencies Dependencies) (*Component, error) {
|
||||||
|
c := Component{
|
||||||
|
r: reporter,
|
||||||
|
d: &dependencies,
|
||||||
|
config: configuration,
|
||||||
|
}
|
||||||
|
if err := c.registerHTTPHandlers(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &c, nil
|
||||||
|
}
|
||||||
42
cmd/serve.go
42
cmd/serve.go
@@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
|
|
||||||
|
"akvorado/clickhouse"
|
||||||
"akvorado/core"
|
"akvorado/core"
|
||||||
"akvorado/daemon"
|
"akvorado/daemon"
|
||||||
"akvorado/flow"
|
"akvorado/flow"
|
||||||
@@ -25,26 +26,28 @@ import (
|
|||||||
|
|
||||||
// ServeConfiguration represents the configuration file for the serve command.
|
// ServeConfiguration represents the configuration file for the serve command.
|
||||||
type ServeConfiguration struct {
|
type ServeConfiguration struct {
|
||||||
Reporting reporter.Configuration
|
Reporting reporter.Configuration
|
||||||
HTTP http.Configuration
|
HTTP http.Configuration
|
||||||
Flow flow.Configuration
|
Flow flow.Configuration
|
||||||
SNMP snmp.Configuration
|
SNMP snmp.Configuration
|
||||||
GeoIP geoip.Configuration
|
GeoIP geoip.Configuration
|
||||||
Kafka kafka.Configuration
|
Kafka kafka.Configuration
|
||||||
Core core.Configuration
|
Core core.Configuration
|
||||||
Web web.Configuration
|
Web web.Configuration
|
||||||
|
Clickhouse clickhouse.Configuration
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultServeConfiguration is the default configuration for the serve command.
|
// DefaultServeConfiguration is the default configuration for the serve command.
|
||||||
var DefaultServeConfiguration = ServeConfiguration{
|
var DefaultServeConfiguration = ServeConfiguration{
|
||||||
Reporting: reporter.DefaultConfiguration,
|
Reporting: reporter.DefaultConfiguration,
|
||||||
HTTP: http.DefaultConfiguration,
|
HTTP: http.DefaultConfiguration,
|
||||||
Flow: flow.DefaultConfiguration,
|
Flow: flow.DefaultConfiguration,
|
||||||
SNMP: snmp.DefaultConfiguration,
|
SNMP: snmp.DefaultConfiguration,
|
||||||
GeoIP: geoip.DefaultConfiguration,
|
GeoIP: geoip.DefaultConfiguration,
|
||||||
Kafka: kafka.DefaultConfiguration,
|
Kafka: kafka.DefaultConfiguration,
|
||||||
Core: core.DefaultConfiguration,
|
Core: core.DefaultConfiguration,
|
||||||
Web: web.DefaultConfiguration,
|
Web: web.DefaultConfiguration,
|
||||||
|
Clickhouse: clickhouse.DefaultConfiguration,
|
||||||
}
|
}
|
||||||
|
|
||||||
type serveOptions struct {
|
type serveOptions struct {
|
||||||
@@ -153,6 +156,12 @@ func daemonStart(r *reporter.Reporter, config ServeConfiguration, checkOnly bool
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to initialize Kafka component: %w", err)
|
return fmt.Errorf("unable to initialize Kafka component: %w", err)
|
||||||
}
|
}
|
||||||
|
clickhouseComponent, err := clickhouse.New(r, config.Clickhouse, clickhouse.Dependencies{
|
||||||
|
HTTP: httpComponent,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to initialize Clickhouse component: %w", err)
|
||||||
|
}
|
||||||
coreComponent, err := core.New(r, config.Core, core.Dependencies{
|
coreComponent, err := core.New(r, config.Core, core.Dependencies{
|
||||||
Daemon: daemonComponent,
|
Daemon: daemonComponent,
|
||||||
Flow: flowComponent,
|
Flow: flowComponent,
|
||||||
@@ -206,6 +215,7 @@ func daemonStart(r *reporter.Reporter, config ServeConfiguration, checkOnly bool
|
|||||||
snmpComponent,
|
snmpComponent,
|
||||||
geoipComponent,
|
geoipComponent,
|
||||||
kafkaComponent,
|
kafkaComponent,
|
||||||
|
clickhouseComponent,
|
||||||
coreComponent,
|
coreComponent,
|
||||||
webComponent,
|
webComponent,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,3 +28,6 @@ services:
|
|||||||
- kafka
|
- kafka
|
||||||
ports:
|
ports:
|
||||||
- 8123:8123/tcp
|
- 8123:8123/tcp
|
||||||
|
- 9000:9000/tcp
|
||||||
|
volumes:
|
||||||
|
- ./flow/flow.proto:/var/lib/clickhouse/format_schemas/flow.proto:ro
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ configured through a different section:
|
|||||||
- `snmp`: [SNMP poller](#snmp)
|
- `snmp`: [SNMP poller](#snmp)
|
||||||
- `geoip`: [GeoIP database](#geoip)
|
- `geoip`: [GeoIP database](#geoip)
|
||||||
- `kafka`: [Kafka broker](#kafka)
|
- `kafka`: [Kafka broker](#kafka)
|
||||||
|
- `clickhouse`: [Clickhouse helper](#clickhouse)
|
||||||
- `core`: [Core](#core)
|
- `core`: [Core](#core)
|
||||||
|
|
||||||
You can get the default configuration with `./akvorado --dump --check`.
|
You can get the default configuration with `./akvorado --dump --check`.
|
||||||
@@ -154,6 +155,11 @@ kafka:
|
|||||||
cleanup.policy: delete
|
cleanup.policy: delete
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Clickhouse
|
||||||
|
|
||||||
|
The Clickhouse component exposes some useful HTTP endpoints to
|
||||||
|
configure a Clickhouse database. It takes no configuration.
|
||||||
|
|
||||||
## Core
|
## Core
|
||||||
|
|
||||||
The core orchestrates the remaining components. It receives the flows
|
The core orchestrates the remaining components. It receives the flows
|
||||||
|
|||||||
33
docs/integration.md
Normal file
33
docs/integration.md
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
# Integrations
|
||||||
|
|
||||||
|
*Akvorado* needs some integration with external components to be
|
||||||
|
useful. The most important one is Kafka, but it can also integrate
|
||||||
|
with Clickhouse and Grafana.
|
||||||
|
|
||||||
|
## Kafka
|
||||||
|
|
||||||
|
The Kafka component sends flows to Kafka. Its
|
||||||
|
[configuration](configuration.md#kafka) mostly needs a topic name and a list
|
||||||
|
of brokers. It is possible to let *Akvorado* manage the topic with the
|
||||||
|
appropriate settings (number of partitions, replication factor and
|
||||||
|
additional configuration entries). If the topic exists, *Akvorado*
|
||||||
|
won't update the number of partitions and the replication factor but
|
||||||
|
other settings will be updated.
|
||||||
|
|
||||||
|
## Clickhouse
|
||||||
|
|
||||||
|
Clickhouse can collect the data from Kafka. To help its configuration,
|
||||||
|
*Akvorado* exposes a few HTTP endpoint:
|
||||||
|
|
||||||
|
- `/api/v0/clickhouse/flow.proto` contains the schema
|
||||||
|
- `/api/v0/clickhouse/init.sh` contains the schema in the form of a
|
||||||
|
script to execute during initialization
|
||||||
|
- `/api/v0/clickhouse/protocols.csv` contains a CSV with the mapping
|
||||||
|
between protocol numbers and names
|
||||||
|
- `/api/v0/clickhouse/asns.csv` contains a CSV with the mapping
|
||||||
|
between AS numbers and organization names
|
||||||
|
|
||||||
|
## Grafana
|
||||||
|
|
||||||
|
No integration is currently done for Grafana, except a reverse proxy
|
||||||
|
configured in the [web section](configuration.md#web).
|
||||||
@@ -27,12 +27,11 @@ with `--check` if you don't want *Akvorado* to start.
|
|||||||
### Exposed HTTP endpoints
|
### Exposed HTTP endpoints
|
||||||
|
|
||||||
The embedded HTTP server contains the endpoints listed on the [home
|
The embedded HTTP server contains the endpoints listed on the [home
|
||||||
page](/). The [`/api/v0/flows`](/api/v0/flows?limit=1) continously
|
page](index.md). The [`/api/v0/flows`](/api/v0/flows?limit=1)
|
||||||
printed flows sent to Kafka (using [ndjson]()). It also accepts a
|
continously printed flows sent to Kafka (using [ndjson]()). It also
|
||||||
`limit` argument to stops after emitting the specified number of
|
accepts a `limit` argument to stops after emitting the specified
|
||||||
flows. This endpoint should not be used for anything else other than
|
number of flows. This endpoint should not be used for anything else
|
||||||
debug: it can skips some flows and if there are several users, flows
|
other than debug: it can skips some flows and if there are several
|
||||||
will be dispatched between them.
|
users, flows will be dispatched between them.
|
||||||
|
|
||||||
[ndjson]: http://ndjson.org/
|
[ndjson]: http://ndjson.org/
|
||||||
|
|
||||||
|
|||||||
19
flow/root.go
19
flow/root.go
@@ -50,8 +50,15 @@ type Dependencies struct {
|
|||||||
HTTP *http.Component
|
HTTP *http.Component
|
||||||
}
|
}
|
||||||
|
|
||||||
//go:embed flow.proto
|
var (
|
||||||
var flowProtoContent []byte
|
//go:embed flow.proto
|
||||||
|
flowProto []byte
|
||||||
|
// FlowProtoHandler is an HTTP handler serving flow.proto
|
||||||
|
FlowProtoHandler = netHTTP.HandlerFunc(func(w netHTTP.ResponseWriter, r *netHTTP.Request) {
|
||||||
|
w.Header().Set("Content-Type", "text/plain")
|
||||||
|
w.Write(flowProto)
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
// New creates a new flow component.
|
// New creates a new flow component.
|
||||||
func New(r *reporter.Reporter, configuration Configuration, dependencies Dependencies) (*Component, error) {
|
func New(r *reporter.Reporter, configuration Configuration, dependencies Dependencies) (*Component, error) {
|
||||||
@@ -62,14 +69,8 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
|
|||||||
incomingFlows: make(chan *FlowMessage, configuration.BufferLength),
|
incomingFlows: make(chan *FlowMessage, configuration.BufferLength),
|
||||||
}
|
}
|
||||||
c.d.Daemon.Track(&c.t, "flow")
|
c.d.Daemon.Track(&c.t, "flow")
|
||||||
|
c.d.HTTP.AddHandler("/api/v0/flow.proto", FlowProtoHandler)
|
||||||
c.initMetrics()
|
c.initMetrics()
|
||||||
if c.d.HTTP != nil {
|
|
||||||
c.d.HTTP.AddHandler("/api/v0/flow.proto",
|
|
||||||
netHTTP.HandlerFunc(func(w netHTTP.ResponseWriter, r *netHTTP.Request) {
|
|
||||||
w.Header().Set("Content-Type", "text/plain")
|
|
||||||
w.Write(flowProtoContent)
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
return &c, nil
|
return &c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -60,3 +60,5 @@ nav:
|
|||||||
Installation: install.md
|
Installation: install.md
|
||||||
Configuration: configuration.md
|
Configuration: configuration.md
|
||||||
Usage: usage.md
|
Usage: usage.md
|
||||||
|
Integration: integration.md
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user