common/kafka: add support for OAuth2

The support is still pretty basic. Notably, scopes are not
configurable (waiting for someone to request them) and maybe there
client ID and secrets should not be provided as username/password.

Fix #1714
This commit is contained in:
Vincent Bernat
2025-05-01 16:08:33 +02:00
parent edf37390d4
commit f672ac98d9
8 changed files with 289 additions and 12 deletions

View File

@@ -6,6 +6,7 @@
package kafka
import (
"context"
"crypto/sha256"
"crypto/sha512"
"fmt"
@@ -40,6 +41,8 @@ type SASLConfiguration struct {
Password string `validate:"required_with=SASLAlgorithm SASLUsername"`
// Mechanism tells the SASL algorithm
Mechanism SASLMechanism `validate:"required_with=SASLUsername"`
// OAuthTokenURL tells which URL to use to get an OAuthToken
OAuthTokenURL string `validate:"required_if=Mechanism 4,excluded_unless=Mechanism 4,omitempty,url"`
}
// DefaultConfiguration represents the default configuration for connecting to Kafka.
@@ -90,6 +93,8 @@ const (
SASLScramSHA256
// SASLScramSHA512 enables SCRAM challenge with SHA512
SASLScramSHA512
// SASLOauth enables OAuth authentication
SASLOauth
)
// NewConfig returns a Sarama Kafka configuration ready to use.
@@ -110,20 +115,28 @@ func NewConfig(config Configuration) (*sarama.Config, error) {
kafkaConfig.Net.SASL.Enable = true
kafkaConfig.Net.SASL.User = config.SASL.Username
kafkaConfig.Net.SASL.Password = config.SASL.Password
switch config.SASL.Mechanism {
case SASLPlain:
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext
if config.SASL.Mechanism == SASLScramSHA256 {
case SASLScramSHA256:
kafkaConfig.Net.SASL.Handshake = true
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &xdgSCRAMClient{HashGeneratorFcn: sha256.New}
}
}
if config.SASL.Mechanism == SASLScramSHA512 {
case SASLScramSHA512:
kafkaConfig.Net.SASL.Handshake = true
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &xdgSCRAMClient{HashGeneratorFcn: sha512.New}
}
case SASLOauth:
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
kafkaConfig.Net.SASL.TokenProvider = newOAuthTokenProvider(
context.Background(), // TODO should be bound to the component lifecycle, but no component here
tlsConfig,
config.SASL.Username, config.SASL.Password,
config.SASL.OAuthTokenURL)
}
}
return kafkaConfig, nil

View File

@@ -62,6 +62,19 @@ func TestKafkaNewConfig(t *testing.T) {
Mechanism: SASLScramSHA512,
},
},
}, {
description: "SASL OAuth2",
config: Configuration{
TLS: helpers.TLSConfiguration{
Enable: true,
},
SASL: SASLConfiguration{
Username: "hello",
Password: "password",
Mechanism: SASLOauth,
OAuthTokenURL: "http://example.com/token",
},
},
},
}
for _, tc := range cases {
@@ -187,6 +200,65 @@ func TestTLSConfiguration(t *testing.T) {
Mechanism: SASLScramSHA256,
},
},
}, {
Description: "TLS SASL OAuth",
Initial: func() interface{} { return DefaultConfiguration() },
Configuration: func() interface{} {
return gin.H{
"tls": gin.H{
"enable": true,
},
"sasl": gin.H{
"username": "hello",
"password": "bye",
"mechanism": "oauth",
"oauth-token-url": "http://example.com/token",
},
}
},
Expected: Configuration{
Topic: "flows",
Brokers: []string{"127.0.0.1:9092"},
Version: Version(sarama.V2_8_1_0),
TLS: helpers.TLSConfiguration{
Enable: true,
// Value from DefaultConfig is true
Verify: true,
},
SASL: SASLConfiguration{
Username: "hello",
Password: "bye",
Mechanism: SASLOauth,
OAuthTokenURL: "http://example.com/token",
},
},
}, {
Description: "OAuth requires a token URL",
Initial: func() interface{} { return DefaultConfiguration() },
Configuration: func() interface{} {
return gin.H{
"sasl": gin.H{
"username": "hello",
"password": "bye",
"mechanism": "oauth",
},
}
},
Error: true,
}, {
Description: "OAuth token URL only with OAuth",
Initial: func() interface{} { return DefaultConfiguration() },
Configuration: func() interface{} {
return gin.H{
"sasl": gin.H{
"username": "hello",
"password": "bye",
"mechanism": "plain",
"oauth-token-url": "http://example.com/token",
},
}
},
Error: true,
},
})
}

46
common/kafka/oauth.go Normal file
View File

