From 67703cc61e42f2d40e9157c3bedf3ba33b06059d Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Tue, 9 Aug 2022 11:40:51 +0200 Subject: [PATCH] console: use templates to build SQL query This is needed if we want to be able to mix use of several tables inside a single query (for example, flows_1m0s for a part of the query and flows_5m0s for another part to overlay historical data). Also, the way we handle time buckets is now cleaner. The previous way had two stages of rounding and was incorrect. We were discarding the first and last value for this reason. The new way only has one stage of rounding and is correct. It tries hard to align the buckets at the specified start time. We don't need to discard these values anymore. We still discard the last one because it could be incomplete (when end is "now"). --- console/clickhouse.go | 277 ++++++++++++------ console/clickhouse_test.go | 186 +++++++----- .../src/views/HomePage/WidgetGraph.vue | 2 +- .../VisualizePage/DataGraphTimeSeries.vue | 2 +- console/graph.go | 65 ++-- console/graph_test.go | 175 +++++++---- console/root.go | 2 +- console/sankey.go | 49 ++-- console/sankey_test.go | 61 ++-- console/widgets.go | 51 ++-- console/widgets_test.go | 9 +- 11 files changed, 535 insertions(+), 344 deletions(-) diff --git a/console/clickhouse.go b/console/clickhouse.go index ee42e8ce..f859da58 100644 --- a/console/clickhouse.go +++ b/console/clickhouse.go @@ -4,16 +4,15 @@ package console import ( + "bytes" + "encoding/json" "errors" "fmt" - "regexp" - "strconv" "strings" + "text/template" "time" ) -var resolutionRegexp = regexp.MustCompile(`{resolution->(\d+)}`) - // flowsTable describe a consolidated or unconsolidated flows table. type flowsTable struct { Name string @@ -21,95 +20,6 @@ type flowsTable struct { Oldest time.Time } -// Build a query against the flows table or one of the consolidated -// version depending on the information needed. The provided query -// should contain `{table}` which will be replaced by the appropriate -// flows table and {timefilter} which will be replaced by the -// appropriate time filter. -func (c *Component) queryFlowsTable(query string, mainTableRequired bool, start, end time.Time, targetResolution time.Duration) string { - c.flowsTablesLock.RLock() - defer c.flowsTablesLock.RUnlock() - - // Select table - table := "flows" - resolution := time.Second - if !mainTableRequired { - // We can use the consolidated data. The first - // criteria is to find the tables matching the time - // criteria. - candidates := []int{} - for idx, table := range c.flowsTables { - if start.After(table.Oldest.Add(table.Resolution)) { - candidates = append(candidates, idx) - } - } - if len(candidates) == 0 { - // No candidate, fallback to the one with oldest data - best := 0 - for idx, table := range c.flowsTables { - if c.flowsTables[best].Oldest.After(table.Oldest.Add(table.Resolution)) { - best = idx - } - } - candidates = []int{best} - // Add other candidates that are not far off in term of oldest data - for idx, table := range c.flowsTables { - if idx == best { - continue - } - if c.flowsTables[best].Oldest.After(table.Oldest) { - candidates = append(candidates, idx) - } - } - } - if len(candidates) > 1 { - // Use resolution to find the best one - best := 0 - for _, idx := range candidates { - if c.flowsTables[idx].Resolution > targetResolution { - continue - } - if c.flowsTables[idx].Resolution > c.flowsTables[best].Resolution { - best = idx - } - } - candidates = []int{best} - } - table = c.flowsTables[candidates[0]].Name - resolution = c.flowsTables[candidates[0]].Resolution - } - if resolution == 0 { - resolution = time.Second - } - - // Build timefilter to match the resolution - start = start.Truncate(resolution) - end = end.Truncate(resolution) - timeFilterStart := fmt.Sprintf(`toDateTime('%s', 'UTC')`, start.UTC().Format("2006-01-02 15:04:05")) - timeFilterStop := fmt.Sprintf(`toDateTime('%s', 'UTC')`, end.UTC().Format("2006-01-02 15:04:05")) - timeFilter := fmt.Sprintf(`TimeReceived BETWEEN %s AND %s`, timeFilterStart, timeFilterStop) - - c.metrics.clickhouseQueries.WithLabelValues(table).Inc() - query = strings.ReplaceAll(query, "{timefilter}", timeFilter) - query = strings.ReplaceAll(query, "{timefilter.Start}", timeFilterStart) - query = strings.ReplaceAll(query, "{timefilter.Stop}", timeFilterStop) - query = strings.ReplaceAll(query, "{table}", table) - query = strings.ReplaceAll(query, "{resolution}", strconv.Itoa(int(resolution.Seconds()))) - query = resolutionRegexp.ReplaceAllStringFunc(query, func(in string) string { - matches := resolutionRegexp.FindStringSubmatch(in) - target, err := strconv.Atoi(matches[1]) - if err != nil { - panic(err) - } - target = target / int(resolution.Seconds()) * int(resolution.Seconds()) - if target < 1 { - target = 1 - } - return strconv.Itoa(target) - }) - return query -} - // refreshFlowsTables refreshes the information we have about flows // tables (live one and consolidated ones). This information includes // the consolidation interval and the oldest available data. @@ -166,3 +76,184 @@ AND engine LIKE '%MergeTree' c.flowsTablesLock.Unlock() return nil } + +// finalizeQuery builds the finalized query. A single "context" +// function is provided to return a `Context` struct with all the +// information needed. +func (c *Component) finalizeQuery(query string) string { + t := template.Must(template.New("query"). + Funcs(template.FuncMap{ + "context": c.contextFunc, + }). + Option("missingkey=error"). + Parse(strings.TrimSpace(query))) + buf := bytes.NewBufferString("") + if err := t.Execute(buf, nil); err != nil { + c.r.Err(err).Str("query", query).Msg("invalid query") + panic(err) + } + return buf.String() +} + +type inputContext struct { + Start time.Time `json:"start"` + End time.Time `json:"end"` + MainTableRequired bool `json:"main-table-required,omitempty"` + Points uint `json:"points"` + Units string `json:"units,omitempty"` +} + +type context struct { + Table string + Timefilter string + TimefilterStart string + TimefilterEnd string + Units string + Interval uint64 + ToStartOfInterval func(string) string +} + +// templateEscape escapes `{{` and `}}` from a string. In fact, only +// the opening tag needs to be escaped. +func templateEscape(input string) string { + return strings.ReplaceAll(input, `{{`, `{{"{{"}}`) +} + +// templateDate turns a date into an UTC string compatible with ClickHouse. +func templateDate(input time.Time) string { + return input.UTC().Format("2006-01-02 15:04:05") +} + +// templateWhere transforms a filter to a WHERE clause +func templateWhere(qf queryFilter) string { + if qf.Filter == "" { + return `{{ .Timefilter }}` + } + return fmt.Sprintf(`{{ .Timefilter }} AND (%s)`, templateEscape(qf.Filter)) +} + +// templateTable builds a template directive to select the right table +func templateContext(context inputContext) string { + encoded, err := json.Marshal(context) + if err != nil { + panic(err) + } + return fmt.Sprintf("context `%s`", string(encoded)) +} + +func (c *Component) contextFunc(inputStr string) context { + var input inputContext + if err := json.Unmarshal([]byte(inputStr), &input); err != nil { + panic(err) + } + + targetInterval := time.Duration(uint64(input.End.Sub(input.Start)) / uint64(input.Points)) + if targetInterval < time.Second { + targetInterval = time.Second + } + + c.flowsTablesLock.RLock() + defer c.flowsTablesLock.RUnlock() + + // Select table + table := "flows" + computedInterval := time.Second + if !input.MainTableRequired && len(c.flowsTables) > 0 { + // We can use the consolidated data. The first + // criteria is to find the tables matching the time + // criteria. + candidates := []int{} + for idx, table := range c.flowsTables { + if input.Start.After(table.Oldest.Add(table.Resolution)) { + candidates = append(candidates, idx) + } + } + if len(candidates) == 0 { + // No candidate, fallback to the one with oldest data + best := 0 + for idx, table := range c.flowsTables { + if c.flowsTables[best].Oldest.After(table.Oldest.Add(table.Resolution)) { + best = idx + } + } + candidates = []int{best} + // Add other candidates that are not far off in term of oldest data + for idx, table := range c.flowsTables { + if idx == best { + continue + } + if c.flowsTables[best].Oldest.After(table.Oldest) { + candidates = append(candidates, idx) + } + } + } + if len(candidates) > 1 { + // Use interval to find the best one + best := 0 + for _, idx := range candidates { + if c.flowsTables[idx].Resolution > targetInterval { + continue + } + if c.flowsTables[idx].Resolution > c.flowsTables[best].Resolution { + best = idx + } + } + candidates = []int{best} + } + table = c.flowsTables[candidates[0]].Name + computedInterval = c.flowsTables[candidates[0]].Resolution + } + if computedInterval < time.Second { + computedInterval = time.Second + } + + // Make start/end match the computed interval (currently equal to the table resolution) + start := input.Start.Truncate(computedInterval) + end := input.End.Truncate(computedInterval) + // Adapt the computed interval to match the target one more closely + if targetInterval > computedInterval { + computedInterval = targetInterval.Truncate(computedInterval) + } + // Adapt end to ensure we get a full interval + end = start.Add(end.Sub(start).Truncate(computedInterval)) + // Now, toStartOfInterval will provide an incorrect value. We + // compute a correction offset. Go's truncate seems to + // be different from what we expect. + computedIntervalOffset := start.UTC().Sub( + time.Unix(start.UTC().Unix()/ + int64(computedInterval.Seconds())* + int64(computedInterval.Seconds()), 0)) + diffOffset := uint64(computedInterval.Seconds()) - uint64(computedIntervalOffset.Seconds()) + + // Compute all strings + timefilterStart := fmt.Sprintf(`toDateTime('%s', 'UTC')`, start.UTC().Format("2006-01-02 15:04:05")) + timefilterEnd := fmt.Sprintf(`toDateTime('%s', 'UTC')`, end.UTC().Format("2006-01-02 15:04:05")) + timefilter := fmt.Sprintf(`TimeReceived BETWEEN %s AND %s`, timefilterStart, timefilterEnd) + var units string + switch input.Units { + case "pps": + units = `SUM(Packets*SamplingRate)` + case "l3bps": + units = `SUM(Bytes*SamplingRate*8)` + case "l2bps": + units = `SUM((Bytes+18*Packets)*SamplingRate*8)` + } + + c.metrics.clickhouseQueries.WithLabelValues(table).Inc() + return context{ + Table: table, + Timefilter: timefilter, + TimefilterStart: timefilterStart, + TimefilterEnd: timefilterEnd, + Units: units, + Interval: uint64(computedInterval.Seconds()), + ToStartOfInterval: func(field string) string { + return fmt.Sprintf( + `toStartOfInterval(%s + INTERVAL %d second, INTERVAL %d second) - INTERVAL %d second`, + field, + diffOffset, + uint64(computedInterval.Seconds()), + diffOffset) + }, + } +} diff --git a/console/clickhouse_test.go b/console/clickhouse_test.go index 2095fd83..7a5e094e 100644 --- a/console/clickhouse_test.go +++ b/console/clickhouse_test.go @@ -4,6 +4,7 @@ package console import ( + "fmt" "testing" "time" @@ -70,113 +71,143 @@ AND engine LIKE '%MergeTree' } } -func TestQueryFlowsTables(t *testing.T) { +func TestFinalizeQuery(t *testing.T) { cases := []struct { - Description string - Tables []flowsTable - Query string - MainTableRequired bool - Start time.Time - End time.Time - Resolution time.Duration - Expected string + Description string + Tables []flowsTable + Query string + Context inputContext + Expected string }{ { - Description: "query with source port", - Query: "SELECT TimeReceived, SrcPort FROM {table} WHERE {timefilter}", - MainTableRequired: true, - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Expected: "SELECT TimeReceived, SrcPort FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", + Description: "simple query without additional tables", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 86400, + }, + Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", + }, { + Description: "query with source port", + Query: "SELECT TimeReceived, SrcPort FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + MainTableRequired: true, + Points: 86400, + }, + Expected: "SELECT TimeReceived, SrcPort FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", }, { Description: "only flows table available", Tables: []flowsTable{{"flows", 0, time.Date(2022, 03, 10, 15, 45, 10, 0, time.UTC)}}, - Query: "SELECT 1 FROM {table} WHERE {timefilter}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 86400, + }, + Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", }, { Description: "timefilter.Start and timefilter.Stop", Tables: []flowsTable{{"flows", 0, time.Date(2022, 03, 10, 15, 45, 10, 0, time.UTC)}}, - Query: "SELECT {timefilter.Start}, {timefilter.Stop}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Expected: "SELECT toDateTime('2022-04-10 15:45:10', 'UTC'), toDateTime('2022-04-11 15:45:10', 'UTC')", + Query: "SELECT {{ .TimefilterStart }}, {{ .TimefilterEnd }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 86400, + }, + Expected: "SELECT toDateTime('2022-04-10 15:45:10', 'UTC'), toDateTime('2022-04-11 15:45:10', 'UTC')", }, { Description: "only flows table and out of range request", Tables: []flowsTable{{"flows", 0, time.Date(2022, 04, 10, 22, 45, 10, 0, time.UTC)}}, - Query: "SELECT 1 FROM {table} WHERE {timefilter}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 86400, + }, + Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", }, { Description: "select consolidated table", Tables: []flowsTable{ {"flows", 0, time.Date(2022, 03, 10, 22, 45, 10, 0, time.UTC)}, {"flows_1m0s", time.Minute, time.Date(2022, 04, 2, 22, 45, 10, 0, time.UTC)}, }, - Query: "SELECT 1 FROM {table} WHERE {timefilter}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Resolution: 2 * time.Minute, - Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC')", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }} // {{ .Interval }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 720, // 2-minute resolution + }, + Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC') // 120", }, { Description: "select consolidated table out of range", Tables: []flowsTable{ {"flows", 0, time.Date(2022, 04, 10, 22, 45, 10, 0, time.UTC)}, {"flows_1m0s", time.Minute, time.Date(2022, 04, 10, 17, 45, 10, 0, time.UTC)}, }, - Query: "SELECT 1 FROM {table} WHERE {timefilter}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Resolution: 2 * time.Minute, - Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC')", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 720, // 2-minute resolution, + }, + Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC')", }, { Description: "select flows table out of range", Tables: []flowsTable{ {"flows", 0, time.Date(2022, 04, 10, 16, 45, 10, 0, time.UTC)}, {"flows_1m0s", time.Minute, time.Date(2022, 04, 10, 17, 45, 10, 0, time.UTC)}, }, - Query: "SELECT 1 FROM {table} WHERE {timefilter}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Resolution: 2 * time.Minute, - Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 720, // 2-minute resolution, + }, + Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", }, { - Description: "select flows table better resolution", + Description: "select flows table with better resolution", Tables: []flowsTable{ {"flows", 0, time.Date(2022, 03, 10, 16, 45, 10, 0, time.UTC)}, {"flows_1m0s", time.Minute, time.Date(2022, 03, 10, 17, 45, 10, 0, time.UTC)}, }, - Query: "SELECT 1 FROM {table} WHERE {timefilter} // {resolution} // {resolution->864}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Resolution: 30 * time.Second, - Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC') // 1 // 864", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }} // {{ .Interval }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 2880, + }, + Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC') // 30", }, { - Description: "select consolidated table better resolution", + Description: "select consolidated table with better resolution", Tables: []flowsTable{ {"flows", 0, time.Date(2022, 03, 10, 22, 45, 10, 0, time.UTC)}, {"flows_5m0s", 5 * time.Minute, time.Date(2022, 04, 2, 22, 45, 10, 0, time.UTC)}, {"flows_1m0s", time.Minute, time.Date(2022, 04, 2, 22, 45, 10, 0, time.UTC)}, }, - Query: "SELECT 1 FROM {table} WHERE {timefilter} // {resolution} // {resolution->864}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Resolution: 2 * time.Minute, - Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC') // 60 // 840", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }} // {{ .Interval }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 720, // 2-minute resolution, + }, + Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC') // 120", }, { - Description: "select consolidated table better range", + Description: "select consolidated table with better range", Tables: []flowsTable{ {"flows", 0, time.Date(2022, 04, 10, 22, 45, 10, 0, time.UTC)}, {"flows_5m0s", 5 * time.Minute, time.Date(2022, 04, 2, 22, 45, 10, 0, time.UTC)}, {"flows_1m0s", time.Minute, time.Date(2022, 04, 10, 22, 45, 10, 0, time.UTC)}, }, - Query: "SELECT 1 FROM {table} WHERE {timefilter}", - Start: time.Date(2022, 04, 10, 15, 46, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 46, 10, 0, time.UTC), - Resolution: 2 * time.Minute, - Expected: "SELECT 1 FROM flows_5m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC')", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 46, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 46, 10, 0, time.UTC), + Points: 720, // 2-minute resolution, + }, + Expected: "SELECT 1 FROM flows_5m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC')", }, { Description: "select best resolution when equality for oldest data", Tables: []flowsTable{ @@ -184,11 +215,31 @@ func TestQueryFlowsTables(t *testing.T) { {"flows_1m0s", time.Minute, time.Date(2022, 04, 10, 22, 40, 00, 0, time.UTC)}, {"flows_1h0m0s", time.Hour, time.Date(2022, 04, 10, 22, 00, 10, 0, time.UTC)}, }, - Query: "SELECT 1 FROM {table} WHERE {timefilter}", - Start: time.Date(2022, 04, 10, 15, 46, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 46, 10, 0, time.UTC), - Resolution: 2 * time.Minute, - Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:46:00', 'UTC') AND toDateTime('2022-04-11 15:46:00', 'UTC')", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 46, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 46, 10, 0, time.UTC), + Points: 720, // 2-minute resolution, + }, + Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:46:00', 'UTC') AND toDateTime('2022-04-11 15:46:00', 'UTC')", + }, { + Description: "query with escaped template", + Query: `SELECT TimeReceived, SrcPort WHERE InIfDescription = '{{"{{"}} hello }}'`, + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 86400, + }, + Expected: `SELECT TimeReceived, SrcPort WHERE InIfDescription = '{{ hello }}'`, + }, { + Description: "use of ToStartOfInterval", + Query: `{{ call .ToStartOfInterval "TimeReceived" }}`, + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 720, + }, + Expected: `toStartOfInterval(TimeReceived + INTERVAL 50 second, INTERVAL 120 second) - INTERVAL 50 second`, }, } @@ -196,9 +247,10 @@ func TestQueryFlowsTables(t *testing.T) { for _, tc := range cases { t.Run(tc.Description, func(t *testing.T) { c.flowsTables = tc.Tables - got := c.queryFlowsTable(tc.Query, tc.MainTableRequired, tc.Start, tc.End, tc.Resolution) + got := c.finalizeQuery( + fmt.Sprintf(`{{ with %s }}%s{{ end }}`, templateContext(tc.Context), tc.Query)) if diff := helpers.Diff(got, tc.Expected); diff != "" { - t.Fatalf("queryFlowsTable(): (-got, +want):\n%s", diff) + t.Fatalf("finalizeQuery(): (-got, +want):\n%s", diff) } }) } diff --git a/console/frontend/src/views/HomePage/WidgetGraph.vue b/console/frontend/src/views/HomePage/WidgetGraph.vue index 432b18a2..6c9f7706 100644 --- a/console/frontend/src/views/HomePage/WidgetGraph.vue +++ b/console/frontend/src/views/HomePage/WidgetGraph.vue @@ -73,7 +73,7 @@ const options = computed(() => ({ }, data: (data.value?.data || []) .map(({ t, gbps }) => [t, gbps]) - .slice(1, -1), + .slice(0, -1), }, ], })); diff --git a/console/frontend/src/views/VisualizePage/DataGraphTimeSeries.vue b/console/frontend/src/views/VisualizePage/DataGraphTimeSeries.vue index 86016fbc..b2c1554a 100644 --- a/console/frontend/src/views/VisualizePage/DataGraphTimeSeries.vue +++ b/console/frontend/src/views/VisualizePage/DataGraphTimeSeries.vue @@ -126,7 +126,7 @@ const graph = computed(() => { (row, rowIdx) => row[timeIdx] * (data.axis[rowIdx] == 1 ? 1 : -1) ), ]) - .slice(1, -1), + .slice(0, -1), ], }, xAxis = { diff --git a/console/graph.go b/console/graph.go index a9ad9c22..57d9daca 100644 --- a/console/graph.go +++ b/console/graph.go @@ -19,7 +19,7 @@ import ( type graphHandlerInput struct { Start time.Time `json:"start" binding:"required"` End time.Time `json:"end" binding:"required,gtfield=Start"` - Points int `json:"points" binding:"required,min=5,max=2000"` // minimum number of points + Points uint `json:"points" binding:"required,min=5,max=2000"` // minimum number of points Dimensions []queryColumn `json:"dimensions"` // group by ... Limit int `json:"limit" binding:"min=1,max=50"` // limit product of dimensions Filter queryFilter `json:"filter"` // where ... @@ -54,28 +54,12 @@ func (input graphHandlerInput) reverseDirection() graphHandlerInput { } func (input graphHandlerInput) toSQL1(axis int, skipWith bool) string { - interval := int64((input.End.Sub(input.Start).Seconds())) / int64(input.Points) - slot := fmt.Sprintf(`{resolution->%d}`, interval) - - // Filter - where := input.Filter.Filter - if where == "" { - where = "{timefilter}" - } else { - where = fmt.Sprintf("{timefilter} AND (%s)", where) - } + where := templateWhere(input.Filter) // Select fields := []string{ - fmt.Sprintf(`toStartOfInterval(TimeReceived, INTERVAL %s second) AS time`, slot), - } - switch input.Units { - case "pps": - fields = append(fields, fmt.Sprintf(`SUM(Packets*SamplingRate/%s) AS xps`, slot)) - case "l3bps": - fields = append(fields, fmt.Sprintf(`SUM(Bytes*SamplingRate*8/%s) AS xps`, slot)) - case "l2bps": - fields = append(fields, fmt.Sprintf(`SUM((Bytes+18*Packets)*SamplingRate*8/%s) AS xps`, slot)) + `{{ call .ToStartOfInterval "TimeReceived" }} AS time`, + `{{ .Units }}/{{ .Interval }} AS xps`, } selectFields := []string{} dimensions := []string{} @@ -99,7 +83,7 @@ func (input graphHandlerInput) toSQL1(axis int, skipWith bool) string { with := []string{} if len(dimensions) > 0 && !skipWith { with = append(with, fmt.Sprintf( - "rows AS (SELECT %s FROM {table} WHERE %s GROUP BY %s ORDER BY SUM(Bytes) DESC LIMIT %d)", + "rows AS (SELECT %s FROM {{ .Table }} WHERE %s GROUP BY %s ORDER BY SUM(Bytes) DESC LIMIT %d)", strings.Join(dimensions, ", "), where, strings.Join(dimensions, ", "), @@ -107,34 +91,40 @@ func (input graphHandlerInput) toSQL1(axis int, skipWith bool) string { } withStr := "" if len(with) > 0 { - withStr = fmt.Sprintf("WITH\n %s", strings.Join(with, ",\n ")) + withStr = fmt.Sprintf("\nWITH\n %s", strings.Join(with, ",\n ")) } sqlQuery := fmt.Sprintf(` -%s +{{ with %s }}%s SELECT %d AS axis, * FROM ( SELECT %s -FROM {table} +FROM {{ .Table }} WHERE %s GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL %s second) - TO {timefilter.Stop} - STEP %s)`, withStr, axis, strings.Join(fields, ",\n "), where, slot, slot) - return sqlQuery + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, + templateContext(inputContext{ + Start: input.Start, + End: input.End, + MainTableRequired: requireMainTable(input.Dimensions, input.Filter), + Points: input.Points, + Units: input.Units, + }), + withStr, axis, strings.Join(fields, ",\n "), where) + return strings.TrimSpace(sqlQuery) } // graphHandlerInputToSQL converts a graph input to an SQL request func (input graphHandlerInput) toSQL() string { - result := input.toSQL1(1, false) + parts := []string{input.toSQL1(1, false)} if input.Bidirectional { - part2 := input.reverseDirection().toSQL1(2, true) - result = fmt.Sprintf(`%s -UNION ALL -%s`, result, strings.TrimSpace(part2)) + parts = append(parts, input.reverseDirection().toSQL1(2, true)) } - return strings.TrimSpace(result) + return strings.TrimSpace(strings.Join(parts, "\nUNION ALL\n")) } func (c *Component) graphHandlerFunc(gc *gin.Context) { @@ -146,12 +136,7 @@ func (c *Component) graphHandlerFunc(gc *gin.Context) { } sqlQuery := input.toSQL() - resolution := time.Duration(int64(input.End.Sub(input.Start).Nanoseconds()) / int64(input.Points)) - if resolution < time.Second { - resolution = time.Second - } - sqlQuery = c.queryFlowsTable(sqlQuery, requireMainTable(input.Dimensions, input.Filter), - input.Start, input.End, resolution) + sqlQuery = c.finalizeQuery(sqlQuery) gc.Header("X-SQL-Query", strings.ReplaceAll(sqlQuery, "\n", " ")) results := []struct { diff --git a/console/graph_test.go b/console/graph_test.go index 12e74f0a..27606aae 100644 --- a/console/graph_test.go +++ b/console/graph_test.go @@ -72,18 +72,20 @@ func TestGraphQuerySQL(t *testing.T) { Units: "l3bps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} SELECT 1 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, emptyArrayString() AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864})`, + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, }, { Description: "no dimensions, no filters, l2 bps", Input: graphHandlerInput{ @@ -95,18 +97,21 @@ ORDER BY time WITH FILL Units: "l2bps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l2bps"}@@ }} SELECT 1 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM((Bytes+18*Packets)*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, emptyArrayString() AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864})`, + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }} +`, }, { Description: "no dimensions, no filters, pps", Input: graphHandlerInput{ @@ -118,18 +123,20 @@ ORDER BY time WITH FILL Units: "pps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"pps"}@@ }} SELECT 1 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Packets*SamplingRate/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, emptyArrayString() AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864})`, + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, }, { Description: "no dimensions", Input: graphHandlerInput{ @@ -141,18 +148,45 @@ ORDER BY time WITH FILL Units: "l3bps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} SELECT 1 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, emptyArrayString() AS dimensions -FROM {table} -WHERE {timefilter} AND (DstCountry = 'FR' AND SrcCountry = 'US') +FROM {{ .Table }} +WHERE {{ .Timefilter }} AND (DstCountry = 'FR' AND SrcCountry = 'US') GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864})`, + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, + }, { + Description: "no dimensions, escaped filter", + Input: graphHandlerInput{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 100, + Dimensions: []queryColumn{}, + Filter: queryFilter{Filter: "InIfDescription = '{{ hello }}' AND SrcCountry = 'US'"}, + Units: "l3bps", + }, + Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} +SELECT 1 AS axis, * FROM ( +SELECT + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, + emptyArrayString() AS dimensions +FROM {{ .Table }} +WHERE {{ .Timefilter }} AND (InIfDescription = '{{"{{"}} hello }}' AND SrcCountry = 'US') +GROUP BY time, dimensions +ORDER BY time WITH FILL + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, }, { Description: "no dimensions, reverse direction", Input: graphHandlerInput{ @@ -168,31 +202,35 @@ ORDER BY time WITH FILL Bidirectional: true, }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} SELECT 1 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, emptyArrayString() AS dimensions -FROM {table} -WHERE {timefilter} AND (DstCountry = 'FR' AND SrcCountry = 'US') +FROM {{ .Table }} +WHERE {{ .Timefilter }} AND (DstCountry = 'FR' AND SrcCountry = 'US') GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864}) + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }} UNION ALL +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} SELECT 2 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, emptyArrayString() AS dimensions -FROM {table} -WHERE {timefilter} AND (SrcCountry = 'FR' AND DstCountry = 'US') +FROM {{ .Table }} +WHERE {{ .Timefilter }} AND (SrcCountry = 'FR' AND DstCountry = 'US') GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864})`, + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, }, { Description: "no filters", Input: graphHandlerInput{ @@ -208,20 +246,22 @@ ORDER BY time WITH FILL Units: "l3bps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} WITH - rows AS (SELECT ExporterName, InIfProvider FROM {table} WHERE {timefilter} GROUP BY ExporterName, InIfProvider ORDER BY SUM(Bytes) DESC LIMIT 20) + rows AS (SELECT ExporterName, InIfProvider FROM {{ .Table }} WHERE {{ .Timefilter }} GROUP BY ExporterName, InIfProvider ORDER BY SUM(Bytes) DESC LIMIT 20) SELECT 1 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, if((ExporterName, InIfProvider) IN rows, [ExporterName, InIfProvider], ['Other', 'Other']) AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864})`, + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, }, { Description: "no filters, reverse", Input: graphHandlerInput{ @@ -238,36 +278,41 @@ ORDER BY time WITH FILL Bidirectional: true, }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} WITH - rows AS (SELECT ExporterName, InIfProvider FROM {table} WHERE {timefilter} GROUP BY ExporterName, InIfProvider ORDER BY SUM(Bytes) DESC LIMIT 20) + rows AS (SELECT ExporterName, InIfProvider FROM {{ .Table }} WHERE {{ .Timefilter }} GROUP BY ExporterName, InIfProvider ORDER BY SUM(Bytes) DESC LIMIT 20) SELECT 1 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, if((ExporterName, InIfProvider) IN rows, [ExporterName, InIfProvider], ['Other', 'Other']) AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864}) + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }} UNION ALL +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} SELECT 2 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, if((ExporterName, OutIfProvider) IN rows, [ExporterName, OutIfProvider], ['Other', 'Other']) AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864})`, + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, }, } for _, tc := range cases { + tc.Expected = strings.ReplaceAll(tc.Expected, "@@", "`") t.Run(tc.Description, func(t *testing.T) { got := tc.Input.toSQL() if diff := helpers.Diff(strings.Split(strings.TrimSpace(got), "\n"), diff --git a/console/root.go b/console/root.go index 29f77c5b..a5657b7f 100644 --- a/console/root.go +++ b/console/root.go @@ -5,7 +5,6 @@ package console import ( - "html/template" "io/fs" netHTTP "net/http" "os" @@ -13,6 +12,7 @@ import ( "path/filepath" "runtime" "sync" + "text/template" "time" "github.com/benbjohnson/clock" diff --git a/console/sankey.go b/console/sankey.go index e7da082c..d654300b 100644 --- a/console/sankey.go +++ b/console/sankey.go @@ -42,13 +42,7 @@ type sankeyLink struct { // sankeyHandlerInputToSQL converts a sankey query to an SQL request func (input sankeyHandlerInput) toSQL() (string, error) { - // Filter - where := input.Filter.Filter - if where == "" { - where = "{timefilter}" - } else { - where = fmt.Sprintf("{timefilter} AND (%s)", where) - } + where := templateWhere(input.Filter) // Select arrayFields := []string{} @@ -60,22 +54,16 @@ func (input sankeyHandlerInput) toSQL() (string, error) { column.toSQLSelect())) dimensions = append(dimensions, column.String()) } - fields := []string{} - switch input.Units { - case "pps": - fields = append(fields, `SUM(Packets*SamplingRate/range) AS xps`) - case "l3bps": - fields = append(fields, `SUM(Bytes*SamplingRate*8/range) AS xps`) - case "l2bps": - fields = append(fields, `SUM((Bytes+18*Packets)*SamplingRate*8/range) AS xps`) + fields := []string{ + `{{ .Units }}/range AS xps`, + fmt.Sprintf("[%s] AS dimensions", strings.Join(arrayFields, ",\n ")), } - fields = append(fields, fmt.Sprintf("[%s] AS dimensions", strings.Join(arrayFields, ",\n "))) // With with := []string{ - fmt.Sprintf(`(SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE %s) AS range`, where), + fmt.Sprintf(`(SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {{ .Table }} WHERE %s) AS range`, where), fmt.Sprintf( - "rows AS (SELECT %s FROM {table} WHERE %s GROUP BY %s ORDER BY SUM(Bytes) DESC LIMIT %d)", + "rows AS (SELECT %s FROM {{ .Table }} WHERE %s GROUP BY %s ORDER BY SUM(Bytes) DESC LIMIT %d)", strings.Join(dimensions, ", "), where, strings.Join(dimensions, ", "), @@ -83,15 +71,25 @@ func (input sankeyHandlerInput) toSQL() (string, error) { } sqlQuery := fmt.Sprintf(` +{{ with %s }} WITH %s SELECT %s -FROM {table} +FROM {{ .Table }} WHERE %s GROUP BY dimensions -ORDER BY xps DESC`, strings.Join(with, ",\n "), strings.Join(fields, ",\n "), where) - return sqlQuery, nil +ORDER BY xps DESC +{{ end }}`, + templateContext(inputContext{ + Start: input.Start, + End: input.End, + MainTableRequired: requireMainTable(input.Dimensions, input.Filter), + Points: 20, + Units: input.Units, + }), + strings.Join(with, ",\n "), strings.Join(fields, ",\n "), where) + return strings.TrimSpace(sqlQuery), nil } func (c *Component) sankeyHandlerFunc(gc *gin.Context) { @@ -108,15 +106,8 @@ func (c *Component) sankeyHandlerFunc(gc *gin.Context) { return } - // We need to select a resolution allowing us to have a somewhat accurate timespan - resolution := time.Duration(int64(input.End.Sub(input.Start).Nanoseconds()) / 20) - if resolution < time.Second { - resolution = time.Second - } - // Prepare and execute query - sqlQuery = c.queryFlowsTable(sqlQuery, requireMainTable(input.Dimensions, input.Filter), - input.Start, input.End, resolution) + sqlQuery = c.finalizeQuery(sqlQuery) gc.Header("X-SQL-Query", strings.ReplaceAll(sqlQuery, "\n", " ")) results := []struct { Xps float64 `ch:"xps"` diff --git a/console/sankey_test.go b/console/sankey_test.go index 4d0ae8e7..7bdfa4b6 100644 --- a/console/sankey_test.go +++ b/console/sankey_test.go @@ -31,17 +31,19 @@ func TestSankeyQuerySQL(t *testing.T) { Units: "l3bps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":20,"units":"l3bps"}@@ }} WITH - (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE {timefilter}) AS range, - rows AS (SELECT SrcAS, ExporterName FROM {table} WHERE {timefilter} GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 5) + (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {{ .Table }} WHERE {{ .Timefilter }}) AS range, + rows AS (SELECT SrcAS, ExporterName FROM {{ .Table }} WHERE {{ .Timefilter }} GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 5) SELECT - SUM(Bytes*SamplingRate*8/range) AS xps, + {{ .Units }}/range AS xps, [if(SrcAS IN (SELECT SrcAS FROM rows), concat(toString(SrcAS), ': ', dictGetOrDefault('asns', 'name', SrcAS, '???')), 'Other'), if(ExporterName IN (SELECT ExporterName FROM rows), ExporterName, 'Other')] AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY dimensions -ORDER BY xps DESC`, +ORDER BY xps DESC +{{ end }}`, }, { Description: "two dimensions, no filters, l2 bps", Input: sankeyHandlerInput{ @@ -53,17 +55,20 @@ ORDER BY xps DESC`, Units: "l2bps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":20,"units":"l2bps"}@@ }} WITH - (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE {timefilter}) AS range, - rows AS (SELECT SrcAS, ExporterName FROM {table} WHERE {timefilter} GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 5) + (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {{ .Table }} WHERE {{ .Timefilter }}) AS range, + rows AS (SELECT SrcAS, ExporterName FROM {{ .Table }} WHERE {{ .Timefilter }} GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 5) SELECT - SUM((Bytes+18*Packets)*SamplingRate*8/range) AS xps, + {{ .Units }}/range AS xps, [if(SrcAS IN (SELECT SrcAS FROM rows), concat(toString(SrcAS), ': ', dictGetOrDefault('asns', 'name', SrcAS, '???')), 'Other'), if(ExporterName IN (SELECT ExporterName FROM rows), ExporterName, 'Other')] AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY dimensions -ORDER BY xps DESC`, +ORDER BY xps DESC +{{ end }} +`, }, { Description: "two dimensions, no filters, pps", Input: sankeyHandlerInput{ @@ -75,17 +80,19 @@ ORDER BY xps DESC`, Units: "pps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":20,"units":"pps"}@@ }} WITH - (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE {timefilter}) AS range, - rows AS (SELECT SrcAS, ExporterName FROM {table} WHERE {timefilter} GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 5) + (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {{ .Table }} WHERE {{ .Timefilter }}) AS range, + rows AS (SELECT SrcAS, ExporterName FROM {{ .Table }} WHERE {{ .Timefilter }} GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 5) SELECT - SUM(Packets*SamplingRate/range) AS xps, + {{ .Units }}/range AS xps, [if(SrcAS IN (SELECT SrcAS FROM rows), concat(toString(SrcAS), ': ', dictGetOrDefault('asns', 'name', SrcAS, '???')), 'Other'), if(ExporterName IN (SELECT ExporterName FROM rows), ExporterName, 'Other')] AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY dimensions -ORDER BY xps DESC`, +ORDER BY xps DESC +{{ end }}`, }, { Description: "two dimensions, with filter", Input: sankeyHandlerInput{ @@ -97,23 +104,27 @@ ORDER BY xps DESC`, Units: "l3bps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":20,"units":"l3bps"}@@ }} WITH - (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE {timefilter} AND (DstCountry = 'FR')) AS range, - rows AS (SELECT SrcAS, ExporterName FROM {table} WHERE {timefilter} AND (DstCountry = 'FR') GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 10) + (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {{ .Table }} WHERE {{ .Timefilter }} AND (DstCountry = 'FR')) AS range, + rows AS (SELECT SrcAS, ExporterName FROM {{ .Table }} WHERE {{ .Timefilter }} AND (DstCountry = 'FR') GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 10) SELECT - SUM(Bytes*SamplingRate*8/range) AS xps, + {{ .Units }}/range AS xps, [if(SrcAS IN (SELECT SrcAS FROM rows), concat(toString(SrcAS), ': ', dictGetOrDefault('asns', 'name', SrcAS, '???')), 'Other'), if(ExporterName IN (SELECT ExporterName FROM rows), ExporterName, 'Other')] AS dimensions -FROM {table} -WHERE {timefilter} AND (DstCountry = 'FR') +FROM {{ .Table }} +WHERE {{ .Timefilter }} AND (DstCountry = 'FR') GROUP BY dimensions -ORDER BY xps DESC`, +ORDER BY xps DESC +{{ end }}`, }, } for _, tc := range cases { + tc.Expected = strings.ReplaceAll(tc.Expected, "@@", "`") t.Run(tc.Description, func(t *testing.T) { got, _ := tc.Input.toSQL() - if diff := helpers.Diff(strings.Split(got, "\n"), strings.Split(tc.Expected, "\n")); diff != "" { + if diff := helpers.Diff(strings.Split(strings.TrimSpace(got), "\n"), + strings.Split(strings.TrimSpace(tc.Expected), "\n")); diff != "" { t.Errorf("toSQL (-got, +want):\n%s", diff) } }) diff --git a/console/widgets.go b/console/widgets.go index 8b03a2ee..1e74c744 100644 --- a/console/widgets.go +++ b/console/widgets.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "reflect" + "strings" "time" "akvorado/common/helpers" @@ -151,23 +152,31 @@ func (c *Component) widgetTopHandlerFunc(gc *gin.Context) { } now := c.d.Clock.Now() - query := c.queryFlowsTable(fmt.Sprintf(` + query := c.finalizeQuery(fmt.Sprintf(` +{{ with %s }} WITH - (SELECT SUM(Bytes*SamplingRate) FROM {table} WHERE {timefilter} %s) AS Total + (SELECT SUM(Bytes*SamplingRate) FROM {{ .Table }} WHERE {{ .Timefilter }} %s) AS Total SELECT if(empty(%s),'Unknown',%s) AS Name, SUM(Bytes*SamplingRate) / Total * 100 AS Percent -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} %s GROUP BY %s ORDER BY Percent DESC LIMIT 5 -`, filter, selector, selector, filter, groupby), mainTableRequired, now.Add(-5*time.Minute), now, time.Minute) +{{ end }}`, + templateContext(inputContext{ + Start: now.Add(-5 * time.Minute), + End: now, + MainTableRequired: mainTableRequired, + Points: 5, + }), + filter, selector, selector, filter, groupby)) gc.Header("X-SQL-Query", query) results := []topResult{} - err := c.d.ClickHouseDB.Conn.Select(ctx, &results, query) + err := c.d.ClickHouseDB.Conn.Select(ctx, &results, strings.TrimSpace(query)) if err != nil { c.r.Err(err).Msg("unable to query database") gc.JSON(http.StatusInternalServerError, gin.H{"message": "Unable to query database."}) @@ -177,7 +186,7 @@ LIMIT 5 } type widgetParameters struct { - Points uint64 `form:"points" binding:"isdefault|min=5,max=1000"` + Points uint `form:"points" binding:"isdefault|min=5,max=1000"` } func (c *Component) widgetGraphHandlerFunc(gc *gin.Context) { @@ -191,28 +200,34 @@ func (c *Component) widgetGraphHandlerFunc(gc *gin.Context) { if params.Points == 0 { params.Points = 200 } - interval := int64((24 * time.Hour).Seconds()) / int64(params.Points) - slot := fmt.Sprintf(`{resolution->%d}`, interval) now := c.d.Clock.Now() - query := c.queryFlowsTable(fmt.Sprintf(` + query := c.finalizeQuery(fmt.Sprintf(` +{{ with %s }} SELECT - toStartOfInterval(TimeReceived, INTERVAL %s second) AS Time, - SUM(Bytes*SamplingRate*8/%s)/1000/1000/1000 AS Gbps -FROM {table} -WHERE {timefilter} + {{ call .ToStartOfInterval "TimeReceived" }} AS Time, + SUM(Bytes*SamplingRate*8/{{ .Interval }})/1000/1000/1000 AS Gbps +FROM {{ .Table }} +WHERE {{ .Timefilter }} AND InIfBoundary = 'external' GROUP BY Time ORDER BY Time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL %s second) - TO {timefilter.Stop} - STEP %s`, slot, slot, slot, slot), false, now.Add(-24*time.Hour), now, time.Duration(interval)*time.Second) + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }} +{{ end }}`, + templateContext(inputContext{ + Start: now.Add(-24 * time.Hour), + End: now, + MainTableRequired: false, + Points: params.Points, + }))) gc.Header("X-SQL-Query", query) results := []struct { Time time.Time `json:"t"` Gbps float64 `json:"gbps"` }{} - err := c.d.ClickHouseDB.Conn.Select(ctx, &results, query) + err := c.d.ClickHouseDB.Conn.Select(ctx, &results, strings.TrimSpace(query)) if err != nil { c.r.Err(err).Msg("unable to query database") gc.JSON(http.StatusInternalServerError, gin.H{"message": "Unable to query database."}) diff --git a/console/widgets_test.go b/console/widgets_test.go index 1a380675..77d6dc4a 100644 --- a/console/widgets_test.go +++ b/console/widgets_test.go @@ -6,6 +6,7 @@ package console import ( "net" "reflect" + "strings" "testing" "time" @@ -229,18 +230,18 @@ func TestWidgetGraph(t *testing.T) { {base.Add(5 * time.Minute), 24.7}, } mockConn.EXPECT(). - Select(gomock.Any(), gomock.Any(), ` + Select(gomock.Any(), gomock.Any(), strings.TrimSpace(` SELECT - toStartOfInterval(TimeReceived, INTERVAL 864 second) AS Time, + toStartOfInterval(TimeReceived + INTERVAL 144 second, INTERVAL 864 second) - INTERVAL 144 second AS Time, SUM(Bytes*SamplingRate*8/864)/1000/1000/1000 AS Gbps FROM flows WHERE TimeReceived BETWEEN toDateTime('2009-11-10 23:00:00', 'UTC') AND toDateTime('2009-11-11 23:00:00', 'UTC') AND InIfBoundary = 'external' GROUP BY Time ORDER BY Time WITH FILL - FROM toStartOfInterval(toDateTime('2009-11-10 23:00:00', 'UTC'), INTERVAL 864 second) + FROM toDateTime('2009-11-10 23:00:00', 'UTC') TO toDateTime('2009-11-11 23:00:00', 'UTC') - STEP 864`). + STEP 864`)). SetArg(1, expected). Return(nil)