console: add an API to get the last received flow

This commit is contained in:
Vincent Bernat
2022-04-14 10:06:01 +02:00
parent 2bbeacec84
commit 4be33c70ba
12 changed files with 260 additions and 32 deletions

View File

@@ -60,8 +60,9 @@ $(BIN)/protoc-gen-go: PACKAGE=google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
inlet/flow/decoder/%.pb.go: inlet/flow/data/schemas/%.proto | $(PROTOC_GEN_GO) ; $(info $(M) compiling protocol buffers definition) inlet/flow/decoder/%.pb.go: inlet/flow/data/schemas/%.proto | $(PROTOC_GEN_GO) ; $(info $(M) compiling protocol buffers definition)
$Q $(PROTOC) -I=. --plugin=$(PROTOC_GEN_GO) --go_out=. --go_opt=module=$(MODULE) $< $Q $(PROTOC) -I=. --plugin=$(PROTOC_GEN_GO) --go_out=. --go_opt=module=$(MODULE) $<
common/clickhousedb/mocks/mock_driver.go: | $(MOCKGEN) ; $(info $(M) generate mocks for ClickHouse driver) common/clickhousedb/mocks/mock_driver.go: Makefile | $(MOCKGEN) ; $(info $(M) generate mocks for ClickHouse driver)
$Q $(MOCKGEN) -destination $@ -package mocks github.com/ClickHouse/clickhouse-go/v2/lib/driver Conn,Row,Rows $Q $(MOCKGEN) -destination $@ -package mocks \
github.com/ClickHouse/clickhouse-go/v2/lib/driver Conn,Row,Rows,ColumnType
$Q sed -i'' -e '1i //go:build !release' $@ $Q sed -i'' -e '1i //go:build !release' $@
console/frontend/node_modules: console/frontend/package.json console/frontend/yarn.lock console/frontend/node_modules: console/frontend/package.json console/frontend/yarn.lock

View File

