Files
akvorado/console/clickhouse_test.go
Vincent Bernat 28783ff4f3 orchestrator/clickhouse: add support for distributed/replicated tables
Fix #605

All MergeTree tables are now replicated.

For some tables, a `_local` variant is added and the non-`_local`
variant is now distributed. The distributed tables are the `flows`
table, the `flows_DDDD` tables (where `DDDD` is a duration), as well as
the `flows_raw_errors` table. The `exporters` table is not distributed
and stays local.

The data is following this schema:

- data is coming from `flows_HHHH_raw` table, using the Kafka engine

- the `flows_HHHH_raw_consumer` reads data from `flows_HHHH_raw` (local)
  and sends it to `flows` (distributed) when there is no error

- the `flows_raw_errors_consumer` reads data from
  `flows_HHHH_raw` (local) and sends it to
  `flows_raw_errors` (distributed)

- the `flows_DDDD_consumer` reads fata from `flows_local` (local) and
  sends it to `flow_DDDD_local` (local)

- the `exporters_consumer` reads data from `flows` (distributed) and
  sends it to `exporters` (local)

The reason for `flows_HHHH_raw_consumer` to send data to the distributed
`flows` table, and not the local one is to ensure flows are
balanced (for example, if there is not enough Kafka partitions). But
sending it to `flows_local` would have been possible.

On the other hand, it is important for `flows_DDDD_consumer` to read
from local to avoid duplication. It could have sent to distributed, but
the data is now balanced correctly and we just send it to local instead
for better performance.

The `exporters_consumer` is allowed to read from the distributed `flows`
table because it writes the result to the local `exporters` table.
2024-04-04 22:03:12 +02:00

383 lines
16 KiB
Go

// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package console
import (
"fmt"
"testing"
"time"
"akvorado/common/helpers"
"go.uber.org/mock/gomock"
)
func TestRefreshFlowsTables(t *testing.T) {
c, _, mockConn, _ := NewMock(t, DefaultConfiguration())
mockConn.EXPECT().
Select(gomock.Any(), gomock.Any(), `
SELECT name
FROM system.tables
WHERE database=currentDatabase()
AND table LIKE 'flows%'
AND table NOT LIKE '%_local'
AND table != 'flows_raw_errors'
AND (engine LIKE '%MergeTree' OR engine = 'Distributed')
`).
Return(nil).
SetArg(1, []struct {
Name string `ch:"name"`
}{
{"flows"},
{"flows_1h0m0s"},
{"flows_1m0s"},
{"flows_5m0s"},
})
mockConn.EXPECT().
Select(gomock.Any(), gomock.Any(), `SELECT MIN(TimeReceived) AS t FROM flows`).
Return(nil).
SetArg(1, []struct {
T time.Time `ch:"t"`
}{{time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC)}})
mockConn.EXPECT().
Select(gomock.Any(), gomock.Any(), `SELECT MIN(TimeReceived) AS t FROM flows_1h0m0s`).
Return(nil).
SetArg(1, []struct {
T time.Time `ch:"t"`
}{{time.Date(2022, 1, 10, 15, 45, 10, 0, time.UTC)}})
mockConn.EXPECT().
Select(gomock.Any(), gomock.Any(), `SELECT MIN(TimeReceived) AS t FROM flows_1m0s`).
Return(nil).
SetArg(1, []struct {
T time.Time `ch:"t"`
}{{time.Date(2022, 4, 20, 15, 45, 10, 0, time.UTC)}})
mockConn.EXPECT().
Select(gomock.Any(), gomock.Any(), `SELECT MIN(TimeReceived) AS t FROM flows_5m0s`).
Return(nil).
SetArg(1, []struct {
T time.Time `ch:"t"`
}{{time.Date(2022, 2, 10, 15, 45, 10, 0, time.UTC)}})
if err := c.refreshFlowsTables(); err != nil {
t.Fatalf("refreshFlowsTables() error:\n%+v", err)
}
expected := []flowsTable{
{"flows", time.Duration(0), time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC)},
{"flows_1h0m0s", time.Hour, time.Date(2022, 1, 10, 15, 45, 10, 0, time.UTC)},
{"flows_1m0s", time.Minute, time.Date(2022, 4, 20, 15, 45, 10, 0, time.UTC)},
{"flows_5m0s", 5 * time.Minute, time.Date(2022, 2, 10, 15, 45, 10, 0, time.UTC)},
}
if diff := helpers.Diff(c.flowsTables, expected); diff != "" {
t.Fatalf("refreshFlowsTables() diff:\n%s", diff)
}
}
func TestFinalizeQuery(t *testing.T) {
cases := []struct {
Description string
Tables []flowsTable
Query string
Context inputContext
Expected string
}{
{
Description: "simple query without additional tables",
Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}",
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 86400,
},
Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')",
}, {
Description: "query with source port",
Query: "SELECT TimeReceived, SrcPort FROM {{ .Table }} WHERE {{ .Timefilter }}",
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
MainTableRequired: true,
Points: 86400,
},
Expected: "SELECT TimeReceived, SrcPort FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')",
}, {
Description: "only flows table available",
Tables: []flowsTable{{"flows", 0, time.Date(2022, 3, 10, 15, 45, 10, 0, time.UTC)}},
Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}",
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 86400,
},
Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')",
}, {
Description: "timefilter.Start and timefilter.Stop",
Tables: []flowsTable{{"flows", 0, time.Date(2022, 3, 10, 15, 45, 10, 0, time.UTC)}},
Query: "SELECT {{ .TimefilterStart }}, {{ .TimefilterEnd }}",
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 86400,
},
Expected: "SELECT toDateTime('2022-04-10 15:45:10', 'UTC'), toDateTime('2022-04-11 15:45:10', 'UTC')",
}, {
Description: "only flows table and out of range request",
Tables: []flowsTable{{"flows", 0, time.Date(2022, 4, 10, 22, 45, 10, 0, time.UTC)}},
Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}",
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 86400,
},
Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')",
}, {
Description: "select consolidated table",
Tables: []flowsTable{
{"flows", 0, time.Date(2022, 3, 10, 22, 45, 10, 0, time.UTC)},
{"flows_1m0s", time.Minute, time.Date(2022, 4, 2, 22, 45, 10, 0, time.UTC)},
},
Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }} // {{ .Interval }}",
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 720, // 2-minute resolution
},
Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC') // 120",
}, {
Description: "select consolidated table out of range",
Tables: []flowsTable{
{"flows", 0, time.Date(2022, 4, 10, 22, 45, 10, 0, time.UTC)},
{"flows_1m0s", time.Minute, time.Date(2022, 4, 10, 17, 45, 10, 0, time.UTC)},
},
Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}",
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 720, // 2-minute resolution,
},
Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC')",
}, {
Description: "select flows table out of range",
Tables: []flowsTable{
{"flows", 0, time.Date(2022, 4, 10, 16, 45, 10, 0, time.UTC)},
{"flows_1m0s", time.Minute, time.Date(2022, 4, 10, 17, 45, 10, 0, time.UTC)},
},
Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}",
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 720, // 2-minute resolution,
},
Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC')",
}, {
Description: "use flows table for resolution (control for next case)",
Tables: []flowsTable{
{"flows", 0, time.Date(2022, 4, 10, 10, 45, 10, 0, time.UTC)},
{"flows_1m0s", time.Minute, time.Date(2022, 3, 10, 10, 45, 10, 0, time.UTC)},
},
Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }} // {{ .Interval }}",
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 2880, // 30-second resolution
},
Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC') // 30",
}, {
Description: "use flows table for resolution (but flows_1m0s for data)",
Tables: []flowsTable{
{"flows", 0, time.Date(2022, 4, 10, 10, 45, 10, 0, time.UTC)},
{"flows_1m0s", time.Minute, time.Date(2022, 3, 10, 10, 45, 10, 0, time.UTC)},
},
Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }} // {{ .Interval }}",
Context: inputContext{
Start: time.Date(2022, 3, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 3, 11, 15, 45, 10, 0, time.UTC),
StartForInterval: func() *time.Time {
t := time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC)
return &t
}(),
Points: 2880, // 30-second resolution
},
Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-03-10 15:45:10', 'UTC') AND toDateTime('2022-03-11 15:45:10', 'UTC') // 30",
}, {
Description: "select flows table with better resolution",
Tables: []flowsTable{
{"flows", 0, time.Date(2022, 3, 10, 16, 45, 10, 0, time.UTC)},
{"flows_1m0s", time.Minute, time.Date(2022, 3, 10, 17, 45, 10, 0, time.UTC)},
},
Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }} // {{ .Interval }}",
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 2880,
},
Expected: "SELECT 1 FROM flows WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:10', 'UTC') AND toDateTime('2022-04-11 15:45:10', 'UTC') // 30",
}, {
Description: "select consolidated table with better resolution",
Tables: []flowsTable{
{"flows", 0, time.Date(2022, 3, 10, 22, 45, 10, 0, time.UTC)},
{"flows_5m0s", 5 * time.Minute, time.Date(2022, 4, 2, 22, 45, 10, 0, time.UTC)},
{"flows_1m0s", time.Minute, time.Date(2022, 4, 2, 22, 45, 10, 0, time.UTC)},
},
Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }} // {{ .Interval }}",
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 720, // 2-minute resolution,
},
Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC') // 120",
}, {
Description: "select consolidated table with better range",
Tables: []flowsTable{
{"flows", 0, time.Date(2022, 4, 10, 22, 45, 10, 0, time.UTC)},
{"flows_5m0s", 5 * time.Minute, time.Date(2022, 4, 2, 22, 45, 10, 0, time.UTC)},
{"flows_1m0s", time.Minute, time.Date(2022, 4, 10, 22, 45, 10, 0, time.UTC)},
},
Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}",
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 46, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 46, 10, 0, time.UTC),
Points: 720, // 2-minute resolution,
},
Expected: "SELECT 1 FROM flows_5m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:45:00', 'UTC') AND toDateTime('2022-04-11 15:45:00', 'UTC')",
}, {
Description: "select best resolution when equality for oldest data",
Tables: []flowsTable{
{"flows", 0, time.Date(2022, 4, 10, 22, 40, 55, 0, time.UTC)},
{"flows_1m0s", time.Minute, time.Date(2022, 4, 10, 22, 40, 0, 0, time.UTC)},
{"flows_1h0m0s", time.Hour, time.Date(2022, 4, 10, 22, 0, 10, 0, time.UTC)},
},
Query: "SELECT 1 FROM {{ .Table }} WHERE {{ .Timefilter }}",
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 46, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 46, 10, 0, time.UTC),
Points: 720, // 2-minute resolution,
},
Expected: "SELECT 1 FROM flows_1m0s WHERE TimeReceived BETWEEN toDateTime('2022-04-10 15:46:00', 'UTC') AND toDateTime('2022-04-11 15:46:00', 'UTC')",
}, {
Description: "query with escaped template",
Query: `SELECT TimeReceived, SrcPort WHERE InIfDescription = '{{"{{"}} hello }}'`,
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 86400,
},
Expected: `SELECT TimeReceived, SrcPort WHERE InIfDescription = '{{ hello }}'`,
}, {
Description: "use of ToStartOfInterval",
Query: `{{ call .ToStartOfInterval "TimeReceived" }}`,
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 720,
},
Expected: `toStartOfInterval(TimeReceived + INTERVAL 50 second, INTERVAL 120 second) - INTERVAL 50 second`,
}, {
Description: "Small interval outside main table expiration",
Query: "SELECT InIfProvider FROM {{ .Table }}",
Tables: []flowsTable{
{"flows", time.Duration(0), time.Date(2022, 11, 6, 12, 0, 0, 0, time.UTC)},
{"flows_1h0m0s", time.Hour, time.Date(2022, 4, 25, 18, 0, 0, 0, time.UTC)},
{"flows_1m0s", time.Minute, time.Date(2022, 11, 14, 12, 0, 0, 0, time.UTC)},
{"flows_5m0s", 5 * time.Minute, time.Date(2022, 8, 23, 12, 0, 0, 0, time.UTC)},
},
Context: inputContext{
Start: time.Date(2022, 10, 30, 1, 0, 0, 0, time.UTC),
End: time.Date(2022, 10, 30, 12, 0, 0, 0, time.UTC),
Points: 200,
},
Expected: "SELECT InIfProvider FROM flows_5m0s",
},
}
c, _, _, _ := NewMock(t, DefaultConfiguration())
for _, tc := range cases {
t.Run(tc.Description, func(t *testing.T) {
c.flowsTables = tc.Tables
got := c.finalizeQuery(
fmt.Sprintf(`{{ with %s }}%s{{ end }}`, templateContext(tc.Context), tc.Query))
if diff := helpers.Diff(got, tc.Expected); diff != "" {
t.Fatalf("finalizeQuery(): (-got, +want):\n%s", diff)
}
})
}
}
func TestComputeBestTableAndInterval(t *testing.T) {
cases := []struct {
Description string
Tables []flowsTable
Context inputContext
Expected tableIntervalOutput
}{
{
Description: "simple query without additional tables",
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 86400,
},
Expected: tableIntervalOutput{Table: "flows", Interval: 1},
}, {
Description: "query with main table",
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
MainTableRequired: true,
Points: 86400,
},
Expected: tableIntervalOutput{Table: "flows", Interval: 1},
}, {
Description: "only flows table available",
Tables: []flowsTable{{"flows", 0, time.Date(2022, 3, 10, 15, 45, 10, 0, time.UTC)}},
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 86400,
},
Expected: tableIntervalOutput{Table: "flows", Interval: 1},
}, {
Description: "select flows table out of range",
Tables: []flowsTable{
{"flows", 0, time.Date(2022, 4, 10, 16, 45, 10, 0, time.UTC)},
{"flows_1m0s", time.Minute, time.Date(2022, 4, 10, 17, 45, 10, 0, time.UTC)},
},
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 720, // 2-minute resolution,
},
Expected: tableIntervalOutput{Table: "flows", Interval: 1},
}, {
Description: "select consolidated table with better resolution",
Tables: []flowsTable{
{"flows", 0, time.Date(2022, 3, 10, 22, 45, 10, 0, time.UTC)},
{"flows_5m0s", 5 * time.Minute, time.Date(2022, 4, 2, 22, 45, 10, 0, time.UTC)},
{"flows_1m0s", time.Minute, time.Date(2022, 4, 2, 22, 45, 10, 0, time.UTC)},
},
Context: inputContext{
Start: time.Date(2022, 4, 10, 15, 45, 10, 0, time.UTC),
End: time.Date(2022, 4, 11, 15, 45, 10, 0, time.UTC),
Points: 720, // 2-minute resolution,
},
Expected: tableIntervalOutput{Table: "flows_1m0s", Interval: 60},
},
}
c, _, _, _ := NewMock(t, DefaultConfiguration())
for _, tc := range cases {
t.Run(tc.Description, func(t *testing.T) {
c.flowsTables = tc.Tables
table, interval, _ := c.computeTableAndInterval(
tc.Context)
got := tableIntervalOutput{
Table: table,
Interval: uint64(interval.Seconds()),
}
if diff := helpers.Diff(got, tc.Expected); diff != "" {
t.Fatalf("ComputeBestTableAndInterval(): (-got, +want):\n%s", diff)
}
})
}
}