@@ -0,0 +1,46 @@
// SPDX-FileCopyrightText: 2025 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package kafka
import (
"context"
"crypto/tls"
"net/http"
"github.com/IBM/sarama"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
// tokenProvider implements sarama.AccessTokenProvider.
type tokenProvider struct {
tokenSource oauth2.TokenSource
}
// newOAuthTokenProvider returns a sarama.AccessTokenProvider using OAuth credentials.
func newOAuthTokenProvider(ctx context.Context, tlsConfig *tls.Config, clientID, clientSecret, tokenURL string) sarama.AccessTokenProvider {
cfg := clientcredentials.Config{
ClientID: clientID,
ClientSecret: clientSecret,
TokenURL: tokenURL,
}
httpClient := &http.Client{Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsConfig,
}}
ctx = context.WithValue(ctx, oauth2.HTTPClient, httpClient)
return &tokenProvider{
tokenSource: cfg.TokenSource(context.Background()),
}
}
// Token returns a new *sarama.AccessToken or an error as appropriate.
func (t *tokenProvider) Token() (*sarama.AccessToken, error) {
token, err := t.tokenSource.Token()
if err != nil {
return nil, err
}
return &sarama.AccessToken{Token: token.AccessToken}, nil
}

114
common/kafka/oauth_test.go Normal file
View File

