flow: introduce versioned flows

We need to version flow schemas. Otherwise, this won't be manageable.
Confluent is pushing for a registry, but it seems the ecosystem is
still too young. Let's version on our side with a topic for each
version.
This commit is contained in:
Vincent Bernat
2022-03-20 21:50:48 +01:00
parent be0d510b12
commit 41131fca96
20 changed files with 222 additions and 112 deletions

2
.gitignore vendored
View File

@@ -1,4 +1,4 @@
/bin/
/test/
/flow/flow.pb.go
/flow/flow*.pb.go
/web/data

View File

@@ -13,7 +13,7 @@ M = $(shell printf "\033[34;1m▶\033[0m")
export GO111MODULE=on
GENERATED = flow/flow.pb.go web/data
GENERATED = flow/flow-0.pb.go web/data
.PHONY: all
all: fmt lint $(GENERATED) | $(BIN) ; $(info $(M) building executable) @ ## Build program binary

View File

@@ -4,14 +4,24 @@ import (
"embed"
"fmt"
"net/http"
"text/template"
"time"
"akvorado/flow"
)
//go:embed data/protocols.csv
//go:embed data/asns.csv
var data embed.FS
var (
//go:embed data/protocols.csv
//go:embed data/asns.csv
data embed.FS
initShTemplate = template.Must(template.New("initsh").Parse(`#!/bin/sh
{{ range $version, $schema := . }}
cat > /var/lib/clickhouse/format_schemas/flow-{{ $version }}.proto <<'EOPROTO'
{{ $schema }}
EOPROTO
{{ end }}
`))
)
func (c *Component) addHandlerEmbedded(url string, path string) {
c.d.HTTP.AddHandler(url,
@@ -29,15 +39,10 @@ func (c *Component) addHandlerEmbedded(url string, path string) {
// registerHTTPHandler register some handlers that will be useful for
// Clickhouse
func (c *Component) registerHTTPHandlers() error {
c.d.HTTP.AddHandler("/api/v0/clickhouse/flow.proto", flow.FlowProtoHandler)
c.d.HTTP.AddHandler("/api/v0/clickhouse/init.sh",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/x-shellscript")
w.Write([]byte(`#!/bin/sh
cat > /var/lib/clickhouse/format_schemas/flow.proto <<'EOF'
`))
flow.FlowProtoHandler.ServeHTTP(w, r)
w.Write([]byte(`EOF`))
initShTemplate.Execute(w, flow.VersionedSchemas)
}))
entries, err := data.ReadDir("data")

View File

@@ -1,9 +1,6 @@
package clickhouse
import (
"bufio"
"fmt"
netHTTP "net/http"
"testing"
"akvorado/helpers"
@@ -20,11 +17,7 @@ func TestHTTPEndpoints(t *testing.T) {
t.Fatalf("New() error:\n%+v", err)
}
cases := []struct {
URL string
ContentType string
FirstLines []string
}{
cases := helpers.HTTPEndpointCases{
{
URL: "/api/v0/clickhouse/protocols.csv",
ContentType: "text/csv; charset=utf-8",
@@ -40,49 +33,18 @@ func TestHTTPEndpoints(t *testing.T) {
"asn,name",
"1,LVLT-1",
},
}, {
URL: "/api/v0/clickhouse/flow.proto",
ContentType: "text/plain",
FirstLines: []string{
`syntax = "proto3";`,
`package flow;`,
},
}, {
URL: "/api/v0/clickhouse/init.sh",
ContentType: "text/x-shellscript",
FirstLines: []string{
`#!/bin/sh`,
`cat > /var/lib/clickhouse/format_schemas/flow.proto <<'EOF'`,
``,
`cat > /var/lib/clickhouse/format_schemas/flow-0.proto <<'EOPROTO'`,
`syntax = "proto3";`,
`package flow;`,
},
},
}
for _, tc := range cases {
t.Run(tc.URL, func(t *testing.T) {
resp, err := netHTTP.Get(fmt.Sprintf("http://%s%s", c.d.HTTP.Address, tc.URL))
if err != nil {
t.Fatalf("GET %s:\n%+v", tc.URL, err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
t.Fatalf("GET %s: got status code %d, not 200", tc.URL, resp.StatusCode)
}
gotContentType := resp.Header.Get("Content-Type")
if gotContentType != tc.ContentType {
t.Errorf("GET %s Content-Type (-got, +want):\n-%s\n+%s",
tc.URL, gotContentType, tc.ContentType)
}
reader := bufio.NewScanner(resp.Body)
got := []string{}
for reader.Scan() && len(got) < len(tc.FirstLines) {
got = append(got, reader.Text())
}
if diff := helpers.Diff(got, tc.FirstLines); diff != "" {
t.Errorf("GET %s (-got, +want):\n%s", tc.URL, diff)
}
})
}
helpers.TestHTTPEndpoints(t, c.d.HTTP.Address, cases)
}

View File

@@ -71,7 +71,6 @@ func (c *Component) FlowsHTTPHandler() http.Handler {
defer atomic.AddUint32(&c.httpFlowClients, ^uint32(0))
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
encoder := json.NewEncoder(w)
if limit == 1 {
encoder.SetIndent("", " ")

View File

@@ -129,8 +129,9 @@ func TestCore(t *testing.T) {
received := make(chan bool)
kafkaProducer.ExpectInputWithMessageCheckerFunctionAndSucceed(func(msg *sarama.ProducerMessage) error {
defer close(received)
if msg.Topic != "flows" {
t.Errorf("Kafka message topic (-got, +want):\n-%s\n+%s", msg.Topic, "flows")
expectedTopic := fmt.Sprintf("flows-v%d", flow.CurrentSchemaVersion)
if msg.Topic != expectedTopic {
t.Errorf("Kafka message topic (-got, +want):\n-%s\n+%s", msg.Topic, expectedTopic)
}
if msg.Key != sarama.StringEncoder("192.0.2.142") {
t.Errorf("Kafka message key (-got, +want):\n-%s\n+%s", msg.Key, "192.0.2.142")

View File

@@ -30,4 +30,4 @@ services:
- 8123:8123/tcp
- 9000:9000/tcp
volumes:
- ./flow/flow.proto:/var/lib/clickhouse/format_schemas/flow.proto:ro
- ./flow/flow-v0.proto:/var/lib/clickhouse/format_schemas/flow-v0.proto:ro

View File

@@ -106,10 +106,8 @@ automatically refreshed.
## Kafka
Received flows are exported to a Kafka topic using the [protocol
buffers format][]. The definition file is `flow/flow.proto`. It is
also available through the [`/api/v0/flow.proto`](/api/v0/flow.proto)
HTTP endpoint. Each flow is written in the [length-delimited
format][].
buffers format][]. The definition file is `flow/flow-*.proto`. Each
flow is written in the [length-delimited format][].
[protocol buffers format]: https://developers.google.com/protocol-buffers
[length-delimited format]: https://cwiki.apache.org/confluence/display/GEODE/Delimiting+Protobuf+Messages
@@ -131,6 +129,10 @@ The following keys are accepted:
- `compression-codec` defines the compression codec to use to compress
messages (`none`, `gzip`, `snappy`, `lz4` and `zstd`)
The topic name is suffixed by the version of the schema. For example,
if the configured topic is `flows` and the current schema version is
0, the topic used to send received flows will be `flows-v0`.
If no topic configuration is provided, the topic should already exist
in Kafka. If a configuration is provided, the topic is created if it
does not exist or updated if it does. Currently, updating the number

View File

@@ -33,11 +33,14 @@ The remaining steps are outside of *Akvorado* control:
## Flow representation
The flow representation is encoded in the
[`flow.proto`](/api/v0/flow.proto) file. Any information that could
change with time is embedded in the flow. This includes for example
interface names and speeds, as well. This ensures that older data are
not processed using incorrect mappings.
The flow representation is encoded in a versioned `flow-*.proto` file.
Any information that could change with time is embedded in the flow.
This includes for example interface names and speeds, as well. This
ensures that older data are not processed using incorrect mappings.
Each time the schema changes, we issue a new `flow-*.proto` file,
update the schema version and a new Kafka topic will be used. This
ensures we do not mix different schemas in a single topic.
## Programming design

View File

@@ -22,7 +22,6 @@ The embedded HTTP server serves the following endpoints:
- [`/api/v0/version`](/api/v0/version){ target=http }: *Akvorado* version
- [`/api/v0/healthcheck`](/api/v0/healthcheck){ target=http }: are we alive?
- [`/api/v0/flows`](/api/v0/flows?limit=1){ target=http }: next available flow
- [`/api/v0/flow.proto`](/api/v0/flow.proto){ target=http }: protocol buffers definition
- [`/api/v0/grafana`](/api/v0/grafana): Grafana web interface (if configured)
<iframe name="http" style="width: 100%; height: 200px; border: 0; background-color: #1111"></iframe>

View File

@@ -14,13 +14,16 @@ additional configuration entries). If the topic exists, *Akvorado*
won't update the number of partitions and the replication factor but
other settings will be updated.
Each time a new flow schema is needed, a different topic is used.
*Akvorado* suffixes the topic name with the version to ensure this
property.
## Clickhouse
Clickhouse can collect the data from Kafka. To help its configuration,
*Akvorado* exposes a few HTTP endpoint:
- `/api/v0/clickhouse/flow.proto` contains the schema
- `/api/v0/clickhouse/init.sh` contains the schema in the form of a
- `/api/v0/clickhouse/init.sh` contains the schemas in the form of a
script to execute during initialization
- `/api/v0/clickhouse/protocols.csv` contains a CSV with the mapping
between protocol numbers and names

View File

@@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"net"
netHTTP "net/http"
"strconv"
"sync"
"time"
@@ -50,16 +49,6 @@ type Dependencies struct {
HTTP *http.Component
}
var (
//go:embed flow.proto
flowProto []byte
// FlowProtoHandler is an HTTP handler serving flow.proto
FlowProtoHandler = netHTTP.HandlerFunc(func(w netHTTP.ResponseWriter, r *netHTTP.Request) {
w.Header().Set("Content-Type", "text/plain")
w.Write(flowProto)
})
)
// New creates a new flow component.
func New(r *reporter.Reporter, configuration Configuration, dependencies Dependencies) (*Component, error) {
c := Component{
@@ -69,7 +58,7 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
incomingFlows: make(chan *FlowMessage, configuration.BufferLength),
}
c.d.Daemon.Track(&c.t, "flow")
c.d.HTTP.AddHandler("/api/v0/flow.proto", FlowProtoHandler)
c.initHTTP()
c.initMetrics()
return &c, nil
}

View File

@@ -5,7 +5,6 @@ import (
"io/ioutil"
"math/rand"
"net"
netHTTP "net/http"
"path/filepath"
"testing"
"time"
@@ -255,18 +254,3 @@ out4:
t.Fatalf("Metrics after data (-got, +want):\n%s", diff)
}
}
func TestServeProtoFile(t *testing.T) {
r := reporter.NewMock(t)
c := NewMock(t, r, DefaultConfiguration)
// Check the HTTP server is running and answering metrics
resp, err := netHTTP.Get(fmt.Sprintf("http://%s/api/v0/flow.proto", c.d.HTTP.Address))
if err != nil {
t.Fatalf("GET /api/v0/flow.proto:\n%+v", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
t.Fatalf("GET /api/v0/flow.proto: got status code %d, not 200", resp.StatusCode)
}
}

74
flow/schema.go Normal file
View File

@@ -0,0 +1,74 @@
package flow
import (
"embed"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
)
// CurrentSchemaVersion is the version of the protobuf definition
const CurrentSchemaVersion = 0
var (
// VersionedSchemas is a mapping from schema version to protobuf definitions
VersionedSchemas map[int]string
//go:embed flow*.proto
schemas embed.FS
)
func init() {
VersionedSchemas = make(map[int]string)
entries, err := schemas.ReadDir(".")
if err != nil {
panic(err)
}
for _, entry := range entries {
version, err := strconv.Atoi(
strings.TrimPrefix(
strings.TrimSuffix(entry.Name(), ".proto"),
"flow-"))
if err != nil {
panic(err)
}
f, err := schemas.Open(entry.Name())
if err != nil {
panic(err)
}
schema, err := ioutil.ReadAll(f)
if err != nil {
panic(err)
}
VersionedSchemas[version] = string(schema)
}
}
func (c *Component) initHTTP() {
for version, schema := range VersionedSchemas {
c.d.HTTP.AddHandler(fmt.Sprintf("/api/v0/flow/schema-%d.proto", version),
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
w.Write([]byte(schema))
}))
}
c.d.HTTP.AddHandler("/api/v0/flow/schemas.json", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
answer := struct {
CurrentVersion int `json:"current_version"`
Versions map[int]string `json:"versions"`
}{
CurrentVersion: CurrentSchemaVersion,
Versions: map[int]string{},
}
for version := range VersionedSchemas {
answer.Versions[version] = fmt.Sprintf("/api/v0/flow/schema-%d.proto", version)
}
encoder := json.NewEncoder(w)
encoder.SetIndent("", " ")
encoder.Encode(answer)
}))
}

37
flow/schema_test.go Normal file
View File

@@ -0,0 +1,37 @@
package flow
import (
"testing"
"akvorado/helpers"
"akvorado/reporter"
)
func TestHTTPEndpoints(t *testing.T) {
r := reporter.NewMock(t)
c := NewMock(t, r, DefaultConfiguration)
cases := helpers.HTTPEndpointCases{
{
URL: "/api/v0/flow/schema-0.proto",
ContentType: "text/plain",
FirstLines: []string{
`syntax = "proto3";`,
`package flow;`,
},
}, {
URL: "/api/v0/flow/schemas.json",
ContentType: "application/json",
FirstLines: []string{
`{`,
` "current_version": 0,`,
` "versions": {`,
` "0": "/api/v0/flow/schema-0.proto"`,
` }`,
`}`,
},
},
}
helpers.TestHTTPEndpoints(t, c.d.HTTP.Address, cases)
}

View File

@@ -3,9 +3,12 @@
package helpers
import (
"bufio"
"fmt"
"net"
"net/http"
"reflect"
"testing"
"github.com/kylelemons/godebug/pretty"
)
@@ -25,3 +28,39 @@ var prettyC = pretty.Config{
func Diff(a, b interface{}) string {
return prettyC.Compare(a, b)
}
// HTTPEndpointCases describes case for TestHTTPEndpoints
type HTTPEndpointCases []struct {
URL string
ContentType string
FirstLines []string
}
// TestHTTPEndpoints test a few HTTP endpoints
func TestHTTPEndpoints(t *testing.T, serverAddr net.Addr, cases HTTPEndpointCases) {
for _, tc := range cases {
t.Run(tc.URL, func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("http://%s%s", serverAddr, tc.URL))
if err != nil {
t.Fatalf("GET %s:\n%+v", tc.URL, err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
t.Fatalf("GET %s: got status code %d, not 200", tc.URL, resp.StatusCode)
}
gotContentType := resp.Header.Get("Content-Type")
if gotContentType != tc.ContentType {
t.Errorf("GET %s Content-Type (-got, +want):\n-%s\n+%s",
tc.URL, gotContentType, tc.ContentType)
}
reader := bufio.NewScanner(resp.Body)
got := []string{}
for reader.Scan() && len(got) < len(tc.FirstLines) {
got = append(got, reader.Text())
}
if diff := Diff(got, tc.FirstLines); diff != "" {
t.Errorf("GET %s (-got, +want):\n%s", tc.URL, diff)
}
})
}
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/Shopify/sarama"
"akvorado/daemon"
"akvorado/flow"
"akvorado/helpers"
"akvorado/reporter"
)
@@ -106,6 +107,7 @@ func TestRealKafka(t *testing.T) {
configuration.Brokers = brokers
configuration.Version = Version(sarama.V2_8_1_0)
configuration.FlushInterval = 100 * time.Millisecond
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, flow.CurrentSchemaVersion)
r := reporter.NewMock(t)
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
@@ -141,7 +143,7 @@ func TestRealKafka(t *testing.T) {
defer consumer.Close()
var partitions []int32
for {
partitions, err = consumer.Partitions(topicName)
partitions, err = consumer.Partitions(expectedTopicName)
if err != nil {
if errors.Is(err, sarama.ErrUnknownTopicOrPartition) {
// Wait for topic to be available
@@ -151,7 +153,7 @@ func TestRealKafka(t *testing.T) {
}
break
}
partitionConsumer, err := consumer.ConsumePartition(topicName, partitions[0], sarama.OffsetOldest)
partitionConsumer, err := consumer.ConsumePartition(expectedTopicName, partitions[0], sarama.OffsetOldest)
if err != nil {
t.Fatalf("ConsumePartitions() error:\n%+v", err)
}
@@ -182,6 +184,7 @@ func TestTopicCreation(t *testing.T) {
rand.Seed(time.Now().UnixMicro())
topicName := fmt.Sprintf("test-topic-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, flow.CurrentSchemaVersion)
retentionMs := "76548"
segmentBytes := "107374184"
segmentBytes2 := "10737184"
@@ -243,7 +246,7 @@ func TestTopicCreation(t *testing.T) {
if err != nil {
t.Fatalf("ListTopics() error:\n%+v", err)
}
topic, ok := topics[topicName]
topic, ok := topics[expectedTopicName]
if !ok {
t.Fatal("ListTopics() did not find the topic")
}

View File

@@ -13,6 +13,7 @@ import (
"gopkg.in/tomb.v2"
"akvorado/daemon"
"akvorado/flow"
"akvorado/reporter"
)
@@ -23,6 +24,7 @@ type Component struct {
t tomb.Tomb
config Configuration
kafkaTopic string
kafkaConfig *sarama.Config
kafkaProducer sarama.AsyncProducer
createKafkaProducer func() (sarama.AsyncProducer, error)
@@ -36,8 +38,8 @@ type Dependencies struct {
// New creates a new HTTP component.
func New(reporter *reporter.Reporter, configuration Configuration, dependencies Dependencies) (*Component, error) {
sarama.Logger = &kafkaLogger{reporter}
// Build Kafka configuration
sarama.Logger = &kafkaLogger{reporter}
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = sarama.KafkaVersion(configuration.Version)
kafkaConfig.Metadata.AllowAutoTopicCreation = false
@@ -66,6 +68,7 @@ func New(reporter *reporter.Reporter, configuration Configuration, dependencies
config: configuration,
kafkaConfig: kafkaConfig,
kafkaTopic: fmt.Sprintf("%s-v%d", configuration.Topic, flow.CurrentSchemaVersion),
}
c.initMetrics()
c.createKafkaProducer = func() (sarama.AsyncProducer, error) {
@@ -102,22 +105,22 @@ func (c *Component) Start() error {
defer client.Close()
l := c.r.With().
Str("brokers", strings.Join(c.config.Brokers, ",")).
Str("topic", c.config.Topic).
Str("topic", c.kafkaTopic).
Logger()
topics, err := client.ListTopics()
if err != nil {
l.Err(err).Msg("unable to get metadata for topics")
return fmt.Errorf("unable to get metadata for topics: %w", err)
}
if topic, ok := topics[c.config.Topic]; !ok {
if err := client.CreateTopic(c.config.Topic,
if topic, ok := topics[c.kafkaTopic]; !ok {
if err := client.CreateTopic(c.kafkaTopic,
&sarama.TopicDetail{
NumPartitions: c.config.TopicConfiguration.NumPartitions,
ReplicationFactor: c.config.TopicConfiguration.ReplicationFactor,
ConfigEntries: c.config.TopicConfiguration.ConfigEntries,
}, false); err != nil {
l.Err(err).Msg("unable to create topic")
return fmt.Errorf("unable to create topic %q: %w", c.config.Topic, err)
return fmt.Errorf("unable to create topic %q: %w", c.kafkaTopic, err)
}
l.Info().Msg("topic created")
} else {
@@ -129,10 +132,10 @@ func (c *Component) Start() error {
l.Warn().Msgf("mismatch for replication factor: got %d, want %d",
topic.ReplicationFactor, c.config.TopicConfiguration.ReplicationFactor)
}
if err := client.AlterConfig(sarama.TopicResource, c.config.Topic, c.config.TopicConfiguration.ConfigEntries, false); err != nil {
if err := client.AlterConfig(sarama.TopicResource, c.kafkaTopic, c.config.TopicConfiguration.ConfigEntries, false); err != nil {
l.Err(err).Msg("unable to set topic configuration")
return fmt.Errorf("unable to set topic configuration for %q: %w",
c.config.Topic, err)
c.kafkaTopic, err)
}
l.Info().Msg("topic updated")
}
@@ -176,7 +179,7 @@ func (c *Component) Send(sampler string, payload []byte) {
c.metrics.bytesSent.WithLabelValues(sampler).Add(float64(len(payload)))
c.metrics.messagesSent.WithLabelValues(sampler).Inc()
c.kafkaProducer.Input() <- &sarama.ProducerMessage{
Topic: c.config.Topic,
Topic: c.kafkaTopic,
Key: sarama.StringEncoder(sampler),
Value: sarama.ByteEncoder(payload),
}

View File

@@ -18,9 +18,11 @@ func TestKafka(t *testing.T) {
c, mockProducer := NewMock(t, r, DefaultConfiguration)
// Send one message
received := make(chan bool)
mockProducer.ExpectInputWithMessageCheckerFunctionAndSucceed(func(got *sarama.ProducerMessage) error {
defer close(received)
expected := sarama.ProducerMessage{
Topic: "flows",
Topic: "flows-v0",
Key: sarama.StringEncoder("127.0.0.1"),
Value: sarama.ByteEncoder("hello world!"),
Partition: 30,
@@ -31,6 +33,11 @@ func TestKafka(t *testing.T) {
return nil
})
c.Send("127.0.0.1", []byte("hello world!"))
select {
case <-received:
case <-time.After(1 * time.Second):
t.Fatal("Kafka message not received")
}
// Another but with a fail
mockProducer.ExpectInputAndFail(errors.New("noooo"))
@@ -39,9 +46,9 @@ func TestKafka(t *testing.T) {
time.Sleep(10 * time.Millisecond)
gotMetrics := r.GetMetrics("akvorado_kafka_")
expectedMetrics := map[string]string{
`sent_bytes_total{sampler="127.0.0.1"}`: "26",
`errors_total{error="kafka: Failed to produce message to topic flows: noooo"}`: "1",
`sent_messages_total{sampler="127.0.0.1"}`: "2",
`sent_bytes_total{sampler="127.0.0.1"}`: "26",
`errors_total{error="kafka: Failed to produce message to topic flows-v0: noooo"}`: "1",
`sent_messages_total{sampler="127.0.0.1"}`: "2",
}
if diff := helpers.Diff(gotMetrics, expectedMetrics); diff != "" {
t.Fatalf("Metrics (-got, +want):\n%s", diff)