@@ -5,6 +5,7 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"akvorado/common/clickhousedb"
"akvorado/common/daemon" "akvorado/common/daemon"
"akvorado/common/http" "akvorado/common/http"
"akvorado/common/reporter" "akvorado/common/reporter"
@@ -13,17 +14,19 @@ import (
// ConsoleConfiguration represents the configuration file for the console command. // ConsoleConfiguration represents the configuration file for the console command.
type ConsoleConfiguration struct { type ConsoleConfiguration struct {
Reporting reporter.Configuration Reporting reporter.Configuration
HTTP http.Configuration HTTP http.Configuration
Console console.Configuration Console console.Configuration
ClickHouse clickhousedb.Configuration
} }
// DefaultConsoleConfiguration is the default configuration for the console command. // DefaultConsoleConfiguration is the default configuration for the console command.
func DefaultConsoleConfiguration() ConsoleConfiguration { func DefaultConsoleConfiguration() ConsoleConfiguration {
return ConsoleConfiguration{ return ConsoleConfiguration{
HTTP: http.DefaultConfiguration(), HTTP: http.DefaultConfiguration(),
Reporting: reporter.DefaultConfiguration(), Reporting: reporter.DefaultConfiguration(),
Console: console.DefaultConfiguration(), Console: console.DefaultConfiguration(),
ClickHouse: clickhousedb.DefaultConfiguration(),
} }
} }
@@ -76,8 +79,16 @@ func consoleStart(r *reporter.Reporter, config ConsoleConfiguration, checkOnly b
if err != nil { if err != nil {
return fmt.Errorf("unable to initialize HTTP component: %w", err) return fmt.Errorf("unable to initialize HTTP component: %w", err)
} }
clickhouseComponent, err := clickhousedb.New(r, config.ClickHouse, clickhousedb.Dependencies{
Daemon: daemonComponent,
})
if err != nil {
return fmt.Errorf("unable to initialize ClickHouse component: %w", err)
}
consoleComponent, err := console.New(r, config.Console, console.Dependencies{ consoleComponent, err := console.New(r, config.Console, console.Dependencies{
HTTP: httpComponent, Daemon: daemonComponent,
HTTP: httpComponent,
ClickHouseDB: clickhouseComponent,
}) })
if err != nil { if err != nil {
return fmt.Errorf("unable to initialize console component: %w", err) return fmt.Errorf("unable to initialize console component: %w", err)
@@ -95,6 +106,7 @@ func consoleStart(r *reporter.Reporter, config ConsoleConfiguration, checkOnly b
// Start all the components. // Start all the components.
components := []interface{}{ components := []interface{}{
httpComponent, httpComponent,
clickhouseComponent,
consoleComponent, consoleComponent,
} }
return StartStopComponents(r, daemonComponent, components) return StartStopComponents(r, daemonComponent, components)

View File

@@ -65,6 +65,7 @@ components and centralizes configuration of the various other components.`,
config.ClickHouseDB = config.ClickHouse.Configuration config.ClickHouseDB = config.ClickHouse.Configuration
config.ClickHouse.Kafka.Configuration = config.Kafka.Configuration config.ClickHouse.Kafka.Configuration = config.Kafka.Configuration
config.Inlet.Kafka.Configuration = config.Kafka.Configuration config.Inlet.Kafka.Configuration = config.Kafka.Configuration
config.Console.ClickHouse = config.ClickHouse.Configuration
} }
if err := OrchestratorOptions.Parse(cmd.OutOrStdout(), "orchestrator", &config); err != nil { if err := OrchestratorOptions.Parse(cmd.OutOrStdout(), "orchestrator", &config); err != nil {
return err return err

View File

@@ -7,6 +7,7 @@ import (
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"akvorado/common/clickhousedb/mocks"
"akvorado/common/helpers" "akvorado/common/helpers"
"akvorado/common/reporter" "akvorado/common/reporter"
) )
@@ -46,6 +47,40 @@ func TestMock(t *testing.T) {
} }
}) })
t.Run("scan", func(t *testing.T) {
ctrl := gomock.NewController(t)
mockRows := mocks.NewMockRows(ctrl)
mock.EXPECT().Query(gomock.Any(),
`SELECT 10, 12`).
Return(mockRows, nil)
mockRows.EXPECT().Next().Return(true)
mockRows.EXPECT().Close()
mockRows.EXPECT().Scan(gomock.Any()).DoAndReturn(func(args ...interface{}) interface{} {
arg0 := args[0].(*uint64)
*arg0 = uint64(10)
arg1 := args[1].(*uint64)
*arg1 = uint64(12)
return nil
})
rows, err := chComponent.Query(context.Background(),
`SELECT 10, 12`)
if err != nil {
t.Fatalf("SELECT error:\n%+v", err)
}
if !rows.Next() {
t.Fatal("Next() should return true")
}
defer rows.Close()
var n, m uint64
if err := rows.Scan(&n, &m); err != nil {
t.Fatalf("Scan() error:\n%+v", err)
}
if n != 10 || m != 12 {
t.Errorf("Scan() should return 10, 12, not %d, %d", n, m)
}
})
// Check healthcheck // Check healthcheck
t.Run("healthcheck", func(t *testing.T) { t.Run("healthcheck", func(t *testing.T) {
firstCall := mock.EXPECT(). firstCall := mock.EXPECT().

View File

@@ -1,9 +0,0 @@
//go:build release
package http
import "github.com/gin-gonic/gin"
func init() {
gin.SetMode(gin.ReleaseMode)
}

View File

@@ -6,7 +6,7 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"net/http/pprof" // profiler "net/http/pprof"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -201,4 +201,5 @@ func init() {
// Disable proxy for client // Disable proxy for client
http.DefaultTransport.(*http.Transport).Proxy = nil http.DefaultTransport.(*http.Transport).Proxy = nil
http.DefaultClient.Timeout = 30 * time.Second http.DefaultClient.Timeout = 30 * time.Second
gin.SetMode(gin.ReleaseMode)
} }

43
console/api.go Normal file
View File

@@ -0,0 +1,43 @@
package console
import (
"net/http"
"reflect"
"github.com/gin-gonic/gin"
)
func (c *Component) lastFlowHandlerFunc(gc *gin.Context) {
ctx := c.t.Context(gc.Request.Context())
rows, err := c.d.ClickHouseDB.Conn.Query(ctx,
`SELECT * FROM flows WHERE TimeReceived = (SELECT MAX(TimeReceived) FROM flows) LIMIT 1`)
if err != nil {
c.r.Err(err).Msg("unable to query database")
gc.JSON(http.StatusInternalServerError, gin.H{"message": "Unable to query database."})
return
}
if !rows.Next() {
gc.JSON(http.StatusNotFound, gin.H{"message": "no flow currently in database."})
return
}
defer rows.Close()
var (
response = gin.H{}
columnTypes = rows.ColumnTypes()
vars = make([]interface{}, len(columnTypes))
)
for i := range columnTypes {
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
}
if err := rows.Scan(vars...); err != nil {
c.r.Err(err).Msg("unable to parse flow")
gc.JSON(http.StatusInternalServerError, gin.H{"message": "Unable to parse flow."})
return
}
for index, column := range rows.Columns() {
response[column] = vars[index]
}
gc.IndentedJSON(http.StatusOK, response)
}

107
console/api_test.go Normal file
View File

@@ -0,0 +1,107 @@
package console
import (
"net"
"reflect"
"testing"
"time"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/golang/mock/gomock"
"akvorado/common/clickhousedb"
"akvorado/common/clickhousedb/mocks"
"akvorado/common/daemon"
"akvorado/common/helpers"
"akvorado/common/http"
"akvorado/common/reporter"
)
func TestLastFlow(t *testing.T) {
r := reporter.NewMock(t)
ch, mockConn := clickhousedb.NewMock(t, r)
h := http.NewMock(t, r)
c, err := New(r, Configuration{}, Dependencies{
Daemon: daemon.NewMock(t),
HTTP: h,
ClickHouseDB: ch,
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
helpers.StartStop(t, c)
ctrl := gomock.NewController(t)
mockRows := mocks.NewMockRows(ctrl)
mockConn.EXPECT().Query(gomock.Any(),
`SELECT * FROM flows WHERE TimeReceived = (SELECT MAX(TimeReceived) FROM flows) LIMIT 1`).
Return(mockRows, nil)
mockRows.EXPECT().Next().Return(true)
mockRows.EXPECT().Close()
mockRows.EXPECT().Columns().Return([]string{
"TimeReceived", "SamplingRate",
"SrcAddr", "SrcCountry",
"InIfName", "InIfBoundary", "InIfSpeed",
}).AnyTimes()
colTimeReceived := mocks.NewMockColumnType(ctrl)
colSamplingRate := mocks.NewMockColumnType(ctrl)
colSrcAddr := mocks.NewMockColumnType(ctrl)
colSrcCountry := mocks.NewMockColumnType(ctrl)
colInIfName := mocks.NewMockColumnType(ctrl)
colInIfBoundary := mocks.NewMockColumnType(ctrl)
colInIfSpeed := mocks.NewMockColumnType(ctrl)
colTimeReceived.EXPECT().ScanType().Return(reflect.TypeOf(time.Time{}))
colSamplingRate.EXPECT().ScanType().Return(reflect.TypeOf(uint64(0)))
colSrcAddr.EXPECT().ScanType().Return(reflect.TypeOf(net.IP{}))
colSrcCountry.EXPECT().ScanType().Return(reflect.TypeOf(""))
colInIfName.EXPECT().ScanType().Return(reflect.TypeOf(""))
colInIfBoundary.EXPECT().ScanType().Return(reflect.TypeOf(""))
colInIfSpeed.EXPECT().ScanType().Return(reflect.TypeOf(uint32(0)))
mockRows.EXPECT().ColumnTypes().Return([]driver.ColumnType{
colTimeReceived,
colSamplingRate,
colSrcAddr,
colSrcCountry,
colInIfName,
colInIfBoundary,
colInIfSpeed,
}).AnyTimes()
mockRows.EXPECT().Scan(gomock.Any()).
DoAndReturn(func(args ...interface{}) interface{} {
arg0 := args[0].(*time.Time)
*arg0 = time.Date(2022, 4, 4, 8, 36, 11, 10, time.UTC)
arg1 := args[1].(*uint64)
*arg1 = uint64(10000)
arg2 := args[2].(*net.IP)
*arg2 = net.ParseIP("2001:db8::22")
arg3 := args[3].(*string)
*arg3 = "FR"
arg4 := args[4].(*string)
*arg4 = "Hu0/0/1/10"
arg5 := args[5].(*string)
*arg5 = "external"
arg6 := args[6].(*uint32)
*arg6 = uint32(100000)
return nil
})
helpers.TestHTTPEndpoints(t, h.Address, helpers.HTTPEndpointCases{
{
URL: "/api/v0/console/last-flow",
ContentType: "application/json; charset=utf-8",
FirstLines: []string{
`{`,
` "InIfBoundary": "external",`,
` "InIfName": "Hu0/0/1/10",`,
` "InIfSpeed": 100000,`,
` "SamplingRate": 10000,`,
` "SrcAddr": "2001:db8::22",`,
` "SrcCountry": "FR",`,
` "TimeReceived": "2022-04-04T08:36:11.00000001Z"`,
`}`,
},
},
})
}

View File

@@ -3,6 +3,7 @@ package console
import ( import (
"testing" "testing"
"akvorado/common/daemon"
"akvorado/common/helpers" "akvorado/common/helpers"
"akvorado/common/http" "akvorado/common/http"
"akvorado/common/reporter" "akvorado/common/reporter"
@@ -20,14 +21,16 @@ func TestServeAssets(t *testing.T) {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
h := http.NewMock(t, r) h := http.NewMock(t, r)
_, err := New(r, Configuration{ c, err := New(r, Configuration{
ServeLiveFS: live, ServeLiveFS: live,
}, Dependencies{ }, Dependencies{
HTTP: h, Daemon: daemon.NewMock(t),
HTTP: h,
}) })
if err != nil { if err != nil {
t.Fatalf("New() error:\n%+v", err) t.Fatalf("New() error:\n%+v", err)
} }
helpers.StartStop(t, c)
helpers.TestHTTPEndpoints(t, h.Address, helpers.HTTPEndpointCases{ helpers.TestHTTPEndpoints(t, h.Address, helpers.HTTPEndpointCases{
{ {

View File

@@ -7,6 +7,8 @@ import (
"strings" "strings"
"testing" "testing"
"akvorado/common/daemon"
"akvorado/common/helpers"
"akvorado/common/http" "akvorado/common/http"
"akvorado/common/reporter" "akvorado/common/reporter"
) )
@@ -28,29 +30,31 @@ func TestServeDocs(t *testing.T) {
t.Run(fmt.Sprintf("%s-%s", name, tc.Path), func(t *testing.T) { t.Run(fmt.Sprintf("%s-%s", name, tc.Path), func(t *testing.T) {
r := reporter.NewMock(t) r := reporter.NewMock(t)
h := http.NewMock(t, r) h := http.NewMock(t, r)
_, err := New(r, Configuration{ c, err := New(r, Configuration{
ServeLiveFS: live, ServeLiveFS: live,
}, Dependencies{ }, Dependencies{
HTTP: h, Daemon: daemon.NewMock(t),
HTTP: h,
}) })
if err != nil { if err != nil {
t.Fatalf("New() error:\n%+v", err) t.Fatalf("New() error:\n%+v", err)
} }
helpers.StartStop(t, c)
resp, err := netHTTP.Get(fmt.Sprintf("http://%s/api/v0/docs/%s", resp, err := netHTTP.Get(fmt.Sprintf("http://%s/api/v0/console/docs/%s",
h.Address, tc.Path)) h.Address, tc.Path))
if err != nil { if err != nil {
t.Fatalf("GET /api/v0/docs/%s:\n%+v", tc.Path, err) t.Fatalf("GET /api/v0/console/docs/%s:\n%+v", tc.Path, err)
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != 200 { if resp.StatusCode != 200 {
t.Errorf("GET /api/v0/docs/%s: got status code %d, not 200", t.Errorf("GET /api/v0/console/docs/%s: got status code %d, not 200",
tc.Path, resp.StatusCode) tc.Path, resp.StatusCode)
} }
body, _ := ioutil.ReadAll(resp.Body) body, _ := ioutil.ReadAll(resp.Body)
if !strings.Contains(string(body), tc.Expect) { if !strings.Contains(string(body), tc.Expect) {
t.Logf("Body:\n%s", string(body)) t.Logf("Body:\n%s", string(body))
t.Errorf("GET /api/v0/docs/%s: does not contain %q", t.Errorf("GET /api/v0/console/docs/%s: does not contain %q",
tc.Path, tc.Expect) tc.Path, tc.Expect)
} }
}) })

View File

@@ -85,7 +85,7 @@ watch(
if (to.id !== from?.id) { if (to.id !== from?.id) {
const id = to.id; const id = to.id;
try { try {
const response = await fetch(`/api/v0/docs/${id}`); const response = await fetch(`/api/v0/console/docs/${id}`);
if (!response.ok) { if (!response.ok) {
throw `got a ${response.status} error`; throw `got a ${response.status} error`;
} }

View File

@@ -11,14 +11,19 @@ import (
"runtime" "runtime"
"sync" "sync"
"akvorado/common/clickhousedb"
"akvorado/common/daemon"
"akvorado/common/http" "akvorado/common/http"
"akvorado/common/reporter" "akvorado/common/reporter"
"gopkg.in/tomb.v2"
) )
// Component represents the console component. // Component represents the console component.
type Component struct { type Component struct {
r *reporter.Reporter r *reporter.Reporter
d *Dependencies d *Dependencies
t tomb.Tomb
config Configuration config Configuration
templates map[string]*template.Template templates map[string]*template.Template
@@ -27,7 +32,9 @@ type Component struct {
// Dependencies define the dependencies of the console component. // Dependencies define the dependencies of the console component.
type Dependencies struct { type Dependencies struct {
HTTP *http.Component Daemon daemon.Component
HTTP *http.Component
ClickHouseDB *clickhousedb.Component
} }
// New creates a new console component. // New creates a new console component.
@@ -38,12 +45,35 @@ func New(reporter *reporter.Reporter, config Configuration, dependencies Depende
config: config, config: config,
} }
c.d.HTTP.AddHandler("/", netHTTP.HandlerFunc(c.assetsHandlerFunc)) c.d.Daemon.Track(&c.t, "console")
c.d.HTTP.GinRouter.GET("/api/v0/docs/:name", c.docsHandlerFunc)
return &c, nil return &c, nil
} }
// Start starts the console component.
func (c *Component) Start() error {
c.r.Info().Msg("starting console component")
c.d.HTTP.AddHandler("/", netHTTP.HandlerFunc(c.assetsHandlerFunc))
c.d.HTTP.GinRouter.GET("/api/v0/console/docs/:name", c.docsHandlerFunc)
c.d.HTTP.GinRouter.GET("/api/v0/console/last-flow", c.lastFlowHandlerFunc)
c.t.Go(func() error {
select {
case <-c.t.Dying():
}
return nil
})
return nil
}
// Stop stops the console component.
func (c *Component) Stop() error {
defer c.r.Info().Msg("console component stopped")
c.r.Info().Msg("stopping console component")
c.t.Kill(nil)
return c.t.Wait()
}
// embedOrLiveFS returns a subset of the provided embedded filesystem, // embedOrLiveFS returns a subset of the provided embedded filesystem,
// except if the component is configured to use the live filesystem. // except if the component is configured to use the live filesystem.
// Then, it returns the provided tree. // Then, it returns the provided tree.