From fa7e4b6ac79ecbc9b74de9966f3457d93540d8f3 Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Fri, 25 Nov 2022 00:41:15 +0100 Subject: [PATCH] inlet/kafka: add TLS support to Kafka Fix #277 --- common/kafka/config.go | 144 +++++++++++++++++++++++++- common/kafka/config_test.go | 138 ++++++++++++++++++++++++ common/kafka/scram.go | 32 ++++++ console/data/docs/02-configuration.md | 22 +++- console/data/docs/99-changelog.md | 1 + go.mod | 3 + go.sum | 3 + inlet/kafka/root.go | 6 +- orchestrator/kafka/root.go | 6 +- 9 files changed, 348 insertions(+), 7 deletions(-) create mode 100644 common/kafka/scram.go diff --git a/common/kafka/config.go b/common/kafka/config.go index f70c0243..90257d4b 100644 --- a/common/kafka/config.go +++ b/common/kafka/config.go @@ -5,7 +5,19 @@ // configuration struture. package kafka -import "github.com/Shopify/sarama" +import ( + "crypto/sha256" + "crypto/sha512" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "os" + + "github.com/Shopify/sarama" + + "akvorado/common/helpers" +) // Configuration defines how we connect to a Kafka cluster. type Configuration struct { @@ -15,6 +27,29 @@ type Configuration struct { Brokers []string `min=1,dive,validate:"listen"` // Version is the version of Kafka we assume to work Version Version + // TLS defines TLS configuration + TLS TLSConfiguration +} + +// TLSConfiguration defines TLS configuration. +type TLSConfiguration struct { + // Enable says if TLS should be used to connect to brokers + Enable bool `validate:"required_with=CAFile CertFile KeyFile Username Password SASLAlgorithm"` + // Verify says if we need to check remote certificates + Verify bool + // CAFile tells the location of the CA certificate to check broker + // certificate. If empty, the system CA certificates are used instead. + CAFile string // no validation as the orchestrator may not have the file + // CertFile tells the location of the user certificate if any. + CertFile string `validate:"required_with=KeyFile"` + // KeyFile tells the location of the user key if any. + KeyFile string + // SASLUsername tells the SASL username + SASLUsername string `validate:"required_with=SASLAlgorithm"` + // SASLPassword tells the SASL password + SASLPassword string `validate:"required_with=SASLAlgorithm SASLUsername"` + // SASLMechanism tells the SASL algorithm + SASLMechanism SASLMechanism `validate:"required_with=SASLUsername"` } // DefaultConfiguration represents the default configuration for connecting to Kafka. @@ -23,6 +58,10 @@ func DefaultConfiguration() Configuration { Topic: "flows", Brokers: []string{"127.0.0.1:9092"}, Version: Version(sarama.V2_8_1_0), + TLS: TLSConfiguration{ + Enable: false, + Verify: true, + }, } } @@ -48,3 +87,106 @@ func (v Version) String() string { func (v Version) MarshalText() ([]byte, error) { return []byte(v.String()), nil } + +// SASLMechanism defines an SASL algorithm +type SASLMechanism int + +const ( + SASLNone SASLMechanism = iota // SASLNone means no user authentication + SASLPlainText // SASLPlainText means user/password in plain text + SASLSCRAMSHA256 // SASLSCRAMSHA256 enables SCRAM challenge with SHA256 + SASLSCRAMSHA512 // SASLSCRAMSHA512 enables SCRAM challenge with SHA512 +) + +var saslAlgorithmMap = helpers.NewBimap(map[SASLMechanism]string{ + SASLNone: "none", + SASLPlainText: "plain", + SASLSCRAMSHA256: "scram-sha256", + SASLSCRAMSHA512: "scram-sha512", +}) + +// MarshalText turns a SASL algorithm to text +func (sa SASLMechanism) MarshalText() ([]byte, error) { + got, ok := saslAlgorithmMap.LoadValue(sa) + if ok { + return []byte(got), nil + } + return nil, errors.New("unknown SASL algorithm") +} + +// String turns a SASL algorithm to string +func (sa SASLMechanism) String() string { + got, _ := saslAlgorithmMap.LoadValue(sa) + return got +} + +// UnmarshalText provides a SASL algorithm from text +func (sa *SASLMechanism) UnmarshalText(input []byte) error { + if len(input) == 0 { + *sa = SASLNone + return nil + } + got, ok := saslAlgorithmMap.LoadKey(string(input)) + if ok { + *sa = got + return nil + } + return errors.New("unknown provider") +} + +// NewConfig returns a Sarama Kafka configuration ready to use. +func NewConfig(config Configuration) (*sarama.Config, error) { + kafkaConfig := sarama.NewConfig() + kafkaConfig.Version = sarama.KafkaVersion(config.Version) + if config.TLS.Enable { + kafkaConfig.Net.TLS.Enable = true + kafkaConfig.Net.TLS.Config = &tls.Config{ + InsecureSkipVerify: !config.TLS.Verify, + } + // Read CA certificate if provided + if config.TLS.CAFile != "" { + caCert, err := os.ReadFile(config.TLS.CAFile) + if err != nil { + return nil, fmt.Errorf("cannot read CA certificate for Kafka: %w", err) + } + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + return nil, errors.New("cannot parse CA certificate for Kafka") + } + kafkaConfig.Net.TLS.Config.RootCAs = caCertPool + } + // Read user certificate if provided + if config.TLS.CertFile != "" { + if config.TLS.KeyFile == "" { + config.TLS.KeyFile = config.TLS.CertFile + } + cert, err := tls.LoadX509KeyPair(config.TLS.CertFile, config.TLS.KeyFile) + if err != nil { + return nil, fmt.Errorf("cannot read user certificate: %w", err) + } + kafkaConfig.Net.TLS.Config.Certificates = []tls.Certificate{cert} + } + // SASL + if config.TLS.SASLUsername != "" { + kafkaConfig.Net.SASL.Enable = true + kafkaConfig.Net.SASL.User = config.TLS.SASLUsername + kafkaConfig.Net.SASL.Password = config.TLS.SASLPassword + kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext + if config.TLS.SASLMechanism == SASLSCRAMSHA256 { + kafkaConfig.Net.SASL.Handshake = true + kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &xdgSCRAMClient{HashGeneratorFcn: sha256.New} + } + } + if config.TLS.SASLMechanism == SASLSCRAMSHA512 { + kafkaConfig.Net.SASL.Handshake = true + kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &xdgSCRAMClient{HashGeneratorFcn: sha512.New} + } + } + } + } + return kafkaConfig, nil +} diff --git a/common/kafka/config_test.go b/common/kafka/config_test.go index df7bef30..1056595c 100644 --- a/common/kafka/config_test.go +++ b/common/kafka/config_test.go @@ -7,6 +7,9 @@ import ( "testing" "akvorado/common/helpers" + + "github.com/Shopify/sarama" + "github.com/gin-gonic/gin" ) func TestDefaultConfiguration(t *testing.T) { @@ -14,3 +17,138 @@ func TestDefaultConfiguration(t *testing.T) { t.Fatalf("validate.Struct() error:\n%+v", err) } } + +func TestKafkaNewConfig(t *testing.T) { + // It is a bit a pain to test the result, just check we don't have an error + cases := []struct { + description string + config Configuration + }{ + { + description: "No TLS", + config: DefaultConfiguration(), + }, { + description: "SASL plain", + config: Configuration{ + TLS: TLSConfiguration{ + Enable: true, + SASLUsername: "hello", + SASLPassword: "password", + }, + }, + }, { + description: "SASL SCRAM SHA256", + config: Configuration{ + TLS: TLSConfiguration{ + Enable: true, + SASLUsername: "hello", + SASLPassword: "password", + SASLMechanism: SASLSCRAMSHA256, + }, + }, + }, { + description: "SASL SCRAM SHA512", + config: Configuration{ + TLS: TLSConfiguration{ + Enable: true, + SASLUsername: "hello", + SASLPassword: "password", + SASLMechanism: SASLSCRAMSHA512, + }, + }, + }, + } + for _, tc := range cases { + t.Run(tc.description, func(t *testing.T) { + kafkaConfig, err := NewConfig(tc.config) + if err != nil { + t.Fatalf("NewConfig() error:\n%+v", err) + } + if err := kafkaConfig.Validate(); err != nil { + t.Fatalf("Validate() error:\n%+v", err) + } + }) + } +} + +func TestTLSConfiguration(t *testing.T) { + helpers.TestConfigurationDecode(t, helpers.ConfigurationDecodeCases{ + { + Description: "no TLS", + Initial: func() interface{} { return DefaultConfiguration() }, + Configuration: func() interface{} { return nil }, + Expected: DefaultConfiguration(), + }, { + Description: "TLS without auth", + Initial: func() interface{} { return DefaultConfiguration() }, + Configuration: func() interface{} { + return gin.H{ + "tls": gin.H{ + "enable": true, + }, + } + }, + Expected: Configuration{ + Topic: "flows", + Brokers: []string{"127.0.0.1:9092"}, + Version: Version(sarama.V2_8_1_0), + TLS: TLSConfiguration{ + Enable: true, + Verify: true, + }, + }, + }, { + Description: "TLS SASL plain, skip cert verification", + Initial: func() interface{} { return DefaultConfiguration() }, + Configuration: func() interface{} { + return gin.H{ + "tls": gin.H{ + "enable": true, + "verify": false, + "sasl-username": "hello", + "sasl-password": "bye", + "sasl-mechanism": "plain", + }, + } + }, + Expected: Configuration{ + Topic: "flows", + Brokers: []string{"127.0.0.1:9092"}, + Version: Version(sarama.V2_8_1_0), + TLS: TLSConfiguration{ + Enable: true, + Verify: false, + SASLUsername: "hello", + SASLPassword: "bye", + SASLMechanism: SASLPlainText, + }, + }, + }, { + Description: "TLS SASL SCRAM 256", + Initial: func() interface{} { return DefaultConfiguration() }, + Configuration: func() interface{} { + return gin.H{ + "tls": gin.H{ + "enable": true, + "sasl-username": "hello", + "sasl-password": "bye", + "sasl-mechanism": "scram-sha256", + }, + } + }, + Expected: Configuration{ + Topic: "flows", + Brokers: []string{"127.0.0.1:9092"}, + Version: Version(sarama.V2_8_1_0), + TLS: TLSConfiguration{ + Enable: true, + Verify: true, + SASLUsername: "hello", + SASLPassword: "bye", + SASLMechanism: SASLSCRAMSHA256, + }, + }, + }, + }) + +} diff --git a/common/kafka/scram.go b/common/kafka/scram.go new file mode 100644 index 00000000..f4ec2483 --- /dev/null +++ b/common/kafka/scram.go @@ -0,0 +1,32 @@ +// SPDX-FileCopyrightText: 2013 Shopify +// SPDX-License-Identifier: MIT + +// From https://github.com/Shopify/sarama/blob/main/examples/sasl_scram_client/scram_client.go + +package kafka + +import "github.com/xdg-go/scram" + +type xdgSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (x *xdgSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +func (x *xdgSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +func (x *xdgSCRAMClient) Done() bool { + return x.ClientConversation.Done() +} diff --git a/console/data/docs/02-configuration.md b/console/data/docs/02-configuration.md index da0e6994..6ee77bca 100644 --- a/console/data/docs/02-configuration.md +++ b/console/data/docs/02-configuration.md @@ -157,8 +157,8 @@ flow is written in the [length-delimited format][]. The following keys are accepted: -- `topic`, `brokers` and `version` keys are described in the - configuration for the [inlet service](#kafka) (the values of these +- `topic`, `brokers`, `tls`, and `version` keys are described in the + configuration for the [orchestrator service](#kafka-1) (the values of these keys come from the orchestrator configuration) - `flush-interval` defines the maximum flush interval to send received flows to Kafka @@ -387,10 +387,28 @@ flows. It accepts the following keys: - `brokers` specifies the list of brokers to use to bootstrap the connection to the Kafka cluster +- `tls` defines the TLS configuration to connect to the cluster - `version` tells which minimal version of Kafka to expect - `topic` defines the base topic name - `topic-configuration` describes how the topic should be configured +The following keys are accepted for the TLS configuration: + +- `enable` should be set to `true` to enable TLS. +- `verify` can be set to `false` to skip checking server certificate (not recommended). +- `ca-file` gives the location of the file containing the CA certificate in PEM + format to check the server certificate. If not provided, the system + certificates are used instead. +- `cert-file` and `key-file` defines the location of the client certificate pair + in PEM format to authenticate to the broker. If the first one is empty, no + client certificate is used. If the second one is empty, the key is expected to + be in the certificate file. +- `sasl-username` and `sasl-password` enables SASL authentication with the + provided user and password. +- `sasl-algorithm` tells which SASL mechanism to use for authentication. This + can be `none`, `plain`, `scram-sha256`, or `scram-sha512`. This should not be + set to none when SASL is used. + The following keys are accepted for the topic configuration: - `num-partitions` for the number of partitions diff --git a/console/data/docs/99-changelog.md b/console/data/docs/99-changelog.md index ff4104a8..02a05086 100644 --- a/console/data/docs/99-changelog.md +++ b/console/data/docs/99-changelog.md @@ -26,6 +26,7 @@ details. - 🩹 *console*: use configured dimensions limit for “Visualize” tab - 🌱 *inlet*: optimize BMP collector (see above) - 🌱 *inlet*: replace LRU cache for classifiers by a time-based cache +- 🌱 *inlet*: add TLS support for Kafka transport - 🌱 *console*: Ctrl-Enter or Cmd-Enter when editing a filter now applies the changes - 🌱 *console*: switch to TypeScript for the frontend code diff --git a/go.mod b/go.mod index 81ec7048..4ce5703b 100644 --- a/go.mod +++ b/go.mod @@ -109,6 +109,9 @@ require ( github.com/tklauser/go-sysconf v0.3.10 // indirect github.com/tklauser/numcpus v0.4.0 // indirect github.com/ugorji/go/codec v1.2.7 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.1 // indirect + github.com/xdg-go/stringprep v1.0.3 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect go.opentelemetry.io/otel v1.11.1 // indirect go.opentelemetry.io/otel/trace v1.11.1 // indirect diff --git a/go.sum b/go.sum index 086bb5d9..71f24675 100644 --- a/go.sum +++ b/go.sum @@ -445,8 +445,11 @@ github.com/vincentbernat/patricia v0.0.0-20220923091046-b376a1167a94 h1:T7+yyM63 github.com/vincentbernat/patricia v0.0.0-20220923091046-b376a1167a94/go.mod h1:6jY40ESetsbfi04/S12iJlsiS6DYL2B2W+WAcqoDHtw= github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI= github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae h1:4hwBBUfQCFe3Cym0ZtKyq7L16eZUtYKs+BaHDN6mAns= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E= github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs= github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/inlet/kafka/root.go b/inlet/kafka/root.go index b857f447..94888a6b 100644 --- a/inlet/kafka/root.go +++ b/inlet/kafka/root.go @@ -42,8 +42,10 @@ type Dependencies struct { // New creates a new HTTP component. func New(reporter *reporter.Reporter, configuration Configuration, dependencies Dependencies) (*Component, error) { // Build Kafka configuration - kafkaConfig := sarama.NewConfig() - kafkaConfig.Version = sarama.KafkaVersion(configuration.Version) + kafkaConfig, err := kafka.NewConfig(configuration.Configuration) + if err != nil { + return nil, err + } kafkaConfig.Metadata.AllowAutoTopicCreation = true kafkaConfig.Producer.MaxMessageBytes = configuration.MaxMessageBytes kafkaConfig.Producer.Compression = sarama.CompressionCodec(configuration.CompressionCodec) diff --git a/orchestrator/kafka/root.go b/orchestrator/kafka/root.go index 026408d7..c41fe0e6 100644 --- a/orchestrator/kafka/root.go +++ b/orchestrator/kafka/root.go @@ -26,8 +26,10 @@ type Component struct { // New creates a new Kafka configurator. func New(r *reporter.Reporter, config Configuration) (*Component, error) { - kafkaConfig := sarama.NewConfig() - kafkaConfig.Version = sarama.KafkaVersion(config.Version) + kafkaConfig, err := kafka.NewConfig(config.Configuration) + if err != nil { + return nil, err + } if err := kafkaConfig.Validate(); err != nil { return nil, fmt.Errorf("cannot validate Kafka configuration: %w", err) }