@@ -0,0 +1,114 @@
// SPDX-FileCopyrightText: 2025 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package kafka
import (
"akvorado/common/helpers"
"akvorado/common/reporter"
"context"
"fmt"
"testing"
"time"
"github.com/IBM/sarama"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
func TestOAuth2ServerPassword(t *testing.T) {
oauthServer := helpers.CheckExternalService(t, "mock-auth2-server",
[]string{"mock-oauth2-server:8080", "127.0.0.1:5556"})
ctx := context.Background()
conf := &oauth2.Config{
ClientID: "kafka-client",
ClientSecret: "kafka-client-secret",
Endpoint: oauth2.Endpoint{
TokenURL: fmt.Sprintf("http://%s/default/token", oauthServer),
},
Scopes: []string{"openid"},
}
token, err := conf.PasswordCredentialsToken(ctx, "akvorado@example.com", "password")
if err != nil {
t.Fatalf("PasswordCredentialsToken() error:\n%+v", err)
}
t.Logf("Access token: %s", token.AccessToken)
t.Logf("Token type: %s", token.TokenType)
t.Logf("Expiry: %s", token.Expiry.Format(time.RFC3339))
}
func TestOAuth2ServerClientCredentials(t *testing.T) {
oauthServer := helpers.CheckExternalService(t, "mock-oauth2-server",
[]string{"mock-oauth2-server:8080", "127.0.0.1:5556"})
ctx := context.Background()
// Use clientcredentials.Config instead of oauth2.Config
config := clientcredentials.Config{
ClientID: "kafka-client",
ClientSecret: "kafka-client-secret",
TokenURL: fmt.Sprintf("http://%s/default/token", oauthServer),
Scopes: []string{"openid"},
}
// Get token directly from the client credentials config
token, err := config.Token(ctx)
if err != nil {
t.Fatalf("ClientCredentials Token() error:\n%+v", err)
}
t.Logf("Access token: %s", token.AccessToken)
t.Logf("Token type: %s", token.TokenType)
t.Logf("Expiry: %s", token.Expiry.Format(time.RFC3339))
}
// Example with kcat:
// kcat -b 127.0.0.1:9093 \
// -X security.protocol=SASL_PLAINTEXT \
// -X sasl.mechanisms=OAUTHBEARER \
// -X sasl.oauthbearer.method=OIDC \ // -X sasl.oauthbearer.client.id=kafka-client \
// -X sasl.oauthbearer.client.secret=kafka-client-secret \
// -X sasl.oauthbearer.token.endpoint.url=http://127.0.0.1:5556/default/token \
// -t my-topic -C -d all
func TestOAuth2Broker(t *testing.T) {
r := reporter.NewMock(t)
GlobalKafkaLogger.Register(r)
defer GlobalKafkaLogger.Unregister()
// Ensure broker is ready.
SetupKafkaBroker(t)
// Then try again with OAuth2.
oauthServer := helpers.CheckExternalService(t, "mock-auth2-server",
[]string{"mock-oauth2-server:8080", "127.0.0.1:5556"})
broker := helpers.CheckExternalService(t, "Kafka",
[]string{"kafka:9093", "127.0.0.1:9093"})
config := DefaultConfiguration()
config.SASL = SASLConfiguration{
Username: "kafka-client",
Password: "kafka-client-secret",
Mechanism: SASLOauth,
OAuthTokenURL: fmt.Sprintf("http://%s/default/token", oauthServer),
}
kafkaConfig, err := NewConfig(config)
if err != nil {
t.Fatalf("NewConfig() error:\n%+v", err)
}
if err := kafkaConfig.Validate(); err != nil {
t.Fatalf("Validate() error:\n%+v", err)
}
client, err := sarama.NewClient([]string{broker}, kafkaConfig)
if err != nil {
t.Fatalf("sarama.NewClient() error:\n%+v", err)
}
if err := client.RefreshMetadata(); err != nil {
t.Fatalf("client.RefreshMetadata() error:\n%+v", err)
}
}

View File

@@ -748,8 +748,10 @@ The following keys are accepted for SASL configuration:
- `username` and `password` enables SASL authentication with the
provided user and password.
- `algorithm` tells which SASL mechanism to use for authentication. This
can be `none`, `plain`, `scram-sha256`, or `scram-sha512`. This should not be
can be `none`, `plain`, `scram-sha256`, `scram-sha512`, or `oauth`. This should not be
set to none when SASL is used.
- `oauth-token-url` defines the URL to query to get a valid OAuth token (in this
case, `username` and `password` are used as client credentials).
The following keys are accepted for the topic configuration:

View File

@@ -15,6 +15,7 @@ identified with a specific icon:
- 🩹 *inlet*: don't override flow-provided VLANs with VLAN from Ethernet header
- 🌱 *orchestrator*: put SASL parameters in their own section in Kafka configuration
- 🌱 *orchestrator*: add OAuth support to Kafka client
## 1.11.4 - 2025-04-26

View File

@@ -7,21 +7,48 @@ services:
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
mock-oauth2-server:
extends:
file: versions.yml
service: mock-oauth2-server
ports:
- 127.0.0.1:5556:8080/tcp
environment:
LOG_LEVEL: debug
kafka:
extends:
file: versions.yml
service: kafka
environment:
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9092
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
KAFKA_CFG_BROKER_ID: 1
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
# We have two sets of listeners: INTERNAL that is used from inside the docker
# compose network and listens on "kafka" and EXTERNAL that is mapped to
# the host network and listens on "localhost".
#
# Then, in each set, we have a plain text one and an OAuth-enabled one.
KAFKA_CFG_LISTENERS: INTERNAL://:9092,OINTERNAL://:9093,EXTERNAL://:9094,OEXTERNAL://:9095
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OINTERNAL:SASL_PLAINTEXT,EXTERNAL:PLAINTEXT,OEXTERNAL:SASL_PLAINTEXT
KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OINTERNAL://kafka:9093,EXTERNAL://localhost:9092,OEXTERNAL://localhost:9093
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
# OAuth2 configuration
KAFKA_CFG_LISTENER_NAME_OEXTERNAL_SASL_ENABLED_MECHANISMS: OAUTHBEARER
KAFKA_CFG_LISTENER_NAME_OEXTERNAL_SASL_OAUTHBEARER_JWKS_ENDPOINT_URL: http://mock-oauth2-server:8080/default/jwks
KAFKA_CFG_LISTENER_NAME_OEXTERNAL_SASL_OAUTHBEARER_EXPECTED_AUDIENCE: default
KAFKA_CFG_LISTENER_NAME_OEXTERNAL_OAUTHBEARER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=kafka-client clientSecret=kafka-client-secret unsecuredLoginStringClaim_sub="sub";
KAFKA_CFG_LISTENER_NAME_OEXTERNAL_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
KAFKA_CFG_LISTENER_NAME_OINTERNAL_SASL_ENABLED_MECHANISMS: OAUTHBEARER
KAFKA_CFG_LISTENER_NAME_OINTERNAL_SASL_OAUTHBEARER_JWKS_ENDPOINT_URL: http://mock-oauth2-server:8080/default/jwks
KAFKA_CFG_LISTENER_NAME_OINTERNAL_SASL_OAUTHBEARER_EXPECTED_AUDIENCE: default
KAFKA_CFG_LISTENER_NAME_OINTERNAL_OAUTHBEARER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=kafka-client clientSecret=kafka-client-secret unsecuredLoginStringClaim_sub="sub";
KAFKA_CFG_LISTENER_NAME_OINTERNAL_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
depends_on:
- zookeeper
- mock-oauth2-server
ports:
- 127.0.0.1:9092:9093/tcp
- 127.0.0.1:9092:9094/tcp
- 127.0.0.1:9093:9095/tcp
redis:
extends:

View File

@@ -40,3 +40,5 @@ services:
image: postgres:16 # \d+
mysql:
image: mariadb:11.4 # \d+\.\d+
mock-oauth2-server:
image: ghcr.io/navikt/mock-oauth2-server:2.1.10 # \d+\.\d+\.\d+