diff --git a/common/kafka/config.go b/common/kafka/config.go index 28a384ab..1b541073 100644 --- a/common/kafka/config.go +++ b/common/kafka/config.go @@ -6,6 +6,7 @@ package kafka import ( + "context" "crypto/sha256" "crypto/sha512" "fmt" @@ -40,6 +41,8 @@ type SASLConfiguration struct { Password string `validate:"required_with=SASLAlgorithm SASLUsername"` // Mechanism tells the SASL algorithm Mechanism SASLMechanism `validate:"required_with=SASLUsername"` + // OAuthTokenURL tells which URL to use to get an OAuthToken + OAuthTokenURL string `validate:"required_if=Mechanism 4,excluded_unless=Mechanism 4,omitempty,url"` } // DefaultConfiguration represents the default configuration for connecting to Kafka. @@ -90,6 +93,8 @@ const ( SASLScramSHA256 // SASLScramSHA512 enables SCRAM challenge with SHA512 SASLScramSHA512 + // SASLOauth enables OAuth authentication + SASLOauth ) // NewConfig returns a Sarama Kafka configuration ready to use. @@ -110,20 +115,28 @@ func NewConfig(config Configuration) (*sarama.Config, error) { kafkaConfig.Net.SASL.Enable = true kafkaConfig.Net.SASL.User = config.SASL.Username kafkaConfig.Net.SASL.Password = config.SASL.Password - kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext - if config.SASL.Mechanism == SASLScramSHA256 { + switch config.SASL.Mechanism { + case SASLPlain: + kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext + case SASLScramSHA256: kafkaConfig.Net.SASL.Handshake = true kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &xdgSCRAMClient{HashGeneratorFcn: sha256.New} } - } - if config.SASL.Mechanism == SASLScramSHA512 { + case SASLScramSHA512: kafkaConfig.Net.SASL.Handshake = true kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &xdgSCRAMClient{HashGeneratorFcn: sha512.New} } + case SASLOauth: + kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth + kafkaConfig.Net.SASL.TokenProvider = newOAuthTokenProvider( + context.Background(), // TODO should be bound to the component lifecycle, but no component here + tlsConfig, + config.SASL.Username, config.SASL.Password, + config.SASL.OAuthTokenURL) } } return kafkaConfig, nil diff --git a/common/kafka/config_test.go b/common/kafka/config_test.go index d77e971f..5eaac1d5 100644 --- a/common/kafka/config_test.go +++ b/common/kafka/config_test.go @@ -62,6 +62,19 @@ func TestKafkaNewConfig(t *testing.T) { Mechanism: SASLScramSHA512, }, }, + }, { + description: "SASL OAuth2", + config: Configuration{ + TLS: helpers.TLSConfiguration{ + Enable: true, + }, + SASL: SASLConfiguration{ + Username: "hello", + Password: "password", + Mechanism: SASLOauth, + OAuthTokenURL: "http://example.com/token", + }, + }, }, } for _, tc := range cases { @@ -187,6 +200,65 @@ func TestTLSConfiguration(t *testing.T) { Mechanism: SASLScramSHA256, }, }, + }, { + Description: "TLS SASL OAuth", + Initial: func() interface{} { return DefaultConfiguration() }, + Configuration: func() interface{} { + return gin.H{ + "tls": gin.H{ + "enable": true, + }, + "sasl": gin.H{ + "username": "hello", + "password": "bye", + "mechanism": "oauth", + "oauth-token-url": "http://example.com/token", + }, + } + }, + Expected: Configuration{ + Topic: "flows", + Brokers: []string{"127.0.0.1:9092"}, + Version: Version(sarama.V2_8_1_0), + TLS: helpers.TLSConfiguration{ + Enable: true, + // Value from DefaultConfig is true + Verify: true, + }, + SASL: SASLConfiguration{ + Username: "hello", + Password: "bye", + Mechanism: SASLOauth, + OAuthTokenURL: "http://example.com/token", + }, + }, + }, { + Description: "OAuth requires a token URL", + Initial: func() interface{} { return DefaultConfiguration() }, + Configuration: func() interface{} { + return gin.H{ + "sasl": gin.H{ + "username": "hello", + "password": "bye", + "mechanism": "oauth", + }, + } + }, + Error: true, + }, { + Description: "OAuth token URL only with OAuth", + Initial: func() interface{} { return DefaultConfiguration() }, + Configuration: func() interface{} { + return gin.H{ + "sasl": gin.H{ + "username": "hello", + "password": "bye", + "mechanism": "plain", + "oauth-token-url": "http://example.com/token", + }, + } + }, + Error: true, }, }) } diff --git a/common/kafka/oauth.go b/common/kafka/oauth.go new file mode 100644 index 00000000..1ce847c6 --- /dev/null +++ b/common/kafka/oauth.go @@ -0,0 +1,46 @@ +// SPDX-FileCopyrightText: 2025 Free Mobile +// SPDX-License-Identifier: AGPL-3.0-only + +package kafka + +import ( + "context" + "crypto/tls" + "net/http" + + "github.com/IBM/sarama" + "golang.org/x/oauth2" + "golang.org/x/oauth2/clientcredentials" +) + +// tokenProvider implements sarama.AccessTokenProvider. +type tokenProvider struct { + tokenSource oauth2.TokenSource +} + +// newOAuthTokenProvider returns a sarama.AccessTokenProvider using OAuth credentials. +func newOAuthTokenProvider(ctx context.Context, tlsConfig *tls.Config, clientID, clientSecret, tokenURL string) sarama.AccessTokenProvider { + cfg := clientcredentials.Config{ + ClientID: clientID, + ClientSecret: clientSecret, + TokenURL: tokenURL, + } + httpClient := &http.Client{Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: tlsConfig, + }} + ctx = context.WithValue(ctx, oauth2.HTTPClient, httpClient) + + return &tokenProvider{ + tokenSource: cfg.TokenSource(context.Background()), + } +} + +// Token returns a new *sarama.AccessToken or an error as appropriate. +func (t *tokenProvider) Token() (*sarama.AccessToken, error) { + token, err := t.tokenSource.Token() + if err != nil { + return nil, err + } + return &sarama.AccessToken{Token: token.AccessToken}, nil +} diff --git a/common/kafka/oauth_test.go b/common/kafka/oauth_test.go new file mode 100644 index 00000000..e3d90ced --- /dev/null +++ b/common/kafka/oauth_test.go @@ -0,0 +1,114 @@ +// SPDX-FileCopyrightText: 2025 Free Mobile +// SPDX-License-Identifier: AGPL-3.0-only + +package kafka + +import ( + "akvorado/common/helpers" + "akvorado/common/reporter" + + "context" + "fmt" + "testing" + "time" + + "github.com/IBM/sarama" + "golang.org/x/oauth2" + "golang.org/x/oauth2/clientcredentials" +) + +func TestOAuth2ServerPassword(t *testing.T) { + oauthServer := helpers.CheckExternalService(t, "mock-auth2-server", + []string{"mock-oauth2-server:8080", "127.0.0.1:5556"}) + + ctx := context.Background() + conf := &oauth2.Config{ + ClientID: "kafka-client", + ClientSecret: "kafka-client-secret", + Endpoint: oauth2.Endpoint{ + TokenURL: fmt.Sprintf("http://%s/default/token", oauthServer), + }, + Scopes: []string{"openid"}, + } + + token, err := conf.PasswordCredentialsToken(ctx, "akvorado@example.com", "password") + if err != nil { + t.Fatalf("PasswordCredentialsToken() error:\n%+v", err) + } + + t.Logf("Access token: %s", token.AccessToken) + t.Logf("Token type: %s", token.TokenType) + t.Logf("Expiry: %s", token.Expiry.Format(time.RFC3339)) +} + +func TestOAuth2ServerClientCredentials(t *testing.T) { + oauthServer := helpers.CheckExternalService(t, "mock-oauth2-server", + []string{"mock-oauth2-server:8080", "127.0.0.1:5556"}) + + ctx := context.Background() + + // Use clientcredentials.Config instead of oauth2.Config + config := clientcredentials.Config{ + ClientID: "kafka-client", + ClientSecret: "kafka-client-secret", + TokenURL: fmt.Sprintf("http://%s/default/token", oauthServer), + Scopes: []string{"openid"}, + } + + // Get token directly from the client credentials config + token, err := config.Token(ctx) + if err != nil { + t.Fatalf("ClientCredentials Token() error:\n%+v", err) + } + + t.Logf("Access token: %s", token.AccessToken) + t.Logf("Token type: %s", token.TokenType) + t.Logf("Expiry: %s", token.Expiry.Format(time.RFC3339)) +} + +// Example with kcat: +// kcat -b 127.0.0.1:9093 \ +// -X security.protocol=SASL_PLAINTEXT \ +// -X sasl.mechanisms=OAUTHBEARER \ +// -X sasl.oauthbearer.method=OIDC \ // -X sasl.oauthbearer.client.id=kafka-client \ +// -X sasl.oauthbearer.client.secret=kafka-client-secret \ +// -X sasl.oauthbearer.token.endpoint.url=http://127.0.0.1:5556/default/token \ +// -t my-topic -C -d all + +func TestOAuth2Broker(t *testing.T) { + r := reporter.NewMock(t) + GlobalKafkaLogger.Register(r) + defer GlobalKafkaLogger.Unregister() + + // Ensure broker is ready. + SetupKafkaBroker(t) + + // Then try again with OAuth2. + oauthServer := helpers.CheckExternalService(t, "mock-auth2-server", + []string{"mock-oauth2-server:8080", "127.0.0.1:5556"}) + broker := helpers.CheckExternalService(t, "Kafka", + []string{"kafka:9093", "127.0.0.1:9093"}) + + config := DefaultConfiguration() + config.SASL = SASLConfiguration{ + Username: "kafka-client", + Password: "kafka-client-secret", + Mechanism: SASLOauth, + OAuthTokenURL: fmt.Sprintf("http://%s/default/token", oauthServer), + } + kafkaConfig, err := NewConfig(config) + if err != nil { + t.Fatalf("NewConfig() error:\n%+v", err) + } + if err := kafkaConfig.Validate(); err != nil { + t.Fatalf("Validate() error:\n%+v", err) + } + + client, err := sarama.NewClient([]string{broker}, kafkaConfig) + if err != nil { + t.Fatalf("sarama.NewClient() error:\n%+v", err) + } + if err := client.RefreshMetadata(); err != nil { + t.Fatalf("client.RefreshMetadata() error:\n%+v", err) + } +} diff --git a/console/data/docs/02-configuration.md b/console/data/docs/02-configuration.md index 4a633ddd..407cd2ae 100644 --- a/console/data/docs/02-configuration.md +++ b/console/data/docs/02-configuration.md @@ -748,8 +748,10 @@ The following keys are accepted for SASL configuration: - `username` and `password` enables SASL authentication with the provided user and password. - `algorithm` tells which SASL mechanism to use for authentication. This - can be `none`, `plain`, `scram-sha256`, or `scram-sha512`. This should not be + can be `none`, `plain`, `scram-sha256`, `scram-sha512`, or `oauth`. This should not be set to none when SASL is used. +- `oauth-token-url` defines the URL to query to get a valid OAuth token (in this + case, `username` and `password` are used as client credentials). The following keys are accepted for the topic configuration: diff --git a/console/data/docs/99-changelog.md b/console/data/docs/99-changelog.md index 0af70fcb..1436928f 100644 --- a/console/data/docs/99-changelog.md +++ b/console/data/docs/99-changelog.md @@ -15,6 +15,7 @@ identified with a specific icon: - 🩹 *inlet*: don't override flow-provided VLANs with VLAN from Ethernet header - 🌱 *orchestrator*: put SASL parameters in their own section in Kafka configuration +- 🌱 *orchestrator*: add OAuth support to Kafka client ## 1.11.4 - 2025-04-26 diff --git a/docker/docker-compose-dev.yml b/docker/docker-compose-dev.yml index 2fb5caff..7ce7dd07 100644 --- a/docker/docker-compose-dev.yml +++ b/docker/docker-compose-dev.yml @@ -7,21 +7,48 @@ services: environment: - ALLOW_ANONYMOUS_LOGIN=yes + mock-oauth2-server: + extends: + file: versions.yml + service: mock-oauth2-server + ports: + - 127.0.0.1:5556:8080/tcp + environment: + LOG_LEVEL: debug + kafka: extends: file: versions.yml service: kafka environment: - - KAFKA_CFG_BROKER_ID=1 - - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093 - - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT - - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9092 - - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT + KAFKA_CFG_BROKER_ID: 1 + KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 + # We have two sets of listeners: INTERNAL that is used from inside the docker + # compose network and listens on "kafka" and EXTERNAL that is mapped to + # the host network and listens on "localhost". + # + # Then, in each set, we have a plain text one and an OAuth-enabled one. + KAFKA_CFG_LISTENERS: INTERNAL://:9092,OINTERNAL://:9093,EXTERNAL://:9094,OEXTERNAL://:9095 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OINTERNAL:SASL_PLAINTEXT,EXTERNAL:PLAINTEXT,OEXTERNAL:SASL_PLAINTEXT + KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OINTERNAL://kafka:9093,EXTERNAL://localhost:9092,OEXTERNAL://localhost:9093 + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL + # OAuth2 configuration + KAFKA_CFG_LISTENER_NAME_OEXTERNAL_SASL_ENABLED_MECHANISMS: OAUTHBEARER + KAFKA_CFG_LISTENER_NAME_OEXTERNAL_SASL_OAUTHBEARER_JWKS_ENDPOINT_URL: http://mock-oauth2-server:8080/default/jwks + KAFKA_CFG_LISTENER_NAME_OEXTERNAL_SASL_OAUTHBEARER_EXPECTED_AUDIENCE: default + KAFKA_CFG_LISTENER_NAME_OEXTERNAL_OAUTHBEARER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=kafka-client clientSecret=kafka-client-secret unsecuredLoginStringClaim_sub="sub"; + KAFKA_CFG_LISTENER_NAME_OEXTERNAL_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler + KAFKA_CFG_LISTENER_NAME_OINTERNAL_SASL_ENABLED_MECHANISMS: OAUTHBEARER + KAFKA_CFG_LISTENER_NAME_OINTERNAL_SASL_OAUTHBEARER_JWKS_ENDPOINT_URL: http://mock-oauth2-server:8080/default/jwks + KAFKA_CFG_LISTENER_NAME_OINTERNAL_SASL_OAUTHBEARER_EXPECTED_AUDIENCE: default + KAFKA_CFG_LISTENER_NAME_OINTERNAL_OAUTHBEARER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=kafka-client clientSecret=kafka-client-secret unsecuredLoginStringClaim_sub="sub"; + KAFKA_CFG_LISTENER_NAME_OINTERNAL_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler depends_on: - zookeeper + - mock-oauth2-server ports: - - 127.0.0.1:9092:9093/tcp + - 127.0.0.1:9092:9094/tcp + - 127.0.0.1:9093:9095/tcp redis: extends: diff --git a/docker/versions.yml b/docker/versions.yml index 28f0426e..cb7f0cdc 100644 --- a/docker/versions.yml +++ b/docker/versions.yml @@ -40,3 +40,5 @@ services: image: postgres:16 # \d+ mysql: image: mariadb:11.4 # \d+\.\d+ + mock-oauth2-server: + image: ghcr.io/navikt/mock-oauth2-server:2.1.10 # \d+\.\d+\.\d+