mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-12 06:24:10 +01:00
This requires usage of `reflect.MethodByName`, which prevents DCE. This does not really matter as DCE is not smart enough to detect we don't use it, but the day we have an alternative not supporting functions call, we would be ready. Or maybe there is an alternative. Or we could use `strings.ReplaceAll()` instead.
284 lines
9.1 KiB
Go
284 lines
9.1 KiB
Go
// SPDX-FileCopyrightText: 2022 Free Mobile
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
package console
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"text/template"
|
|
"time"
|
|
|
|
"akvorado/console/query"
|
|
)
|
|
|
|
// flowsTable describe a consolidated or unconsolidated flows table.
|
|
type flowsTable struct {
|
|
Name string
|
|
Resolution time.Duration
|
|
Oldest time.Time
|
|
}
|
|
|
|
// refreshFlowsTables refreshes the information we have about flows
|
|
// tables (live one and consolidated ones). This information includes
|
|
// the consolidation interval and the oldest available data.
|
|
func (c *Component) refreshFlowsTables() error {
|
|
ctx := c.t.Context(nil)
|
|
var tables []struct {
|
|
Name string `ch:"name"`
|
|
}
|
|
err := c.d.ClickHouseDB.Select(ctx, &tables, `
|
|
SELECT name
|
|
FROM system.tables
|
|
WHERE database=currentDatabase()
|
|
AND table LIKE 'flows%'
|
|
AND table NOT LIKE '%_local'
|
|
AND table != 'flows_raw_errors'
|
|
AND (engine LIKE '%MergeTree' OR engine = 'Distributed')
|
|
`)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot query flows table metadata: %w", err)
|
|
}
|
|
|
|
newFlowsTables := []flowsTable{}
|
|
for _, table := range tables {
|
|
// Parse resolution
|
|
resolution := time.Duration(0)
|
|
if strings.HasPrefix(table.Name, "flows_") {
|
|
var err error
|
|
resolution, err = time.ParseDuration(strings.TrimPrefix(table.Name, "flows_"))
|
|
if err != nil {
|
|
c.r.Err(err).Msgf("cannot parse duration for table %s", table.Name)
|
|
continue
|
|
}
|
|
}
|
|
// Get oldest timestamp
|
|
var oldest []struct {
|
|
T time.Time `ch:"t"`
|
|
}
|
|
err := c.d.ClickHouseDB.Conn.Select(ctx, &oldest,
|
|
fmt.Sprintf(`SELECT MIN(TimeReceived) AS t FROM %s`, table.Name))
|
|
if err != nil {
|
|
return fmt.Errorf("cannot query table %s for oldest timestamp: %w", table.Name, err)
|
|
}
|
|
|
|
newFlowsTables = append(newFlowsTables, flowsTable{
|
|
Name: table.Name,
|
|
Resolution: resolution,
|
|
Oldest: oldest[0].T,
|
|
})
|
|
}
|
|
if len(newFlowsTables) == 0 {
|
|
return errors.New("no flows table present (yet?)")
|
|
}
|
|
|
|
c.flowsTablesLock.Lock()
|
|
c.flowsTables = newFlowsTables
|
|
c.flowsTablesLock.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// inputContext is the intermeidate context provided by the input handler.
|
|
type inputContext struct {
|
|
Start time.Time
|
|
End time.Time
|
|
StartForTableSelection *time.Time
|
|
MainTableRequired bool
|
|
Points uint
|
|
Units string
|
|
}
|
|
|
|
// context is the context to finalize the template.
|
|
type context struct {
|
|
Table string
|
|
Timefilter string
|
|
TimefilterStart string
|
|
TimefilterEnd string
|
|
Units string
|
|
Interval uint64
|
|
ToStartOfInterval string
|
|
}
|
|
|
|
// templateQuery holds a template string and its associated input context.
|
|
type templateQuery struct {
|
|
Template string
|
|
Context inputContext
|
|
}
|
|
|
|
// templateEscape escapes `{{` and `}}` from a string. In fact, only
|
|
// the opening tag needs to be escaped.
|
|
func templateEscape(input string) string {
|
|
return strings.ReplaceAll(input, `{{`, `{{"{{"}}`)
|
|
}
|
|
|
|
// templateWhere transforms a filter to a WHERE clause
|
|
func templateWhere(qf query.Filter) string {
|
|
if qf.Direct() == "" {
|
|
return `{{ .Timefilter }}`
|
|
}
|
|
return fmt.Sprintf(`{{ .Timefilter }} AND (%s)`, templateEscape(qf.Direct()))
|
|
}
|
|
|
|
// finalizeTemplateQueries builds the finalized queries from a list of templateQuery.
|
|
// Each template is processed with its associated context and combined with UNION ALL.
|
|
func (c *Component) finalizeTemplateQueries(queries []templateQuery) string {
|
|
parts := make([]string, len(queries))
|
|
for i, q := range queries {
|
|
parts[i] = c.finalizeTemplateQuery(q)
|
|
}
|
|
return strings.Join(parts, "\nUNION ALL\n")
|
|
}
|
|
|
|
// finalizeTemplateQuery builds the finalized query for a single templateQuery
|
|
func (c *Component) finalizeTemplateQuery(query templateQuery) string {
|
|
input := query.Context
|
|
table, computedInterval, targetInterval := c.computeTableAndInterval(query.Context)
|
|
|
|
// Make start/end match the computed interval (currently equal to the table resolution)
|
|
start := input.Start.Truncate(computedInterval)
|
|
end := input.End.Truncate(computedInterval)
|
|
// Adapt the computed interval to match the target one more closely
|
|
if targetInterval > computedInterval {
|
|
computedInterval = targetInterval.Truncate(computedInterval)
|
|
}
|
|
// Adapt end to ensure we get a full interval
|
|
end = start.Add(end.Sub(start).Truncate(computedInterval))
|
|
// Now, toStartOfInterval will provide an incorrect value. We
|
|
// compute a correction offset. Go's truncate seems to
|
|
// be different from what we expect.
|
|
computedIntervalOffset := start.UTC().Sub(
|
|
time.Unix(start.UTC().Unix()/
|
|
int64(computedInterval.Seconds())*
|
|
int64(computedInterval.Seconds()), 0))
|
|
diffOffset := uint64(computedInterval.Seconds()) - uint64(computedIntervalOffset.Seconds())
|
|
|
|
// Compute all strings
|
|
timefilterStart := fmt.Sprintf(`toDateTime('%s', 'UTC')`, start.UTC().Format("2006-01-02 15:04:05"))
|
|
timefilterEnd := fmt.Sprintf(`toDateTime('%s', 'UTC')`, end.UTC().Format("2006-01-02 15:04:05"))
|
|
timefilter := fmt.Sprintf(`TimeReceived BETWEEN %s AND %s`, timefilterStart, timefilterEnd)
|
|
var units string
|
|
switch input.Units {
|
|
case "pps":
|
|
units = `SUM(Packets*SamplingRate)`
|
|
case "l3bps":
|
|
units = `SUM(Bytes*SamplingRate*8)`
|
|
case "l2bps":
|
|
// For each packet, we add the Ethernet header (14 bytes), the FCS (4
|
|
// bytes), the preamble and start frame delimiter (8 bytes) and the IPG
|
|
// (~ 12 bytes). We don't include the VLAN header (4 bytes) as it is
|
|
// often not used with external entities. Both sFlow and IPFIX may have
|
|
// a better view of that, but we don't collect it yet.
|
|
units = `SUM((Bytes+38*Packets)*SamplingRate*8)`
|
|
case "inl2%":
|
|
// That's like l2bps, but this time we use the interface speed to get a
|
|
// percent value
|
|
units = `ifNotFinite(SUM((Bytes+38*Packets)*SamplingRate*8*100/(InIfSpeed*1000000))/COUNT(DISTINCT ExporterAddress, InIfName),0)`
|
|
case "outl2%":
|
|
// Same but using output interface as reference
|
|
units = `ifNotFinite(SUM((Bytes+38*Packets)*SamplingRate*8*100/(OutIfSpeed*1000000))/COUNT(DISTINCT ExporterAddress, OutIfName),0)`
|
|
}
|
|
|
|
c.metrics.clickhouseQueries.WithLabelValues(table).Inc()
|
|
|
|
context := context{
|
|
Table: table,
|
|
Timefilter: timefilter,
|
|
TimefilterStart: timefilterStart,
|
|
TimefilterEnd: timefilterEnd,
|
|
Units: units,
|
|
Interval: uint64(computedInterval.Seconds()),
|
|
ToStartOfInterval: fmt.Sprintf(
|
|
`toStartOfInterval(%s + INTERVAL %d second, INTERVAL %d second) - INTERVAL %d second`,
|
|
"TimeReceived",
|
|
diffOffset,
|
|
uint64(computedInterval.Seconds()),
|
|
diffOffset),
|
|
}
|
|
|
|
t := template.Must(template.New("query").
|
|
Option("missingkey=error").
|
|
Parse(strings.TrimSpace(query.Template)))
|
|
buf := bytes.NewBufferString("")
|
|
if err := t.Execute(buf, context); err != nil {
|
|
c.r.Err(err).Str("query", query.Template).Msg("invalid query")
|
|
panic(err)
|
|
}
|
|
return buf.String()
|
|
}
|
|
|
|
func (c *Component) computeTableAndInterval(input inputContext) (string, time.Duration, time.Duration) {
|
|
targetInterval := time.Duration(uint64(input.End.Sub(input.Start)) / uint64(input.Points))
|
|
targetInterval = max(targetInterval, time.Second)
|
|
|
|
// Select table
|
|
targetIntervalForTableSelection := targetInterval
|
|
if input.MainTableRequired {
|
|
return "flows", time.Second, targetInterval
|
|
}
|
|
startForTableSelection := input.Start
|
|
if input.StartForTableSelection != nil {
|
|
startForTableSelection = *input.StartForTableSelection
|
|
}
|
|
table, computedInterval := c.getBestTable(startForTableSelection, targetIntervalForTableSelection)
|
|
return table, computedInterval, targetInterval
|
|
}
|
|
|
|
// Get the best table starting at the specified time.
|
|
func (c *Component) getBestTable(start time.Time, targetInterval time.Duration) (string, time.Duration) {
|
|
c.flowsTablesLock.RLock()
|
|
defer c.flowsTablesLock.RUnlock()
|
|
|
|
table := "flows"
|
|
computedInterval := time.Second
|
|
if len(c.flowsTables) > 0 {
|
|
// We can use the consolidated data. The first
|
|
// criteria is to find the tables matching the time
|
|
// criteria.
|
|
candidates := []int{}
|
|
for idx, table := range c.flowsTables {
|
|
if start.After(table.Oldest.Add(table.Resolution)) {
|
|
candidates = append(candidates, idx)
|
|
}
|
|
}
|
|
if len(candidates) == 0 {
|
|
// No candidate, fallback to the one with oldest data
|
|
best := 0
|
|
for idx, table := range c.flowsTables {
|
|
if c.flowsTables[best].Oldest.After(table.Oldest.Add(table.Resolution)) {
|
|
best = idx
|
|
}
|
|
}
|
|
candidates = []int{best}
|
|
// Add other candidates that are not far off in term of oldest data
|
|
for idx, table := range c.flowsTables {
|
|
if idx == best {
|
|
continue
|
|
}
|
|
if c.flowsTables[best].Oldest.After(table.Oldest) {
|
|
candidates = append(candidates, idx)
|
|
}
|
|
}
|
|
}
|
|
sort.Slice(candidates, func(i, j int) bool {
|
|
return c.flowsTables[candidates[i]].Resolution < c.flowsTables[candidates[j]].Resolution
|
|
})
|
|
// If possible, use the first resolution before the target interval
|
|
for len(candidates) > 1 {
|
|
if c.flowsTables[candidates[1]].Resolution <= targetInterval {
|
|
candidates = candidates[1:]
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
table = c.flowsTables[candidates[0]].Name
|
|
computedInterval = c.flowsTables[candidates[0]].Resolution
|
|
}
|
|
if computedInterval < time.Second {
|
|
computedInterval = time.Second
|
|
}
|
|
return table, computedInterval
|
|
}
|