diff --git a/common/kafka/config.go b/common/kafka/config.go index 1b541073..7d68c3e4 100644 --- a/common/kafka/config.go +++ b/common/kafka/config.go @@ -36,9 +36,9 @@ type Configuration struct { // SASLConfiguration defines SASL configuration. type SASLConfiguration struct { // Username tells the SASL username - Username string `validate:"required_with=SASLAlgorithm"` + Username string `validate:"required_with=SASLMechanism"` // Password tells the SASL password - Password string `validate:"required_with=SASLAlgorithm SASLUsername"` + Password string `validate:"required_with=SASLMechanism"` // Mechanism tells the SASL algorithm Mechanism SASLMechanism `validate:"required_with=SASLUsername"` // OAuthTokenURL tells which URL to use to get an OAuthToken @@ -111,7 +111,7 @@ func NewConfig(config Configuration) (*sarama.Config, error) { kafkaConfig.Net.TLS.Config = tlsConfig } // SASL - if config.SASL.Username != "" { + if config.SASL.Mechanism != SASLNone { kafkaConfig.Net.SASL.Enable = true kafkaConfig.Net.SASL.User = config.SASL.Username kafkaConfig.Net.SASL.Password = config.SASL.Password @@ -137,6 +137,8 @@ func NewConfig(config Configuration) (*sarama.Config, error) { tlsConfig, config.SASL.Username, config.SASL.Password, config.SASL.OAuthTokenURL) + default: + return nil, fmt.Errorf("unknown SASL mechanism: %s", config.SASL.Mechanism) } } return kafkaConfig, nil diff --git a/common/kafka/config_test.go b/common/kafka/config_test.go index 5eaac1d5..85db5a51 100644 --- a/common/kafka/config_test.go +++ b/common/kafka/config_test.go @@ -34,8 +34,9 @@ func TestKafkaNewConfig(t *testing.T) { Enable: true, }, SASL: SASLConfiguration{ - Username: "hello", - Password: "password", + Username: "hello", + Password: "password", + Mechanism: SASLPlain, }, }, }, {