diff --git a/console/clickhouse.go b/console/clickhouse.go index ee42e8ce..f859da58 100644 --- a/console/clickhouse.go +++ b/console/clickhouse.go @@ -4,16 +4,15 @@ package console import ( + "bytes" + "encoding/json" "errors" "fmt" - "regexp" - "strconv" "strings" + "text/template" "time" ) -var resolutionRegexp = regexp.MustCompile(`{resolution->(\d+)}`) - // flowsTable describe a consolidated or unconsolidated flows table. type flowsTable struct { Name string @@ -21,95 +20,6 @@ type flowsTable struct { Oldest time.Time } -// Build a query against the flows table or one of the consolidated -// version depending on the information needed. The provided query -// should contain `{table}` which will be replaced by the appropriate -// flows table and {timefilter} which will be replaced by the -// appropriate time filter. -func (c *Component) queryFlowsTable(query string, mainTableRequired bool, start, end time.Time, targetResolution time.Duration) string { - c.flowsTablesLock.RLock() - defer c.flowsTablesLock.RUnlock() - - // Select table - table := "flows" - resolution := time.Second - if !mainTableRequired { - // We can use the consolidated data. The first - // criteria is to find the tables matching the time - // criteria. - candidates := []int{} - for idx, table := range c.flowsTables { - if start.After(table.Oldest.Add(table.Resolution)) { - candidates = append(candidates, idx) - } - } - if len(candidates) == 0 { - // No candidate, fallback to the one with oldest data - best := 0 - for idx, table := range c.flowsTables { - if c.flowsTables[best].Oldest.After(table.Oldest.Add(table.Resolution)) { - best = idx - } - } - candidates = []int{best} - // Add other candidates that are not far off in term of oldest data - for idx, table := range c.flowsTables { - if idx == best { - continue - } - if c.flowsTables[best].Oldest.After(table.Oldest) { - candidates = append(candidates, idx) - } - } - } - if len(candidates) > 1 { - // Use resolution to find the best one - best := 0 - for _, idx := range candidates { - if c.flowsTables[idx].Resolution > targetResolution { - continue - } - if c.flowsTables[idx].Resolution > c.flowsTables[best].Resolution { - best = idx - } - } - candidates = []int{best} - } - table = c.flowsTables[candidates[0]].Name - resolution = c.flowsTables[candidates[0]].Resolution - } - if resolution == 0 { - resolution = time.Second - } - - // Build timefilter to match the resolution - start = start.Truncate(resolution) - end = end.Truncate(resolution) - timeFilterStart := fmt.Sprintf(`toDateTime('%s', 'UTC')`, start.UTC().Format("2006-01-02 15:04:05")) - timeFilterStop := fmt.Sprintf(`toDateTime('%s', 'UTC')`, end.UTC().Format("2006-01-02 15:04:05")) - timeFilter := fmt.Sprintf(`TimeReceived BETWEEN %s AND %s`, timeFilterStart, timeFilterStop) - - c.metrics.clickhouseQueries.WithLabelValues(table).Inc() - query = strings.ReplaceAll(query, "{timefilter}", timeFilter) - query = strings.ReplaceAll(query, "{timefilter.Start}", timeFilterStart) - query = strings.ReplaceAll(query, "{timefilter.Stop}", timeFilterStop) - query = strings.ReplaceAll(query, "{table}", table) - query = strings.ReplaceAll(query, "{resolution}", strconv.Itoa(int(resolution.Seconds()))) - query = resolutionRegexp.ReplaceAllStringFunc(query, func(in string) string { - matches := resolutionRegexp.FindStringSubmatch(in) - target, err := strconv.Atoi(matches[1]) - if err != nil { - panic(err) - } - target = target / int(resolution.Seconds()) * int(resolution.Seconds()) - if target < 1 { - target = 1 - } - return strconv.Itoa(target) - }) - return query -} - // refreshFlowsTables refreshes the information we have about flows // tables (live one and consolidated ones). This information includes // the consolidation interval and the oldest available data. @@ -166,3 +76,184 @@ AND engine LIKE '%MergeTree' c.flowsTablesLock.Unlock() return nil } + +// finalizeQuery builds the finalized query. A single "context" +// function is provided to return a `Context` struct with all the +// information needed. +func (c *Component) finalizeQuery(query string) string { + t := template.Must(template.New("query"). + Funcs(template.FuncMap{ + "context": c.contextFunc, + }). + Option("missingkey=error"). + Parse(strings.TrimSpace(query))) + buf := bytes.NewBufferString("") + if err := t.Execute(buf, nil); err != nil { + c.r.Err(err).Str("query", query).Msg("invalid query") + panic(err) + } + return buf.String() +} + +type inputContext struct { + Start time.Time `json:"start"` + End time.Time `json:"end"` + MainTableRequired bool `json:"main-table-required,omitempty"` + Points uint `json:"points"` + Units string `json:"units,omitempty"` +} + +type context struct { + Table string + Timefilter string + TimefilterStart string + TimefilterEnd string + Units string + Interval uint64 + ToStartOfInterval func(string) string +} + +// templateEscape escapes `{{` and `}}` from a string. In fact, only +// the opening tag needs to be escaped. +func templateEscape(input string) string { + return strings.ReplaceAll(input, `{{`, `{{"{{"}}`) +} + +// templateDate turns a date into an UTC string compatible with ClickHouse. +func templateDate(input time.Time) string { + return input.UTC().Format("2006-01-02 15:04:05") +} + +// templateWhere transforms a filter to a WHERE clause +func templateWhere(qf queryFilter) string { + if qf.Filter == "" { + return `{{ .Timefilter }}` + } + return fmt.Sprintf(`{{ .Timefilter }} AND (%s)`, templateEscape(qf.Filter)) +} + +// templateTable builds a template directive to select the right table +func templateContext(context inputContext) string { + encoded, err := json.Marshal(context) + if err != nil { + panic(err) + } + return fmt.Sprintf("context `%s`", string(encoded)) +} + +func (c *Component) contextFunc(inputStr string) context { + var input inputContext + if err := json.Unmarshal([]byte(inputStr), &input); err != nil { + panic(err) + } + + targetInterval := time.Duration(uint64(input.End.Sub(input.Start)) / uint64(input.Points)) + if targetInterval < time.Second { + targetInterval = time.Second + } + + c.flowsTablesLock.RLock() + defer c.flowsTablesLock.RUnlock() + + // Select table + table := "flows" + computedInterval := time.Second + if !input.MainTableRequired && len(c.flowsTables) > 0 { + // We can use the consolidated data. The first + // criteria is to find the tables matching the time + // criteria. + candidates := []int{} + for idx, table := range c.flowsTables { + if input.Start.After(table.Oldest.Add(table.Resolution)) { + candidates = append(candidates, idx) + } + } + if len(candidates) == 0 { + // No candidate, fallback to the one with oldest data + best := 0 + for idx, table := range c.flowsTables { + if c.flowsTables[best].Oldest.After(table.Oldest.Add(table.Resolution)) { + best = idx + } + } + candidates = []int{best} + // Add other candidates that are not far off in term of oldest data + for idx, table := range c.flowsTables { + if idx == best { + continue + } + if c.flowsTables[best].Oldest.After(table.Oldest) { + candidates = append(candidates, idx) + } + } + } + if len(candidates) > 1 { + // Use interval to find the best one + best := 0 + for _, idx := range candidates { + if c.flowsTables[idx].Resolution > targetInterval { + continue + } + if c.flowsTables[idx].Resolution > c.flowsTables[best].Resolution { + best = idx + } + } + candidates = []int{best} + } + table = c.flowsTables[candidates[0]].Name + computedInterval = c.flowsTables[candidates[0]].Resolution + } + if computedInterval < time.Second { + computedInterval = time.Second + } + + // Make start/end match the computed interval (currently equal to the table resolution) + start := input.Start.Truncate(computedInterval) + end := input.End.Truncate(computedInterval) + // Adapt the computed interval to match the target one more closely + if targetInterval > computedInterval { + computedInterval = targetInterval.Truncate(computedInterval) + } + // Adapt end to ensure we get a full interval + end = start.Add(end.Sub(start).Truncate(computedInterval)) + // Now, toStartOfInterval will provide an incorrect value. We + // compute a correction offset. Go's truncate seems to + // be different from what we expect. + computedIntervalOffset := start.UTC().Sub( + time.Unix(start.UTC().Unix()/ + int64(computedInterval.Seconds())* + int64(computedInterval.Seconds()), 0)) + diffOffset := uint64(computedInterval.Seconds()) - uint64(computedIntervalOffset.Seconds()) + + // Compute all strings + timefilterStart := fmt.Sprintf(`toDateTime('%s', 'UTC')`, start.UTC().Format("2006-01-02 15:04:05")) + timefilterEnd := fmt.Sprintf(`toDateTime('%s', 'UTC')`, end.UTC().Format("2006-01-02 15:04:05")) + timefilter := fmt.Sprintf(`TimeReceived BETWEEN %s AND %s`, timefilterStart, timefilterEnd) + var units string + switch input.Units { + case "pps": + units = `SUM(Packets*SamplingRate)` + case "l3bps": + units = `SUM(Bytes*SamplingRate*8)` + case "l2bps": + units = `SUM((Bytes+18*Packets)*SamplingRate*8)` + } + + c.metrics.clickhouseQueries.WithLabelValues(table).Inc() + return context{ + Table: table, + Timefilter: timefilter, + TimefilterStart: timefilterStart, + TimefilterEnd: timefilterEnd, + Units: units, + Interval: uint64(computedInterval.Seconds()), + ToStartOfInterval: func(field string) string { + return fmt.Sprintf( + `toStartOfInterval(%s + INTERVAL %d second, INTERVAL %d second) - INTERVAL %d second`, + field, + diffOffset, + uint64(computedInterval.Seconds()), + diffOffset) + }, + } +} diff --git a/console/clickhouse_test.go b/console/clickhouse_test.go index 2095fd83..7a5e094e 100644 --- a/console/clickhouse_test.go +++ b/console/clickhouse_test.go @@ -4,6 +4,7 @@ package console import ( + "fmt" "testing" "time" @@ -70,113 +71,143 @@ AND engine LIKE '%MergeTree' } } -func TestQueryFlowsTables(t *testing.T) { +func TestFinalizeQuery(t *testing.T) { cases := []struct { - Description string - Tables []flowsTable - Query string - MainTableRequired bool - Start time.Time - End time.Time - Resolution time.Duration - Expected string + Description string + Tables []flowsTable + Query string + Context inputContext + Expected string }{ { - Description: "query with source port", - Query: "SELECT TimeReceived, SrcPort FROM {table} WHERE {timefilter}", - MainTableRequired: true, - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Expected: "SELECT TimeReceived, SrcPort FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", + Description: "simple query without additional tables", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 86400, + }, + Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", + }, { + Description: "query with source port", + Query: "SELECT TimeReceived, SrcPort FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + MainTableRequired: true, + Points: 86400, + }, + Expected: "SELECT TimeReceived, SrcPort FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", }, { Description: "only flows table available", Tables: []flowsTable{{"flows", 0, time.Date(2022, 03, 10, 15, 45, 10, 0, time.UTC)}}, - Query: "SELECT 1 FROM {table} WHERE {timefilter}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 86400, + }, + Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", }, { Description: "timefilter.Start and timefilter.Stop", Tables: []flowsTable{{"flows", 0, time.Date(2022, 03, 10, 15, 45, 10, 0, time.UTC)}}, - Query: "SELECT {timefilter.Start}, {timefilter.Stop}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Expected: "SELECT toDateTime('2022-04-10 15:45:10', 'UTC'), toDateTime('2022-04-11 15:45:10', 'UTC')", + Query: "SELECT {{ .TimefilterStart }}, {{ .TimefilterEnd }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 86400, + }, + Expected: "SELECT toDateTime('2022-04-10 15:45:10', 'UTC'), toDateTime('2022-04-11 15:45:10', 'UTC')", }, { Description: "only flows table and out of range request", Tables: []flowsTable{{"flows", 0, time.Date(2022, 04, 10, 22, 45, 10, 0, time.UTC)}}, - Query: "SELECT 1 FROM {table} WHERE {timefilter}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 86400, + }, + Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", }, { Description: "select consolidated table", Tables: []flowsTable{ {"flows", 0, time.Date(2022, 03, 10, 22, 45, 10, 0, time.UTC)}, {"flows_1m0s", time.Minute, time.Date(2022, 04, 2, 22, 45, 10, 0, time.UTC)}, }, - Query: "SELECT 1 FROM {table} WHERE {timefilter}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Resolution: 2 * time.Minute, - Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC')", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }} // {{ .Interval }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 720, // 2-minute resolution + }, + Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC') // 120", }, { Description: "select consolidated table out of range", Tables: []flowsTable{ {"flows", 0, time.Date(2022, 04, 10, 22, 45, 10, 0, time.UTC)}, {"flows_1m0s", time.Minute, time.Date(2022, 04, 10, 17, 45, 10, 0, time.UTC)}, }, - Query: "SELECT 1 FROM {table} WHERE {timefilter}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Resolution: 2 * time.Minute, - Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC')", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 720, // 2-minute resolution, + }, + Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC')", }, { Description: "select flows table out of range", Tables: []flowsTable{ {"flows", 0, time.Date(2022, 04, 10, 16, 45, 10, 0, time.UTC)}, {"flows_1m0s", time.Minute, time.Date(2022, 04, 10, 17, 45, 10, 0, time.UTC)}, }, - Query: "SELECT 1 FROM {table} WHERE {timefilter}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Resolution: 2 * time.Minute, - Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 720, // 2-minute resolution, + }, + Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')", }, { - Description: "select flows table better resolution", + Description: "select flows table with better resolution", Tables: []flowsTable{ {"flows", 0, time.Date(2022, 03, 10, 16, 45, 10, 0, time.UTC)}, {"flows_1m0s", time.Minute, time.Date(2022, 03, 10, 17, 45, 10, 0, time.UTC)}, }, - Query: "SELECT 1 FROM {table} WHERE {timefilter} // {resolution} // {resolution->864}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Resolution: 30 * time.Second, - Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC') // 1 // 864", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }} // {{ .Interval }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 2880, + }, + Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC') // 30", }, { - Description: "select consolidated table better resolution", + Description: "select consolidated table with better resolution", Tables: []flowsTable{ {"flows", 0, time.Date(2022, 03, 10, 22, 45, 10, 0, time.UTC)}, {"flows_5m0s", 5 * time.Minute, time.Date(2022, 04, 2, 22, 45, 10, 0, time.UTC)}, {"flows_1m0s", time.Minute, time.Date(2022, 04, 2, 22, 45, 10, 0, time.UTC)}, }, - Query: "SELECT 1 FROM {table} WHERE {timefilter} // {resolution} // {resolution->864}", - Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), - Resolution: 2 * time.Minute, - Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC') // 60 // 840", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }} // {{ .Interval }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 720, // 2-minute resolution, + }, + Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC') // 120", }, { - Description: "select consolidated table better range", + Description: "select consolidated table with better range", Tables: []flowsTable{ {"flows", 0, time.Date(2022, 04, 10, 22, 45, 10, 0, time.UTC)}, {"flows_5m0s", 5 * time.Minute, time.Date(2022, 04, 2, 22, 45, 10, 0, time.UTC)}, {"flows_1m0s", time.Minute, time.Date(2022, 04, 10, 22, 45, 10, 0, time.UTC)}, }, - Query: "SELECT 1 FROM {table} WHERE {timefilter}", - Start: time.Date(2022, 04, 10, 15, 46, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 46, 10, 0, time.UTC), - Resolution: 2 * time.Minute, - Expected: "SELECT 1 FROM flows_5m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC')", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 46, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 46, 10, 0, time.UTC), + Points: 720, // 2-minute resolution, + }, + Expected: "SELECT 1 FROM flows_5m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC')", }, { Description: "select best resolution when equality for oldest data", Tables: []flowsTable{ @@ -184,11 +215,31 @@ func TestQueryFlowsTables(t *testing.T) { {"flows_1m0s", time.Minute, time.Date(2022, 04, 10, 22, 40, 00, 0, time.UTC)}, {"flows_1h0m0s", time.Hour, time.Date(2022, 04, 10, 22, 00, 10, 0, time.UTC)}, }, - Query: "SELECT 1 FROM {table} WHERE {timefilter}", - Start: time.Date(2022, 04, 10, 15, 46, 10, 0, time.UTC), - End: time.Date(2022, 04, 11, 15, 46, 10, 0, time.UTC), - Resolution: 2 * time.Minute, - Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:46:00', 'UTC') AND toDateTime('2022-04-11 15:46:00', 'UTC')", + Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}", + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 46, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 46, 10, 0, time.UTC), + Points: 720, // 2-minute resolution, + }, + Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:46:00', 'UTC') AND toDateTime('2022-04-11 15:46:00', 'UTC')", + }, { + Description: "query with escaped template", + Query: `SELECT TimeReceived, SrcPort WHERE InIfDescription = '{{"{{"}} hello }}'`, + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 86400, + }, + Expected: `SELECT TimeReceived, SrcPort WHERE InIfDescription = '{{ hello }}'`, + }, { + Description: "use of ToStartOfInterval", + Query: `{{ call .ToStartOfInterval "TimeReceived" }}`, + Context: inputContext{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 720, + }, + Expected: `toStartOfInterval(TimeReceived + INTERVAL 50 second, INTERVAL 120 second) - INTERVAL 50 second`, }, } @@ -196,9 +247,10 @@ func TestQueryFlowsTables(t *testing.T) { for _, tc := range cases { t.Run(tc.Description, func(t *testing.T) { c.flowsTables = tc.Tables - got := c.queryFlowsTable(tc.Query, tc.MainTableRequired, tc.Start, tc.End, tc.Resolution) + got := c.finalizeQuery( + fmt.Sprintf(`{{ with %s }}%s{{ end }}`, templateContext(tc.Context), tc.Query)) if diff := helpers.Diff(got, tc.Expected); diff != "" { - t.Fatalf("queryFlowsTable(): (-got, +want):\n%s", diff) + t.Fatalf("finalizeQuery(): (-got, +want):\n%s", diff) } }) } diff --git a/console/frontend/src/views/HomePage/WidgetGraph.vue b/console/frontend/src/views/HomePage/WidgetGraph.vue index 432b18a2..6c9f7706 100644 --- a/console/frontend/src/views/HomePage/WidgetGraph.vue +++ b/console/frontend/src/views/HomePage/WidgetGraph.vue @@ -73,7 +73,7 @@ const options = computed(() => ({ }, data: (data.value?.data || []) .map(({ t, gbps }) => [t, gbps]) - .slice(1, -1), + .slice(0, -1), }, ], })); diff --git a/console/frontend/src/views/VisualizePage/DataGraphTimeSeries.vue b/console/frontend/src/views/VisualizePage/DataGraphTimeSeries.vue index 86016fbc..b2c1554a 100644 --- a/console/frontend/src/views/VisualizePage/DataGraphTimeSeries.vue +++ b/console/frontend/src/views/VisualizePage/DataGraphTimeSeries.vue @@ -126,7 +126,7 @@ const graph = computed(() => { (row, rowIdx) => row[timeIdx] * (data.axis[rowIdx] == 1 ? 1 : -1) ), ]) - .slice(1, -1), + .slice(0, -1), ], }, xAxis = { diff --git a/console/graph.go b/console/graph.go index a9ad9c22..57d9daca 100644 --- a/console/graph.go +++ b/console/graph.go @@ -19,7 +19,7 @@ import ( type graphHandlerInput struct { Start time.Time `json:"start" binding:"required"` End time.Time `json:"end" binding:"required,gtfield=Start"` - Points int `json:"points" binding:"required,min=5,max=2000"` // minimum number of points + Points uint `json:"points" binding:"required,min=5,max=2000"` // minimum number of points Dimensions []queryColumn `json:"dimensions"` // group by ... Limit int `json:"limit" binding:"min=1,max=50"` // limit product of dimensions Filter queryFilter `json:"filter"` // where ... @@ -54,28 +54,12 @@ func (input graphHandlerInput) reverseDirection() graphHandlerInput { } func (input graphHandlerInput) toSQL1(axis int, skipWith bool) string { - interval := int64((input.End.Sub(input.Start).Seconds())) / int64(input.Points) - slot := fmt.Sprintf(`{resolution->%d}`, interval) - - // Filter - where := input.Filter.Filter - if where == "" { - where = "{timefilter}" - } else { - where = fmt.Sprintf("{timefilter} AND (%s)", where) - } + where := templateWhere(input.Filter) // Select fields := []string{ - fmt.Sprintf(`toStartOfInterval(TimeReceived, INTERVAL %s second) AS time`, slot), - } - switch input.Units { - case "pps": - fields = append(fields, fmt.Sprintf(`SUM(Packets*SamplingRate/%s) AS xps`, slot)) - case "l3bps": - fields = append(fields, fmt.Sprintf(`SUM(Bytes*SamplingRate*8/%s) AS xps`, slot)) - case "l2bps": - fields = append(fields, fmt.Sprintf(`SUM((Bytes+18*Packets)*SamplingRate*8/%s) AS xps`, slot)) + `{{ call .ToStartOfInterval "TimeReceived" }} AS time`, + `{{ .Units }}/{{ .Interval }} AS xps`, } selectFields := []string{} dimensions := []string{} @@ -99,7 +83,7 @@ func (input graphHandlerInput) toSQL1(axis int, skipWith bool) string { with := []string{} if len(dimensions) > 0 && !skipWith { with = append(with, fmt.Sprintf( - "rows AS (SELECT %s FROM {table} WHERE %s GROUP BY %s ORDER BY SUM(Bytes) DESC LIMIT %d)", + "rows AS (SELECT %s FROM {{ .Table }} WHERE %s GROUP BY %s ORDER BY SUM(Bytes) DESC LIMIT %d)", strings.Join(dimensions, ", "), where, strings.Join(dimensions, ", "), @@ -107,34 +91,40 @@ func (input graphHandlerInput) toSQL1(axis int, skipWith bool) string { } withStr := "" if len(with) > 0 { - withStr = fmt.Sprintf("WITH\n %s", strings.Join(with, ",\n ")) + withStr = fmt.Sprintf("\nWITH\n %s", strings.Join(with, ",\n ")) } sqlQuery := fmt.Sprintf(` -%s +{{ with %s }}%s SELECT %d AS axis, * FROM ( SELECT %s -FROM {table} +FROM {{ .Table }} WHERE %s GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL %s second) - TO {timefilter.Stop} - STEP %s)`, withStr, axis, strings.Join(fields, ",\n "), where, slot, slot) - return sqlQuery + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, + templateContext(inputContext{ + Start: input.Start, + End: input.End, + MainTableRequired: requireMainTable(input.Dimensions, input.Filter), + Points: input.Points, + Units: input.Units, + }), + withStr, axis, strings.Join(fields, ",\n "), where) + return strings.TrimSpace(sqlQuery) } // graphHandlerInputToSQL converts a graph input to an SQL request func (input graphHandlerInput) toSQL() string { - result := input.toSQL1(1, false) + parts := []string{input.toSQL1(1, false)} if input.Bidirectional { - part2 := input.reverseDirection().toSQL1(2, true) - result = fmt.Sprintf(`%s -UNION ALL -%s`, result, strings.TrimSpace(part2)) + parts = append(parts, input.reverseDirection().toSQL1(2, true)) } - return strings.TrimSpace(result) + return strings.TrimSpace(strings.Join(parts, "\nUNION ALL\n")) } func (c *Component) graphHandlerFunc(gc *gin.Context) { @@ -146,12 +136,7 @@ func (c *Component) graphHandlerFunc(gc *gin.Context) { } sqlQuery := input.toSQL() - resolution := time.Duration(int64(input.End.Sub(input.Start).Nanoseconds()) / int64(input.Points)) - if resolution < time.Second { - resolution = time.Second - } - sqlQuery = c.queryFlowsTable(sqlQuery, requireMainTable(input.Dimensions, input.Filter), - input.Start, input.End, resolution) + sqlQuery = c.finalizeQuery(sqlQuery) gc.Header("X-SQL-Query", strings.ReplaceAll(sqlQuery, "\n", " ")) results := []struct { diff --git a/console/graph_test.go b/console/graph_test.go index 12e74f0a..27606aae 100644 --- a/console/graph_test.go +++ b/console/graph_test.go @@ -72,18 +72,20 @@ func TestGraphQuerySQL(t *testing.T) { Units: "l3bps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} SELECT 1 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, emptyArrayString() AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864})`, + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, }, { Description: "no dimensions, no filters, l2 bps", Input: graphHandlerInput{ @@ -95,18 +97,21 @@ ORDER BY time WITH FILL Units: "l2bps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l2bps"}@@ }} SELECT 1 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM((Bytes+18*Packets)*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, emptyArrayString() AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864})`, + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }} +`, }, { Description: "no dimensions, no filters, pps", Input: graphHandlerInput{ @@ -118,18 +123,20 @@ ORDER BY time WITH FILL Units: "pps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"pps"}@@ }} SELECT 1 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Packets*SamplingRate/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, emptyArrayString() AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864})`, + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, }, { Description: "no dimensions", Input: graphHandlerInput{ @@ -141,18 +148,45 @@ ORDER BY time WITH FILL Units: "l3bps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} SELECT 1 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, emptyArrayString() AS dimensions -FROM {table} -WHERE {timefilter} AND (DstCountry = 'FR' AND SrcCountry = 'US') +FROM {{ .Table }} +WHERE {{ .Timefilter }} AND (DstCountry = 'FR' AND SrcCountry = 'US') GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864})`, + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, + }, { + Description: "no dimensions, escaped filter", + Input: graphHandlerInput{ + Start: time.Date(2022, 04, 10, 15, 45, 10, 0, time.UTC), + End: time.Date(2022, 04, 11, 15, 45, 10, 0, time.UTC), + Points: 100, + Dimensions: []queryColumn{}, + Filter: queryFilter{Filter: "InIfDescription = '{{ hello }}' AND SrcCountry = 'US'"}, + Units: "l3bps", + }, + Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} +SELECT 1 AS axis, * FROM ( +SELECT + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, + emptyArrayString() AS dimensions +FROM {{ .Table }} +WHERE {{ .Timefilter }} AND (InIfDescription = '{{"{{"}} hello }}' AND SrcCountry = 'US') +GROUP BY time, dimensions +ORDER BY time WITH FILL + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, }, { Description: "no dimensions, reverse direction", Input: graphHandlerInput{ @@ -168,31 +202,35 @@ ORDER BY time WITH FILL Bidirectional: true, }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} SELECT 1 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, emptyArrayString() AS dimensions -FROM {table} -WHERE {timefilter} AND (DstCountry = 'FR' AND SrcCountry = 'US') +FROM {{ .Table }} +WHERE {{ .Timefilter }} AND (DstCountry = 'FR' AND SrcCountry = 'US') GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864}) + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }} UNION ALL +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} SELECT 2 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, emptyArrayString() AS dimensions -FROM {table} -WHERE {timefilter} AND (SrcCountry = 'FR' AND DstCountry = 'US') +FROM {{ .Table }} +WHERE {{ .Timefilter }} AND (SrcCountry = 'FR' AND DstCountry = 'US') GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864})`, + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, }, { Description: "no filters", Input: graphHandlerInput{ @@ -208,20 +246,22 @@ ORDER BY time WITH FILL Units: "l3bps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} WITH - rows AS (SELECT ExporterName, InIfProvider FROM {table} WHERE {timefilter} GROUP BY ExporterName, InIfProvider ORDER BY SUM(Bytes) DESC LIMIT 20) + rows AS (SELECT ExporterName, InIfProvider FROM {{ .Table }} WHERE {{ .Timefilter }} GROUP BY ExporterName, InIfProvider ORDER BY SUM(Bytes) DESC LIMIT 20) SELECT 1 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, if((ExporterName, InIfProvider) IN rows, [ExporterName, InIfProvider], ['Other', 'Other']) AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864})`, + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, }, { Description: "no filters, reverse", Input: graphHandlerInput{ @@ -238,36 +278,41 @@ ORDER BY time WITH FILL Bidirectional: true, }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} WITH - rows AS (SELECT ExporterName, InIfProvider FROM {table} WHERE {timefilter} GROUP BY ExporterName, InIfProvider ORDER BY SUM(Bytes) DESC LIMIT 20) + rows AS (SELECT ExporterName, InIfProvider FROM {{ .Table }} WHERE {{ .Timefilter }} GROUP BY ExporterName, InIfProvider ORDER BY SUM(Bytes) DESC LIMIT 20) SELECT 1 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, if((ExporterName, InIfProvider) IN rows, [ExporterName, InIfProvider], ['Other', 'Other']) AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864}) + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }} UNION ALL +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":100,"units":"l3bps"}@@ }} SELECT 2 AS axis, * FROM ( SELECT - toStartOfInterval(TimeReceived, INTERVAL {resolution->864} second) AS time, - SUM(Bytes*SamplingRate*8/{resolution->864}) AS xps, + {{ call .ToStartOfInterval "TimeReceived" }} AS time, + {{ .Units }}/{{ .Interval }} AS xps, if((ExporterName, OutIfProvider) IN rows, [ExporterName, OutIfProvider], ['Other', 'Other']) AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY time, dimensions ORDER BY time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL {resolution->864} second) - TO {timefilter.Stop} - STEP {resolution->864})`, + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }}) +{{ end }}`, }, } for _, tc := range cases { + tc.Expected = strings.ReplaceAll(tc.Expected, "@@", "`") t.Run(tc.Description, func(t *testing.T) { got := tc.Input.toSQL() if diff := helpers.Diff(strings.Split(strings.TrimSpace(got), "\n"), diff --git a/console/root.go b/console/root.go index 29f77c5b..a5657b7f 100644 --- a/console/root.go +++ b/console/root.go @@ -5,7 +5,6 @@ package console import ( - "html/template" "io/fs" netHTTP "net/http" "os" @@ -13,6 +12,7 @@ import ( "path/filepath" "runtime" "sync" + "text/template" "time" "github.com/benbjohnson/clock" diff --git a/console/sankey.go b/console/sankey.go index e7da082c..d654300b 100644 --- a/console/sankey.go +++ b/console/sankey.go @@ -42,13 +42,7 @@ type sankeyLink struct { // sankeyHandlerInputToSQL converts a sankey query to an SQL request func (input sankeyHandlerInput) toSQL() (string, error) { - // Filter - where := input.Filter.Filter - if where == "" { - where = "{timefilter}" - } else { - where = fmt.Sprintf("{timefilter} AND (%s)", where) - } + where := templateWhere(input.Filter) // Select arrayFields := []string{} @@ -60,22 +54,16 @@ func (input sankeyHandlerInput) toSQL() (string, error) { column.toSQLSelect())) dimensions = append(dimensions, column.String()) } - fields := []string{} - switch input.Units { - case "pps": - fields = append(fields, `SUM(Packets*SamplingRate/range) AS xps`) - case "l3bps": - fields = append(fields, `SUM(Bytes*SamplingRate*8/range) AS xps`) - case "l2bps": - fields = append(fields, `SUM((Bytes+18*Packets)*SamplingRate*8/range) AS xps`) + fields := []string{ + `{{ .Units }}/range AS xps`, + fmt.Sprintf("[%s] AS dimensions", strings.Join(arrayFields, ",\n ")), } - fields = append(fields, fmt.Sprintf("[%s] AS dimensions", strings.Join(arrayFields, ",\n "))) // With with := []string{ - fmt.Sprintf(`(SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE %s) AS range`, where), + fmt.Sprintf(`(SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {{ .Table }} WHERE %s) AS range`, where), fmt.Sprintf( - "rows AS (SELECT %s FROM {table} WHERE %s GROUP BY %s ORDER BY SUM(Bytes) DESC LIMIT %d)", + "rows AS (SELECT %s FROM {{ .Table }} WHERE %s GROUP BY %s ORDER BY SUM(Bytes) DESC LIMIT %d)", strings.Join(dimensions, ", "), where, strings.Join(dimensions, ", "), @@ -83,15 +71,25 @@ func (input sankeyHandlerInput) toSQL() (string, error) { } sqlQuery := fmt.Sprintf(` +{{ with %s }} WITH %s SELECT %s -FROM {table} +FROM {{ .Table }} WHERE %s GROUP BY dimensions -ORDER BY xps DESC`, strings.Join(with, ",\n "), strings.Join(fields, ",\n "), where) - return sqlQuery, nil +ORDER BY xps DESC +{{ end }}`, + templateContext(inputContext{ + Start: input.Start, + End: input.End, + MainTableRequired: requireMainTable(input.Dimensions, input.Filter), + Points: 20, + Units: input.Units, + }), + strings.Join(with, ",\n "), strings.Join(fields, ",\n "), where) + return strings.TrimSpace(sqlQuery), nil } func (c *Component) sankeyHandlerFunc(gc *gin.Context) { @@ -108,15 +106,8 @@ func (c *Component) sankeyHandlerFunc(gc *gin.Context) { return } - // We need to select a resolution allowing us to have a somewhat accurate timespan - resolution := time.Duration(int64(input.End.Sub(input.Start).Nanoseconds()) / 20) - if resolution < time.Second { - resolution = time.Second - } - // Prepare and execute query - sqlQuery = c.queryFlowsTable(sqlQuery, requireMainTable(input.Dimensions, input.Filter), - input.Start, input.End, resolution) + sqlQuery = c.finalizeQuery(sqlQuery) gc.Header("X-SQL-Query", strings.ReplaceAll(sqlQuery, "\n", " ")) results := []struct { Xps float64 `ch:"xps"` diff --git a/console/sankey_test.go b/console/sankey_test.go index 4d0ae8e7..7bdfa4b6 100644 --- a/console/sankey_test.go +++ b/console/sankey_test.go @@ -31,17 +31,19 @@ func TestSankeyQuerySQL(t *testing.T) { Units: "l3bps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":20,"units":"l3bps"}@@ }} WITH - (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE {timefilter}) AS range, - rows AS (SELECT SrcAS, ExporterName FROM {table} WHERE {timefilter} GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 5) + (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {{ .Table }} WHERE {{ .Timefilter }}) AS range, + rows AS (SELECT SrcAS, ExporterName FROM {{ .Table }} WHERE {{ .Timefilter }} GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 5) SELECT - SUM(Bytes*SamplingRate*8/range) AS xps, + {{ .Units }}/range AS xps, [if(SrcAS IN (SELECT SrcAS FROM rows), concat(toString(SrcAS), ': ', dictGetOrDefault('asns', 'name', SrcAS, '???')), 'Other'), if(ExporterName IN (SELECT ExporterName FROM rows), ExporterName, 'Other')] AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY dimensions -ORDER BY xps DESC`, +ORDER BY xps DESC +{{ end }}`, }, { Description: "two dimensions, no filters, l2 bps", Input: sankeyHandlerInput{ @@ -53,17 +55,20 @@ ORDER BY xps DESC`, Units: "l2bps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":20,"units":"l2bps"}@@ }} WITH - (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE {timefilter}) AS range, - rows AS (SELECT SrcAS, ExporterName FROM {table} WHERE {timefilter} GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 5) + (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {{ .Table }} WHERE {{ .Timefilter }}) AS range, + rows AS (SELECT SrcAS, ExporterName FROM {{ .Table }} WHERE {{ .Timefilter }} GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 5) SELECT - SUM((Bytes+18*Packets)*SamplingRate*8/range) AS xps, + {{ .Units }}/range AS xps, [if(SrcAS IN (SELECT SrcAS FROM rows), concat(toString(SrcAS), ': ', dictGetOrDefault('asns', 'name', SrcAS, '???')), 'Other'), if(ExporterName IN (SELECT ExporterName FROM rows), ExporterName, 'Other')] AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY dimensions -ORDER BY xps DESC`, +ORDER BY xps DESC +{{ end }} +`, }, { Description: "two dimensions, no filters, pps", Input: sankeyHandlerInput{ @@ -75,17 +80,19 @@ ORDER BY xps DESC`, Units: "pps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":20,"units":"pps"}@@ }} WITH - (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE {timefilter}) AS range, - rows AS (SELECT SrcAS, ExporterName FROM {table} WHERE {timefilter} GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 5) + (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {{ .Table }} WHERE {{ .Timefilter }}) AS range, + rows AS (SELECT SrcAS, ExporterName FROM {{ .Table }} WHERE {{ .Timefilter }} GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 5) SELECT - SUM(Packets*SamplingRate/range) AS xps, + {{ .Units }}/range AS xps, [if(SrcAS IN (SELECT SrcAS FROM rows), concat(toString(SrcAS), ': ', dictGetOrDefault('asns', 'name', SrcAS, '???')), 'Other'), if(ExporterName IN (SELECT ExporterName FROM rows), ExporterName, 'Other')] AS dimensions -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} GROUP BY dimensions -ORDER BY xps DESC`, +ORDER BY xps DESC +{{ end }}`, }, { Description: "two dimensions, with filter", Input: sankeyHandlerInput{ @@ -97,23 +104,27 @@ ORDER BY xps DESC`, Units: "l3bps", }, Expected: ` +{{ with context @@{"start":"2022-04-10T15:45:10Z","end":"2022-04-11T15:45:10Z","points":20,"units":"l3bps"}@@ }} WITH - (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {table} WHERE {timefilter} AND (DstCountry = 'FR')) AS range, - rows AS (SELECT SrcAS, ExporterName FROM {table} WHERE {timefilter} AND (DstCountry = 'FR') GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 10) + (SELECT MAX(TimeReceived) - MIN(TimeReceived) FROM {{ .Table }} WHERE {{ .Timefilter }} AND (DstCountry = 'FR')) AS range, + rows AS (SELECT SrcAS, ExporterName FROM {{ .Table }} WHERE {{ .Timefilter }} AND (DstCountry = 'FR') GROUP BY SrcAS, ExporterName ORDER BY SUM(Bytes) DESC LIMIT 10) SELECT - SUM(Bytes*SamplingRate*8/range) AS xps, + {{ .Units }}/range AS xps, [if(SrcAS IN (SELECT SrcAS FROM rows), concat(toString(SrcAS), ': ', dictGetOrDefault('asns', 'name', SrcAS, '???')), 'Other'), if(ExporterName IN (SELECT ExporterName FROM rows), ExporterName, 'Other')] AS dimensions -FROM {table} -WHERE {timefilter} AND (DstCountry = 'FR') +FROM {{ .Table }} +WHERE {{ .Timefilter }} AND (DstCountry = 'FR') GROUP BY dimensions -ORDER BY xps DESC`, +ORDER BY xps DESC +{{ end }}`, }, } for _, tc := range cases { + tc.Expected = strings.ReplaceAll(tc.Expected, "@@", "`") t.Run(tc.Description, func(t *testing.T) { got, _ := tc.Input.toSQL() - if diff := helpers.Diff(strings.Split(got, "\n"), strings.Split(tc.Expected, "\n")); diff != "" { + if diff := helpers.Diff(strings.Split(strings.TrimSpace(got), "\n"), + strings.Split(strings.TrimSpace(tc.Expected), "\n")); diff != "" { t.Errorf("toSQL (-got, +want):\n%s", diff) } }) diff --git a/console/widgets.go b/console/widgets.go index 8b03a2ee..1e74c744 100644 --- a/console/widgets.go +++ b/console/widgets.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "reflect" + "strings" "time" "akvorado/common/helpers" @@ -151,23 +152,31 @@ func (c *Component) widgetTopHandlerFunc(gc *gin.Context) { } now := c.d.Clock.Now() - query := c.queryFlowsTable(fmt.Sprintf(` + query := c.finalizeQuery(fmt.Sprintf(` +{{ with %s }} WITH - (SELECT SUM(Bytes*SamplingRate) FROM {table} WHERE {timefilter} %s) AS Total + (SELECT SUM(Bytes*SamplingRate) FROM {{ .Table }} WHERE {{ .Timefilter }} %s) AS Total SELECT if(empty(%s),'Unknown',%s) AS Name, SUM(Bytes*SamplingRate) / Total * 100 AS Percent -FROM {table} -WHERE {timefilter} +FROM {{ .Table }} +WHERE {{ .Timefilter }} %s GROUP BY %s ORDER BY Percent DESC LIMIT 5 -`, filter, selector, selector, filter, groupby), mainTableRequired, now.Add(-5*time.Minute), now, time.Minute) +{{ end }}`, + templateContext(inputContext{ + Start: now.Add(-5 * time.Minute), + End: now, + MainTableRequired: mainTableRequired, + Points: 5, + }), + filter, selector, selector, filter, groupby)) gc.Header("X-SQL-Query", query) results := []topResult{} - err := c.d.ClickHouseDB.Conn.Select(ctx, &results, query) + err := c.d.ClickHouseDB.Conn.Select(ctx, &results, strings.TrimSpace(query)) if err != nil { c.r.Err(err).Msg("unable to query database") gc.JSON(http.StatusInternalServerError, gin.H{"message": "Unable to query database."}) @@ -177,7 +186,7 @@ LIMIT 5 } type widgetParameters struct { - Points uint64 `form:"points" binding:"isdefault|min=5,max=1000"` + Points uint `form:"points" binding:"isdefault|min=5,max=1000"` } func (c *Component) widgetGraphHandlerFunc(gc *gin.Context) { @@ -191,28 +200,34 @@ func (c *Component) widgetGraphHandlerFunc(gc *gin.Context) { if params.Points == 0 { params.Points = 200 } - interval := int64((24 * time.Hour).Seconds()) / int64(params.Points) - slot := fmt.Sprintf(`{resolution->%d}`, interval) now := c.d.Clock.Now() - query := c.queryFlowsTable(fmt.Sprintf(` + query := c.finalizeQuery(fmt.Sprintf(` +{{ with %s }} SELECT - toStartOfInterval(TimeReceived, INTERVAL %s second) AS Time, - SUM(Bytes*SamplingRate*8/%s)/1000/1000/1000 AS Gbps -FROM {table} -WHERE {timefilter} + {{ call .ToStartOfInterval "TimeReceived" }} AS Time, + SUM(Bytes*SamplingRate*8/{{ .Interval }})/1000/1000/1000 AS Gbps +FROM {{ .Table }} +WHERE {{ .Timefilter }} AND InIfBoundary = 'external' GROUP BY Time ORDER BY Time WITH FILL - FROM toStartOfInterval({timefilter.Start}, INTERVAL %s second) - TO {timefilter.Stop} - STEP %s`, slot, slot, slot, slot), false, now.Add(-24*time.Hour), now, time.Duration(interval)*time.Second) + FROM {{ .TimefilterStart }} + TO {{ .TimefilterEnd }} + STEP {{ .Interval }} +{{ end }}`, + templateContext(inputContext{ + Start: now.Add(-24 * time.Hour), + End: now, + MainTableRequired: false, + Points: params.Points, + }))) gc.Header("X-SQL-Query", query) results := []struct { Time time.Time `json:"t"` Gbps float64 `json:"gbps"` }{} - err := c.d.ClickHouseDB.Conn.Select(ctx, &results, query) + err := c.d.ClickHouseDB.Conn.Select(ctx, &results, strings.TrimSpace(query)) if err != nil { c.r.Err(err).Msg("unable to query database") gc.JSON(http.StatusInternalServerError, gin.H{"message": "Unable to query database."}) diff --git a/console/widgets_test.go b/console/widgets_test.go index 1a380675..77d6dc4a 100644 --- a/console/widgets_test.go +++ b/console/widgets_test.go @@ -6,6 +6,7 @@ package console import ( "net" "reflect" + "strings" "testing" "time" @@ -229,18 +230,18 @@ func TestWidgetGraph(t *testing.T) { {base.Add(5 * time.Minute), 24.7}, } mockConn.EXPECT(). - Select(gomock.Any(), gomock.Any(), ` + Select(gomock.Any(), gomock.Any(), strings.TrimSpace(` SELECT - toStartOfInterval(TimeReceived, INTERVAL 864 second) AS Time, + toStartOfInterval(TimeReceived + INTERVAL 144 second, INTERVAL 864 second) - INTERVAL 144 second AS Time, SUM(Bytes*SamplingRate*8/864)/1000/1000/1000 AS Gbps FROM flows WHERE TimeReceived BETWEEN toDateTime('2009-11-10 23:00:00', 'UTC') AND toDateTime('2009-11-11 23:00:00', 'UTC') AND InIfBoundary = 'external' GROUP BY Time ORDER BY Time WITH FILL - FROM toStartOfInterval(toDateTime('2009-11-10 23:00:00', 'UTC'), INTERVAL 864 second) + FROM toDateTime('2009-11-10 23:00:00', 'UTC') TO toDateTime('2009-11-11 23:00:00', 'UTC') - STEP 864`). + STEP 864`)). SetArg(1, expected). Return(nil)