Files
akvorado/common/kafka/config.go
Vincent Bernat e68b2de72c
Some checks failed
CI / 🤖 Check dependabot status (push) Has been cancelled
CI / 🐧 Test on Linux (${{ github.ref_type == 'tag' }}, misc) (push) Has been cancelled
CI / 🐧 Test on Linux (coverage) (push) Has been cancelled
CI / 🐧 Test on Linux (regular) (push) Has been cancelled
CI / ❄️ Build on Nix (push) Has been cancelled
CI / 🍏 Build and test on macOS (push) Has been cancelled
CI / 🧪 End-to-end testing (push) Has been cancelled
CI / 🔍 Upload code coverage (push) Has been cancelled
CI / 🔬 Test only Go (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 20) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 22) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 24) (push) Has been cancelled
CI / ⚖️ Check licenses (push) Has been cancelled
CI / 🐋 Build Docker images (push) Has been cancelled
CI / 🐋 Tag Docker images (push) Has been cancelled
CI / 🚀 Publish release (push) Has been cancelled
common/helpers: migrate from verify to skip-verify in TLS config
Otherwise, the default is "false" for verify. This is a breaking change.

Fix #2055.
2025-10-30 08:31:27 +01:00

187 lines
5.7 KiB
Go

// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
// Package kafka exposes some common helpers for Kafka, including the
// configuration struture.
package kafka
import (
"fmt"
"reflect"
"akvorado/common/helpers"
"akvorado/common/reporter"
"github.com/gin-gonic/gin"
"github.com/go-viper/mapstructure/v2"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl"
"github.com/twmb/franz-go/pkg/sasl/oauth"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
"golang.org/x/oauth2/clientcredentials"
)
// Configuration defines how we connect to a Kafka cluster.
type Configuration struct {
// Topic defines the topic to write flows to.
Topic string `validate:"required"`
// Brokers is the list of brokers to connect to.
Brokers []string `min=1,dive,validate:"listen"`
// TLS defines TLS configuration
TLS helpers.TLSConfiguration
// SASL defines SASL configuration
SASL SASLConfiguration
}
// SASLConfiguration defines SASL configuration.
type SASLConfiguration struct {
// Username tells the SASL username
Username string `validate:"required_with=SASLMechanism"`
// Password tells the SASL password
Password string `validate:"required_with=SASLMechanism"`
// Mechanism tells the SASL algorithm
Mechanism SASLMechanism `validate:"required_with=SASLUsername"`
// OAuthTokenURL tells which URL to use to get an OAuthToken
OAuthTokenURL string `validate:"required_if=Mechanism 4,excluded_unless=Mechanism 4,omitempty,url"`
// OAuthScopes defines the scopes to send for OAuth mechanism
OAuthScopes []string
}
// DefaultConfiguration represents the default configuration for connecting to Kafka.
func DefaultConfiguration() Configuration {
return Configuration{
Topic: "flows",
Brokers: []string{"127.0.0.1:9092"},
}
}
// SASLMechanism defines an SASL algorithm
type SASLMechanism int
const (
// SASLNone means no user authentication
SASLNone SASLMechanism = iota
// SASLPlain means user/password in plain text
SASLPlain
// SASLScramSHA256 enables SCRAM challenge with SHA256
SASLScramSHA256
// SASLScramSHA512 enables SCRAM challenge with SHA512
SASLScramSHA512
// SASLOauth enables OAuth authentication
SASLOauth
)
// NewConfig returns a slice of kgo.Opt configurations ready to use.
func NewConfig(r *reporter.Reporter, config Configuration) ([]kgo.Opt, error) {
opts := []kgo.Opt{
kgo.SeedBrokers(config.Brokers...),
kgo.ClientID(fmt.Sprintf("akvorado-%s", helpers.AkvoradoVersion)),
kgo.WithLogger(NewLogger(r)),
}
// TLS configuration
tlsConfig, err := config.TLS.MakeTLSConfig()
if err != nil {
return nil, err
}
if tlsConfig != nil {
opts = append(opts, kgo.DialTLSConfig(tlsConfig))
}
// SASL configuration
if config.SASL.Mechanism != SASLNone {
var mechanism sasl.Mechanism
switch config.SASL.Mechanism {
case SASLPlain:
mechanism = plain.Auth{
User: config.SASL.Username,
Pass: config.SASL.Password,
}.AsMechanism()
case SASLScramSHA256:
mechanism = scram.Auth{
User: config.SASL.Username,
Pass: config.SASL.Password,
}.AsSha256Mechanism()
case SASLScramSHA512:
mechanism = scram.Auth{
User: config.SASL.Username,
Pass: config.SASL.Password,
}.AsSha512Mechanism()
case SASLOauth:
mechanism = oauth.Oauth(
newOAuthTokenProvider(
tlsConfig,
clientcredentials.Config{
ClientID: config.SASL.Username,
ClientSecret: config.SASL.Password,
TokenURL: config.SASL.OAuthTokenURL,
Scopes: config.SASL.OAuthScopes,
}),
)
default:
return nil, fmt.Errorf("unknown SASL mechanism: %s", config.SASL.Mechanism)
}
opts = append(opts, kgo.SASL(mechanism))
}
return opts, nil
}
// ConfigurationUnmarshallerHook normalize Kafka configuration:
// - move SASL related parameters from TLS section to SASL section
func ConfigurationUnmarshallerHook() mapstructure.DecodeHookFunc {
return func(from, to reflect.Value) (any, error) {
if from.Kind() != reflect.Map || from.IsNil() || !helpers.SameTypeOrSuperset(to.Type(), reflect.TypeOf(Configuration{})) {
return from.Interface(), nil
}
var tlsKey, saslKey *reflect.Value
fromMap := from.MapKeys()
for i, k := range fromMap {
k = helpers.ElemOrIdentity(k)
if k.Kind() != reflect.String {
return from.Interface(), nil
}
if helpers.MapStructureMatchName(k.String(), "TLS") {
tlsKey = &fromMap[i]
} else if helpers.MapStructureMatchName(k.String(), "SASL") {
saslKey = &fromMap[i]
}
}
var sasl reflect.Value
if saslKey != nil {
sasl = helpers.ElemOrIdentity(from.MapIndex(*saslKey))
} else {
sasl = reflect.ValueOf(gin.H{})
from.SetMapIndex(reflect.ValueOf("sasl"), sasl)
}
if tlsKey != nil {
tls := helpers.ElemOrIdentity(from.MapIndex(*tlsKey))
tlsMap := tls.MapKeys()
for _, k := range tlsMap {
k = helpers.ElemOrIdentity(k)
if k.Kind() != reflect.String {
return from.Interface(), nil
}
if helpers.MapStructureMatchName(k.String(), "SASLUsername") {
sasl.SetMapIndex(reflect.ValueOf("username"), helpers.ElemOrIdentity(tls.MapIndex(k)))
tls.SetMapIndex(k, reflect.Value{})
} else if helpers.MapStructureMatchName(k.String(), "SASLPassword") {
sasl.SetMapIndex(reflect.ValueOf("password"), helpers.ElemOrIdentity(tls.MapIndex(k)))
tls.SetMapIndex(k, reflect.Value{})
} else if helpers.MapStructureMatchName(k.String(), "SASLMechanism") {
sasl.SetMapIndex(reflect.ValueOf("mechanism"), helpers.ElemOrIdentity(tls.MapIndex(k)))
tls.SetMapIndex(k, reflect.Value{})
}
}
}
return from.Interface(), nil
}
}
func init() {
helpers.RegisterMapstructureDeprecatedFields[Configuration]("Version")
helpers.RegisterMapstructureUnmarshallerHook(ConfigurationUnmarshallerHook())
}