diff --git a/console/graph.go b/console/graph.go index 0ed58605..f627ad0a 100644 --- a/console/graph.go +++ b/console/graph.go @@ -18,7 +18,7 @@ type graphQuery struct { End time.Time `json:"end" binding:"required,gtfield=Start"` Points int `json:"points" binding:"required,min=5,max=2000"` // minimum number of points Dimensions []queryColumn `json:"dimensions"` // group by ... - Limit int `json:"limit" binding:"isdefault|min=1,max=50"` // limit product of dimensions + Limit int `json:"limit" binding:"min=1,max=50"` // limit product of dimensions Filter queryFilter `json:"filter"` // where ... } diff --git a/console/root.go b/console/root.go index 02e516c6..c402135c 100644 --- a/console/root.go +++ b/console/root.go @@ -82,6 +82,7 @@ func (c *Component) Start() error { c.d.HTTP.GinRouter.GET("/api/v0/console/widget/top/:name", c.widgetTopHandlerFunc) c.d.HTTP.GinRouter.GET("/api/v0/console/widget/graph", c.widgetGraphHandlerFunc) c.d.HTTP.GinRouter.POST("/api/v0/console/graph", c.graphHandlerFunc) + c.d.HTTP.GinRouter.POST("/api/v0/console/sankey", c.sankeyHandlerFunc) c.t.Go(func() error { ticker := time.NewTicker(10 * time.Second) diff --git a/console/sankey.go b/console/sankey.go new file mode 100644 index 00000000..f0263f49 --- /dev/null +++ b/console/sankey.go @@ -0,0 +1,169 @@ +package console + +import ( + "fmt" + "net/http" + "sort" + "strings" + "time" + + "github.com/gin-gonic/gin" + + "akvorado/common/helpers" +) + +// sankeyQuery describes the input for the /sankey endpoint. +type sankeyQuery struct { + Start time.Time `json:"start" binding:"required"` + End time.Time `json:"end" binding:"required,gtfield=Start"` + Dimensions []queryColumn `json:"dimensions" binding:"required,min=2"` // group by ... + Limit int `json:"limit" binding:"min=1,max=50"` // limit product of dimensions + Filter queryFilter `json:"filter"` // where ... +} + +// sankeyQueryToSQL converts a sankey query to an SQL request +func (query sankeyQuery) toSQL() (string, error) { + // Filter + where := query.Filter.filter + if where == "" { + where = "{timefilter}" + } else { + where = fmt.Sprintf("{timefilter} AND (%s)", where) + } + + // Select + arrayFields := []string{} + dimensions := []string{} + for _, column := range query.Dimensions { + arrayFields = append(arrayFields, fmt.Sprintf(`if(%s IN (SELECT %s FROM rows), %s, 'Other')`, + column.String(), + column.String(), + column.toSQLSelect())) + dimensions = append(dimensions, column.String()) + } + fields := []string{ + `SUM(Bytes*SamplingRate*8/range) AS bps`, + fmt.Sprintf("[%s] AS dimensions", strings.Join(arrayFields, ",\n ")), + } + + // With + with := []string{ + fmt.Sprintf(`(SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE %s) AS range`, where), + fmt.Sprintf( + "rows AS (SELECT %s FROM {table} WHERE %s GROUP BY %s ORDER BY SUM(Bytes) DESC LIMIT %d)", + strings.Join(dimensions, ", "), + where, + strings.Join(dimensions, ", "), + query.Limit), + } + + sqlQuery := fmt.Sprintf(` +WITH + %s +SELECT + %s +FROM {table} +WHERE %s +GROUP BY dimensions +ORDER BY bps DESC`, strings.Join(with, ",\n "), strings.Join(fields, ",\n "), where) + return sqlQuery, nil +} + +type sankeyHandlerOutput struct { + // Unprocessed data for table view + Rows [][]string `json:"rows"` + Bps []int `json:"bps"` // row → bps + // Processed data for sankey graph + Nodes []string `json:"nodes"` + Links []sankeyLink `json:"links"` +} +type sankeyLink struct { + Source string `json:"source"` + Target string `json:"target"` + Bps int `json:"bps"` +} + +func (c *Component) sankeyHandlerFunc(gc *gin.Context) { + ctx := c.t.Context(gc.Request.Context()) + var query sankeyQuery + if err := gc.ShouldBindJSON(&query); err != nil { + gc.JSON(http.StatusBadRequest, gin.H{"message": helpers.Capitalize(err.Error())}) + return + } + + sqlQuery, err := query.toSQL() + if err != nil { + gc.JSON(http.StatusBadRequest, gin.H{"message": helpers.Capitalize(err.Error())}) + return + } + + // We need to select a resolution allowing us to have a somewhat accurate timespan + resolution := time.Duration(int64(query.End.Sub(query.Start).Nanoseconds()) / 20) + if resolution < time.Second { + resolution = time.Second + } + + // Prepare and execute query + sqlQuery = c.queryFlowsTable(sqlQuery, + query.Start, query.End, resolution) + gc.Header("X-SQL-Query", sqlQuery) + results := []struct { + Bps float64 `ch:"bps"` + Dimensions []string `ch:"dimensions"` + }{} + if err := c.d.ClickHouseDB.Conn.Select(ctx, &results, sqlQuery); err != nil { + c.r.Err(err).Msg("unable to query database") + gc.JSON(http.StatusInternalServerError, gin.H{"message": "Unable to query database."}) + return + } + + // Prepare output + output := sankeyHandlerOutput{ + Rows: make([][]string, 0, len(results)), + Bps: make([]int, 0, len(results)), + Nodes: make([]string, 0), + Links: make([]sankeyLink, 0), + } + completeName := func(name string, index int) string { + if name != "Other" { + return name + } + return fmt.Sprintf("Other %s", query.Dimensions[index].String()) + } + addedNodes := map[string]bool{} + addNode := func(name string) { + if _, ok := addedNodes[name]; !ok { + addedNodes[name] = true + output.Nodes = append(output.Nodes, name) + } + } + addLink := func(source, target string, bps int) { + for idx, link := range output.Links { + if link.Source == source && link.Target == target { + output.Links[idx].Bps += bps + return + } + } + output.Links = append(output.Links, sankeyLink{source, target, bps}) + } + for _, result := range results { + output.Rows = append(output.Rows, result.Dimensions) + output.Bps = append(output.Bps, int(result.Bps)) + // Consider each pair of successive dimensions + for i := 0; i < len(query.Dimensions)-1; i++ { + dimension1 := completeName(result.Dimensions[i], i) + dimension2 := completeName(result.Dimensions[i+1], i+1) + addNode(dimension1) + addNode(dimension2) + addLink(dimension1, dimension2, int(result.Bps)) + } + } + sort.Slice(output.Links, func(i, j int) bool { + if output.Links[i].Bps == output.Links[j].Bps { + return output.Links[i].Source < output.Links[j].Source + } + return output.Links[i].Bps > output.Links[j].Bps + }) + + gc.JSON(http.StatusOK, output) +} diff --git a/console/sankey_test.go b/console/sankey_test.go new file mode 100644 index 00000000..18d6b9dd --- /dev/null +++ b/console/sankey_test.go @@ -0,0 +1,255 @@ +package console + +import ( + "bytes" + "encoding/json" + "fmt" + netHTTP "net/http" + "strings" + "testing" + "time" + + "github.com/gin-gonic/gin" + "github.com/golang/mock/gomock" + + "akvorado/common/clickhousedb" + "akvorado/common/daemon" + "akvorado/common/helpers" + "akvorado/common/http" + "akvorado/common/reporter" +) + +func TestSankeyQuerySQL(t *testing.T) { + cases := []struct { + Description string + Input sankeyQuery + Expected string + }{ + { + Description: "two dimensions, no filters", + Input: sankeyQuery{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Dimensions: []queryColumn{queryColumnSrcAS, queryColumnExporterName}, + Limit: 5, + Filter: queryFilter{}, + }, + Expected: ` +WITH + (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE {timefilter}) AS range, + rows AS (SELECT SrcAS, ExporterName FROM {table} WHERE {timefilter} GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 5) +SELECT + SUM(Bytes*SamplingRate*8/range) AS bps, + [if(SrcAS IN (SELECT SrcAS FROM rows), concat(toString(SrcAS), ': ', dictGetOrDefault('asns', 'name', SrcAS, '???')), 'Other'), + if(ExporterName IN (SELECT ExporterName FROM rows), ExporterName, 'Other')] AS dimensions +FROM {table} +WHERE {timefilter} +GROUP BY dimensions +ORDER BY bps DESC`, + }, { + Description: "two dimensions, with filter", + Input: sankeyQuery{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Dimensions: []queryColumn{queryColumnSrcAS, queryColumnExporterName}, + Limit: 10, + Filter: queryFilter{"DstCountry = 'FR'"}, + }, + Expected: ` +WITH + (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE {timefilter} AND (DstCountry = 'FR')) AS range, + rows AS (SELECT SrcAS, ExporterName FROM {table} WHERE {timefilter} AND (DstCountry = 'FR') GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 10) +SELECT + SUM(Bytes*SamplingRate*8/range) AS bps, + [if(SrcAS IN (SELECT SrcAS FROM rows), concat(toString(SrcAS), ': ', dictGetOrDefault('asns', 'name', SrcAS, '???')), 'Other'), + if(ExporterName IN (SELECT ExporterName FROM rows), ExporterName, 'Other')] AS dimensions +FROM {table} +WHERE {timefilter} AND (DstCountry = 'FR') +GROUP BY dimensions +ORDER BY bps DESC`, + }, + } + for _, tc := range cases { + t.Run(tc.Description, func(t *testing.T) { + got, _ := tc.Input.toSQL() + if diff := helpers.Diff(strings.Split(got, "\n"), strings.Split(tc.Expected, "\n")); diff != "" { + t.Errorf("toSQL (-got, +want):\n%s", diff) + } + }) + } +} + +func TestSankeyHandler(t *testing.T) { + r := reporter.NewMock(t) + ch, mockConn := clickhousedb.NewMock(t, r) + h := http.NewMock(t, r) + c, err := New(r, Configuration{}, Dependencies{ + Daemon: daemon.NewMock(t), + HTTP: h, + ClickHouseDB: ch, + }) + if err != nil { + t.Fatalf("New() error:\n%+v", err) + } + helpers.StartStop(t, c) + + expectedSQL := []struct { + Bps float64 `ch:"bps"` + Dimensions []string `ch:"dimensions"` + }{ + // [(random.randrange(100, 10000), x) + // for x in set([(random.choice(asn), + // random.choice(providers), + // random.choice(routers)) for x in range(30)])] + {9677, []string{"AS100", "Other", "router1"}}, + {9472, []string{"AS300", "provider1", "Other"}}, + {7593, []string{"AS300", "provider2", "router1"}}, + {7234, []string{"AS200", "provider1", "Other"}}, + {6006, []string{"AS100", "provider1", "Other"}}, + {5988, []string{"Other", "provider1", "Other"}}, + {4675, []string{"AS200", "provider3", "Other"}}, + {4348, []string{"AS200", "Other", "router2"}}, + {3999, []string{"AS100", "provider3", "Other"}}, + {3978, []string{"AS100", "provider3", "router2"}}, + {3623, []string{"Other", "Other", "router1"}}, + {3080, []string{"AS300", "provider3", "router2"}}, + {2915, []string{"AS300", "Other", "router1"}}, + {2623, []string{"AS100", "provider1", "router1"}}, + {2482, []string{"AS200", "provider2", "router2"}}, + {2234, []string{"AS100", "provider2", "Other"}}, + {1360, []string{"AS200", "Other", "router1"}}, + {975, []string{"AS300", "Other", "Other"}}, + {717, []string{"AS200", "provider3", "router2"}}, + {621, []string{"Other", "Other", "Other"}}, + {159, []string{"Other", "provider1", "router1"}}, + } + expected := gin.H{ + // Raw data + "rows": [][]string{ + {"AS100", "Other", "router1"}, + {"AS300", "provider1", "Other"}, + {"AS300", "provider2", "router1"}, + {"AS200", "provider1", "Other"}, + {"AS100", "provider1", "Other"}, + {"Other", "provider1", "Other"}, + {"AS200", "provider3", "Other"}, + {"AS200", "Other", "router2"}, + {"AS100", "provider3", "Other"}, + {"AS100", "provider3", "router2"}, + {"Other", "Other", "router1"}, + {"AS300", "provider3", "router2"}, + {"AS300", "Other", "router1"}, + {"AS100", "provider1", "router1"}, + {"AS200", "provider2", "router2"}, + {"AS100", "provider2", "Other"}, + {"AS200", "Other", "router1"}, + {"AS300", "Other", "Other"}, + {"AS200", "provider3", "router2"}, + {"Other", "Other", "Other"}, + {"Other", "provider1", "router1"}, + }, + "bps": []int{ + 9677, + 9472, + 7593, + 7234, + 6006, + 5988, + 4675, + 4348, + 3999, + 3978, + 3623, + 3080, + 2915, + 2623, + 2482, + 2234, + 1360, + 975, + 717, + 621, + 159, + }, + // For graph + "nodes": []string{ + "AS100", + "Other InIfProvider", + "router1", + "AS300", + "provider1", + "Other ExporterName", + "provider2", + "AS200", + "Other SrcAS", + "provider3", + "router2", + }, + "links": []gin.H{ + {"source": "provider1", "target": "Other ExporterName", "bps": 9472 + 7234 + 6006 + 5988}, + {"source": "Other InIfProvider", "target": "router1", "bps": 9677 + 3623 + 2915 + 1360}, + {"source": "AS100", "target": "Other InIfProvider", "bps": 9677}, + {"source": "AS300", "target": "provider1", "bps": 9472}, + {"source": "provider3", "target": "Other ExporterName", "bps": 4675 + 3999}, + {"source": "AS100", "target": "provider1", "bps": 6006 + 2623}, + {"source": "AS100", "target": "provider3", "bps": 3999 + 3978}, + {"source": "provider3", "target": "router2", "bps": 3978 + 3080 + 717}, + {"source": "AS300", "target": "provider2", "bps": 7593}, + {"source": "provider2", "target": "router1", "bps": 7593}, + {"source": "AS200", "target": "provider1", "bps": 7234}, + {"source": "Other SrcAS", "target": "provider1", "bps": 5988 + 159}, + {"source": "AS200", "target": "Other InIfProvider", "bps": 4348 + 1360}, + {"source": "AS200", "target": "provider3", "bps": 4675 + 717}, + {"source": "Other InIfProvider", "target": "router2", "bps": 4348}, + {"source": "Other SrcAS", "target": "Other InIfProvider", "bps": 3623 + 621}, + {"source": "AS300", "target": "Other InIfProvider", "bps": 2915 + 975}, + {"source": "AS300", "target": "provider3", "bps": 3080}, + {"source": "provider1", "target": "router1", "bps": 2623 + 159}, + {"source": "AS200", "target": "provider2", "bps": 2482}, + {"source": "provider2", "target": "router2", "bps": 2482}, + {"source": "AS100", "target": "provider2", "bps": 2234}, + {"source": "provider2", "target": "Other ExporterName", "bps": 2234}, + {"source": "Other InIfProvider", "target": "Other ExporterName", "bps": 975 + 621}, + }, + } + mockConn.EXPECT(). + Select(gomock.Any(), gomock.Any(), gomock.Any()). + SetArg(1, expectedSQL). + Return(nil) + + input := sankeyQuery{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Dimensions: []queryColumn{queryColumnSrcAS, queryColumnInIfProvider, queryColumnExporterName}, + Limit: 10, + Filter: queryFilter{"DstCountry = 'FR'"}, + } + payload := new(bytes.Buffer) + err = json.NewEncoder(payload).Encode(input) + if err != nil { + t.Fatalf("Encode() error:\n%+v", err) + } + resp, err := netHTTP.Post(fmt.Sprintf("http://%s/api/v0/console/sankey", h.Address), + "application/json", payload) + if err != nil { + t.Fatalf("POST /api/v0/console/sankey:\n%+v", err) + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + t.Errorf("POST /api/v0/console/sankey: got status code %d, not 200", resp.StatusCode) + } + gotContentType := resp.Header.Get("Content-Type") + if gotContentType != "application/json; charset=utf-8" { + t.Errorf("POST /api/v0/console/sankey Content-Type (-got, +want):\n-%s\n+%s", + gotContentType, "application/json; charset=utf-8") + } + decoder := json.NewDecoder(resp.Body) + var got gin.H + if err := decoder.Decode(&got); err != nil { + t.Fatalf("POST /api/v0/console/sankey error:\n%+v", err) + } + + if diff := helpers.Diff(got, expected); diff != "" { + t.Fatalf("POST /api/v0/console/sankey (-got, +want):\n%s", diff) + } +}