console: use templates to build SQL query

This is needed if we want to be able to mix use of several tables
inside a single query (for example, flows_1m0s for a part of the query
and flows_5m0s for another part to overlay historical data).

Also, the way we handle time buckets is now cleaner. The previous way
had two stages of rounding and was incorrect. We were discarding the
first and last value for this reason. The new way only has one stage
of rounding and is correct. It tries hard to align the buckets at the
specified start time. We don't need to discard these values anymore.
We still discard the last one because it could be incomplete (when end
is "now").
This commit is contained in:
Vincent Bernat
2022-08-09 11:40:51 +02:00
parent 61b3baa406
commit 67703cc61e
11 changed files with 535 additions and 344 deletions

View File

@@ -4,16 +4,15 @@
package console
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"text/template"
"time"
)
var resolutionRegexp = regexp.MustCompile(`{resolution->(\d+)}`)
// flowsTable describe a consolidated or unconsolidated flows table.
type flowsTable struct {
Name string
@@ -21,95 +20,6 @@ type flowsTable struct {
Oldest time.Time
}
// Build a query against the flows table or one of the consolidated
// version depending on the information needed. The provided query
// should contain `{table}` which will be replaced by the appropriate
// flows table and {timefilter} which will be replaced by the
// appropriate time filter.
func (c *Component) queryFlowsTable(query string, mainTableRequired bool, start, end time.Time, targetResolution time.Duration) string {
c.flowsTablesLock.RLock()
defer c.flowsTablesLock.RUnlock()
// Select table
table := "flows"
resolution := time.Second
if !mainTableRequired {
// We can use the consolidated data. The first
// criteria is to find the tables matching the time
// criteria.
candidates := []int{}
for idx, table := range c.flowsTables {
if start.After(table.Oldest.Add(table.Resolution)) {
candidates = append(candidates, idx)
}
}
if len(candidates) == 0 {
// No candidate, fallback to the one with oldest data
best := 0
for idx, table := range c.flowsTables {
if c.flowsTables[best].Oldest.After(table.Oldest.Add(table.Resolution)) {
best = idx
}
}
candidates = []int{best}
// Add other candidates that are not far off in term of oldest data
for idx, table := range c.flowsTables {
if idx == best {
continue
}
if c.flowsTables[best].Oldest.After(table.Oldest) {
candidates = append(candidates, idx)
}
}
}
if len(candidates) > 1 {
// Use resolution to find the best one
best := 0
for _, idx := range candidates {
if c.flowsTables[idx].Resolution > targetResolution {
continue
}
if c.flowsTables[idx].Resolution > c.flowsTables[best].Resolution {
best = idx
}
}
candidates = []int{best}
}
table = c.flowsTables[candidates[0]].Name
resolution = c.flowsTables[candidates[0]].Resolution
}
if resolution == 0 {
resolution = time.Second
}
// Build timefilter to match the resolution
start = start.Truncate(resolution)
end = end.Truncate(resolution)
timeFilterStart := fmt.Sprintf(`toDateTime('%s', 'UTC')`, start.UTC().Format("2006-01-02 15:04:05"))
timeFilterStop := fmt.Sprintf(`toDateTime('%s', 'UTC')`, end.UTC().Format("2006-01-02 15:04:05"))
timeFilter := fmt.Sprintf(`TimeReceived BETWEEN %s AND %s`, timeFilterStart, timeFilterStop)
c.metrics.clickhouseQueries.WithLabelValues(table).Inc()
query = strings.ReplaceAll(query, "{timefilter}", timeFilter)
query = strings.ReplaceAll(query, "{timefilter.Start}", timeFilterStart)
query = strings.ReplaceAll(query, "{timefilter.Stop}", timeFilterStop)
query = strings.ReplaceAll(query, "{table}", table)
query = strings.ReplaceAll(query, "{resolution}", strconv.Itoa(int(resolution.Seconds())))
query = resolutionRegexp.ReplaceAllStringFunc(query, func(in string) string {
matches := resolutionRegexp.FindStringSubmatch(in)
target, err := strconv.Atoi(matches[1])
if err != nil {
panic(err)
}
target = target / int(resolution.Seconds()) * int(resolution.Seconds())
if target < 1 {
target = 1
}
return strconv.Itoa(target)
})
return query
}
// refreshFlowsTables refreshes the information we have about flows
// tables (live one and consolidated ones). This information includes
// the consolidation interval and the oldest available data.
@@ -166,3 +76,184 @@ AND engine LIKE '%MergeTree'
c.flowsTablesLock.Unlock()
return nil
}
// finalizeQuery builds the finalized query. A single "context"
// function is provided to return a `Context` struct with all the
// information needed.
func (c *Component) finalizeQuery(query string) string {
t := template.Must(template.New("query").
Funcs(template.FuncMap{
"context": c.contextFunc,
}).
Option("missingkey=error").
Parse(strings.TrimSpace(query)))
buf := bytes.NewBufferString("")
if err := t.Execute(buf, nil); err != nil {
c.r.Err(err).Str("query", query).Msg("invalid query")
panic(err)
}
return buf.String()
}
type inputContext struct {
Start time.Time `json:"start"`
End time.Time `json:"end"`
MainTableRequired bool `json:"main-table-required,omitempty"`
Points uint `json:"points"`
Units string `json:"units,omitempty"`
}
type context struct {
Table string
Timefilter string
TimefilterStart string
TimefilterEnd string
Units string
Interval uint64
ToStartOfInterval func(string) string
}
// templateEscape escapes `{{` and `}}` from a string. In fact, only
// the opening tag needs to be escaped.
func templateEscape(input string) string {
return strings.ReplaceAll(input, `{{`, `{{"{{"}}`)
}
// templateDate turns a date into an UTC string compatible with ClickHouse.
func templateDate(input time.Time) string {
return input.UTC().Format("2006-01-02 15:04:05")
}
// templateWhere transforms a filter to a WHERE clause
func templateWhere(qf queryFilter) string {
if qf.Filter == "" {
return `{{ .Timefilter }}`
}
return fmt.Sprintf(`{{ .Timefilter }} AND (%s)`, templateEscape(qf.Filter))
}
// templateTable builds a template directive to select the right table
func templateContext(context inputContext) string {
encoded, err := json.Marshal(context)
if err != nil {
panic(err)
}
return fmt.Sprintf("context `%s`", string(encoded))
}
func (c *Component) contextFunc(inputStr string) context {
var input inputContext
if err := json.Unmarshal([]byte(inputStr), &input); err != nil {
panic(err)
}
targetInterval := time.Duration(uint64(input.End.Sub(input.Start)) / uint64(input.Points))
if targetInterval < time.Second {
targetInterval = time.Second
}
c.flowsTablesLock.RLock()
defer c.flowsTablesLock.RUnlock()
// Select table
table := "flows"
computedInterval := time.Second
if !input.MainTableRequired && len(c.flowsTables) > 0 {
// We can use the consolidated data. The first
// criteria is to find the tables matching the time
// criteria.
candidates := []int{}
for idx, table := range c.flowsTables {
if input.Start.After(table.Oldest.Add(table.Resolution)) {
candidates = append(candidates, idx)
}
}
if len(candidates) == 0 {
// No candidate, fallback to the one with oldest data
best := 0
for idx, table := range c.flowsTables {
if c.flowsTables[best].Oldest.After(table.Oldest.Add(table.Resolution)) {
best = idx
}
}
candidates = []int{best}
// Add other candidates that are not far off in term of oldest data
for idx, table := range c.flowsTables {
if idx == best {
continue
}
if c.flowsTables[best].Oldest.After(table.Oldest) {
candidates = append(candidates, idx)
}
}
}
if len(candidates) > 1 {
// Use interval to find the best one
best := 0
for _, idx := range candidates {
if c.flowsTables[idx].Resolution > targetInterval {
continue
}
if c.flowsTables[idx].Resolution > c.flowsTables[best].Resolution {
best = idx
}
}
candidates = []int{best}
}
table = c.flowsTables[candidates[0]].Name
computedInterval = c.flowsTables[candidates[0]].Resolution
}
if computedInterval < time.Second {
computedInterval = time.Second
}
// Make start/end match the computed interval (currently equal to the table resolution)
start := input.Start.Truncate(computedInterval)
end := input.End.Truncate(computedInterval)
// Adapt the computed interval to match the target one more closely
if targetInterval > computedInterval {
computedInterval = targetInterval.Truncate(computedInterval)
}
// Adapt end to ensure we get a full interval
end = start.Add(end.Sub(start).Truncate(computedInterval))
// Now, toStartOfInterval will provide an incorrect value. We
// compute a correction offset. Go's truncate seems to
// be different from what we expect.
computedIntervalOffset := start.UTC().Sub(
time.Unix(start.UTC().Unix()/
int64(computedInterval.Seconds())*
int64(computedInterval.Seconds()), 0))
diffOffset := uint64(computedInterval.Seconds()) - uint64(computedIntervalOffset.Seconds())
// Compute all strings
timefilterStart := fmt.Sprintf(`toDateTime('%s', 'UTC')`, start.UTC().Format("2006-01-02 15:04:05"))
timefilterEnd := fmt.Sprintf(`toDateTime('%s', 'UTC')`, end.UTC().Format("2006-01-02 15:04:05"))
timefilter := fmt.Sprintf(`TimeReceived BETWEEN %s AND %s`, timefilterStart, timefilterEnd)
var units string
switch input.Units {
case "pps":
units = `SUM(Packets*SamplingRate)`
case "l3bps":
units = `SUM(Bytes*SamplingRate*8)`
case "l2bps":
units = `SUM((Bytes+18*Packets)*SamplingRate*8)`
}
c.metrics.clickhouseQueries.WithLabelValues(table).Inc()
return context{
Table: table,
Timefilter: timefilter,
TimefilterStart: timefilterStart,
TimefilterEnd: timefilterEnd,
Units: units,
Interval: uint64(computedInterval.Seconds()),
ToStartOfInterval: func(field string) string {
return fmt.Sprintf(
`toStartOfInterval(%s + INTERVAL %d second, INTERVAL %d second) - INTERVAL %d second`,
field,
diffOffset,
uint64(computedInterval.Seconds()),
diffOffset)
},
}
}