mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-12 06:24:10 +01:00
Add TLS support for ClickhouseDB
This commit is contained in:
@@ -5,6 +5,8 @@ package clickhousedb
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"akvorado/common/helpers"
|
||||
)
|
||||
|
||||
// Configuration defines how we connect to a ClickHouse database
|
||||
@@ -21,6 +23,8 @@ type Configuration struct {
|
||||
MaxOpenConns int `validate:"min=1"`
|
||||
// DialTimeout tells how much time to wait when connecting to ClickHouse
|
||||
DialTimeout time.Duration `validate:"min=100ms"`
|
||||
// TLS defines TLS connection parameters, if empty, plain TCP will be used.
|
||||
TLS helpers.TLSConfiguration
|
||||
}
|
||||
|
||||
// DefaultConfiguration represents the default configuration for connecting to ClickHouse
|
||||
@@ -31,5 +35,9 @@ func DefaultConfiguration() Configuration {
|
||||
Username: "default",
|
||||
MaxOpenConns: 10,
|
||||
DialTimeout: 5 * time.Second,
|
||||
TLS: helpers.TLSConfiguration{
|
||||
Enable: false,
|
||||
Verify: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,6 +36,11 @@ type Dependencies struct {
|
||||
|
||||
// New creates a new ClickHouse wrapper
|
||||
func New(r *reporter.Reporter, config Configuration, dependencies Dependencies) (*Component, error) {
|
||||
tlsConfig, err := config.TLS.MakeTLSConfig()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn, err := clickhouse.Open(&clickhouse.Options{
|
||||
Addr: config.Servers,
|
||||
Auth: clickhouse.Auth{
|
||||
@@ -43,6 +48,8 @@ func New(r *reporter.Reporter, config Configuration, dependencies Dependencies)
|
||||
Username: config.Username,
|
||||
Password: config.Password,
|
||||
},
|
||||
// nil TLS means no tls for clickhouse
|
||||
TLS: tlsConfig,
|
||||
Compression: &clickhouse.Compression{Method: clickhouse.CompressionLZ4},
|
||||
DialTimeout: config.DialTimeout,
|
||||
MaxOpenConns: config.MaxOpenConns,
|
||||
|
||||
59
common/helpers/tls.go
Normal file
59
common/helpers/tls.go
Normal file
@@ -0,0 +1,59 @@
|
||||
package helpers
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
// TLSConfiguration defines TLS configuration.
|
||||
type TLSConfiguration struct {
|
||||
// Enable says if TLS should be used to connect to brokers
|
||||
Enable bool `validate:"required_with=CAFile CertFile KeyFile Username Password SASLAlgorithm"`
|
||||
// Verify says if we need to check remote certificates
|
||||
Verify bool
|
||||
// CAFile tells the location of the CA certificate to check broker
|
||||
// certificate. If empty, the system CA certificates are used instead.
|
||||
CAFile string // no validation as the orchestrator may not have the file
|
||||
// CertFile tells the location of the user certificate if any.
|
||||
CertFile string `validate:"required_with=KeyFile"`
|
||||
// KeyFile tells the location of the user key if any.
|
||||
KeyFile string
|
||||
}
|
||||
|
||||
// MakeTLSConfig Create and *tls.Config from a TLSConfiguration.
|
||||
// Loading of certificates, key and Certificate authority is done here as well.
|
||||
func (config TLSConfiguration) MakeTLSConfig() (*tls.Config, error) {
|
||||
if !config.Enable {
|
||||
return nil, nil
|
||||
}
|
||||
tlsConfig := &tls.Config{
|
||||
InsecureSkipVerify: !config.Verify,
|
||||
}
|
||||
// Read CA certificate if provided
|
||||
if config.CAFile != "" {
|
||||
caCert, err := os.ReadFile(config.CAFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read CA certificate for Kafka: %w", err)
|
||||
}
|
||||
caCertPool := x509.NewCertPool()
|
||||
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
|
||||
return nil, errors.New("cannot parse CA certificate for Kafka")
|
||||
}
|
||||
tlsConfig.RootCAs = caCertPool
|
||||
}
|
||||
// Read user certificate if provided
|
||||
if config.CertFile != "" {
|
||||
if config.KeyFile == "" {
|
||||
config.KeyFile = config.CertFile
|
||||
}
|
||||
cert, err := tls.LoadX509KeyPair(config.CertFile, config.KeyFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read user certificate: %w", err)
|
||||
}
|
||||
tlsConfig.Certificates = []tls.Certificate{cert}
|
||||
}
|
||||
return tlsConfig, nil
|
||||
}
|
||||
@@ -8,12 +8,9 @@ package kafka
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"crypto/sha512"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"akvorado/common/helpers"
|
||||
"akvorado/common/helpers/bimap"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
@@ -28,22 +25,12 @@ type Configuration struct {
|
||||
// Version is the version of Kafka we assume to work
|
||||
Version Version
|
||||
// TLS defines TLS configuration
|
||||
TLS TLSConfiguration
|
||||
TLS TLSAndSASLConfiguration
|
||||
}
|
||||
|
||||
// TLSConfiguration defines TLS configuration.
|
||||
type TLSConfiguration struct {
|
||||
// Enable says if TLS should be used to connect to brokers
|
||||
Enable bool `validate:"required_with=CAFile CertFile KeyFile Username Password SASLAlgorithm"`
|
||||
// Verify says if we need to check remote certificates
|
||||
Verify bool
|
||||
// CAFile tells the location of the CA certificate to check broker
|
||||
// certificate. If empty, the system CA certificates are used instead.
|
||||
CAFile string // no validation as the orchestrator may not have the file
|
||||
// CertFile tells the location of the user certificate if any.
|
||||
CertFile string `validate:"required_with=KeyFile"`
|
||||
// KeyFile tells the location of the user key if any.
|
||||
KeyFile string
|
||||
// TLSAndSASLConfiguration defines TLS configuration.
|
||||
type TLSAndSASLConfiguration struct {
|
||||
helpers.TLSConfiguration `mapstructure:",squash" yaml:",inline"`
|
||||
// SASLUsername tells the SASL username
|
||||
SASLUsername string `validate:"required_with=SASLAlgorithm"`
|
||||
// SASLPassword tells the SASL password
|
||||
@@ -58,9 +45,11 @@ func DefaultConfiguration() Configuration {
|
||||
Topic: "flows",
|
||||
Brokers: []string{"127.0.0.1:9092"},
|
||||
Version: Version(sarama.V2_8_1_0),
|
||||
TLS: TLSConfiguration{
|
||||
Enable: false,
|
||||
Verify: true,
|
||||
TLS: TLSAndSASLConfiguration{
|
||||
TLSConfiguration: helpers.TLSConfiguration{
|
||||
Enable: false,
|
||||
Verify: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -138,34 +127,13 @@ func (sa *SASLMechanism) UnmarshalText(input []byte) error {
|
||||
func NewConfig(config Configuration) (*sarama.Config, error) {
|
||||
kafkaConfig := sarama.NewConfig()
|
||||
kafkaConfig.Version = sarama.KafkaVersion(config.Version)
|
||||
if config.TLS.Enable {
|
||||
tlsConfig, err := config.TLS.TLSConfiguration.MakeTLSConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if tlsConfig != nil {
|
||||
kafkaConfig.Net.TLS.Enable = true
|
||||
kafkaConfig.Net.TLS.Config = &tls.Config{
|
||||
InsecureSkipVerify: !config.TLS.Verify,
|
||||
}
|
||||
// Read CA certificate if provided
|
||||
if config.TLS.CAFile != "" {
|
||||
caCert, err := os.ReadFile(config.TLS.CAFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read CA certificate for Kafka: %w", err)
|
||||
}
|
||||
caCertPool := x509.NewCertPool()
|
||||
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
|
||||
return nil, errors.New("cannot parse CA certificate for Kafka")
|
||||
}
|
||||
kafkaConfig.Net.TLS.Config.RootCAs = caCertPool
|
||||
}
|
||||
// Read user certificate if provided
|
||||
if config.TLS.CertFile != "" {
|
||||
if config.TLS.KeyFile == "" {
|
||||
config.TLS.KeyFile = config.TLS.CertFile
|
||||
}
|
||||
cert, err := tls.LoadX509KeyPair(config.TLS.CertFile, config.TLS.KeyFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read user certificate: %w", err)
|
||||
}
|
||||
kafkaConfig.Net.TLS.Config.Certificates = []tls.Certificate{cert}
|
||||
}
|
||||
kafkaConfig.Net.TLS.Config = tlsConfig
|
||||
// SASL
|
||||
if config.TLS.SASLUsername != "" {
|
||||
kafkaConfig.Net.SASL.Enable = true
|
||||
|
||||
@@ -30,8 +30,10 @@ func TestKafkaNewConfig(t *testing.T) {
|
||||
}, {
|
||||
description: "SASL plain",
|
||||
config: Configuration{
|
||||
TLS: TLSConfiguration{
|
||||
Enable: true,
|
||||
TLS: TLSAndSASLConfiguration{
|
||||
TLSConfiguration: helpers.TLSConfiguration{
|
||||
Enable: true,
|
||||
},
|
||||
SASLUsername: "hello",
|
||||
SASLPassword: "password",
|
||||
},
|
||||
@@ -39,8 +41,10 @@ func TestKafkaNewConfig(t *testing.T) {
|
||||
}, {
|
||||
description: "SASL SCRAM SHA256",
|
||||
config: Configuration{
|
||||
TLS: TLSConfiguration{
|
||||
Enable: true,
|
||||
TLS: TLSAndSASLConfiguration{
|
||||
TLSConfiguration: helpers.TLSConfiguration{
|
||||
Enable: true,
|
||||
},
|
||||
SASLUsername: "hello",
|
||||
SASLPassword: "password",
|
||||
SASLMechanism: SASLSCRAMSHA256,
|
||||
@@ -49,8 +53,10 @@ func TestKafkaNewConfig(t *testing.T) {
|
||||
}, {
|
||||
description: "SASL SCRAM SHA512",
|
||||
config: Configuration{
|
||||
TLS: TLSConfiguration{
|
||||
Enable: true,
|
||||
TLS: TLSAndSASLConfiguration{
|
||||
TLSConfiguration: helpers.TLSConfiguration{
|
||||
Enable: true,
|
||||
},
|
||||
SASLUsername: "hello",
|
||||
SASLPassword: "password",
|
||||
SASLMechanism: SASLSCRAMSHA512,
|
||||
@@ -92,9 +98,11 @@ func TestTLSConfiguration(t *testing.T) {
|
||||
Topic: "flows",
|
||||
Brokers: []string{"127.0.0.1:9092"},
|
||||
Version: Version(sarama.V2_8_1_0),
|
||||
TLS: TLSConfiguration{
|
||||
Enable: true,
|
||||
Verify: true,
|
||||
TLS: TLSAndSASLConfiguration{
|
||||
TLSConfiguration: helpers.TLSConfiguration{
|
||||
Enable: true,
|
||||
Verify: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, {
|
||||
@@ -115,9 +123,11 @@ func TestTLSConfiguration(t *testing.T) {
|
||||
Topic: "flows",
|
||||
Brokers: []string{"127.0.0.1:9092"},
|
||||
Version: Version(sarama.V2_8_1_0),
|
||||
TLS: TLSConfiguration{
|
||||
Enable: true,
|
||||
Verify: false,
|
||||
TLS: TLSAndSASLConfiguration{
|
||||
TLSConfiguration: helpers.TLSConfiguration{
|
||||
Enable: true,
|
||||
Verify: false,
|
||||
},
|
||||
SASLUsername: "hello",
|
||||
SASLPassword: "bye",
|
||||
SASLMechanism: SASLPlainText,
|
||||
@@ -140,9 +150,12 @@ func TestTLSConfiguration(t *testing.T) {
|
||||
Topic: "flows",
|
||||
Brokers: []string{"127.0.0.1:9092"},
|
||||
Version: Version(sarama.V2_8_1_0),
|
||||
TLS: TLSConfiguration{
|
||||
Enable: true,
|
||||
Verify: true,
|
||||
TLS: TLSAndSASLConfiguration{
|
||||
TLSConfiguration: helpers.TLSConfiguration{
|
||||
Enable: true,
|
||||
// Value from DefaultConfig is true
|
||||
Verify: true,
|
||||
},
|
||||
SASLUsername: "hello",
|
||||
SASLPassword: "bye",
|
||||
SASLMechanism: SASLSCRAMSHA256,
|
||||
|
||||
2
go.mod
2
go.mod
@@ -32,7 +32,6 @@ require (
|
||||
github.com/mattn/go-isatty v0.0.20
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/netsampler/goflow2/v2 v2.1.2-0.20240106090313-c38607daf962
|
||||
github.com/opencontainers/image-spec v1.1.0-rc5
|
||||
github.com/oschwald/maxminddb-golang v1.12.0
|
||||
github.com/osrg/gobgp/v3 v3.21.0
|
||||
github.com/prometheus/client_golang v1.18.0
|
||||
@@ -115,6 +114,7 @@ require (
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/opencontainers/image-spec v1.1.0-rc5 // indirect
|
||||
github.com/paulmach/orb v0.10.0 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.18 // indirect
|
||||
|
||||
Reference in New Issue
Block a user