console: add a bidirectional mode for graphs

It allows one to also display flows in the opposite direction.
This commit is contained in:
Vincent Bernat
2022-08-07 23:15:18 +02:00
parent e89a188d6d
commit 50614cef5b
15 changed files with 978 additions and 425 deletions

View File

@@ -4,6 +4,7 @@
package console
import (
"fmt"
"strings"
"testing"
"time"
@@ -14,6 +15,46 @@ import (
"akvorado/common/helpers"
)
func TestGraphInputReverseDirection(t *testing.T) {
input := graphHandlerInput{
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Dimensions: []queryColumn{
queryColumnExporterName,
queryColumnInIfProvider,
},
Filter: queryFilter{
filter: "DstCountry = 'FR' AND SrcCountry = 'US'",
reverseFilter: "SrcCountry = 'FR' AND DstCountry = 'US'",
},
Units: "l3bps",
}
original1 := fmt.Sprintf("%+v", input)
expected := graphHandlerInput{
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Dimensions: []queryColumn{
queryColumnExporterName,
queryColumnOutIfProvider,
},
Filter: queryFilter{
reverseFilter: "DstCountry = 'FR' AND SrcCountry = 'US'",
filter: "SrcCountry = 'FR' ANd DstCountry = 'US'",
},
Units: "l3bps",
}
got := input.reverseDirection()
original2 := fmt.Sprintf("%+v", input)
if diff := helpers.Diff(got, expected); diff != "" {
t.Fatalf("reverseDirection() (-got, +want):\n%s", diff)
}
if original1 != original2 {
t.Fatalf("reverseDirection() modified original to:\n-%s\n+%s", original1, original2)
}
}
func TestGraphQuerySQL(t *testing.T) {
cases := []struct {
Description string
@@ -31,6 +72,7 @@ func TestGraphQuerySQL(t *testing.T) {
Units: "l3bps",
},
Expected: `
SELECT 1 AS axis, * FROM (
SELECT
toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time,
SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps,
@@ -41,7 +83,7 @@ GROUP BY time, dimensions
ORDER BY time WITH FILL
FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second)
TO {timefilter.Stop}
STEP {resolution->864}`,
STEP {resolution->864})`,
}, {
Description: "no dimensions, no filters, l2 bps",
Input: graphHandlerInput{
@@ -53,6 +95,7 @@ ORDER BY time WITH FILL
Units: "l2bps",
},
Expected: `
SELECT 1 AS axis, * FROM (
SELECT
toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time,
SUM((Bytes+18*Packets)*SamplingRate*8/{resolution->864}) AS xps,
@@ -63,7 +106,7 @@ GROUP BY time, dimensions
ORDER BY time WITH FILL
FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second)
TO {timefilter.Stop}
STEP {resolution->864}`,
STEP {resolution->864})`,
}, {
Description: "no dimensions, no filters, pps",
Input: graphHandlerInput{
@@ -75,6 +118,7 @@ ORDER BY time WITH FILL
Units: "pps",
},
Expected: `
SELECT 1 AS axis, * FROM (
SELECT
toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time,
SUM(Packets*SamplingRate/{resolution->864}) AS xps,
@@ -85,7 +129,7 @@ GROUP BY time, dimensions
ORDER BY time WITH FILL
FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second)
TO {timefilter.Stop}
STEP {resolution->864}`,
STEP {resolution->864})`,
}, {
Description: "no dimensions",
Input: graphHandlerInput{
@@ -97,6 +141,7 @@ ORDER BY time WITH FILL
Units: "l3bps",
},
Expected: `
SELECT 1 AS axis, * FROM (
SELECT
toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time,
SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps,
@@ -107,7 +152,47 @@ GROUP BY time, dimensions
ORDER BY time WITH FILL
FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second)
TO {timefilter.Stop}
STEP {resolution->864}`,
STEP {resolution->864})`,
}, {
Description: "no dimensions, reverse direction",
Input: graphHandlerInput{
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Dimensions: []queryColumn{},
Filter: queryFilter{
filter: "DstCountry = 'FR' AND SrcCountry = 'US'",
reverseFilter: "SrcCountry = 'FR' AND DstCountry = 'US'",
},
Units: "l3bps",
Bidirectional: true,
},
Expected: `
SELECT 1 AS axis, * FROM (
SELECT
toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time,
SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps,
emptyArrayString() AS dimensions
FROM {table}
WHERE {timefilter} AND (DstCountry = 'FR' AND SrcCountry = 'US')
GROUP BY time, dimensions
ORDER BY time WITH FILL
FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second)
TO {timefilter.Stop}
STEP {resolution->864})
UNION ALL
SELECT 2 AS axis, * FROM (
SELECT
toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time,
SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps,
emptyArrayString() AS dimensions
FROM {table}
WHERE {timefilter} AND (SrcCountry = 'FR' AND DstCountry = 'US')
GROUP BY time, dimensions
ORDER BY time WITH FILL
FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second)
TO {timefilter.Stop}
STEP {resolution->864})`,
}, {
Description: "no filters",
Input: graphHandlerInput{
@@ -125,6 +210,7 @@ ORDER BY time WITH FILL
Expected: `
WITH
rows AS (SELECT ExporterName, InIfProvider FROM {table} WHERE {timefilter} GROUP BY ExporterName, InIfProvider ORDER BY SUM(Bytes) DESC LIMIT 20)
SELECT 1 AS axis, * FROM (
SELECT
toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time,
SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps,
@@ -135,12 +221,55 @@ GROUP BY time, dimensions
ORDER BY time WITH FILL
FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second)
TO {timefilter.Stop}
STEP {resolution->864}`,
STEP {resolution->864})`,
}, {
Description: "no filters, reverse",
Input: graphHandlerInput{
Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
Points: 100,
Limit: 20,
Dimensions: []queryColumn{
queryColumnExporterName,
queryColumnInIfProvider,
},
Filter: queryFilter{},
Units: "l3bps",
Bidirectional: true,
},
Expected: `
WITH
rows AS (SELECT ExporterName, InIfProvider FROM {table} WHERE {timefilter} GROUP BY ExporterName, InIfProvider ORDER BY SUM(Bytes) DESC LIMIT 20)
SELECT 1 AS axis, * FROM (
SELECT
toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time,
SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps,
if((ExporterName, InIfProvider) IN rows, [ExporterName, InIfProvider], ['Other', 'Other']) AS dimensions
FROM {table}
WHERE {timefilter}
GROUP BY time, dimensions
ORDER BY time WITH FILL
FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second)
TO {timefilter.Stop}
STEP {resolution->864})
UNION ALL
SELECT 2 AS axis, * FROM (
SELECT
toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time,
SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps,
if((ExporterName, OutIfProvider) IN rows, [ExporterName, OutIfProvider], ['Other', 'Other']) AS dimensions
FROM {table}
WHERE {timefilter}
GROUP BY time, dimensions
ORDER BY time WITH FILL
FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second)
TO {timefilter.Stop}
STEP {resolution->864})`,
},
}
for _, tc := range cases {
t.Run(tc.Description, func(t *testing.T) {
got, _ := tc.Input.toSQL()
got := tc.Input.toSQL()
if diff := helpers.Diff(strings.Split(strings.TrimSpace(got), "\n"),
strings.Split(strings.TrimSpace(tc.Expected), "\n")); diff != "" {
t.Errorf("toSQL (-got, +want):\n%s", diff)
@@ -151,26 +280,75 @@ ORDER BY time WITH FILL
func TestGraphHandler(t *testing.T) {
_, h, mockConn, _ := NewMock(t, DefaultConfiguration())
base := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
// Single direction
expectedSQL := []struct {
Axis uint8 `ch:"axis"`
Time time.Time `ch:"time"`
Xps float64 `ch:"xps"`
Dimensions []string `ch:"dimensions"`
}{
{base, 1000, []string{"router1", "provider1"}},
{base, 2000, []string{"router1", "provider2"}},
{base, 1200, []string{"router2", "provider2"}},
{base, 1100, []string{"router2", "provider3"}},
{base, 1900, []string{"Other", "Other"}},
{base.Add(time.Minute), 500, []string{"router1", "provider1"}},
{base.Add(time.Minute), 5000, []string{"router1", "provider2"}},
{base.Add(time.Minute), 900, []string{"router2", "provider4"}},
{base.Add(time.Minute), 100, []string{"Other", "Other"}},
{base.Add(2 * time.Minute), 100, []string{"router1", "provider1"}},
{base.Add(2 * time.Minute), 3000, []string{"router1", "provider2"}},
{base.Add(2 * time.Minute), 100, []string{"router2", "provider4"}},
{base.Add(2 * time.Minute), 100, []string{"Other", "Other"}},
{1, base, 1000, []string{"router1", "provider1"}},
{1, base, 2000, []string{"router1", "provider2"}},
{1, base, 1200, []string{"router2", "provider2"}},
{1, base, 1100, []string{"router2", "provider3"}},
{1, base, 1900, []string{"Other", "Other"}},
{1, base.Add(time.Minute), 500, []string{"router1", "provider1"}},
{1, base.Add(time.Minute), 5000, []string{"router1", "provider2"}},
{1, base.Add(time.Minute), 900, []string{"router2", "provider4"}},
{1, base.Add(time.Minute), 100, []string{"Other", "Other"}},
{1, base.Add(2 * time.Minute), 100, []string{"router1", "provider1"}},
{1, base.Add(2 * time.Minute), 3000, []string{"router1", "provider2"}},
{1, base.Add(2 * time.Minute), 100, []string{"router2", "provider4"}},
{1, base.Add(2 * time.Minute), 100, []string{"Other", "Other"}},
}
mockConn.EXPECT().
Select(gomock.Any(), gomock.Any(), gomock.Any()).
SetArg(1, expectedSQL).
Return(nil)
// Bidirectional
expectedSQL = []struct {
Axis uint8 `ch:"axis"`
Time time.Time `ch:"time"`
Xps float64 `ch:"xps"`
Dimensions []string `ch:"dimensions"`
}{
{1, base, 1000, []string{"router1", "provider1"}},
{1, base, 2000, []string{"router1", "provider2"}},
{1, base, 1200, []string{"router2", "provider2"}},
{1, base, 1100, []string{"router2", "provider3"}},
{1, base, 1900, []string{"Other", "Other"}},
{1, base.Add(time.Minute), 500, []string{"router1", "provider1"}},
{1, base.Add(time.Minute), 5000, []string{"router1", "provider2"}},
{1, base.Add(time.Minute), 900, []string{"router2", "provider4"}},
// Axes can be mixed. In reality, it seems they cannot
// be interleaved, but ClickHouse documentation does
// not say it is not possible.
{2, base, 100, []string{"router1", "provider1"}},
{2, base, 200, []string{"router1", "provider2"}},
{2, base, 120, []string{"router2", "provider2"}},
{1, base.Add(time.Minute), 100, []string{"Other", "Other"}},
{1, base.Add(2 * time.Minute), 100, []string{"router1", "provider1"}},
{2, base, 110, []string{"router2", "provider3"}},
{2, base, 190, []string{"Other", "Other"}},
{2, base.Add(time.Minute), 50, []string{"router1", "provider1"}},
{2, base.Add(time.Minute), 500, []string{"router1", "provider2"}},
{1, base.Add(2 * time.Minute), 3000, []string{"router1", "provider2"}},
{1, base.Add(2 * time.Minute), 100, []string{"router2", "provider4"}},
{1, base.Add(2 * time.Minute), 100, []string{"Other", "Other"}},
{2, base.Add(time.Minute), 90, []string{"router2", "provider4"}},
{2, base.Add(time.Minute), 10, []string{"Other", "Other"}},
{2, base.Add(2 * time.Minute), 10, []string{"router1", "provider1"}},
{2, base.Add(2 * time.Minute), 300, []string{"router1", "provider2"}},
{2, base.Add(2 * time.Minute), 10, []string{"router2", "provider4"}},
{2, base.Add(2 * time.Minute), 10, []string{"Other", "Other"}},
}
mockConn.EXPECT().
Select(gomock.Any(), gomock.Any(), gomock.Any()).
@@ -179,15 +357,17 @@ func TestGraphHandler(t *testing.T) {
helpers.TestHTTPEndpoints(t, h.Address, helpers.HTTPEndpointCases{
{
URL: "/api/v0/console/graph",
Description: "single direction",
URL: "/api/v0/console/graph",
JSONInput: gin.H{
"start": time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
"end": time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
"points": 100,
"limit": 20,
"dimensions": []string{"ExporterName", "InIfProvider"},
"filter": "DstCountry = 'FR' AND SrcCountry = 'US'",
"units": "l3bps",
"start": time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
"end": time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
"points": 100,
"limit": 20,
"dimensions": []string{"ExporterName", "InIfProvider"},
"filter": "DstCountry = 'FR' AND SrcCountry = 'US'",
"units": "l3bps",
"bidirectional": false,
},
JSONOutput: gin.H{
// Sorted by sum of bps
@@ -244,6 +424,124 @@ func TestGraphHandler(t *testing.T) {
500,
1000,
},
"axis": []int{
1, 1, 1, 1, 1, 1,
},
},
}, {
Description: "bidirectional",
URL: "/api/v0/console/graph",
JSONInput: gin.H{
"start": time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC),
"end": time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC),
"points": 100,
"limit": 20,
"dimensions": []string{"ExporterName", "InIfProvider"},
"filter": "DstCountry = 'FR' AND SrcCountry = 'US'",
"units": "l3bps",
"bidirectional": true,
},
JSONOutput: gin.H{
// Sorted by sum of bps
"rows": [][]string{
{"router1", "provider2"}, // 10000
{"router1", "provider1"}, // 1600
{"router2", "provider2"}, // 1200
{"router2", "provider3"}, // 1100
{"router2", "provider4"}, // 1000
{"Other", "Other"}, // 2100
{"router1", "provider2"}, // 1000
{"router1", "provider1"}, // 160
{"router2", "provider2"}, // 120
{"router2", "provider3"}, // 110
{"router2", "provider4"}, // 100
{"Other", "Other"}, // 210
},
"t": []string{
"2009-11-10T23:00:00Z",
"2009-11-10T23:01:00Z",
"2009-11-10T23:02:00Z",
},
"points": [][]int{
{2000, 5000, 3000},
{1000, 500, 100},
{1200, 0, 0},
{1100, 0, 0},
{0, 900, 100},
{1900, 100, 100},
{200, 500, 300},
{100, 50, 10},
{120, 0, 0},
{110, 0, 0},
{0, 90, 10},
{190, 10, 10},
},
"min": []int{
2000,
100,
1200,
1100,
100,
100,
200,
10,
120,
110,
10,
10,
},
"max": []int{
5000,
1000,
1200,
1100,
900,
1900,
500,
100,
120,
110,
90,
190,
},
"average": []int{
3333,
533,
400,
366,
333,
700,
333,
53,
40,
36,
33,
70,
},
"95th": []int{
4000,
750,
600,
550,
500,
1000,
400,
75,
60,
55,
50,
100,
},
"axis": []int{
1, 1, 1, 1, 1, 1,
2, 2, 2, 2, 2, 2,
},
},
},
})