mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-11 22:14:02 +01:00
common/kafka: use enumer for SASLMechanism
This commit is contained in:
5
Makefile
5
Makefile
@@ -28,7 +28,8 @@ GENERATED_GO = \
|
||||
console/filter/parser.go \
|
||||
inlet/core/asnprovider_enumer.go \
|
||||
inlet/core/netprovider_enumer.go \
|
||||
inlet/flow/decoder/timestampsource_enumer.go
|
||||
inlet/flow/decoder/timestampsource_enumer.go \
|
||||
common/kafka/saslmechanism_enumer.go
|
||||
GENERATED_TEST_GO = \
|
||||
common/clickhousedb/mocks/mock_driver.go \
|
||||
conntrackfixer/mocks/mock_conntrackfixer.go
|
||||
@@ -86,6 +87,8 @@ inlet/core/netprovider_enumer.go: go.mod inlet/core/config.go | $(ENUMER) ; $(in
|
||||
$Q $(ENUMER) -type=NetProvider -text -transform=kebab -trimprefix=NetProvider inlet/core/config.go
|
||||
inlet/flow/decoder/timestampsource_enumer.go: go.mod inlet/flow/decoder/config.go | $(ENUMER) ; $(info $(M) generate enums for TimestampSource…)
|
||||
$Q $(ENUMER) -type=TimestampSource -text -transform=kebab -trimprefix=TimestampSource inlet/flow/decoder/config.go
|
||||
common/kafka/saslmechanism_enumer.go: go.mod common/kafka/config.go | $(ENUMER) ; $(info $(M) generate enums for SASLMechanism…)
|
||||
$Q $(ENUMER) -type=SASLMechanism -text -transform=kebab -trimprefix=SASL common/kafka/config.go
|
||||
|
||||
common/schema/definition_gen.go: common/schema/definition.go common/schema/definition_gen.sh ; $(info $(M) generate column definitions…)
|
||||
$Q ./common/schema/definition_gen.sh > $@
|
||||
|
||||
@@ -8,11 +8,9 @@ package kafka
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"crypto/sha512"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"akvorado/common/helpers"
|
||||
"akvorado/common/helpers/bimap"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
)
|
||||
@@ -84,50 +82,14 @@ type SASLMechanism int
|
||||
const (
|
||||
// SASLNone means no user authentication
|
||||
SASLNone SASLMechanism = iota
|
||||
// SASLPlainText means user/password in plain text
|
||||
SASLPlainText
|
||||
// SASLSCRAMSHA256 enables SCRAM challenge with SHA256
|
||||
SASLSCRAMSHA256
|
||||
// SASLSCRAMSHA512 enables SCRAM challenge with SHA512
|
||||
SASLSCRAMSHA512
|
||||
// SASLPlain means user/password in plain text
|
||||
SASLPlain
|
||||
// SASLScramSHA256 enables SCRAM challenge with SHA256
|
||||
SASLScramSHA256
|
||||
// SASLScramSHA512 enables SCRAM challenge with SHA512
|
||||
SASLScramSHA512
|
||||
)
|
||||
|
||||
var saslAlgorithmMap = bimap.New(map[SASLMechanism]string{
|
||||
SASLNone: "none",
|
||||
SASLPlainText: "plain",
|
||||
SASLSCRAMSHA256: "scram-sha256",
|
||||
SASLSCRAMSHA512: "scram-sha512",
|
||||
})
|
||||
|
||||
// MarshalText turns a SASL algorithm to text
|
||||
func (sa SASLMechanism) MarshalText() ([]byte, error) {
|
||||
got, ok := saslAlgorithmMap.LoadValue(sa)
|
||||
if ok {
|
||||
return []byte(got), nil
|
||||
}
|
||||
return nil, errors.New("unknown SASL algorithm")
|
||||
}
|
||||
|
||||
// String turns a SASL algorithm to string
|
||||
func (sa SASLMechanism) String() string {
|
||||
got, _ := saslAlgorithmMap.LoadValue(sa)
|
||||
return got
|
||||
}
|
||||
|
||||
// UnmarshalText provides a SASL algorithm from text
|
||||
func (sa *SASLMechanism) UnmarshalText(input []byte) error {
|
||||
if len(input) == 0 {
|
||||
*sa = SASLNone
|
||||
return nil
|
||||
}
|
||||
got, ok := saslAlgorithmMap.LoadKey(string(input))
|
||||
if ok {
|
||||
*sa = got
|
||||
return nil
|
||||
}
|
||||
return errors.New("unknown provider")
|
||||
}
|
||||
|
||||
// NewConfig returns a Sarama Kafka configuration ready to use.
|
||||
func NewConfig(config Configuration) (*sarama.Config, error) {
|
||||
kafkaConfig := sarama.NewConfig()
|
||||
@@ -146,14 +108,14 @@ func NewConfig(config Configuration) (*sarama.Config, error) {
|
||||
kafkaConfig.Net.SASL.User = config.TLS.SASLUsername
|
||||
kafkaConfig.Net.SASL.Password = config.TLS.SASLPassword
|
||||
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext
|
||||
if config.TLS.SASLMechanism == SASLSCRAMSHA256 {
|
||||
if config.TLS.SASLMechanism == SASLScramSHA256 {
|
||||
kafkaConfig.Net.SASL.Handshake = true
|
||||
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
|
||||
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
||||
return &xdgSCRAMClient{HashGeneratorFcn: sha256.New}
|
||||
}
|
||||
}
|
||||
if config.TLS.SASLMechanism == SASLSCRAMSHA512 {
|
||||
if config.TLS.SASLMechanism == SASLScramSHA512 {
|
||||
kafkaConfig.Net.SASL.Handshake = true
|
||||
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
|
||||
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
|
||||
|
||||
@@ -47,7 +47,7 @@ func TestKafkaNewConfig(t *testing.T) {
|
||||
},
|
||||
SASLUsername: "hello",
|
||||
SASLPassword: "password",
|
||||
SASLMechanism: SASLSCRAMSHA256,
|
||||
SASLMechanism: SASLScramSHA256,
|
||||
},
|
||||
},
|
||||
}, {
|
||||
@@ -59,7 +59,7 @@ func TestKafkaNewConfig(t *testing.T) {
|
||||
},
|
||||
SASLUsername: "hello",
|
||||
SASLPassword: "password",
|
||||
SASLMechanism: SASLSCRAMSHA512,
|
||||
SASLMechanism: SASLScramSHA512,
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -130,7 +130,7 @@ func TestTLSConfiguration(t *testing.T) {
|
||||
},
|
||||
SASLUsername: "hello",
|
||||
SASLPassword: "bye",
|
||||
SASLMechanism: SASLPlainText,
|
||||
SASLMechanism: SASLPlain,
|
||||
},
|
||||
},
|
||||
}, {
|
||||
@@ -158,13 +158,9 @@ func TestTLSConfiguration(t *testing.T) {
|
||||
},
|
||||
SASLUsername: "hello",
|
||||
SASLPassword: "bye",
|
||||
SASLMechanism: SASLSCRAMSHA256,
|
||||
SASLMechanism: SASLScramSHA256,
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshal(t *testing.T) {
|
||||
saslAlgorithmMap.TestMarshalUnmarshal(t)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user