From f672ac98d91eb120483412cc23187ea4d09d6177 Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Thu, 1 May 2025 16:08:33 +0200 Subject: [PATCH] common/kafka: add support for OAuth2 The support is still pretty basic. Notably, scopes are not configurable (waiting for someone to request them) and maybe there client ID and secrets should not be provided as username/password. Fix #1714 --- common/kafka/config.go | 21 ++++- common/kafka/config_test.go | 72 ++++++++++++++++ common/kafka/oauth.go | 46 +++++++++++ common/kafka/oauth_test.go | 114 ++++++++++++++++++++++++++ console/data/docs/02-configuration.md | 4 +- console/data/docs/99-changelog.md | 1 + docker/docker-compose-dev.yml | 41 +++++++-- docker/versions.yml | 2 + 8 files changed, 289 insertions(+), 12 deletions(-) create mode 100644 common/kafka/oauth.go create mode 100644 common/kafka/oauth_test.go diff --git a/common/kafka/config.go b/common/kafka/config.go index 28a384ab..1b541073 100644 --- a/common/kafka/config.go +++ b/common/kafka/config.go @@ -6,6 +6,7 @@ package kafka import ( + "context" "crypto/sha256" "crypto/sha512" "fmt" @@ -40,6 +41,8 @@ type SASLConfiguration struct { Password string `validate:"required_with=SASLAlgorithm SASLUsername"` // Mechanism tells the SASL algorithm Mechanism SASLMechanism `validate:"required_with=SASLUsername"` + // OAuthTokenURL tells which URL to use to get an OAuthToken + OAuthTokenURL string `validate:"required_if=Mechanism 4,excluded_unless=Mechanism 4,omitempty,url"` } // DefaultConfiguration represents the default configuration for connecting to Kafka. @@ -90,6 +93,8 @@ const ( SASLScramSHA256 // SASLScramSHA512 enables SCRAM challenge with SHA512 SASLScramSHA512 + // SASLOauth enables OAuth authentication + SASLOauth ) // NewConfig returns a Sarama Kafka configuration ready to use. @@ -110,20 +115,28 @@ func NewConfig(config Configuration) (*sarama.Config, error) { kafkaConfig.Net.SASL.Enable = true kafkaConfig.Net.SASL.User = config.SASL.Username kafkaConfig.Net.SASL.Password = config.SASL.Password - kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext - if config.SASL.Mechanism == SASLScramSHA256 { + switch config.SASL.Mechanism { + case SASLPlain: + kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext + case SASLScramSHA256: kafkaConfig.Net.SASL.Handshake = true kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &xdgSCRAMClient{HashGeneratorFcn: sha256.New} } - } - if config.SASL.Mechanism == SASLScramSHA512 { + case SASLScramSHA512: kafkaConfig.Net.SASL.Handshake = true kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &xdgSCRAMClient{HashGeneratorFcn: sha512.New} } + case SASLOauth: + kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth + kafkaConfig.Net.SASL.TokenProvider = newOAuthTokenProvider( + context.Background(), // TODO should be bound to the component lifecycle, but no component here + tlsConfig, + config.SASL.Username, config.SASL.Password, + config.SASL.OAuthTokenURL) } } return kafkaConfig, nil diff --git a/common/kafka/config_test.go b/common/kafka/config_test.go index d77e971f..5eaac1d5 100644 --- a/common/kafka/config_test.go +++ b/common/kafka/config_test.go @@ -62,6 +62,19 @@ func TestKafkaNewConfig(t *testing.T) { Mechanism: SASLScramSHA512, }, }, + }, { + description: "SASL OAuth2", + config: Configuration{ + TLS: helpers.TLSConfiguration{ + Enable: true, + }, + SASL: SASLConfiguration{ + Username: "hello", + Password: "password", + Mechanism: SASLOauth, + OAuthTokenURL: "http://example.com/token", + }, + }, }, } for _, tc := range cases { @@ -187,6 +200,65 @@ func TestTLSConfiguration(t *testing.T) { Mechanism: SASLScramSHA256, }, }, + }, { + Description: "TLS SASL OAuth", + Initial: func() interface{} { return DefaultConfiguration() }, + Configuration: func() interface{} { + return gin.H{ + "tls": gin.H{ + "enable": true, + }, + "sasl": gin.H{ + "username": "hello", + "password": "bye", + "mechanism": "oauth", + "oauth-token-url": "http://example.com/token", + }, + } + }, + Expected: Configuration{ + Topic: "flows", + Brokers: []string{"127.0.0.1:9092"}, + Version: Version(sarama.V2_8_1_0), + TLS: helpers.TLSConfiguration{ + Enable: true, + // Value from DefaultConfig is true + Verify: true, + }, + SASL: SASLConfiguration{ + Username: "hello", + Password: "bye", + Mechanism: SASLOauth, + OAuthTokenURL: "http://example.com/token", + }, + }, + }, { + Description: "OAuth requires a token URL", + Initial: func() interface{} { return DefaultConfiguration() }, + Configuration: func() interface{} { + return gin.H{ + "sasl": gin.H{ + "username": "hello", + "password": "bye", + "mechanism": "oauth", + }, + } + }, + Error: true, + }, { + Description: "OAuth token URL only with OAuth", + Initial: func() interface{} { return DefaultConfiguration() }, + Configuration: func() interface{} { + return gin.H{ + "sasl": gin.H{ + "username": "hello", + "password": "bye", + "mechanism": "plain", + "oauth-token-url": "http://example.com/token", + }, + } + }, + Error: true, }, }) } diff --git a/common/kafka/oauth.go b/common/kafka/oauth.go new file mode 100644 index 00000000..1ce847c6 --- /dev/null +++ b/common/kafka/oauth.go @@ -0,0 +1,46 @@ +// SPDX-FileCopyrightText: 2025 Free Mobile +// SPDX-License-Identifier: AGPL-3.0-only + +package kafka + +import ( + "context" + "crypto/tls" + "net/http" + + "github.com/IBM/sarama" + "golang.org/x/oauth2" + "golang.org/x/oauth2/clientcredentials" +) + +// tokenProvider implements sarama.AccessTokenProvider. +type tokenProvider struct { + tokenSource oauth2.TokenSource +} + +// newOAuthTokenProvider returns a sarama.AccessTokenProvider using OAuth credentials. +func newOAuthTokenProvider(ctx context.Context, tlsConfig *tls.Config, clientID, clientSecret, tokenURL string) sarama.AccessTokenProvider { + cfg := clientcredentials.Config{ + ClientID: clientID, + ClientSecret: clientSecret, + TokenURL: tokenURL, + } + httpClient := &http.Client{Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: tlsConfig, + }} + ctx = context.WithValue(ctx, oauth2.HTTPClient, httpClient) + + return &tokenProvider{ + tokenSource: cfg.TokenSource(context.Background()), + } +} + +// Token returns a new *sarama.AccessToken or an error as appropriate. +func (t *tokenProvider) Token() (*sarama.AccessToken, error) { + token, err := t.tokenSource.Token() + if err != nil { + return nil, err + } + return &sarama.AccessToken{Token: token.AccessToken}, nil +} diff --git a/common/kafka/oauth_test.go b/common/kafka/oauth_test.go new file mode 100644 index 00000000..e3d90ced --- /dev/null +++ b/common/kafka/oauth_test.go @@ -0,0 +1,114 @@ +// SPDX-FileCopyrightText: 2025 Free Mobile +// SPDX-License-Identifier: AGPL-3.0-only + +package kafka + +import ( + "akvorado/common/helpers" + "akvorado/common/reporter" + + "context" + "fmt" + "testing" + "time" + + "github.com/IBM/sarama" + "golang.org/x/oauth2" + "golang.org/x/oauth2/clientcredentials" +) + +func TestOAuth2ServerPassword(t *testing.T) { + oauthServer := helpers.CheckExternalService(t, "mock-auth2-server", + []string{"mock-oauth2-server:8080", "127.0.0.1:5556"}) + + ctx := context.Background() + conf := &oauth2.Config{ + ClientID: "kafka-client", + ClientSecret: "kafka-client-secret", + Endpoint: oauth2.Endpoint{ + TokenURL: fmt.Sprintf("http://%s/default/token", oauthServer), + }, + Scopes: []string{"openid"}, + } + + token, err := conf.PasswordCredentialsToken(ctx, "akvorado@example.com", "password") + if err != nil { + t.Fatalf("PasswordCredentialsToken() error:\n%+v", err) + } + + t.Logf("Access token: %s", token.AccessToken) + t.Logf("Token type: %s", token.TokenType) + t.Logf("Expiry: %s", token.Expiry.Format(time.RFC3339)) +} + +func TestOAuth2ServerClientCredentials(t *testing.T) { + oauthServer := helpers.CheckExternalService(t, "mock-oauth2-server", + []string{"mock-oauth2-server:8080", "127.0.0.1:5556"}) + + ctx := context.Background() + + // Use clientcredentials.Config instead of oauth2.Config + config := clientcredentials.Config{ + ClientID: "kafka-client", + ClientSecret: "kafka-client-secret", + TokenURL: fmt.Sprintf("http://%s/default/token", oauthServer), + Scopes: []string{"openid"}, + } + + // Get token directly from the client credentials config + token, err := config.Token(ctx) + if err != nil { + t.Fatalf("ClientCredentials Token() error:\n%+v", err) + } + + t.Logf("Access token: %s", token.AccessToken) + t.Logf("Token type: %s", token.TokenType) + t.Logf("Expiry: %s", token.Expiry.Format(time.RFC3339)) +} + +// Example with kcat: +// kcat -b 127.0.0.1:9093 \ +// -X security.protocol=SASL_PLAINTEXT \ +// -X sasl.mechanisms=OAUTHBEARER \ +// -X sasl.oauthbearer.method=OIDC \ // -X sasl.oauthbearer.client.id=kafka-client \ +// -X sasl.oauthbearer.client.secret=kafka-client-secret \ +// -X sasl.oauthbearer.token.endpoint.url=http://127.0.0.1:5556/default/token \ +// -t my-topic -C -d all + +func TestOAuth2Broker(t *testing.T) { + r := reporter.NewMock(t) + GlobalKafkaLogger.Register(r) + defer GlobalKafkaLogger.Unregister() + + // Ensure broker is ready. + SetupKafkaBroker(t) + + // Then try again with OAuth2. + oauthServer := helpers.CheckExternalService(t, "mock-auth2-server", + []string{"mock-oauth2-server:8080", "127.0.0.1:5556"}) + broker := helpers.CheckExternalService(t, "Kafka", + []string{"kafka:9093", "127.0.0.1:9093"}) + + config := DefaultConfiguration() + config.SASL = SASLConfiguration{ + Username: "kafka-client", + Password: "kafka-client-secret", + Mechanism: SASLOauth, + OAuthTokenURL: fmt.Sprintf("http://%s/default/token", oauthServer), + } + kafkaConfig, err := NewConfig(config) + if err != nil { + t.Fatalf("NewConfig() error:\n%+v", err) + } + if err := kafkaConfig.Validate(); err != nil { + t.Fatalf("Validate() error:\n%+v", err) + } + + client, err := sarama.NewClient([]string{broker}, kafkaConfig) + if err != nil { + t.Fatalf("sarama.NewClient() error:\n%+v", err) + } + if err := client.RefreshMetadata(); err != nil { + t.Fatalf("client.RefreshMetadata() error:\n%+v", err) + } +} diff --git a/console/data/docs/02-configuration.md b/console/data/docs/02-configuration.md index 4a633ddd..407cd2ae 100644 --- a/console/data/docs/02-configuration.md +++ b/console/data/docs/02-configuration.md @@ -748,8 +748,10 @@ The following keys are accepted for SASL configuration: - `username` and `password` enables SASL authentication with the provided user and password. - `algorithm` tells which SASL mechanism to use for authentication. This - can be `none`, `plain`, `scram-sha256`, or `scram-sha512`. This should not be + can be `none`, `plain`, `scram-sha256`, `scram-sha512`, or `oauth`. This should not be set to none when SASL is used. +- `oauth-token-url` defines the URL to query to get a valid OAuth token (in this + case, `username` and `password` are used as client credentials). The following keys are accepted for the topic configuration: diff --git a/console/data/docs/99-changelog.md b/console/data/docs/99-changelog.md index 0af70fcb..1436928f 100644 --- a/console/data/docs/99-changelog.md +++ b/console/data/docs/99-changelog.md @@ -15,6 +15,7 @@ identified with a specific icon: - 🩹 *inlet*: don't override flow-provided VLANs with VLAN from Ethernet header - 🌱 *orchestrator*: put SASL parameters in their own section in Kafka configuration +- 🌱 *orchestrator*: add OAuth support to Kafka client ## 1.11.4 - 2025-04-26 diff --git a/docker/docker-compose-dev.yml b/docker/docker-compose-dev.yml index 2fb5caff..7ce7dd07 100644 --- a/docker/docker-compose-dev.yml +++ b/docker/docker-compose-dev.yml @@ -7,21 +7,48 @@ services: environment: - ALLOW_ANONYMOUS_LOGIN=yes + mock-oauth2-server: + extends: + file: versions.yml + service: mock-oauth2-server + ports: + - 127.0.0.1:5556:8080/tcp + environment: + LOG_LEVEL: debug + kafka: extends: file: versions.yml service: kafka environment: - - KAFKA_CFG_BROKER_ID=1 - - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093 - - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT - - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9092 - - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT + KAFKA_CFG_BROKER_ID: 1 + KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 + # We have two sets of listeners: INTERNAL that is used from inside the docker + # compose network and listens on "kafka" and EXTERNAL that is mapped to + # the host network and listens on "localhost". + # + # Then, in each set, we have a plain text one and an OAuth-enabled one. + KAFKA_CFG_LISTENERS: INTERNAL://:9092,OINTERNAL://:9093,EXTERNAL://:9094,OEXTERNAL://:9095 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OINTERNAL:SASL_PLAINTEXT,EXTERNAL:PLAINTEXT,OEXTERNAL:SASL_PLAINTEXT + KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OINTERNAL://kafka:9093,EXTERNAL://localhost:9092,OEXTERNAL://localhost:9093 + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL + # OAuth2 configuration + KAFKA_CFG_LISTENER_NAME_OEXTERNAL_SASL_ENABLED_MECHANISMS: OAUTHBEARER + KAFKA_CFG_LISTENER_NAME_OEXTERNAL_SASL_OAUTHBEARER_JWKS_ENDPOINT_URL: http://mock-oauth2-server:8080/default/jwks + KAFKA_CFG_LISTENER_NAME_OEXTERNAL_SASL_OAUTHBEARER_EXPECTED_AUDIENCE: default + KAFKA_CFG_LISTENER_NAME_OEXTERNAL_OAUTHBEARER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=kafka-client clientSecret=kafka-client-secret unsecuredLoginStringClaim_sub="sub"; + KAFKA_CFG_LISTENER_NAME_OEXTERNAL_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler + KAFKA_CFG_LISTENER_NAME_OINTERNAL_SASL_ENABLED_MECHANISMS: OAUTHBEARER + KAFKA_CFG_LISTENER_NAME_OINTERNAL_SASL_OAUTHBEARER_JWKS_ENDPOINT_URL: http://mock-oauth2-server:8080/default/jwks + KAFKA_CFG_LISTENER_NAME_OINTERNAL_SASL_OAUTHBEARER_EXPECTED_AUDIENCE: default + KAFKA_CFG_LISTENER_NAME_OINTERNAL_OAUTHBEARER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=kafka-client clientSecret=kafka-client-secret unsecuredLoginStringClaim_sub="sub"; + KAFKA_CFG_LISTENER_NAME_OINTERNAL_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler depends_on: - zookeeper + - mock-oauth2-server ports: - - 127.0.0.1:9092:9093/tcp + - 127.0.0.1:9092:9094/tcp + - 127.0.0.1:9093:9095/tcp redis: extends: diff --git a/docker/versions.yml b/docker/versions.yml index 28f0426e..cb7f0cdc 100644 --- a/docker/versions.yml +++ b/docker/versions.yml @@ -40,3 +40,5 @@ services: image: postgres:16 # \d+ mysql: image: mariadb:11.4 # \d+\.\d+ + mock-oauth2-server: + image: ghcr.io/navikt/mock-oauth2-server:2.1.10 # \d+\.\d+\.\d+