Files
akvorado/console/clickhouse.go
Vincent Bernat 28783ff4f3 orchestrator/clickhouse: add support for distributed/replicated tables
Fix #605

All MergeTree tables are now replicated.

For some tables, a `_local` variant is added and the non-`_local`
variant is now distributed. The distributed tables are the `flows`
table, the `flows_DDDD` tables (where `DDDD` is a duration), as well as
the `flows_raw_errors` table. The `exporters` table is not distributed
and stays local.

The data is following this schema:

- data is coming from `flows_HHHH_raw` table, using the Kafka engine

- the `flows_HHHH_raw_consumer` reads data from `flows_HHHH_raw` (local)
  and sends it to `flows` (distributed) when there is no error

- the `flows_raw_errors_consumer` reads data from
  `flows_HHHH_raw` (local) and sends it to
  `flows_raw_errors` (distributed)

- the `flows_DDDD_consumer` reads fata from `flows_local` (local) and
  sends it to `flow_DDDD_local` (local)

- the `exporters_consumer` reads data from `flows` (distributed) and
  sends it to `exporters` (local)

The reason for `flows_HHHH_raw_consumer` to send data to the distributed
`flows` table, and not the local one is to ensure flows are
balanced (for example, if there is not enough Kafka partitions). But
sending it to `flows_local` would have been possible.

On the other hand, it is important for `flows_DDDD_consumer` to read
from local to avoid duplication. It could have sent to distributed, but
the data is now balanced correctly and we just send it to local instead
for better performance.

The `exporters_consumer` is allowed to read from the distributed `flows`
table because it writes the result to the local `exporters` table.
2024-04-04 22:03:12 +02:00

289 lines
9.1 KiB
Go

// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package console
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"sort"
"strings"
"text/template"
"time"
"akvorado/console/query"
)
// flowsTable describe a consolidated or unconsolidated flows table.
type flowsTable struct {
Name string
Resolution time.Duration
Oldest time.Time
}
// refreshFlowsTables refreshes the information we have about flows
// tables (live one and consolidated ones). This information includes
// the consolidation interval and the oldest available data.
func (c *Component) refreshFlowsTables() error {
ctx := c.t.Context(nil)
var tables []struct {
Name string `ch:"name"`
}
err := c.d.ClickHouseDB.Select(ctx, &tables, `
SELECT name
FROM system.tables
WHERE database=currentDatabase()
AND table LIKE 'flows%'
AND table NOT LIKE '%_local'
AND table != 'flows_raw_errors'
AND (engine LIKE '%MergeTree' OR engine = 'Distributed')
`)
if err != nil {
return fmt.Errorf("cannot query flows table metadata: %w", err)
}
newFlowsTables := []flowsTable{}
for _, table := range tables {
// Parse resolution
resolution := time.Duration(0)
if strings.HasPrefix(table.Name, "flows_") {
var err error
resolution, err = time.ParseDuration(strings.TrimPrefix(table.Name, "flows_"))
if err != nil {
c.r.Err(err).Msgf("cannot parse duration for table %s", table.Name)
continue
}
}
// Get oldest timestamp
var oldest []struct {
T time.Time `ch:"t"`
}
err := c.d.ClickHouseDB.Conn.Select(ctx, &oldest,
fmt.Sprintf(`SELECT MIN(TimeReceived) AS t FROM %s`, table.Name))
if err != nil {
return fmt.Errorf("cannot query table %s for oldest timestamp: %w", table.Name, err)
}
newFlowsTables = append(newFlowsTables, flowsTable{
Name: table.Name,
Resolution: resolution,
Oldest: oldest[0].T,
})
}
if len(newFlowsTables) == 0 {
return errors.New("no flows table present (yet?)")
}
c.flowsTablesLock.Lock()
c.flowsTables = newFlowsTables
c.flowsTablesLock.Unlock()
return nil
}
// finalizeQuery builds the finalized query. A single "context"
// function is provided to return a `Context` struct with all the
// information needed.
func (c *Component) finalizeQuery(query string) string {
t := template.Must(template.New("query").
Funcs(template.FuncMap{
"context": c.contextFunc,
}).
Option("missingkey=error").
Parse(strings.TrimSpace(query)))
buf := bytes.NewBufferString("")
if err := t.Execute(buf, nil); err != nil {
c.r.Err(err).Str("query", query).Msg("invalid query")
panic(err)
}
return buf.String()
}
type inputContext struct {
Start time.Time `json:"start"`
End time.Time `json:"end"`
StartForInterval *time.Time `json:"start-for-interval,omitempty"`
MainTableRequired bool `json:"main-table-required,omitempty"`
Points uint `json:"points"`
Units string `json:"units,omitempty"`
}
type context struct {
Table string
Timefilter string
TimefilterStart string
TimefilterEnd string
Units string
Interval uint64
ToStartOfInterval func(string) string
}
// templateEscape escapes `{{` and `}}` from a string. In fact, only
// the opening tag needs to be escaped.
func templateEscape(input string) string {
return strings.ReplaceAll(input, `{{`, `{{"{{"}}`)
}
// templateWhere transforms a filter to a WHERE clause
func templateWhere(qf query.Filter) string {
if qf.Direct() == "" {
return `{{ .Timefilter }}`
}
return fmt.Sprintf(`{{ .Timefilter }} AND (%s)`, templateEscape(qf.Direct()))
}
// templateTable builds a template directive to select the right table
func templateContext(context inputContext) string {
encoded, err := json.Marshal(context)
if err != nil {
panic(err)
}
return fmt.Sprintf("context `%s`", string(encoded))
}
func (c *Component) contextFunc(inputStr string) context {
var input inputContext
if err := json.Unmarshal([]byte(inputStr), &input); err != nil {
panic(err)
}
table, computedInterval, targetInterval := c.computeTableAndInterval(input)
// Make start/end match the computed interval (currently equal to the table resolution)
start := input.Start.Truncate(computedInterval)
end := input.End.Truncate(computedInterval)
// Adapt the computed interval to match the target one more closely
if targetInterval > computedInterval {
computedInterval = targetInterval.Truncate(computedInterval)
}
// Adapt end to ensure we get a full interval
end = start.Add(end.Sub(start).Truncate(computedInterval))
// Now, toStartOfInterval will provide an incorrect value. We
// compute a correction offset. Go's truncate seems to
// be different from what we expect.
computedIntervalOffset := start.UTC().Sub(
time.Unix(start.UTC().Unix()/
int64(computedInterval.Seconds())*
int64(computedInterval.Seconds()), 0))
diffOffset := uint64(computedInterval.Seconds()) - uint64(computedIntervalOffset.Seconds())
// Compute all strings
timefilterStart := fmt.Sprintf(`toDateTime('%s', 'UTC')`, start.UTC().Format("2006-01-02 15:04:05"))
timefilterEnd := fmt.Sprintf(`toDateTime('%s', 'UTC')`, end.UTC().Format("2006-01-02 15:04:05"))
timefilter := fmt.Sprintf(`TimeReceived BETWEEN %s AND %s`, timefilterStart, timefilterEnd)
var units string
switch input.Units {
case "pps":
units = `SUM(Packets*SamplingRate)`
case "l3bps":
units = `SUM(Bytes*SamplingRate*8)`
case "l2bps":
// For each packet, we add the Ethernet header (14 bytes), the FCS (4
// bytes), the preamble and start frame delimiter (8 bytes) and the IPG
// (~ 12 bytes). We don't include the VLAN header (4 bytes) as it is
// often not used with external entities. Both sFlow and IPFIX may have
// a better view of that, but we don't collect it yet.
units = `SUM((Bytes+38*Packets)*SamplingRate*8)`
case "inl2%":
// That's like l2bps, but this time we use the interface speed to get a
// percent value
units = `ifNotFinite(SUM((Bytes+38*Packets)*SamplingRate*8*100/(InIfSpeed*1000000))/COUNT(DISTINCT ExporterAddress, InIfName),0)`
case "outl2%":
// Same but using output interface as reference
units = `ifNotFinite(SUM((Bytes+38*Packets)*SamplingRate*8*100/(OutIfSpeed*1000000))/COUNT(DISTINCT ExporterAddress, OutIfName),0)`
}
c.metrics.clickhouseQueries.WithLabelValues(table).Inc()
return context{
Table: table,
Timefilter: timefilter,
TimefilterStart: timefilterStart,
TimefilterEnd: timefilterEnd,
Units: units,
Interval: uint64(computedInterval.Seconds()),
ToStartOfInterval: func(field string) string {
return fmt.Sprintf(
`toStartOfInterval(%s + INTERVAL %d second, INTERVAL %d second) - INTERVAL %d second`,
field,
diffOffset,
uint64(computedInterval.Seconds()),
diffOffset)
},
}
}
func (c *Component) computeTableAndInterval(input inputContext) (string, time.Duration, time.Duration) {
targetInterval := time.Duration(uint64(input.End.Sub(input.Start)) / uint64(input.Points))
if targetInterval < time.Second {
targetInterval = time.Second
}
// Select table
targetIntervalForTableSelection := targetInterval
if input.MainTableRequired {
targetIntervalForTableSelection = time.Second
}
table, computedInterval := c.getBestTable(input.Start, targetIntervalForTableSelection)
if input.StartForInterval != nil {
_, computedInterval = c.getBestTable(*input.StartForInterval, targetIntervalForTableSelection)
}
return table, computedInterval, targetInterval
}
// Get the best table starting at the specified time.
func (c *Component) getBestTable(start time.Time, targetInterval time.Duration) (string, time.Duration) {
c.flowsTablesLock.RLock()
defer c.flowsTablesLock.RUnlock()
table := "flows"
computedInterval := time.Second
if len(c.flowsTables) > 0 {
// We can use the consolidated data. The first
// criteria is to find the tables matching the time
// criteria.
candidates := []int{}
for idx, table := range c.flowsTables {
if start.After(table.Oldest.Add(table.Resolution)) {
candidates = append(candidates, idx)
}
}
if len(candidates) == 0 {
// No candidate, fallback to the one with oldest data
best := 0
for idx, table := range c.flowsTables {
if c.flowsTables[best].Oldest.After(table.Oldest.Add(table.Resolution)) {
best = idx
}
}
candidates = []int{best}
// Add other candidates that are not far off in term of oldest data
for idx, table := range c.flowsTables {
if idx == best {
continue
}
if c.flowsTables[best].Oldest.After(table.Oldest) {
candidates = append(candidates, idx)
}
}
}
sort.Slice(candidates, func(i, j int) bool {
return c.flowsTables[candidates[i]].Resolution < c.flowsTables[candidates[j]].Resolution
})
// If possible, use the first resolution before the target interval
for len(candidates) > 1 {
if c.flowsTables[candidates[1]].Resolution < targetInterval {
candidates = candidates[1:]
} else {
break
}
}
table = c.flowsTables[candidates[0]].Name
computedInterval = c.flowsTables[candidates[0]].Resolution
}
if computedInterval < time.Second {
computedInterval = time.Second
}
return table, computedInterval
}