mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-12 06:24:10 +01:00
console: fill missing values with 0 for timeseries graphs
This commit is contained in:
@@ -74,12 +74,14 @@ func (c *Component) queryFlowsTable(query string, start, end time.Time, targetRe
|
|||||||
// Build timefilter to match the resolution
|
// Build timefilter to match the resolution
|
||||||
start = start.Truncate(resolution)
|
start = start.Truncate(resolution)
|
||||||
end = end.Truncate(resolution)
|
end = end.Truncate(resolution)
|
||||||
timeFilter := fmt.Sprintf(`TimeReceived BETWEEN toDateTime('%s', 'UTC') AND toDateTime('%s', 'UTC')`,
|
timeFilterStart := fmt.Sprintf(`toDateTime('%s', 'UTC')`, start.UTC().Format("2006-01-02 15:04:05"))
|
||||||
start.UTC().Format("2006-01-02 15:04:05"),
|
timeFilterStop := fmt.Sprintf(`toDateTime('%s', 'UTC')`, end.UTC().Format("2006-01-02 15:04:05"))
|
||||||
end.UTC().Format("2006-01-02 15:04:05"))
|
timeFilter := fmt.Sprintf(`TimeReceived BETWEEN %s AND %s`, timeFilterStart, timeFilterStop)
|
||||||
|
|
||||||
c.metrics.clickhouseQueries.WithLabelValues(table).Inc()
|
c.metrics.clickhouseQueries.WithLabelValues(table).Inc()
|
||||||
query = strings.ReplaceAll(query, "{timefilter}", timeFilter)
|
query = strings.ReplaceAll(query, "{timefilter}", timeFilter)
|
||||||
|
query = strings.ReplaceAll(query, "{timefilter.Start}", timeFilterStart)
|
||||||
|
query = strings.ReplaceAll(query, "{timefilter.Stop}", timeFilterStop)
|
||||||
query = strings.ReplaceAll(query, "{table}", table)
|
query = strings.ReplaceAll(query, "{table}", table)
|
||||||
query = strings.ReplaceAll(query, "{resolution}", strconv.Itoa(int(resolution.Seconds())))
|
query = strings.ReplaceAll(query, "{resolution}", strconv.Itoa(int(resolution.Seconds())))
|
||||||
return query
|
return query
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ type graphHandlerOutput struct {
|
|||||||
// graphHandlerInputToSQL converts a graph input to an SQL request
|
// graphHandlerInputToSQL converts a graph input to an SQL request
|
||||||
func (input graphHandlerInput) toSQL() (string, error) {
|
func (input graphHandlerInput) toSQL() (string, error) {
|
||||||
interval := int64((input.End.Sub(input.Start).Seconds())) / int64(input.Points)
|
interval := int64((input.End.Sub(input.Start).Seconds())) / int64(input.Points)
|
||||||
|
slot := fmt.Sprintf(`(intDiv(%d, {resolution})*{resolution})`, interval)
|
||||||
|
|
||||||
// Filter
|
// Filter
|
||||||
where := input.Filter.filter
|
where := input.Filter.filter
|
||||||
@@ -51,15 +52,15 @@ func (input graphHandlerInput) toSQL() (string, error) {
|
|||||||
|
|
||||||
// Select
|
// Select
|
||||||
fields := []string{
|
fields := []string{
|
||||||
`toStartOfInterval(TimeReceived, INTERVAL slot second) AS time`,
|
fmt.Sprintf(`toStartOfInterval(TimeReceived, INTERVAL %s second) AS time`, slot),
|
||||||
}
|
}
|
||||||
switch input.Units {
|
switch input.Units {
|
||||||
case "pps":
|
case "pps":
|
||||||
fields = append(fields, `SUM(Packets*SamplingRate/slot) AS xps`)
|
fields = append(fields, fmt.Sprintf(`SUM(Packets*SamplingRate/%s) AS xps`, slot))
|
||||||
case "l3bps":
|
case "l3bps":
|
||||||
fields = append(fields, `SUM(Bytes*SamplingRate*8/slot) AS xps`)
|
fields = append(fields, fmt.Sprintf(`SUM(Bytes*SamplingRate*8/%s) AS xps`, slot))
|
||||||
case "l2bps":
|
case "l2bps":
|
||||||
fields = append(fields, `SUM((Bytes+18*Packets)*SamplingRate*8/slot) AS xps`)
|
fields = append(fields, fmt.Sprintf(`SUM((Bytes+18*Packets)*SamplingRate*8/%s) AS xps`, slot))
|
||||||
}
|
}
|
||||||
selectFields := []string{}
|
selectFields := []string{}
|
||||||
dimensions := []string{}
|
dimensions := []string{}
|
||||||
@@ -80,7 +81,7 @@ func (input graphHandlerInput) toSQL() (string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// With
|
// With
|
||||||
with := []string{fmt.Sprintf(`intDiv(%d, {resolution})*{resolution} AS slot`, interval)}
|
with := []string{}
|
||||||
if len(dimensions) > 0 {
|
if len(dimensions) > 0 {
|
||||||
with = append(with, fmt.Sprintf(
|
with = append(with, fmt.Sprintf(
|
||||||
"rows AS (SELECT %s FROM {table} WHERE %s GROUP BY %s ORDER BY SUM(Bytes) DESC LIMIT %d)",
|
"rows AS (SELECT %s FROM {table} WHERE %s GROUP BY %s ORDER BY SUM(Bytes) DESC LIMIT %d)",
|
||||||
@@ -89,16 +90,22 @@ func (input graphHandlerInput) toSQL() (string, error) {
|
|||||||
strings.Join(dimensions, ", "),
|
strings.Join(dimensions, ", "),
|
||||||
input.Limit))
|
input.Limit))
|
||||||
}
|
}
|
||||||
|
withStr := ""
|
||||||
|
if len(with) > 0 {
|
||||||
|
withStr = fmt.Sprintf("WITH\n %s", strings.Join(with, ",\n "))
|
||||||
|
}
|
||||||
|
|
||||||
sqlQuery := fmt.Sprintf(`
|
sqlQuery := fmt.Sprintf(`
|
||||||
WITH
|
%s
|
||||||
%s
|
|
||||||
SELECT
|
SELECT
|
||||||
%s
|
%s
|
||||||
FROM {table}
|
FROM {table}
|
||||||
WHERE %s
|
WHERE %s
|
||||||
GROUP BY time, dimensions
|
GROUP BY time, dimensions
|
||||||
ORDER BY time`, strings.Join(with, ",\n "), strings.Join(fields, ",\n "), where)
|
ORDER BY time WITH FILL
|
||||||
|
FROM toStartOfInterval({timefilter.Start}, INTERVAL %s second)
|
||||||
|
TO {timefilter.Stop}
|
||||||
|
STEP %s`, withStr, strings.Join(fields, ",\n "), where, slot, slot)
|
||||||
return sqlQuery, nil
|
return sqlQuery, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -134,6 +141,21 @@ func (c *Component) graphHandlerFunc(gc *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// When filling 0 value, we may get an empty dimensions.
|
||||||
|
// From ClickHouse 22.4, it is possible to do interpolation database-side
|
||||||
|
// (INTERPOLATE (['Other', 'Other'] AS Dimensions))
|
||||||
|
if len(input.Dimensions) > 0 {
|
||||||
|
zeroDimensions := make([]string, len(input.Dimensions))
|
||||||
|
for idx := range zeroDimensions {
|
||||||
|
zeroDimensions[idx] = "Other"
|
||||||
|
}
|
||||||
|
for idx := range results {
|
||||||
|
if len(results[idx].Dimensions) == 0 {
|
||||||
|
results[idx].Dimensions = zeroDimensions
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// We want to sort rows depending on how much data they gather each
|
// We want to sort rows depending on how much data they gather each
|
||||||
output := graphHandlerOutput{
|
output := graphHandlerOutput{
|
||||||
Time: []time.Time{},
|
Time: []time.Time{},
|
||||||
|
|||||||
@@ -31,16 +31,17 @@ func TestGraphQuerySQL(t *testing.T) {
|
|||||||
Units: "l3bps",
|
Units: "l3bps",
|
||||||
},
|
},
|
||||||
Expected: `
|
Expected: `
|
||||||
WITH
|
|
||||||
intDiv(864, {resolution})*{resolution} AS slot
|
|
||||||
SELECT
|
SELECT
|
||||||
toStartOfInterval(TimeReceived, INTERVAL slot second) AS time,
|
toStartOfInterval(TimeReceived, INTERVAL (intDiv(864, {resolution})*{resolution}) second) AS time,
|
||||||
SUM(Bytes*SamplingRate*8/slot) AS xps,
|
SUM(Bytes*SamplingRate*8/(intDiv(864, {resolution})*{resolution})) AS xps,
|
||||||
emptyArrayString() AS dimensions
|
emptyArrayString() AS dimensions
|
||||||
FROM {table}
|
FROM {table}
|
||||||
WHERE {timefilter}
|
WHERE {timefilter}
|
||||||
GROUP BY time, dimensions
|
GROUP BY time, dimensions
|
||||||
ORDER BY time`,
|
ORDER BY time WITH FILL
|
||||||
|
FROM toStartOfInterval({timefilter.Start}, INTERVAL (intDiv(864, {resolution})*{resolution}) second)
|
||||||
|
TO {timefilter.Stop}
|
||||||
|
STEP (intDiv(864, {resolution})*{resolution})`,
|
||||||
}, {
|
}, {
|
||||||
Description: "no dimensions, no filters, l2 bps",
|
Description: "no dimensions, no filters, l2 bps",
|
||||||
Input: graphHandlerInput{
|
Input: graphHandlerInput{
|
||||||
@@ -52,16 +53,17 @@ ORDER BY time`,
|
|||||||
Units: "l2bps",
|
Units: "l2bps",
|
||||||
},
|
},
|
||||||
Expected: `
|
Expected: `
|
||||||
WITH
|
|
||||||
intDiv(864, {resolution})*{resolution} AS slot
|
|
||||||
SELECT
|
SELECT
|
||||||
toStartOfInterval(TimeReceived, INTERVAL slot second) AS time,
|
toStartOfInterval(TimeReceived, INTERVAL (intDiv(864, {resolution})*{resolution}) second) AS time,
|
||||||
SUM((Bytes+18*Packets)*SamplingRate*8/slot) AS xps,
|
SUM((Bytes+18*Packets)*SamplingRate*8/(intDiv(864, {resolution})*{resolution})) AS xps,
|
||||||
emptyArrayString() AS dimensions
|
emptyArrayString() AS dimensions
|
||||||
FROM {table}
|
FROM {table}
|
||||||
WHERE {timefilter}
|
WHERE {timefilter}
|
||||||
GROUP BY time, dimensions
|
GROUP BY time, dimensions
|
||||||
ORDER BY time`,
|
ORDER BY time WITH FILL
|
||||||
|
FROM toStartOfInterval({timefilter.Start}, INTERVAL (intDiv(864, {resolution})*{resolution}) second)
|
||||||
|
TO {timefilter.Stop}
|
||||||
|
STEP (intDiv(864, {resolution})*{resolution})`,
|
||||||
}, {
|
}, {
|
||||||
Description: "no dimensions, no filters, pps",
|
Description: "no dimensions, no filters, pps",
|
||||||
Input: graphHandlerInput{
|
Input: graphHandlerInput{
|
||||||
@@ -73,16 +75,17 @@ ORDER BY time`,
|
|||||||
Units: "pps",
|
Units: "pps",
|
||||||
},
|
},
|
||||||
Expected: `
|
Expected: `
|
||||||
WITH
|
|
||||||
intDiv(864, {resolution})*{resolution} AS slot
|
|
||||||
SELECT
|
SELECT
|
||||||
toStartOfInterval(TimeReceived, INTERVAL slot second) AS time,
|
toStartOfInterval(TimeReceived, INTERVAL (intDiv(864, {resolution})*{resolution}) second) AS time,
|
||||||
SUM(Packets*SamplingRate/slot) AS xps,
|
SUM(Packets*SamplingRate/(intDiv(864, {resolution})*{resolution})) AS xps,
|
||||||
emptyArrayString() AS dimensions
|
emptyArrayString() AS dimensions
|
||||||
FROM {table}
|
FROM {table}
|
||||||
WHERE {timefilter}
|
WHERE {timefilter}
|
||||||
GROUP BY time, dimensions
|
GROUP BY time, dimensions
|
||||||
ORDER BY time`,
|
ORDER BY time WITH FILL
|
||||||
|
FROM toStartOfInterval({timefilter.Start}, INTERVAL (intDiv(864, {resolution})*{resolution}) second)
|
||||||
|
TO {timefilter.Stop}
|
||||||
|
STEP (intDiv(864, {resolution})*{resolution})`,
|
||||||
}, {
|
}, {
|
||||||
Description: "no dimensions",
|
Description: "no dimensions",
|
||||||
Input: graphHandlerInput{
|
Input: graphHandlerInput{
|
||||||
@@ -94,16 +97,17 @@ ORDER BY time`,
|
|||||||
Units: "l3bps",
|
Units: "l3bps",
|
||||||
},
|
},
|
||||||
Expected: `
|
Expected: `
|
||||||
WITH
|
|
||||||
intDiv(864, {resolution})*{resolution} AS slot
|
|
||||||
SELECT
|
SELECT
|
||||||
toStartOfInterval(TimeReceived, INTERVAL slot second) AS time,
|
toStartOfInterval(TimeReceived, INTERVAL (intDiv(864, {resolution})*{resolution}) second) AS time,
|
||||||
SUM(Bytes*SamplingRate*8/slot) AS xps,
|
SUM(Bytes*SamplingRate*8/(intDiv(864, {resolution})*{resolution})) AS xps,
|
||||||
emptyArrayString() AS dimensions
|
emptyArrayString() AS dimensions
|
||||||
FROM {table}
|
FROM {table}
|
||||||
WHERE {timefilter} AND (DstCountry = 'FR' AND SrcCountry = 'US')
|
WHERE {timefilter} AND (DstCountry = 'FR' AND SrcCountry = 'US')
|
||||||
GROUP BY time, dimensions
|
GROUP BY time, dimensions
|
||||||
ORDER BY time`,
|
ORDER BY time WITH FILL
|
||||||
|
FROM toStartOfInterval({timefilter.Start}, INTERVAL (intDiv(864, {resolution})*{resolution}) second)
|
||||||
|
TO {timefilter.Stop}
|
||||||
|
STEP (intDiv(864, {resolution})*{resolution})`,
|
||||||
}, {
|
}, {
|
||||||
Description: "no filters",
|
Description: "no filters",
|
||||||
Input: graphHandlerInput{
|
Input: graphHandlerInput{
|
||||||
@@ -120,22 +124,25 @@ ORDER BY time`,
|
|||||||
},
|
},
|
||||||
Expected: `
|
Expected: `
|
||||||
WITH
|
WITH
|
||||||
intDiv(864, {resolution})*{resolution} AS slot,
|
|
||||||
rows AS (SELECT ExporterName, InIfProvider FROM {table} WHERE {timefilter} GROUP BY ExporterName, InIfProvider ORDER BY SUM(Bytes) DESC LIMIT 20)
|
rows AS (SELECT ExporterName, InIfProvider FROM {table} WHERE {timefilter} GROUP BY ExporterName, InIfProvider ORDER BY SUM(Bytes) DESC LIMIT 20)
|
||||||
SELECT
|
SELECT
|
||||||
toStartOfInterval(TimeReceived, INTERVAL slot second) AS time,
|
toStartOfInterval(TimeReceived, INTERVAL (intDiv(864, {resolution})*{resolution}) second) AS time,
|
||||||
SUM(Bytes*SamplingRate*8/slot) AS xps,
|
SUM(Bytes*SamplingRate*8/(intDiv(864, {resolution})*{resolution})) AS xps,
|
||||||
if((ExporterName, InIfProvider) IN rows, [ExporterName, InIfProvider], ['Other', 'Other']) AS dimensions
|
if((ExporterName, InIfProvider) IN rows, [ExporterName, InIfProvider], ['Other', 'Other']) AS dimensions
|
||||||
FROM {table}
|
FROM {table}
|
||||||
WHERE {timefilter}
|
WHERE {timefilter}
|
||||||
GROUP BY time, dimensions
|
GROUP BY time, dimensions
|
||||||
ORDER BY time`,
|
ORDER BY time WITH FILL
|
||||||
|
FROM toStartOfInterval({timefilter.Start}, INTERVAL (intDiv(864, {resolution})*{resolution}) second)
|
||||||
|
TO {timefilter.Stop}
|
||||||
|
STEP (intDiv(864, {resolution})*{resolution})`,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
t.Run(tc.Description, func(t *testing.T) {
|
t.Run(tc.Description, func(t *testing.T) {
|
||||||
got, _ := tc.Input.toSQL()
|
got, _ := tc.Input.toSQL()
|
||||||
if diff := helpers.Diff(strings.Split(got, "\n"), strings.Split(tc.Expected, "\n")); diff != "" {
|
if diff := helpers.Diff(strings.Split(strings.TrimSpace(got), "\n"),
|
||||||
|
strings.Split(strings.TrimSpace(tc.Expected), "\n")); diff != "" {
|
||||||
t.Errorf("toSQL (-got, +want):\n%s", diff)
|
t.Errorf("toSQL (-got, +want):\n%s", diff)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -189,18 +189,20 @@ func (c *Component) widgetGraphHandlerFunc(gc *gin.Context) {
|
|||||||
params.Points = 200
|
params.Points = 200
|
||||||
}
|
}
|
||||||
interval := int64((24 * time.Hour).Seconds()) / int64(params.Points)
|
interval := int64((24 * time.Hour).Seconds()) / int64(params.Points)
|
||||||
|
slot := fmt.Sprintf(`(intDiv(%d, {resolution})*{resolution})`, interval)
|
||||||
now := c.d.Clock.Now()
|
now := c.d.Clock.Now()
|
||||||
query := c.queryFlowsTable(fmt.Sprintf(`
|
query := c.queryFlowsTable(fmt.Sprintf(`
|
||||||
WITH
|
|
||||||
intDiv(%d, {resolution})*{resolution} AS slot
|
|
||||||
SELECT
|
SELECT
|
||||||
toStartOfInterval(TimeReceived, INTERVAL slot second) AS Time,
|
toStartOfInterval(TimeReceived, INTERVAL %s second) AS Time,
|
||||||
SUM(Bytes*SamplingRate*8/slot)/1000/1000/1000 AS Gbps
|
SUM(Bytes*SamplingRate*8/%s)/1000/1000/1000 AS Gbps
|
||||||
FROM {table}
|
FROM {table}
|
||||||
WHERE {timefilter}
|
WHERE {timefilter}
|
||||||
AND InIfBoundary = 'external'
|
AND InIfBoundary = 'external'
|
||||||
GROUP BY Time
|
GROUP BY Time
|
||||||
ORDER BY Time`, interval), now.Add(-24*time.Hour), now, time.Duration(interval)*time.Second)
|
ORDER BY Time WITH FILL
|
||||||
|
FROM toStartOfInterval({timefilter.Start}, INTERVAL %s second)
|
||||||
|
TO {timefilter.Stop}
|
||||||
|
STEP %s`, slot, slot, slot, slot), now.Add(-24*time.Hour), now, time.Duration(interval)*time.Second)
|
||||||
gc.Header("X-SQL-Query", query)
|
gc.Header("X-SQL-Query", query)
|
||||||
|
|
||||||
results := []struct {
|
results := []struct {
|
||||||
|
|||||||
@@ -225,21 +225,22 @@ func TestWidgetGraph(t *testing.T) {
|
|||||||
{base.Add(time.Minute), 27.8},
|
{base.Add(time.Minute), 27.8},
|
||||||
{base.Add(2 * time.Minute), 26.4},
|
{base.Add(2 * time.Minute), 26.4},
|
||||||
{base.Add(3 * time.Minute), 29.2},
|
{base.Add(3 * time.Minute), 29.2},
|
||||||
{base.Add(4 * time.Minute), 21.3},
|
{base.Add(4 * time.Minute), 0},
|
||||||
{base.Add(5 * time.Minute), 24.7},
|
{base.Add(5 * time.Minute), 24.7},
|
||||||
}
|
}
|
||||||
mockConn.EXPECT().
|
mockConn.EXPECT().
|
||||||
Select(gomock.Any(), gomock.Any(), `
|
Select(gomock.Any(), gomock.Any(), `
|
||||||
WITH
|
|
||||||
intDiv(864, 1)*1 AS slot
|
|
||||||
SELECT
|
SELECT
|
||||||
toStartOfInterval(TimeReceived, INTERVAL slot second) AS Time,
|
toStartOfInterval(TimeReceived, INTERVAL (intDiv(864, 1)*1) second) AS Time,
|
||||||
SUM(Bytes*SamplingRate*8/slot)/1000/1000/1000 AS Gbps
|
SUM(Bytes*SamplingRate*8/(intDiv(864, 1)*1))/1000/1000/1000 AS Gbps
|
||||||
FROM flows
|
FROM flows
|
||||||
WHERE TimeReceived BETWEEN toDateTime('2009-11-10 23:00:00', 'UTC') AND toDateTime('2009-11-11 23:00:00', 'UTC')
|
WHERE TimeReceived BETWEEN toDateTime('2009-11-10 23:00:00', 'UTC') AND toDateTime('2009-11-11 23:00:00', 'UTC')
|
||||||
AND InIfBoundary = 'external'
|
AND InIfBoundary = 'external'
|
||||||
GROUP BY Time
|
GROUP BY Time
|
||||||
ORDER BY Time`).
|
ORDER BY Time WITH FILL
|
||||||
|
FROM toStartOfInterval(toDateTime('2009-11-10 23:00:00', 'UTC'), INTERVAL (intDiv(864, 1)*1) second)
|
||||||
|
TO toDateTime('2009-11-11 23:00:00', 'UTC')
|
||||||
|
STEP (intDiv(864, 1)*1)`).
|
||||||
SetArg(1, expected).
|
SetArg(1, expected).
|
||||||
Return(nil)
|
Return(nil)
|
||||||
|
|
||||||
@@ -252,7 +253,7 @@ ORDER BY Time`).
|
|||||||
{"t": "2009-11-10T23:01:00Z", "gbps": 27.8},
|
{"t": "2009-11-10T23:01:00Z", "gbps": 27.8},
|
||||||
{"t": "2009-11-10T23:02:00Z", "gbps": 26.4},
|
{"t": "2009-11-10T23:02:00Z", "gbps": 26.4},
|
||||||
{"t": "2009-11-10T23:03:00Z", "gbps": 29.2},
|
{"t": "2009-11-10T23:03:00Z", "gbps": 29.2},
|
||||||
{"t": "2009-11-10T23:04:00Z", "gbps": 21.3},
|
{"t": "2009-11-10T23:04:00Z", "gbps": 0},
|
||||||
{"t": "2009-11-10T23:05:00Z", "gbps": 24.7}},
|
{"t": "2009-11-10T23:05:00Z", "gbps": 24.7}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|||||||
Reference in New Issue
Block a user