mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-12 06:24:10 +01:00
console: endpoint for sankey graph
This commit is contained in:
@@ -18,7 +18,7 @@ type graphQuery struct {
|
||||
End time.Time `json:"end" binding:"required,gtfield=Start"`
|
||||
Points int `json:"points" binding:"required,min=5,max=2000"` // minimum number of points
|
||||
Dimensions []queryColumn `json:"dimensions"` // group by ...
|
||||
Limit int `json:"limit" binding:"isdefault|min=1,max=50"` // limit product of dimensions
|
||||
Limit int `json:"limit" binding:"min=1,max=50"` // limit product of dimensions
|
||||
Filter queryFilter `json:"filter"` // where ...
|
||||
}
|
||||
|
||||
|
||||
@@ -82,6 +82,7 @@ func (c *Component) Start() error {
|
||||
c.d.HTTP.GinRouter.GET("/api/v0/console/widget/top/:name", c.widgetTopHandlerFunc)
|
||||
c.d.HTTP.GinRouter.GET("/api/v0/console/widget/graph", c.widgetGraphHandlerFunc)
|
||||
c.d.HTTP.GinRouter.POST("/api/v0/console/graph", c.graphHandlerFunc)
|
||||
c.d.HTTP.GinRouter.POST("/api/v0/console/sankey", c.sankeyHandlerFunc)
|
||||
|
||||
c.t.Go(func() error {
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
|
||||
169
console/sankey.go
Normal file
169
console/sankey.go
Normal file
@@ -0,0 +1,169 @@
|
||||
package console
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
|
||||
"akvorado/common/helpers"
|
||||
)
|
||||
|
||||
// sankeyQuery describes the input for the /sankey endpoint.
|
||||
type sankeyQuery struct {
|
||||
Start time.Time `json:"start" binding:"required"`
|
||||
End time.Time `json:"end" binding:"required,gtfield=Start"`
|
||||
Dimensions []queryColumn `json:"dimensions" binding:"required,min=2"` // group by ...
|
||||
Limit int `json:"limit" binding:"min=1,max=50"` // limit product of dimensions
|
||||
Filter queryFilter `json:"filter"` // where ...
|
||||
}
|
||||
|
||||
// sankeyQueryToSQL converts a sankey query to an SQL request
|
||||
func (query sankeyQuery) toSQL() (string, error) {
|
||||
// Filter
|
||||
where := query.Filter.filter
|
||||
if where == "" {
|
||||
where = "{timefilter}"
|
||||
} else {
|
||||
where = fmt.Sprintf("{timefilter} AND (%s)", where)
|
||||
}
|
||||
|
||||
// Select
|
||||
arrayFields := []string{}
|
||||
dimensions := []string{}
|
||||
for _, column := range query.Dimensions {
|
||||
arrayFields = append(arrayFields, fmt.Sprintf(`if(%s IN (SELECT %s FROM rows), %s, 'Other')`,
|
||||
column.String(),
|
||||
column.String(),
|
||||
column.toSQLSelect()))
|
||||
dimensions = append(dimensions, column.String())
|
||||
}
|
||||
fields := []string{
|
||||
`SUM(Bytes*SamplingRate*8/range) AS bps`,
|
||||
fmt.Sprintf("[%s] AS dimensions", strings.Join(arrayFields, ",\n ")),
|
||||
}
|
||||
|
||||
// With
|
||||
with := []string{
|
||||
fmt.Sprintf(`(SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE %s) AS range`, where),
|
||||
fmt.Sprintf(
|
||||
"rows AS (SELECT %s FROM {table} WHERE %s GROUP BY %s ORDER BY SUM(Bytes) DESC LIMIT %d)",
|
||||
strings.Join(dimensions, ", "),
|
||||
where,
|
||||
strings.Join(dimensions, ", "),
|
||||
query.Limit),
|
||||
}
|
||||
|
||||
sqlQuery := fmt.Sprintf(`
|
||||
WITH
|
||||
%s
|
||||
SELECT
|
||||
%s
|
||||
FROM {table}
|
||||
WHERE %s
|
||||
GROUP BY dimensions
|
||||
ORDER BY bps DESC`, strings.Join(with, ",\n "), strings.Join(fields, ",\n "), where)
|
||||
return sqlQuery, nil
|
||||
}
|
||||
|
||||
type sankeyHandlerOutput struct {
|
||||
// Unprocessed data for table view
|
||||
Rows [][]string `json:"rows"`
|
||||
Bps []int `json:"bps"` // row → bps
|
||||
// Processed data for sankey graph
|
||||
Nodes []string `json:"nodes"`
|
||||
Links []sankeyLink `json:"links"`
|
||||
}
|
||||
type sankeyLink struct {
|
||||
Source string `json:"source"`
|
||||
Target string `json:"target"`
|
||||
Bps int `json:"bps"`
|
||||
}
|
||||
|
||||
func (c *Component) sankeyHandlerFunc(gc *gin.Context) {
|
||||
ctx := c.t.Context(gc.Request.Context())
|
||||
var query sankeyQuery
|
||||
if err := gc.ShouldBindJSON(&query); err != nil {
|
||||
gc.JSON(http.StatusBadRequest, gin.H{"message": helpers.Capitalize(err.Error())})
|
||||
return
|
||||
}
|
||||
|
||||
sqlQuery, err := query.toSQL()
|
||||
if err != nil {
|
||||
gc.JSON(http.StatusBadRequest, gin.H{"message": helpers.Capitalize(err.Error())})
|
||||
return
|
||||
}
|
||||
|
||||
// We need to select a resolution allowing us to have a somewhat accurate timespan
|
||||
resolution := time.Duration(int64(query.End.Sub(query.Start).Nanoseconds()) / 20)
|
||||
if resolution < time.Second {
|
||||
resolution = time.Second
|
||||
}
|
||||
|
||||
// Prepare and execute query
|
||||
sqlQuery = c.queryFlowsTable(sqlQuery,
|
||||
query.Start, query.End, resolution)
|
||||
gc.Header("X-SQL-Query", sqlQuery)
|
||||
results := []struct {
|
||||
Bps float64 `ch:"bps"`
|
||||
Dimensions []string `ch:"dimensions"`
|
||||
}{}
|
||||
if err := c.d.ClickHouseDB.Conn.Select(ctx, &results, sqlQuery); err != nil {
|
||||
c.r.Err(err).Msg("unable to query database")
|
||||
gc.JSON(http.StatusInternalServerError, gin.H{"message": "Unable to query database."})
|
||||
return
|
||||
}
|
||||
|
||||
// Prepare output
|
||||
output := sankeyHandlerOutput{
|
||||
Rows: make([][]string, 0, len(results)),
|
||||
Bps: make([]int, 0, len(results)),
|
||||
Nodes: make([]string, 0),
|
||||
Links: make([]sankeyLink, 0),
|
||||
}
|
||||
completeName := func(name string, index int) string {
|
||||
if name != "Other" {
|
||||
return name
|
||||
}
|
||||
return fmt.Sprintf("Other %s", query.Dimensions[index].String())
|
||||
}
|
||||
addedNodes := map[string]bool{}
|
||||
addNode := func(name string) {
|
||||
if _, ok := addedNodes[name]; !ok {
|
||||
addedNodes[name] = true
|
||||
output.Nodes = append(output.Nodes, name)
|
||||
}
|
||||
}
|
||||
addLink := func(source, target string, bps int) {
|
||||
for idx, link := range output.Links {
|
||||
if link.Source == source && link.Target == target {
|
||||
output.Links[idx].Bps += bps
|
||||
return
|
||||
}
|
||||
}
|
||||
output.Links = append(output.Links, sankeyLink{source, target, bps})
|
||||
}
|
||||
for _, result := range results {
|
||||
output.Rows = append(output.Rows, result.Dimensions)
|
||||
output.Bps = append(output.Bps, int(result.Bps))
|
||||
// Consider each pair of successive dimensions
|
||||
for i := 0; i < len(query.Dimensions)-1; i++ {
|
||||
dimension1 := completeName(result.Dimensions[i], i)
|
||||
dimension2 := completeName(result.Dimensions[i+1], i+1)
|
||||
addNode(dimension1)
|
||||
addNode(dimension2)
|
||||
addLink(dimension1, dimension2, int(result.Bps))
|
||||
}
|
||||
}
|
||||
sort.Slice(output.Links, func(i, j int) bool {
|
||||
if output.Links[i].Bps == output.Links[j].Bps {
|
||||
return output.Links[i].Source < output.Links[j].Source
|
||||
}
|
||||
return output.Links[i].Bps > output.Links[j].Bps
|
||||
})
|
||||
|
||||
gc.JSON(http.StatusOK, output)
|
||||
}
|
||||
255
console/sankey_test.go
Normal file
255
console/sankey_test.go
Normal file
@@ -0,0 +1,255 @@
|
||||
package console
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
netHTTP "net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/golang/mock/gomock"
|
||||
|
||||
"akvorado/common/clickhousedb"
|
||||
"akvorado/common/daemon"
|
||||
"akvorado/common/helpers"
|
||||
"akvorado/common/http"
|
||||
"akvorado/common/reporter"
|
||||
)
|
||||
|
||||
func TestSankeyQuerySQL(t *testing.T) {
|
||||
cases := []struct {
|
||||
Description string
|
||||
Input sankeyQuery
|
||||
Expected string
|
||||
}{
|
||||
{
|
||||
Description: "two dimensions, no filters",
|
||||
Input: sankeyQuery{
|
||||
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
|
||||
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
|
||||
Dimensions: []queryColumn{queryColumnSrcAS, queryColumnExporterName},
|
||||
Limit: 5,
|
||||
Filter: queryFilter{},
|
||||
},
|
||||
Expected: `
|
||||
WITH
|
||||
(SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE {timefilter}) AS range,
|
||||
rows AS (SELECT SrcAS, ExporterName FROM {table} WHERE {timefilter} GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 5)
|
||||
SELECT
|
||||
SUM(Bytes*SamplingRate*8/range) AS bps,
|
||||
[if(SrcAS IN (SELECT SrcAS FROM rows), concat(toString(SrcAS), ': ', dictGetOrDefault('asns', 'name', SrcAS, '???')), 'Other'),
|
||||
if(ExporterName IN (SELECT ExporterName FROM rows), ExporterName, 'Other')] AS dimensions
|
||||
FROM {table}
|
||||
WHERE {timefilter}
|
||||
GROUP BY dimensions
|
||||
ORDER BY bps DESC`,
|
||||
}, {
|
||||
Description: "two dimensions, with filter",
|
||||
Input: sankeyQuery{
|
||||
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
|
||||
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
|
||||
Dimensions: []queryColumn{queryColumnSrcAS, queryColumnExporterName},
|
||||
Limit: 10,
|
||||
Filter: queryFilter{"DstCountry = 'FR'"},
|
||||
},
|
||||
Expected: `
|
||||
WITH
|
||||
(SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE {timefilter} AND (DstCountry = 'FR')) AS range,
|
||||
rows AS (SELECT SrcAS, ExporterName FROM {table} WHERE {timefilter} AND (DstCountry = 'FR') GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 10)
|
||||
SELECT
|
||||
SUM(Bytes*SamplingRate*8/range) AS bps,
|
||||
[if(SrcAS IN (SELECT SrcAS FROM rows), concat(toString(SrcAS), ': ', dictGetOrDefault('asns', 'name', SrcAS, '???')), 'Other'),
|
||||
if(ExporterName IN (SELECT ExporterName FROM rows), ExporterName, 'Other')] AS dimensions
|
||||
FROM {table}
|
||||
WHERE {timefilter} AND (DstCountry = 'FR')
|
||||
GROUP BY dimensions
|
||||
ORDER BY bps DESC`,
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Description, func(t *testing.T) {
|
||||
got, _ := tc.Input.toSQL()
|
||||
if diff := helpers.Diff(strings.Split(got, "\n"), strings.Split(tc.Expected, "\n")); diff != "" {
|
||||
t.Errorf("toSQL (-got, +want):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSankeyHandler(t *testing.T) {
|
||||
r := reporter.NewMock(t)
|
||||
ch, mockConn := clickhousedb.NewMock(t, r)
|
||||
h := http.NewMock(t, r)
|
||||
c, err := New(r, Configuration{}, Dependencies{
|
||||
Daemon: daemon.NewMock(t),
|
||||
HTTP: h,
|
||||
ClickHouseDB: ch,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New() error:\n%+v", err)
|
||||
}
|
||||
helpers.StartStop(t, c)
|
||||
|
||||
expectedSQL := []struct {
|
||||
Bps float64 `ch:"bps"`
|
||||
Dimensions []string `ch:"dimensions"`
|
||||
}{
|
||||
// [(random.randrange(100, 10000), x)
|
||||
// for x in set([(random.choice(asn),
|
||||
// random.choice(providers),
|
||||
// random.choice(routers)) for x in range(30)])]
|
||||
{9677, []string{"AS100", "Other", "router1"}},
|
||||
{9472, []string{"AS300", "provider1", "Other"}},
|
||||
{7593, []string{"AS300", "provider2", "router1"}},
|
||||
{7234, []string{"AS200", "provider1", "Other"}},
|
||||
{6006, []string{"AS100", "provider1", "Other"}},
|
||||
{5988, []string{"Other", "provider1", "Other"}},
|
||||
{4675, []string{"AS200", "provider3", "Other"}},
|
||||
{4348, []string{"AS200", "Other", "router2"}},
|
||||
{3999, []string{"AS100", "provider3", "Other"}},
|
||||
{3978, []string{"AS100", "provider3", "router2"}},
|
||||
{3623, []string{"Other", "Other", "router1"}},
|
||||
{3080, []string{"AS300", "provider3", "router2"}},
|
||||
{2915, []string{"AS300", "Other", "router1"}},
|
||||
{2623, []string{"AS100", "provider1", "router1"}},
|
||||
{2482, []string{"AS200", "provider2", "router2"}},
|
||||
{2234, []string{"AS100", "provider2", "Other"}},
|
||||
{1360, []string{"AS200", "Other", "router1"}},
|
||||
{975, []string{"AS300", "Other", "Other"}},
|
||||
{717, []string{"AS200", "provider3", "router2"}},
|
||||
{621, []string{"Other", "Other", "Other"}},
|
||||
{159, []string{"Other", "provider1", "router1"}},
|
||||
}
|
||||
expected := gin.H{
|
||||
// Raw data
|
||||
"rows": [][]string{
|
||||
{"AS100", "Other", "router1"},
|
||||
{"AS300", "provider1", "Other"},
|
||||
{"AS300", "provider2", "router1"},
|
||||
{"AS200", "provider1", "Other"},
|
||||
{"AS100", "provider1", "Other"},
|
||||
{"Other", "provider1", "Other"},
|
||||
{"AS200", "provider3", "Other"},
|
||||
{"AS200", "Other", "router2"},
|
||||
{"AS100", "provider3", "Other"},
|
||||
{"AS100", "provider3", "router2"},
|
||||
{"Other", "Other", "router1"},
|
||||
{"AS300", "provider3", "router2"},
|
||||
{"AS300", "Other", "router1"},
|
||||
{"AS100", "provider1", "router1"},
|
||||
{"AS200", "provider2", "router2"},
|
||||
{"AS100", "provider2", "Other"},
|
||||
{"AS200", "Other", "router1"},
|
||||
{"AS300", "Other", "Other"},
|
||||
{"AS200", "provider3", "router2"},
|
||||
{"Other", "Other", "Other"},
|
||||
{"Other", "provider1", "router1"},
|
||||
},
|
||||
"bps": []int{
|
||||
9677,
|
||||
9472,
|
||||
7593,
|
||||
7234,
|
||||
6006,
|
||||
5988,
|
||||
4675,
|
||||
4348,
|
||||
3999,
|
||||
3978,
|
||||
3623,
|
||||
3080,
|
||||
2915,
|
||||
2623,
|
||||
2482,
|
||||
2234,
|
||||
1360,
|
||||
975,
|
||||
717,
|
||||
621,
|
||||
159,
|
||||
},
|
||||
// For graph
|
||||
"nodes": []string{
|
||||
"AS100",
|
||||
"Other InIfProvider",
|
||||
"router1",
|
||||
"AS300",
|
||||
"provider1",
|
||||
"Other ExporterName",
|
||||
"provider2",
|
||||
"AS200",
|
||||
"Other SrcAS",
|
||||
"provider3",
|
||||
"router2",
|
||||
},
|
||||
"links": []gin.H{
|
||||
{"source": "provider1", "target": "Other ExporterName", "bps": 9472 + 7234 + 6006 + 5988},
|
||||
{"source": "Other InIfProvider", "target": "router1", "bps": 9677 + 3623 + 2915 + 1360},
|
||||
{"source": "AS100", "target": "Other InIfProvider", "bps": 9677},
|
||||
{"source": "AS300", "target": "provider1", "bps": 9472},
|
||||
{"source": "provider3", "target": "Other ExporterName", "bps": 4675 + 3999},
|
||||
{"source": "AS100", "target": "provider1", "bps": 6006 + 2623},
|
||||
{"source": "AS100", "target": "provider3", "bps": 3999 + 3978},
|
||||
{"source": "provider3", "target": "router2", "bps": 3978 + 3080 + 717},
|
||||
{"source": "AS300", "target": "provider2", "bps": 7593},
|
||||
{"source": "provider2", "target": "router1", "bps": 7593},
|
||||
{"source": "AS200", "target": "provider1", "bps": 7234},
|
||||
{"source": "Other SrcAS", "target": "provider1", "bps": 5988 + 159},
|
||||
{"source": "AS200", "target": "Other InIfProvider", "bps": 4348 + 1360},
|
||||
{"source": "AS200", "target": "provider3", "bps": 4675 + 717},
|
||||
{"source": "Other InIfProvider", "target": "router2", "bps": 4348},
|
||||
{"source": "Other SrcAS", "target": "Other InIfProvider", "bps": 3623 + 621},
|
||||
{"source": "AS300", "target": "Other InIfProvider", "bps": 2915 + 975},
|
||||
{"source": "AS300", "target": "provider3", "bps": 3080},
|
||||
{"source": "provider1", "target": "router1", "bps": 2623 + 159},
|
||||
{"source": "AS200", "target": "provider2", "bps": 2482},
|
||||
{"source": "provider2", "target": "router2", "bps": 2482},
|
||||
{"source": "AS100", "target": "provider2", "bps": 2234},
|
||||
{"source": "provider2", "target": "Other ExporterName", "bps": 2234},
|
||||
{"source": "Other InIfProvider", "target": "Other ExporterName", "bps": 975 + 621},
|
||||
},
|
||||
}
|
||||
mockConn.EXPECT().
|
||||
Select(gomock.Any(), gomock.Any(), gomock.Any()).
|
||||
SetArg(1, expectedSQL).
|
||||
Return(nil)
|
||||
|
||||
input := sankeyQuery{
|
||||
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
|
||||
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
|
||||
Dimensions: []queryColumn{queryColumnSrcAS, queryColumnInIfProvider, queryColumnExporterName},
|
||||
Limit: 10,
|
||||
Filter: queryFilter{"DstCountry = 'FR'"},
|
||||
}
|
||||
payload := new(bytes.Buffer)
|
||||
err = json.NewEncoder(payload).Encode(input)
|
||||
if err != nil {
|
||||
t.Fatalf("Encode() error:\n%+v", err)
|
||||
}
|
||||
resp, err := netHTTP.Post(fmt.Sprintf("http://%s/api/v0/console/sankey", h.Address),
|
||||
"application/json", payload)
|
||||
if err != nil {
|
||||
t.Fatalf("POST /api/v0/console/sankey:\n%+v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != 200 {
|
||||
t.Errorf("POST /api/v0/console/sankey: got status code %d, not 200", resp.StatusCode)
|
||||
}
|
||||
gotContentType := resp.Header.Get("Content-Type")
|
||||
if gotContentType != "application/json; charset=utf-8" {
|
||||
t.Errorf("POST /api/v0/console/sankey Content-Type (-got, +want):\n-%s\n+%s",
|
||||
gotContentType, "application/json; charset=utf-8")
|
||||
}
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
var got gin.H
|
||||
if err := decoder.Decode(&got); err != nil {
|
||||
t.Fatalf("POST /api/v0/console/sankey error:\n%+v", err)
|
||||
}
|
||||
|
||||
if diff := helpers.Diff(got, expected); diff != "" {
|
||||
t.Fatalf("POST /api/v0/console/sankey (-got, +want):\n%s", diff)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user