diff --git a/.gitignore b/.gitignore index 3df778c0..dfb4d723 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ /bin/ /test/ /inlet/flow/decoder/flow*.pb.go +/common/clickhousedb/mocks/ /console/frontend/node_modules/ /console/data/frontend/ diff --git a/Makefile b/Makefile index c04626a9..5631f511 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,10 @@ M = $(shell if [ "$$(tput colors 2> /dev/null || echo 0)" -ge 8 ]; then printf " export GO111MODULE=on -GENERATED = inlet/flow/decoder/flow-1.pb.go console/data/frontend console/frontend/node_modules +GENERATED = \ + inlet/flow/decoder/flow-1.pb.go \ + common/clickhousedb/mocks/mock_driver.go \ + console/data/frontend console/frontend/node_modules .PHONY: all all: fmt lint $(GENERATED) | $(BIN) ; $(info $(M) building executable…) @ ## Build program binary @@ -28,32 +31,39 @@ all: fmt lint $(GENERATED) | $(BIN) ; $(info $(M) building executable…) @ ## B $(BIN): @mkdir -p $@ $(BIN)/%: | $(BIN) ; $(info $(M) building $(PACKAGE)…) - $Q env GOBIN=$(abspath $(BIN)) $(GO) install $(PACKAGE)@latest + $Q env GOBIN=$(abspath $(BIN)) $(GO) install $(PACKAGE) GOIMPORTS = $(BIN)/goimports -$(BIN)/goimports: PACKAGE=golang.org/x/tools/cmd/goimports +$(BIN)/goimports: PACKAGE=golang.org/x/tools/cmd/goimports@latest REVIVE = $(BIN)/revive -$(BIN)/revive: PACKAGE=github.com/mgechev/revive +$(BIN)/revive: PACKAGE=github.com/mgechev/revive@latest GOCOV = $(BIN)/gocov -$(BIN)/gocov: PACKAGE=github.com/axw/gocov/... +$(BIN)/gocov: PACKAGE=github.com/axw/gocov/gocov@latest GOCOVXML = $(BIN)/gocov-xml -$(BIN)/gocov-xml: PACKAGE=github.com/AlekSi/gocov-xml +$(BIN)/gocov-xml: PACKAGE=github.com/AlekSi/gocov-xml@latest GOTESTSUM = $(BIN)/gotestsum -$(BIN)/gotestsum: PACKAGE=gotest.tools/gotestsum +$(BIN)/gotestsum: PACKAGE=gotest.tools/gotestsum@latest + +MOCKGEN = $(BIN)/mockgen +$(BIN)/mockgen: PACKAGE=github.com/golang/mock/mockgen@v1.6.0 PROTOC = protoc PROTOC_GEN_GO = $(BIN)/protoc-gen-go -$(BIN)/protoc-gen-go: PACKAGE=google.golang.org/protobuf/cmd/protoc-gen-go +$(BIN)/protoc-gen-go: PACKAGE=google.golang.org/protobuf/cmd/protoc-gen-go@v1.28.0 # Generated files inlet/flow/decoder/%.pb.go: inlet/flow/data/schemas/%.proto | $(PROTOC_GEN_GO) ; $(info $(M) compiling protocol buffers definition…) $Q $(PROTOC) -I=. --plugin=$(PROTOC_GEN_GO) --go_out=. --go_opt=module=$(MODULE) $< +common/clickhousedb/mocks/mock_driver.go: | $(MOCKGEN) ; $(info $(M) generate mocks for ClickHouse driver…) + $Q $(MOCKGEN) -destination $@ -package mocks github.com/ClickHouse/clickhouse-go/v2/lib/driver Conn,Row,Rows + $Q sed -i'' -e '1i //go:build !release' $@ + console/frontend/node_modules: console/frontend/package.json console/frontend/yarn.lock console/frontend/node_modules: ; $(info $(M) fetching node modules…) $Q yarn install --silent --frozen-lockfile --cwd console/frontend && touch $@ diff --git a/cmd/orchestrator.go b/cmd/orchestrator.go index d3260779..bd91a1a6 100644 --- a/cmd/orchestrator.go +++ b/cmd/orchestrator.go @@ -5,6 +5,7 @@ import ( "github.com/spf13/cobra" + "akvorado/common/clickhousedb" "akvorado/common/daemon" "akvorado/common/http" "akvorado/common/reporter" @@ -15,11 +16,12 @@ import ( // OrchestratorConfiguration represents the configuration file for the orchestrator command. type OrchestratorConfiguration struct { - Reporting reporter.Configuration - HTTP http.Configuration - ClickHouse clickhouse.Configuration - Kafka kafka.Configuration - Broker broker.Configuration + Reporting reporter.Configuration + HTTP http.Configuration + ClickHouseDB clickhousedb.Configuration `yaml:"-"` + ClickHouse clickhouse.Configuration + Kafka kafka.Configuration + Broker broker.Configuration // Other service configurations Inlet InletConfiguration Console ConsoleConfiguration @@ -28,13 +30,15 @@ type OrchestratorConfiguration struct { // DefaultOrchestratorConfiguration is the default configuration for the orchestrator command. func DefaultOrchestratorConfiguration() OrchestratorConfiguration { return OrchestratorConfiguration{ - Reporting: reporter.DefaultConfiguration(), - HTTP: http.DefaultConfiguration(), - ClickHouse: clickhouse.DefaultConfiguration(), - Kafka: kafka.DefaultConfiguration(), - Broker: broker.DefaultConfiguration(), - Inlet: DefaultInletConfiguration(), - Console: DefaultConsoleConfiguration(), + Reporting: reporter.DefaultConfiguration(), + HTTP: http.DefaultConfiguration(), + ClickHouseDB: clickhousedb.DefaultConfiguration(), + ClickHouse: clickhouse.DefaultConfiguration(), + Kafka: kafka.DefaultConfiguration(), + Broker: broker.DefaultConfiguration(), + // Other service configurations + Inlet: DefaultInletConfiguration(), + Console: DefaultConsoleConfiguration(), } } @@ -58,6 +62,7 @@ components and centralizes configuration of the various other components.`, OrchestratorOptions.Path = args[0] OrchestratorOptions.BeforeDump = func() { // Override some parts of the configuration + config.ClickHouseDB = config.ClickHouse.Configuration config.ClickHouse.Kafka.Configuration = config.Kafka.Configuration config.Inlet.Kafka.Configuration = config.Kafka.Configuration } @@ -96,9 +101,16 @@ func orchestratorStart(r *reporter.Reporter, config OrchestratorConfiguration, c if err != nil { return fmt.Errorf("unable to initialize kafka component: %w", err) } - clickhouseComponent, err := clickhouse.New(r, config.ClickHouse, clickhouse.Dependencies{ + clickhouseDBComponent, err := clickhousedb.New(r, config.ClickHouseDB, clickhousedb.Dependencies{ Daemon: daemonComponent, - HTTP: httpComponent, + }) + if err != nil { + return fmt.Errorf("unable to initialize ClickHouse component: %w", err) + } + clickhouseComponent, err := clickhouse.New(r, config.ClickHouse, clickhouse.Dependencies{ + Daemon: daemonComponent, + HTTP: httpComponent, + ClickHouse: clickhouseDBComponent, }) if err != nil { return fmt.Errorf("unable to initialize clickhouse component: %w", err) @@ -124,6 +136,7 @@ func orchestratorStart(r *reporter.Reporter, config OrchestratorConfiguration, c // Start all the components. components := []interface{}{ httpComponent, + clickhouseDBComponent, clickhouseComponent, kafkaComponent, } diff --git a/common/clickhouse/tests.go b/common/clickhouse/tests.go deleted file mode 100644 index 924f6303..00000000 --- a/common/clickhouse/tests.go +++ /dev/null @@ -1,33 +0,0 @@ -//go:build !release - -package clickhouse - -import ( - "context" - "testing" - "time" - - "github.com/ClickHouse/clickhouse-go/v2" - - "akvorado/common/helpers" -) - -// SetupClickHouse configures a client to use for testing. -func SetupClickHouse(t *testing.T) (clickhouse.Conn, []string) { - t.Helper() - chServer := helpers.CheckExternalService(t, "ClickHouse", []string{"clickhouse", "localhost"}, "9000") - config := DefaultConfiguration() - config.Servers = []string{chServer} - config.DialTimeout = 100 * time.Millisecond - - conn, err := config.Open(context.Background()) - if err != nil { - t.Fatalf("Open() error:\n%+v", err) - } - t.Cleanup(func() { - if err := conn.Close(); err != nil { - t.Errorf("Close() error:\n%+v", err) - } - }) - return conn, []string{chServer} -} diff --git a/common/clickhouse/config.go b/common/clickhousedb/config.go similarity index 56% rename from common/clickhouse/config.go rename to common/clickhousedb/config.go index 6c2902ef..bf0f36c3 100644 --- a/common/clickhouse/config.go +++ b/common/clickhousedb/config.go @@ -1,10 +1,7 @@ -package clickhouse +package clickhousedb import ( - "context" "time" - - "github.com/ClickHouse/clickhouse-go/v2" ) // Configuration defines how we connect to a Clickhouse database @@ -33,27 +30,3 @@ func DefaultConfiguration() Configuration { DialTimeout: 5 * time.Second, } } - -// Open create a new -func (config Configuration) Open(ctx context.Context) (clickhouse.Conn, error) { - conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: config.Servers, - Auth: clickhouse.Auth{ - Database: config.Database, - Username: config.Username, - Password: config.Password, - }, - Compression: &clickhouse.Compression{clickhouse.CompressionLZ4}, - DialTimeout: config.DialTimeout, - MaxOpenConns: config.MaxOpenConns, - MaxIdleConns: config.MaxOpenConns/2 + 1, - ConnMaxLifetime: time.Hour, - }) - if err != nil { - return nil, err - } - if err := conn.Ping(ctx); err != nil { - return nil, err - } - return conn, nil -} diff --git a/common/clickhousedb/root.go b/common/clickhousedb/root.go new file mode 100644 index 00000000..471dc717 --- /dev/null +++ b/common/clickhousedb/root.go @@ -0,0 +1,104 @@ +// Package clickhousedb encapsulates a connection to a ClickHouse database. +package clickhousedb + +import ( + "context" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "gopkg.in/tomb.v2" + + "akvorado/common/daemon" + "akvorado/common/reporter" +) + +// Component represents the ClickHouse wrapper +type Component struct { + r *reporter.Reporter + t tomb.Tomb + d *Dependencies + config Configuration + + healthy chan reporter.ChannelHealthcheckFunc + clickhouse.Conn +} + +// Dependencies define the dependencies of the ClickHouse wrapper +type Dependencies struct { + Daemon daemon.Component +} + +// New creates a new ClickHouse wrapper +func New(r *reporter.Reporter, config Configuration, dependencies Dependencies) (*Component, error) { + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: config.Servers, + Auth: clickhouse.Auth{ + Database: config.Database, + Username: config.Username, + Password: config.Password, + }, + Compression: &clickhouse.Compression{Method: clickhouse.CompressionLZ4}, + DialTimeout: config.DialTimeout, + MaxOpenConns: config.MaxOpenConns, + MaxIdleConns: config.MaxOpenConns/2 + 1, + ConnMaxLifetime: time.Hour, + }) + if err != nil { + return nil, err + } + + c := Component{ + r: r, + d: &dependencies, + config: config, + + healthy: make(chan reporter.ChannelHealthcheckFunc), + Conn: conn, + } + c.d.Daemon.Track(&c.t, "common/clickhousedb") + return &c, nil +} + +// Start initializes the connection to ClickHouse +func (c *Component) Start() error { + c.r.Info().Msg("starting ClickHouse component") + if err := c.Ping(c.t.Context(nil)); err != nil { + return err + } + + c.r.RegisterHealthcheck("clickhousedb", c.channelHealthcheck()) + c.t.Go(func() error { + for { + select { + case <-c.t.Dying(): + return nil + case cb := <-c.healthy: + if cb != nil { + ctx, cancel := context.WithTimeout(c.t.Context(nil), time.Second) + if _, err := c.Query(ctx, "SELECT 1"); err == nil { + cb(reporter.HealthcheckOK, "database available") + } else { + cb(reporter.HealthcheckWarning, "database unavailable") + } + cancel() + } + } + } + }) + return nil +} + +// Stop thethers the connection to ClickHouse +func (c *Component) Stop() error { + c.r.Info().Msg("stopping ClickHouse component") + defer func() { + c.Close() + c.r.Info().Msg("ClickHouse component stopped") + }() + c.t.Kill(nil) + return c.t.Wait() +} + +func (c *Component) channelHealthcheck() reporter.HealthcheckFunc { + return reporter.ChannelHealthcheck(c.t.Context(nil), c.healthy) +} diff --git a/common/clickhousedb/root_test.go b/common/clickhousedb/root_test.go new file mode 100644 index 00000000..bca29857 --- /dev/null +++ b/common/clickhousedb/root_test.go @@ -0,0 +1,116 @@ +package clickhousedb + +import ( + "context" + "errors" + "testing" + + "github.com/golang/mock/gomock" + + "akvorado/common/helpers" + "akvorado/common/reporter" +) + +func TestMock(t *testing.T) { + r := reporter.NewMock(t) + chComponent, mock := NewMock(t, r) + + // Check a select query (this is a bit dumb, but it shows how gomock works) + t.Run("select", func(t *testing.T) { + var got []struct { + N uint64 `ch:"n"` + M uint64 `ch:"m"` + } + expected := []struct { + N uint64 `ch:"n"` + M uint64 `ch:"m"` + }{ + {0, 1}, + {1, 2}, + {2, 3}, + {3, 4}, + {4, 5}, + } + mock.EXPECT(). + Select(gomock.Any(), gomock.Any(), "SELECT number as n, number + 1 as m FROM numbers(5)"). + SetArg(1, expected). + Return(nil) + + err := chComponent.Select(context.Background(), &got, "SELECT number as n, number + 1 as m FROM numbers(5)") + if err != nil { + t.Fatalf("SELECT error:\n%+v", err) + } + + if diff := helpers.Diff(got, expected); diff != "" { + t.Fatalf("SELECT (-got, +want):\n%s", diff) + } + }) + + // Check healthcheck + t.Run("healthcheck", func(t *testing.T) { + firstCall := mock.EXPECT(). + Query(gomock.Any(), "SELECT 1"). + Return(nil, nil) + got := r.RunHealthchecks(context.Background()) + if diff := helpers.Diff(got.Details["clickhousedb"], reporter.HealthcheckResult{ + Status: reporter.HealthcheckOK, + Reason: "database available", + }); diff != "" { + t.Fatalf("runHealthcheck() (-got, +want):\n%s", diff) + } + + mock.EXPECT(). + Query(gomock.Any(), "SELECT 1"). + Return(nil, errors.New("not available")). + After(firstCall) + got = r.RunHealthchecks(context.Background()) + if diff := helpers.Diff(got.Details["clickhousedb"], reporter.HealthcheckResult{ + Status: reporter.HealthcheckWarning, + Reason: "database unavailable", + }); diff != "" { + t.Fatalf("runHealthcheck() (-got, +want):\n%s", diff) + } + }) +} + +func TestRealClickHouse(t *testing.T) { + r := reporter.NewMock(t) + chComponent := SetupClickHouse(t, r) + + // Check a select query + t.Run("select", func(t *testing.T) { + var got []struct { + N uint64 `ch:"n"` + M uint64 `ch:"m"` + } + err := chComponent.Select(context.Background(), &got, "SELECT number as n, number + 1 as m FROM numbers(5)") + if err != nil { + t.Fatalf("SELECT error:\n%+v", err) + } + + expected := []struct { + N uint64 + M uint64 + }{ + {0, 1}, + {1, 2}, + {2, 3}, + {3, 4}, + {4, 5}, + } + if diff := helpers.Diff(got, expected); diff != "" { + t.Fatalf("SELECT (-got, +want):\n%s", diff) + } + }) + + // Check healthcheck + t.Run("healthcheck", func(t *testing.T) { + got := r.RunHealthchecks(context.Background()) + if diff := helpers.Diff(got.Details["clickhousedb"], reporter.HealthcheckResult{ + Status: reporter.HealthcheckOK, + Reason: "database available", + }); diff != "" { + t.Fatalf("runHealthcheck() (-got, +want):\n%s", diff) + } + }) +} diff --git a/common/clickhousedb/tests.go b/common/clickhousedb/tests.go new file mode 100644 index 00000000..19875249 --- /dev/null +++ b/common/clickhousedb/tests.go @@ -0,0 +1,72 @@ +//go:build !release + +package clickhousedb + +import ( + "testing" + "time" + + "github.com/golang/mock/gomock" + + "akvorado/common/clickhousedb/mocks" + "akvorado/common/daemon" + "akvorado/common/helpers" + "akvorado/common/reporter" +) + +// SetupClickHouse configures a client to use for testing. +func SetupClickHouse(t *testing.T, r *reporter.Reporter) *Component { + t.Helper() + chServer := helpers.CheckExternalService(t, "ClickHouse", []string{"clickhouse", "localhost"}, "9000") + config := DefaultConfiguration() + config.Servers = []string{chServer} + config.DialTimeout = 100 * time.Millisecond + c, err := New(r, config, Dependencies{Daemon: daemon.NewMock(t)}) + if err != nil { + t.Fatalf("New() error:\n%+v", err) + } + if err := c.Start(); err != nil { + t.Fatalf("Start() error:\n%+v", err) + } + t.Cleanup(func() { + if err := c.Stop(); err != nil { + t.Errorf("Stop() error:\n%+v", err) + } + }) + return c +} + +// NewMock creates a new component using a mock driver. It returns +// both the component and the mock driver. +func NewMock(t *testing.T, r *reporter.Reporter) (*Component, *mocks.MockConn) { + t.Helper() + c, err := New(r, DefaultConfiguration(), Dependencies{ + Daemon: daemon.NewMock(t), + }) + if err != nil { + t.Fatalf("New() error:\n%+v", err) + } + + ctrl := gomock.NewController(t) + mock := mocks.NewMockConn(ctrl) + c.Conn = mock + + mock.EXPECT(). + Ping(gomock.Any()). + Return(nil). + MinTimes(1) + mock.EXPECT(). + Close(). + Return(nil) + + if err := c.Start(); err != nil { + t.Fatalf("Start() error:\n%+v", err) + } + t.Cleanup(func() { + if err := c.Stop(); err != nil { + t.Errorf("Stop() error:\n%+v", err) + } + }) + + return c, mock +} diff --git a/common/http/root.go b/common/http/root.go index c2a13d5b..b5ba1a5d 100644 --- a/common/http/root.go +++ b/common/http/root.go @@ -56,7 +56,7 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende mux: http.NewServeMux(), GinRouter: gin.New(), } - c.d.Daemon.Track(&c.t, "http") + c.d.Daemon.Track(&c.t, "common/http") c.metrics.inflights = c.r.Gauge( reporter.GaugeOpts{ diff --git a/console/data/docs/06-internals.md b/console/data/docs/06-internals.md index 64bb6ab9..7f014544 100644 --- a/console/data/docs/06-internals.md +++ b/console/data/docs/06-internals.md @@ -1,7 +1,7 @@ # Internal design *Akvorado* is written in Go. Each service has its code in a distinct -directory (`inlet/`, `configure/` and `console/`). The `common/` +directory (`inlet/`, `orchestrator/` and `console/`). The `common/` directory contains components common to several services. The `cmd/` directory contains the main entry points. diff --git a/go.mod b/go.mod index aecf7c44..f4b2047b 100644 --- a/go.mod +++ b/go.mod @@ -4,13 +4,14 @@ go 1.17 require ( github.com/ClickHouse/clickhouse-go/v2 v2.0.12 - github.com/Masterminds/sprig v2.22.0+incompatible github.com/Shopify/sarama v1.32.1-0.20220321223103-27b8f1b5973b github.com/antonmedv/expr v1.9.0 github.com/benbjohnson/clock v1.3.0 github.com/dgraph-io/ristretto v0.1.0 github.com/eapache/go-resiliency v1.2.0 github.com/fsnotify/fsnotify v1.5.1 + github.com/gin-gonic/gin v1.7.7 + github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.2 github.com/gosnmp/gosnmp v1.34.0 github.com/kylelemons/godebug v1.1.0 @@ -33,8 +34,6 @@ require ( ) require ( - github.com/Masterminds/goutils v1.1.1 // indirect - github.com/Masterminds/semver v1.5.0 // indirect github.com/StackExchange/wmi v1.2.1 // indirect github.com/alecthomas/chroma v0.10.0 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -45,7 +44,6 @@ require ( github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect - github.com/gin-gonic/gin v1.7.7 // indirect github.com/go-ole/go-ole v1.2.5 // indirect github.com/go-playground/locales v0.13.0 // indirect github.com/go-playground/universal-translator v0.17.0 // indirect @@ -56,8 +54,6 @@ require ( github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.2 // indirect - github.com/huandu/xstrings v1.3.2 // indirect - github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect @@ -68,8 +64,6 @@ require ( github.com/klauspost/compress v1.15.1 // indirect github.com/leodido/go-urn v1.2.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect - github.com/mitchellh/copystructure v1.2.0 // indirect - github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/oschwald/maxminddb-golang v1.8.0 // indirect diff --git a/go.sum b/go.sum index 59bcd321..69d48c7e 100644 --- a/go.sum +++ b/go.sum @@ -54,12 +54,6 @@ github.com/ClickHouse/clickhouse-go/v2 v2.0.12 h1:Nbl/NZwoM6LGJm7smNBgvtdr/rxjlI github.com/ClickHouse/clickhouse-go/v2 v2.0.12/go.mod h1:u4RoNQLLM2W6hNSPYrIESLJqaWSInZVmfM+MlaAhXcg= github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= -github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= -github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= -github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= -github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= -github.com/Masterminds/sprig v2.22.0+incompatible h1:z4yfnGrZ7netVz+0EDJ0Wi+5VZCSYp4Z0m2dk6cEM60= -github.com/Masterminds/sprig v2.22.0+incompatible/go.mod h1:y6hNFY5UBTIWBxnzTeuNhlNS5hqE0NB0E6fgfo2Br3o= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/sarama v1.30.1/go.mod h1:hGgx05L/DiW8XYBXeJdKIN6V2QUy2H6JqME5VT1NLRw= github.com/Shopify/sarama v1.32.1-0.20220321223103-27b8f1b5973b h1:Migey8dJIiByMK+ZNhgX0UOVhI4e4H2eoDDcrTDWDxw= @@ -182,6 +176,7 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= @@ -208,6 +203,7 @@ github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -317,13 +313,9 @@ github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOn github.com/hashicorp/memberlist v0.3.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk= github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4= -github.com/huandu/xstrings v1.3.2 h1:L18LIDzqlW6xN2rEkpdV8+oL/IXWJ1APd+vsdYy4Wdw= -github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= -github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= -github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= @@ -401,16 +393,12 @@ github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3N github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI= -github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= -github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs= github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= -github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= -github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/mkevac/debugcharts v0.0.0-20191222103121-ae1c48aa8615/go.mod h1:Ad7oeElCZqA1Ufj0U9/liOF4BtVepxRcTvr2ey7zTvM= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= diff --git a/orchestrator/clickhouse/config.go b/orchestrator/clickhouse/config.go index 882d43bc..d370a911 100644 --- a/orchestrator/clickhouse/config.go +++ b/orchestrator/clickhouse/config.go @@ -1,13 +1,13 @@ package clickhouse import ( - "akvorado/common/clickhouse" + "akvorado/common/clickhousedb" "akvorado/common/kafka" ) // Configuration describes the configuration for the ClickHouse configurator. type Configuration struct { - clickhouse.Configuration `mapstructure:",squash" yaml:"-,inline"` + clickhousedb.Configuration `mapstructure:",squash" yaml:"-,inline"` // Kafka describes Kafka-specific configuration Kafka KafkaConfiguration // OrchestratorURL allows one to override URL to reach orchestrator from Clickhouse @@ -24,7 +24,7 @@ type KafkaConfiguration struct { // DefaultConfiguration represents the default configuration for the ClickHouse configurator. func DefaultConfiguration() Configuration { return Configuration{ - Configuration: clickhouse.DefaultConfiguration(), + Configuration: clickhousedb.DefaultConfiguration(), Kafka: KafkaConfiguration{ Consumers: 1, }, diff --git a/orchestrator/clickhouse/functional_test.go b/orchestrator/clickhouse/functional_test.go index c3a24f8e..b4b1e513 100644 --- a/orchestrator/clickhouse/functional_test.go +++ b/orchestrator/clickhouse/functional_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - "akvorado/common/clickhouse" + "akvorado/common/clickhousedb" "akvorado/common/daemon" "akvorado/common/helpers" "akvorado/common/http" @@ -14,14 +14,14 @@ import ( ) func TestRealClickHouse(t *testing.T) { - conn, chServers := clickhouse.SetupClickHouse(t) + r := reporter.NewMock(t) + chComponent := clickhousedb.SetupClickHouse(t, r) configuration := DefaultConfiguration() - configuration.Servers = chServers - r := reporter.NewMock(t) ch, err := New(r, configuration, Dependencies{ - Daemon: daemon.NewMock(t), - HTTP: http.NewMock(t, r), + Daemon: daemon.NewMock(t), + HTTP: http.NewMock(t, r), + ClickHouse: chComponent, }) if err != nil { t.Fatalf("New() error:\n%+v", err) @@ -36,7 +36,7 @@ func TestRealClickHouse(t *testing.T) { } // Check with the ClickHouse client we have our tables - rows, err := conn.Query(context.Background(), "SHOW TABLES") + rows, err := chComponent.Query(context.Background(), "SHOW TABLES") if err != nil { t.Fatalf("Query() error:\n%+v", err) } @@ -67,8 +67,9 @@ func TestRealClickHouse(t *testing.T) { // Check we can run a second time ch, err = New(r, configuration, Dependencies{ - Daemon: daemon.NewMock(t), - HTTP: http.NewMock(t, r), + Daemon: daemon.NewMock(t), + HTTP: http.NewMock(t, r), + ClickHouse: chComponent, }) if err != nil { t.Fatalf("New() error:\n%+v", err) diff --git a/orchestrator/clickhouse/migration-00.go b/orchestrator/clickhouse/migration-00.go index 2ea96952..75734c0e 100644 --- a/orchestrator/clickhouse/migration-00.go +++ b/orchestrator/clickhouse/migration-00.go @@ -14,7 +14,7 @@ import ( func (c *Component) migrateStepCreateFlowsTable(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep { return migrationStep{ CheckQuery: `SELECT 1 FROM system.tables WHERE name = $1 AND database = $2`, - Args: []interface{}{"flows", c.config.Database}, + Args: []interface{}{"flows", c.config.Configuration.Database}, Do: func() error { return conn.Exec(ctx, ` CREATE TABLE flows ( @@ -60,7 +60,7 @@ ORDER BY TimeReceived`) func (c *Component) migrateStepCreateExportersView(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep { return migrationStep{ CheckQuery: `SELECT 1 FROM system.tables WHERE name = $1 AND database = $2`, - Args: []interface{}{"exporters", c.config.Database}, + Args: []interface{}{"exporters", c.config.Configuration.Database}, Do: func() error { return conn.Exec(ctx, ` CREATE MATERIALIZED VIEW exporters @@ -89,7 +89,7 @@ func (c *Component) migrateStepCreateProtocolsDictionary(ctx context.Context, l protocolsURL := fmt.Sprintf("%s/api/v0/orchestrator/clickhouse/protocols.csv", c.config.OrchestratorURL) return migrationStep{ CheckQuery: `SELECT 1 FROM system.dictionaries WHERE name = $1 AND database = $2 AND source = $3`, - Args: []interface{}{"protocols", c.config.Database, protocolsURL}, + Args: []interface{}{"protocols", c.config.Configuration.Database, protocolsURL}, Do: func() error { return conn.Exec(ctx, ` CREATE OR REPLACE DICTIONARY protocols ( @@ -110,7 +110,7 @@ func (c *Component) migrateStepCreateASNsDictionary(ctx context.Context, l repor asnsURL := fmt.Sprintf("%s/api/v0/orchestrator/clickhouse/asns.csv", c.config.OrchestratorURL) return migrationStep{ CheckQuery: `SELECT 1 FROM system.dictionaries WHERE name = $1 AND database = $2 AND source = $3`, - Args: []interface{}{"asns", c.config.Database, asnsURL}, + Args: []interface{}{"asns", c.config.Configuration.Database, asnsURL}, Do: func() error { return conn.Exec(ctx, ` CREATE OR REPLACE DICTIONARY asns ( @@ -144,7 +144,7 @@ func (c *Component) migrateStepCreateRawFlowsTable(ctx context.Context, l report }, " ") return migrationStep{ CheckQuery: `SELECT 1 FROM system.tables WHERE name = $1 AND database = $2 AND engine_full = $3`, - Args: []interface{}{tableName, c.config.Database, kafkaEngine}, + Args: []interface{}{tableName, c.config.Configuration.Database, kafkaEngine}, Do: func() error { l.Debug().Msg("drop raw consumer table") err := conn.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s_consumer`, tableName)) @@ -204,7 +204,7 @@ func (c *Component) migrateStepCreateRawFlowsConsumerView(ctx context.Context, l viewName := fmt.Sprintf("%s_consumer", tableName) return migrationStep{ CheckQuery: `SELECT 1 FROM system.tables WHERE name = $1 AND database = $2`, - Args: []interface{}{viewName, c.config.Database}, + Args: []interface{}{viewName, c.config.Configuration.Database}, Do: func() error { return conn.Exec(ctx, fmt.Sprintf(` CREATE MATERIALIZED VIEW %s TO flows @@ -219,7 +219,7 @@ FROM %s`, viewName, tableName)) func (c *Component) migrateStepDropSchemaMigrationsTable(ctx context.Context, l reporter.Logger, conn clickhouse.Conn) migrationStep { return migrationStep{ CheckQuery: `SELECT COUNT(*) == 0 FROM system.tables WHERE name = $1 AND database = $2`, - Args: []interface{}{"schema_migrations", c.config.Database}, + Args: []interface{}{"schema_migrations", c.config.Configuration.Database}, Do: func() error { return conn.Exec(ctx, "DROP TABLE schema_migrations") }, diff --git a/orchestrator/clickhouse/migrations.go b/orchestrator/clickhouse/migrations.go index 4857ad28..f7b23e75 100644 --- a/orchestrator/clickhouse/migrations.go +++ b/orchestrator/clickhouse/migrations.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "net" - "strings" "github.com/ClickHouse/clickhouse-go/v2" @@ -23,25 +22,14 @@ type migrationStep struct { // migrateDatabase execute database migration func (c *Component) migrateDatabase() error { if c.config.OrchestratorURL == "" { - baseURL, err := c.getHTTPBaseURL(c.config.Servers[0]) + baseURL, err := c.getHTTPBaseURL("1.1.1.1:80") if err != nil { return err } c.config.OrchestratorURL = baseURL } - l := c.r.With(). - Str("server", strings.Join(c.config.Servers, ",")). - Str("database", c.config.Database). - Str("username", c.config.Username). - Logger() - ctx := c.t.Context(context.Background()) - conn, err := c.config.Configuration.Open(ctx) - if err != nil { - l.Err(err).Msg("unable to connect to ClickHouse") - return fmt.Errorf("unable to connect to ClickHouse: %w", err) - } - + ctx := c.t.Context(nil) steps := []struct { Description string Step func(context.Context, reporter.Logger, clickhouse.Conn) migrationStep @@ -59,10 +47,10 @@ func (c *Component) migrateDatabase() error { total := 0 for _, step := range steps { total++ - l := l.With().Str("step", step.Description).Logger() + l := c.r.Logger.With().Str("step", step.Description).Logger() l.Debug().Msg("checking migration step") - step := step.Step(ctx, l, conn) - rows, err := conn.Query(ctx, step.CheckQuery, step.Args...) + step := step.Step(ctx, l, c.d.ClickHouse) + rows, err := c.d.ClickHouse.Query(ctx, step.CheckQuery, step.Args...) if err != nil { l.Err(err).Msg("cannot execute check") return fmt.Errorf("cannot execute check: %w", err) @@ -90,9 +78,9 @@ func (c *Component) migrateDatabase() error { } if count == 0 { - l.Debug().Msg("no migration needed") + c.r.Debug().Msg("no migration needed") } else { - l.Info().Msg("migrations done") + c.r.Info().Msg("migrations done") } close(c.migrationsDone) c.metrics.migrationsRunning.Set(0) diff --git a/orchestrator/clickhouse/root.go b/orchestrator/clickhouse/root.go index a87aaf3c..b2add590 100644 --- a/orchestrator/clickhouse/root.go +++ b/orchestrator/clickhouse/root.go @@ -6,6 +6,7 @@ import ( "gopkg.in/tomb.v2" + "akvorado/common/clickhousedb" "akvorado/common/daemon" "akvorado/common/http" "akvorado/common/reporter" @@ -28,8 +29,9 @@ type Component struct { // Dependencies define the dependencies of the ClickHouse configurator. type Dependencies struct { - Daemon daemon.Component - HTTP *http.Component + Daemon daemon.Component + HTTP *http.Component + ClickHouse *clickhousedb.Component } // New creates a new ClickHouse component. @@ -43,7 +45,7 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende if err := c.registerHTTPHandlers(); err != nil { return nil, err } - c.d.Daemon.Track(&c.t, "configure/clickhouse") + c.d.Daemon.Track(&c.t, "orchestrator/clickhouse") c.metrics.migrationsRunning = c.r.Gauge( reporter.GaugeOpts{ @@ -66,7 +68,7 @@ func (c *Component) Start() error { c.r.Info().Msg("starting ClickHouse component") c.metrics.migrationsRunning.Set(1) if err := c.migrateDatabase(); err != nil { - c.r.Warn().Msg("database migration failed, continue in the background") + c.r.Warn().Msgf("database migration failed %s, continue in the background", err.Error()) c.t.Go(func() error { for { select {