diff --git a/console/clickhouse.go b/console/clickhouse.go index 2099bc5b..af39f2ab 100644 --- a/console/clickhouse.go +++ b/console/clickhouse.go @@ -74,12 +74,14 @@ func (c *Component) queryFlowsTable(query string, start, end time.Time, targetRe // Build timefilter to match the resolution start = start.Truncate(resolution) end = end.Truncate(resolution) - timeFilter := fmt.Sprintf(`TimeReceived BETWEEN toDateTime('%s', 'UTC') AND toDateTime('%s', 'UTC')`, - start.UTC().Format("2006-01-02 15:04:05"), - end.UTC().Format("2006-01-02 15:04:05")) + timeFilterStart := fmt.Sprintf(`toDateTime('%s', 'UTC')`, start.UTC().Format("2006-01-02 15:04:05")) + timeFilterStop := fmt.Sprintf(`toDateTime('%s', 'UTC')`, end.UTC().Format("2006-01-02 15:04:05")) + timeFilter := fmt.Sprintf(`TimeReceived BETWEEN %s AND %s`, timeFilterStart, timeFilterStop) c.metrics.clickhouseQueries.WithLabelValues(table).Inc() query = strings.ReplaceAll(query, "{timefilter}", timeFilter) + query = strings.ReplaceAll(query, "{timefilter.Start}", timeFilterStart) + query = strings.ReplaceAll(query, "{timefilter.Stop}", timeFilterStop) query = strings.ReplaceAll(query, "{table}", table) query = strings.ReplaceAll(query, "{resolution}", strconv.Itoa(int(resolution.Seconds()))) return query diff --git a/console/graph.go b/console/graph.go index bf79939a..b00cbc92 100644 --- a/console/graph.go +++ b/console/graph.go @@ -40,6 +40,7 @@ type graphHandlerOutput struct { // graphHandlerInputToSQL converts a graph input to an SQL request func (input graphHandlerInput) toSQL() (string, error) { interval := int64((input.End.Sub(input.Start).Seconds())) / int64(input.Points) + slot := fmt.Sprintf(`(intDiv(%d, {resolution})*{resolution})`, interval) // Filter where := input.Filter.filter @@ -51,15 +52,15 @@ func (input graphHandlerInput) toSQL() (string, error) { // Select fields := []string{ - `toStartOfInterval(TimeReceived, INTERVAL slot second) AS time`, + fmt.Sprintf(`toStartOfInterval(TimeReceived, INTERVAL %s second) AS time`, slot), } switch input.Units { case "pps": - fields = append(fields, `SUM(Packets*SamplingRate/slot) AS xps`) + fields = append(fields, fmt.Sprintf(`SUM(Packets*SamplingRate/%s) AS xps`, slot)) case "l3bps": - fields = append(fields, `SUM(Bytes*SamplingRate*8/slot) AS xps`) + fields = append(fields, fmt.Sprintf(`SUM(Bytes*SamplingRate*8/%s) AS xps`, slot)) case "l2bps": - fields = append(fields, `SUM((Bytes+18*Packets)*SamplingRate*8/slot) AS xps`) + fields = append(fields, fmt.Sprintf(`SUM((Bytes+18*Packets)*SamplingRate*8/%s) AS xps`, slot)) } selectFields := []string{} dimensions := []string{} @@ -80,7 +81,7 @@ func (input graphHandlerInput) toSQL() (string, error) { } // With - with := []string{fmt.Sprintf(`intDiv(%d, {resolution})*{resolution} AS slot`, interval)} + with := []string{} if len(dimensions) > 0 { with = append(with, fmt.Sprintf( "rows AS (SELECT %s FROM {table} WHERE %s GROUP BY %s ORDER BY SUM(Bytes) DESC LIMIT %d)", @@ -89,16 +90,22 @@ func (input graphHandlerInput) toSQL() (string, error) { strings.Join(dimensions, ", "), input.Limit)) } + withStr := "" + if len(with) > 0 { + withStr = fmt.Sprintf("WITH\n %s", strings.Join(with, ",\n ")) + } sqlQuery := fmt.Sprintf(` -WITH - %s +%s SELECT %s FROM {table} WHERE %s GROUP BY time, dimensions -ORDER BY time`, strings.Join(with, ",\n "), strings.Join(fields, ",\n "), where) +ORDER BY time WITH FILL + FROM toStartOfInterval({timefilter.Start}, INTERVAL %s second) + TO {timefilter.Stop} + STEP %s`, withStr, strings.Join(fields, ",\n "), where, slot, slot) return sqlQuery, nil } @@ -134,6 +141,21 @@ func (c *Component) graphHandlerFunc(gc *gin.Context) { return } + // When filling 0 value, we may get an empty dimensions. + // From ClickHouse 22.4, it is possible to do interpolation database-side + // (INTERPOLATE (['Other', 'Other'] AS Dimensions)) + if len(input.Dimensions) > 0 { + zeroDimensions := make([]string, len(input.Dimensions)) + for idx := range zeroDimensions { + zeroDimensions[idx] = "Other" + } + for idx := range results { + if len(results[idx].Dimensions) == 0 { + results[idx].Dimensions = zeroDimensions + } + } + } + // We want to sort rows depending on how much data they gather each output := graphHandlerOutput{ Time: []time.Time{}, diff --git a/console/graph_test.go b/console/graph_test.go index 886bea3d..9c5008ed 100644 --- a/console/graph_test.go +++ b/console/graph_test.go @@ -31,16 +31,17 @@ func TestGraphQuerySQL(t *testing.T) { Units: "l3bps", }, Expected: ` -WITH - intDiv(864, {resolution})*{resolution} AS slot SELECT - toStartOfInterval(TimeReceived, INTERVAL slot second) AS time, - SUM(Bytes*SamplingRate*8/slot) AS xps, + toStartOfInterval(TimeReceived, INTERVAL (intDiv(864, {resolution})*{resolution}) second) AS time, + SUM(Bytes*SamplingRate*8/(intDiv(864, {resolution})*{resolution})) AS xps, emptyArrayString() AS dimensions FROM {table} WHERE {timefilter} GROUP BY time, dimensions -ORDER BY time`, +ORDER BY time WITH FILL + FROM toStartOfInterval({timefilter.Start}, INTERVAL (intDiv(864, {resolution})*{resolution}) second) + TO {timefilter.Stop} + STEP (intDiv(864, {resolution})*{resolution})`, }, { Description: "no dimensions, no filters, l2 bps", Input: graphHandlerInput{ @@ -52,16 +53,17 @@ ORDER BY time`, Units: "l2bps", }, Expected: ` -WITH - intDiv(864, {resolution})*{resolution} AS slot SELECT - toStartOfInterval(TimeReceived, INTERVAL slot second) AS time, - SUM((Bytes+18*Packets)*SamplingRate*8/slot) AS xps, + toStartOfInterval(TimeReceived, INTERVAL (intDiv(864, {resolution})*{resolution}) second) AS time, + SUM((Bytes+18*Packets)*SamplingRate*8/(intDiv(864, {resolution})*{resolution})) AS xps, emptyArrayString() AS dimensions FROM {table} WHERE {timefilter} GROUP BY time, dimensions -ORDER BY time`, +ORDER BY time WITH FILL + FROM toStartOfInterval({timefilter.Start}, INTERVAL (intDiv(864, {resolution})*{resolution}) second) + TO {timefilter.Stop} + STEP (intDiv(864, {resolution})*{resolution})`, }, { Description: "no dimensions, no filters, pps", Input: graphHandlerInput{ @@ -73,16 +75,17 @@ ORDER BY time`, Units: "pps", }, Expected: ` -WITH - intDiv(864, {resolution})*{resolution} AS slot SELECT - toStartOfInterval(TimeReceived, INTERVAL slot second) AS time, - SUM(Packets*SamplingRate/slot) AS xps, + toStartOfInterval(TimeReceived, INTERVAL (intDiv(864, {resolution})*{resolution}) second) AS time, + SUM(Packets*SamplingRate/(intDiv(864, {resolution})*{resolution})) AS xps, emptyArrayString() AS dimensions FROM {table} WHERE {timefilter} GROUP BY time, dimensions -ORDER BY time`, +ORDER BY time WITH FILL + FROM toStartOfInterval({timefilter.Start}, INTERVAL (intDiv(864, {resolution})*{resolution}) second) + TO {timefilter.Stop} + STEP (intDiv(864, {resolution})*{resolution})`, }, { Description: "no dimensions", Input: graphHandlerInput{ @@ -94,16 +97,17 @@ ORDER BY time`, Units: "l3bps", }, Expected: ` -WITH - intDiv(864, {resolution})*{resolution} AS slot SELECT - toStartOfInterval(TimeReceived, INTERVAL slot second) AS time, - SUM(Bytes*SamplingRate*8/slot) AS xps, + toStartOfInterval(TimeReceived, INTERVAL (intDiv(864, {resolution})*{resolution}) second) AS time, + SUM(Bytes*SamplingRate*8/(intDiv(864, {resolution})*{resolution})) AS xps, emptyArrayString() AS dimensions FROM {table} WHERE {timefilter} AND (DstCountry = 'FR' AND SrcCountry = 'US') GROUP BY time, dimensions -ORDER BY time`, +ORDER BY time WITH FILL + FROM toStartOfInterval({timefilter.Start}, INTERVAL (intDiv(864, {resolution})*{resolution}) second) + TO {timefilter.Stop} + STEP (intDiv(864, {resolution})*{resolution})`, }, { Description: "no filters", Input: graphHandlerInput{ @@ -120,22 +124,25 @@ ORDER BY time`, }, Expected: ` WITH - intDiv(864, {resolution})*{resolution} AS slot, rows AS (SELECT ExporterName, InIfProvider FROM {table} WHERE {timefilter} GROUP BY ExporterName, InIfProvider ORDER BY SUM(Bytes) DESC LIMIT 20) SELECT - toStartOfInterval(TimeReceived, INTERVAL slot second) AS time, - SUM(Bytes*SamplingRate*8/slot) AS xps, + toStartOfInterval(TimeReceived, INTERVAL (intDiv(864, {resolution})*{resolution}) second) AS time, + SUM(Bytes*SamplingRate*8/(intDiv(864, {resolution})*{resolution})) AS xps, if((ExporterName, InIfProvider) IN rows, [ExporterName, InIfProvider], ['Other', 'Other']) AS dimensions FROM {table} WHERE {timefilter} GROUP BY time, dimensions -ORDER BY time`, +ORDER BY time WITH FILL + FROM toStartOfInterval({timefilter.Start}, INTERVAL (intDiv(864, {resolution})*{resolution}) second) + TO {timefilter.Stop} + STEP (intDiv(864, {resolution})*{resolution})`, }, } for _, tc := range cases { t.Run(tc.Description, func(t *testing.T) { got, _ := tc.Input.toSQL() - if diff := helpers.Diff(strings.Split(got, "\n"), strings.Split(tc.Expected, "\n")); diff != "" { + if diff := helpers.Diff(strings.Split(strings.TrimSpace(got), "\n"), + strings.Split(strings.TrimSpace(tc.Expected), "\n")); diff != "" { t.Errorf("toSQL (-got, +want):\n%s", diff) } }) diff --git a/console/widgets.go b/console/widgets.go index 71cbacfc..eaa05c09 100644 --- a/console/widgets.go +++ b/console/widgets.go @@ -189,18 +189,20 @@ func (c *Component) widgetGraphHandlerFunc(gc *gin.Context) { params.Points = 200 } interval := int64((24 * time.Hour).Seconds()) / int64(params.Points) + slot := fmt.Sprintf(`(intDiv(%d, {resolution})*{resolution})`, interval) now := c.d.Clock.Now() query := c.queryFlowsTable(fmt.Sprintf(` -WITH - intDiv(%d, {resolution})*{resolution} AS slot SELECT - toStartOfInterval(TimeReceived, INTERVAL slot second) AS Time, - SUM(Bytes*SamplingRate*8/slot)/1000/1000/1000 AS Gbps + toStartOfInterval(TimeReceived, INTERVAL %s second) AS Time, + SUM(Bytes*SamplingRate*8/%s)/1000/1000/1000 AS Gbps FROM {table} WHERE {timefilter} AND InIfBoundary = 'external' GROUP BY Time -ORDER BY Time`, interval), now.Add(-24*time.Hour), now, time.Duration(interval)*time.Second) +ORDER BY Time WITH FILL + FROM toStartOfInterval({timefilter.Start}, INTERVAL %s second) + TO {timefilter.Stop} + STEP %s`, slot, slot, slot, slot), now.Add(-24*time.Hour), now, time.Duration(interval)*time.Second) gc.Header("X-SQL-Query", query) results := []struct { diff --git a/console/widgets_test.go b/console/widgets_test.go index cef4a9ef..695519b7 100644 --- a/console/widgets_test.go +++ b/console/widgets_test.go @@ -225,21 +225,22 @@ func TestWidgetGraph(t *testing.T) { {base.Add(time.Minute), 27.8}, {base.Add(2 * time.Minute), 26.4}, {base.Add(3 * time.Minute), 29.2}, - {base.Add(4 * time.Minute), 21.3}, + {base.Add(4 * time.Minute), 0}, {base.Add(5 * time.Minute), 24.7}, } mockConn.EXPECT(). Select(gomock.Any(), gomock.Any(), ` -WITH - intDiv(864, 1)*1 AS slot SELECT - toStartOfInterval(TimeReceived, INTERVAL slot second) AS Time, - SUM(Bytes*SamplingRate*8/slot)/1000/1000/1000 AS Gbps + toStartOfInterval(TimeReceived, INTERVAL (intDiv(864, 1)*1) second) AS Time, + SUM(Bytes*SamplingRate*8/(intDiv(864, 1)*1))/1000/1000/1000 AS Gbps FROM flows WHERE TimeReceived BETWEEN toDateTime('2009-11-10 23:00:00', 'UTC') AND toDateTime('2009-11-11 23:00:00', 'UTC') AND InIfBoundary = 'external' GROUP BY Time -ORDER BY Time`). +ORDER BY Time WITH FILL + FROM toStartOfInterval(toDateTime('2009-11-10 23:00:00', 'UTC'), INTERVAL (intDiv(864, 1)*1) second) + TO toDateTime('2009-11-11 23:00:00', 'UTC') + STEP (intDiv(864, 1)*1)`). SetArg(1, expected). Return(nil) @@ -252,7 +253,7 @@ ORDER BY Time`). {"t": "2009-11-10T23:01:00Z", "gbps": 27.8}, {"t": "2009-11-10T23:02:00Z", "gbps": 26.4}, {"t": "2009-11-10T23:03:00Z", "gbps": 29.2}, - {"t": "2009-11-10T23:04:00Z", "gbps": 21.3}, + {"t": "2009-11-10T23:04:00Z", "gbps": 0}, {"t": "2009-11-10T23:05:00Z", "gbps": 24.7}}, }, },