mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-11 22:14:02 +01:00
The user will get empty data, it may be better than an empty table. Add more tests as well. Fix #1935
286 lines
9.1 KiB
Go
286 lines
9.1 KiB
Go
// SPDX-FileCopyrightText: 2022 Free Mobile
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
package console
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"text/template"
|
|
"time"
|
|
|
|
"akvorado/console/query"
|
|
)
|
|
|
|
// flowsTable describe a consolidated or unconsolidated flows table.
|
|
type flowsTable struct {
|
|
Name string
|
|
Resolution time.Duration
|
|
Oldest time.Time
|
|
}
|
|
|
|
// refreshFlowsTables refreshes the information we have about flows
|
|
// tables (live one and consolidated ones). This information includes
|
|
// the consolidation interval and the oldest available data.
|
|
func (c *Component) refreshFlowsTables() error {
|
|
ctx := c.t.Context(nil)
|
|
var tables []struct {
|
|
Name string `ch:"name"`
|
|
}
|
|
err := c.d.ClickHouseDB.Select(ctx, &tables, `
|
|
SELECT name
|
|
FROM system.tables
|
|
WHERE database=currentDatabase()
|
|
AND table LIKE 'flows%'
|
|
AND table NOT LIKE '%_local'
|
|
AND table != 'flows_raw_errors'
|
|
AND (engine LIKE '%MergeTree' OR engine = 'Distributed')
|
|
`)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot query flows table metadata: %w", err)
|
|
}
|
|
|
|
newFlowsTables := []flowsTable{}
|
|
for _, table := range tables {
|
|
// Parse resolution
|
|
resolution := time.Duration(0)
|
|
if strings.HasPrefix(table.Name, "flows_") {
|
|
var err error
|
|
resolution, err = time.ParseDuration(strings.TrimPrefix(table.Name, "flows_"))
|
|
if err != nil {
|
|
c.r.Err(err).Msgf("cannot parse duration for table %s", table.Name)
|
|
continue
|
|
}
|
|
}
|
|
// Get oldest timestamp
|
|
var oldest []struct {
|
|
T time.Time `ch:"t"`
|
|
}
|
|
err := c.d.ClickHouseDB.Conn.Select(ctx, &oldest,
|
|
fmt.Sprintf(`SELECT MIN(TimeReceived) AS t FROM %s`, table.Name))
|
|
if err != nil {
|
|
return fmt.Errorf("cannot query table %s for oldest timestamp: %w", table.Name, err)
|
|
}
|
|
|
|
newFlowsTables = append(newFlowsTables, flowsTable{
|
|
Name: table.Name,
|
|
Resolution: resolution,
|
|
Oldest: oldest[0].T,
|
|
})
|
|
}
|
|
if len(newFlowsTables) == 0 {
|
|
return errors.New("no flows table present (yet?)")
|
|
}
|
|
|
|
c.flowsTablesLock.Lock()
|
|
c.flowsTables = newFlowsTables
|
|
c.flowsTablesLock.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// inputContext is the intermeidate context provided by the input handler.
|
|
type inputContext struct {
|
|
Start time.Time
|
|
End time.Time
|
|
StartForTableSelection *time.Time
|
|
MainTableRequired bool
|
|
Points uint
|
|
Units string
|
|
}
|
|
|
|
// context is the context to finalize the template.
|
|
type context struct {
|
|
Table string
|
|
Timefilter string
|
|
TimefilterStart string
|
|
TimefilterEnd string
|
|
Units string
|
|
Interval uint64
|
|
ToStartOfInterval func(string) string
|
|
}
|
|
|
|
// templateQuery holds a template string and its associated input context.
|
|
type templateQuery struct {
|
|
Template string
|
|
Context inputContext
|
|
}
|
|
|
|
// templateEscape escapes `{{` and `}}` from a string. In fact, only
|
|
// the opening tag needs to be escaped.
|
|
func templateEscape(input string) string {
|
|
return strings.ReplaceAll(input, `{{`, `{{"{{"}}`)
|
|
}
|
|
|
|
// templateWhere transforms a filter to a WHERE clause
|
|
func templateWhere(qf query.Filter) string {
|
|
if qf.Direct() == "" {
|
|
return `{{ .Timefilter }}`
|
|
}
|
|
return fmt.Sprintf(`{{ .Timefilter }} AND (%s)`, templateEscape(qf.Direct()))
|
|
}
|
|
|
|
// finalizeTemplateQueries builds the finalized queries from a list of templateQuery.
|
|
// Each template is processed with its associated context and combined with UNION ALL.
|
|
func (c *Component) finalizeTemplateQueries(queries []templateQuery) string {
|
|
parts := make([]string, len(queries))
|
|
for i, q := range queries {
|
|
parts[i] = c.finalizeTemplateQuery(q)
|
|
}
|
|
return strings.Join(parts, "\nUNION ALL\n")
|
|
}
|
|
|
|
// finalizeTemplateQuery builds the finalized query for a single templateQuery
|
|
func (c *Component) finalizeTemplateQuery(query templateQuery) string {
|
|
input := query.Context
|
|
table, computedInterval, targetInterval := c.computeTableAndInterval(query.Context)
|
|
|
|
// Make start/end match the computed interval (currently equal to the table resolution)
|
|
start := input.Start.Truncate(computedInterval)
|
|
end := input.End.Truncate(computedInterval)
|
|
// Adapt the computed interval to match the target one more closely
|
|
if targetInterval > computedInterval {
|
|
computedInterval = targetInterval.Truncate(computedInterval)
|
|
}
|
|
// Adapt end to ensure we get a full interval
|
|
end = start.Add(end.Sub(start).Truncate(computedInterval))
|
|
// Now, toStartOfInterval will provide an incorrect value. We
|
|
// compute a correction offset. Go's truncate seems to
|
|
// be different from what we expect.
|
|
computedIntervalOffset := start.UTC().Sub(
|
|
time.Unix(start.UTC().Unix()/
|
|
int64(computedInterval.Seconds())*
|
|
int64(computedInterval.Seconds()), 0))
|
|
diffOffset := uint64(computedInterval.Seconds()) - uint64(computedIntervalOffset.Seconds())
|
|
|
|
// Compute all strings
|
|
timefilterStart := fmt.Sprintf(`toDateTime('%s', 'UTC')`, start.UTC().Format("2006-01-02 15:04:05"))
|
|
timefilterEnd := fmt.Sprintf(`toDateTime('%s', 'UTC')`, end.UTC().Format("2006-01-02 15:04:05"))
|
|
timefilter := fmt.Sprintf(`TimeReceived BETWEEN %s AND %s`, timefilterStart, timefilterEnd)
|
|
var units string
|
|
switch input.Units {
|
|
case "pps":
|
|
units = `SUM(Packets*SamplingRate)`
|
|
case "l3bps":
|
|
units = `SUM(Bytes*SamplingRate*8)`
|
|
case "l2bps":
|
|
// For each packet, we add the Ethernet header (14 bytes), the FCS (4
|
|
// bytes), the preamble and start frame delimiter (8 bytes) and the IPG
|
|
// (~ 12 bytes). We don't include the VLAN header (4 bytes) as it is
|
|
// often not used with external entities. Both sFlow and IPFIX may have
|
|
// a better view of that, but we don't collect it yet.
|
|
units = `SUM((Bytes+38*Packets)*SamplingRate*8)`
|
|
case "inl2%":
|
|
// That's like l2bps, but this time we use the interface speed to get a
|
|
// percent value
|
|
units = `ifNotFinite(SUM((Bytes+38*Packets)*SamplingRate*8*100/(InIfSpeed*1000000))/COUNT(DISTINCT ExporterAddress, InIfName),0)`
|
|
case "outl2%":
|
|
// Same but using output interface as reference
|
|
units = `ifNotFinite(SUM((Bytes+38*Packets)*SamplingRate*8*100/(OutIfSpeed*1000000))/COUNT(DISTINCT ExporterAddress, OutIfName),0)`
|
|
}
|
|
|
|
c.metrics.clickhouseQueries.WithLabelValues(table).Inc()
|
|
|
|
context := context{
|
|
Table: table,
|
|
Timefilter: timefilter,
|
|
TimefilterStart: timefilterStart,
|
|
TimefilterEnd: timefilterEnd,
|
|
Units: units,
|
|
Interval: uint64(computedInterval.Seconds()),
|
|
ToStartOfInterval: func(field string) string {
|
|
return fmt.Sprintf(
|
|
`toStartOfInterval(%s + INTERVAL %d second, INTERVAL %d second) - INTERVAL %d second`,
|
|
field,
|
|
diffOffset,
|
|
uint64(computedInterval.Seconds()),
|
|
diffOffset)
|
|
},
|
|
}
|
|
|
|
t := template.Must(template.New("query").
|
|
Option("missingkey=error").
|
|
Parse(strings.TrimSpace(query.Template)))
|
|
buf := bytes.NewBufferString("")
|
|
if err := t.Execute(buf, context); err != nil {
|
|
c.r.Err(err).Str("query", query.Template).Msg("invalid query")
|
|
panic(err)
|
|
}
|
|
return buf.String()
|
|
}
|
|
|
|
func (c *Component) computeTableAndInterval(input inputContext) (string, time.Duration, time.Duration) {
|
|
targetInterval := time.Duration(uint64(input.End.Sub(input.Start)) / uint64(input.Points))
|
|
targetInterval = max(targetInterval, time.Second)
|
|
|
|
// Select table
|
|
targetIntervalForTableSelection := targetInterval
|
|
if input.MainTableRequired {
|
|
return "flows", time.Second, targetInterval
|
|
}
|
|
startForTableSelection := input.Start
|
|
if input.StartForTableSelection != nil {
|
|
startForTableSelection = *input.StartForTableSelection
|
|
}
|
|
table, computedInterval := c.getBestTable(startForTableSelection, targetIntervalForTableSelection)
|
|
return table, computedInterval, targetInterval
|
|
}
|
|
|
|
// Get the best table starting at the specified time.
|
|
func (c *Component) getBestTable(start time.Time, targetInterval time.Duration) (string, time.Duration) {
|
|
c.flowsTablesLock.RLock()
|
|
defer c.flowsTablesLock.RUnlock()
|
|
|
|
table := "flows"
|
|
computedInterval := time.Second
|
|
if len(c.flowsTables) > 0 {
|
|
// We can use the consolidated data. The first
|
|
// criteria is to find the tables matching the time
|
|
// criteria.
|
|
candidates := []int{}
|
|
for idx, table := range c.flowsTables {
|
|
if start.After(table.Oldest.Add(table.Resolution)) {
|
|
candidates = append(candidates, idx)
|
|
}
|
|
}
|
|
if len(candidates) == 0 {
|
|
// No candidate, fallback to the one with oldest data
|
|
best := 0
|
|
for idx, table := range c.flowsTables {
|
|
if c.flowsTables[best].Oldest.After(table.Oldest.Add(table.Resolution)) {
|
|
best = idx
|
|
}
|
|
}
|
|
candidates = []int{best}
|
|
// Add other candidates that are not far off in term of oldest data
|
|
for idx, table := range c.flowsTables {
|
|
if idx == best {
|
|
continue
|
|
}
|
|
if c.flowsTables[best].Oldest.After(table.Oldest) {
|
|
candidates = append(candidates, idx)
|
|
}
|
|
}
|
|
}
|
|
sort.Slice(candidates, func(i, j int) bool {
|
|
return c.flowsTables[candidates[i]].Resolution < c.flowsTables[candidates[j]].Resolution
|
|
})
|
|
// If possible, use the first resolution before the target interval
|
|
for len(candidates) > 1 {
|
|
if c.flowsTables[candidates[1]].Resolution <= targetInterval {
|
|
candidates = candidates[1:]
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
table = c.flowsTables[candidates[0]].Name
|
|
computedInterval = c.flowsTables[candidates[0]].Resolution
|
|
}
|
|
if computedInterval < time.Second {
|
|
computedInterval = time.Second
|
|
}
|
|
return table, computedInterval
|
|
}
|