diff --git a/cmd/testdata/configurations/clickhouse-network-sources/expected.yaml b/cmd/testdata/configurations/clickhouse-network-sources/expected.yaml index ed364487..b0d914b1 100644 --- a/cmd/testdata/configurations/clickhouse-network-sources/expected.yaml +++ b/cmd/testdata/configurations/clickhouse-network-sources/expected.yaml @@ -2,6 +2,12 @@ paths: clickhouse.networksources.amazon: url: https://ip-ranges.amazonaws.com/ip-ranges.json + tls: + enable: false + verify: true + cafile: "" + certfile: "" + keyfile: "" method: GET headers: {} proxy: true diff --git a/common/helpers/tls.go b/common/helpers/tls.go index 068d9bbb..a0a374ae 100644 --- a/common/helpers/tls.go +++ b/common/helpers/tls.go @@ -13,7 +13,7 @@ import ( // TLSConfiguration defines TLS configuration. type TLSConfiguration struct { - // Enable says if TLS should be used to connect to brokers + // Enable says if TLS should be used to connect to remote servers. Enable bool `validate:"required_with=CAFile CertFile KeyFile"` // Verify says if we need to check remote certificates Verify bool diff --git a/common/remotedatasource/config.go b/common/remotedatasource/config.go index 3887fc8a..c06ff240 100644 --- a/common/remotedatasource/config.go +++ b/common/remotedatasource/config.go @@ -31,6 +31,8 @@ type Source struct { Transform TransformQuery // Interval tells how much time to wait before updating the source. Interval time.Duration `validate:"min=1m"` + // TLS defines the TLS configuration if the URL needs it. + TLS helpers.TLSConfiguration } // TransformQuery represents a jq query to transform data. @@ -66,6 +68,10 @@ func DefaultSourceConfiguration() Source { return Source{ Method: "GET", Timeout: time.Minute, + TLS: helpers.TLSConfiguration{ + Enable: false, + Verify: true, + }, } } diff --git a/common/remotedatasource/config_test.go b/common/remotedatasource/config_test.go index 142ea611..7d865154 100644 --- a/common/remotedatasource/config_test.go +++ b/common/remotedatasource/config_test.go @@ -28,6 +28,9 @@ func TestSourceDecode(t *testing.T) { Method: "GET", Timeout: time.Minute, Interval: 10 * time.Minute, + TLS: helpers.TLSConfiguration{ + Verify: true, + }, }, }, { Description: "Simple transform", @@ -45,6 +48,9 @@ func TestSourceDecode(t *testing.T) { Timeout: time.Minute, Interval: 10 * time.Minute, Transform: MustParseTransformQuery(".[]"), + TLS: helpers.TLSConfiguration{ + Verify: true, + }, }, }, { Description: "Use POST", @@ -64,6 +70,33 @@ func TestSourceDecode(t *testing.T) { Timeout: 2 * time.Minute, Interval: 10 * time.Minute, Transform: MustParseTransformQuery(".[]"), + TLS: helpers.TLSConfiguration{ + Verify: true, + }, + }, + }, { + Description: "With TLS configuration", + Initial: func() any { return Source{} }, + Configuration: func() any { + return gin.H{ + "url": "https://example.net", + "interval": "10m", + "tls": gin.H{ + "enable": true, + "ca-file": "something.crt", + }, + } + }, + Expected: Source{ + URL: "https://example.net", + Method: "GET", + Timeout: time.Minute, + Interval: 10 * time.Minute, + TLS: helpers.TLSConfiguration{ + Enable: true, + Verify: false, // TODO this should be fixed + CAFile: "something.crt", + }, }, }, { Description: "Complex transform", @@ -85,6 +118,9 @@ func TestSourceDecode(t *testing.T) { Transform: MustParseTransformQuery(` .prefixes[] | {prefix: .ip_prefix, tenant: "amazon", region: .region, role: .service|ascii_downcase} `), + TLS: helpers.TLSConfiguration{ + Verify: true, + }, }, }, { Description: "Incorrect transform", diff --git a/common/remotedatasource/root.go b/common/remotedatasource/root.go index 31dc9cae..4dc60f99 100644 --- a/common/remotedatasource/root.go +++ b/common/remotedatasource/root.go @@ -73,6 +73,9 @@ func New[T any](r *reporter.Reporter, provider ProviderFunc, dataType string, da source.Transform.Query, _ = gojq.Parse(".") c.dataSources[k] = source } + if _, err := source.TLS.MakeTLSConfig(); err != nil { + return nil, err + } } c.initMetrics() @@ -88,8 +91,10 @@ func (c *Component[T]) Fetch(ctx context.Context, name string, source Source) ([ l := c.r.With().Str("name", name).Str("url", source.URL).Logger() l.Info().Msg("update data source") + tlsConfig, _ := source.TLS.MakeTLSConfig() client := &http.Client{Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: tlsConfig, }} req, err := http.NewRequestWithContext(ctx, source.Method, source.URL, nil) if err != nil { diff --git a/common/remotedatasource/root_test.go b/common/remotedatasource/root_test.go index 37436b35..4d16f7c0 100644 --- a/common/remotedatasource/root_test.go +++ b/common/remotedatasource/root_test.go @@ -5,7 +5,14 @@ package remotedatasource import ( "context" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" "fmt" + "math/big" "net" "net/http" "strconv" @@ -211,3 +218,128 @@ func TestSource(t *testing.T) { t.Fatalf("Metrics (-got, +want):\n%s", diff) } } + +// generateSelfSignedCert generates a self-signed certificate for testing +func generateSelfSignedCert(t *testing.T) tls.Certificate { + t.Helper() + + // Generate a private key + privateKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + t.Fatalf("ecdsa.GenerateKey() error:\n%+v", err) + } + + // Create a certificate template + template := x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{ + Organization: []string{"Test Organization"}, + }, + NotBefore: time.Now(), + NotAfter: time.Now().Add(time.Hour), + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + IPAddresses: []net.IP{net.ParseIP("127.0.0.1")}, + } + + // Create the certificate + certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey) + if err != nil { + t.Fatalf("x509.CreateCertificate() error:\n%+v", err) + } + + return tls.Certificate{ + Certificate: [][]byte{certDER}, + PrivateKey: privateKey, + } +} + +func TestSourceWithTLS(t *testing.T) { + cert := generateSelfSignedCert(t) + + // Setup TLS server + mux := http.NewServeMux() + mux.Handle("/data.json", http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(200) + w.Write([]byte(`{"results": [{"name": "secure", "description": "tls test"}]}`)) + })) + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Listen() error:\n%+v", err) + } + + server := &http.Server{ + Handler: mux, + TLSConfig: &tls.Config{ + Certificates: []tls.Certificate{cert}, + }, + } + address := listener.Addr().String() + go server.ServeTLS(listener, "", "") + defer server.Shutdown(context.Background()) + + t.Run("WithoutTLSConfig", func(t *testing.T) { + r := reporter.NewMock(t) + config := map[string]Source{ + "secure": { + URL: fmt.Sprintf("https://%s/data.json", address), + Method: "GET", + Timeout: 1 * time.Second, + Interval: 1 * time.Minute, + Transform: MustParseTransformQuery(".results[]"), + }, + } + handler := remoteDataHandler{ + data: []remoteData{}, + } + handler.fetcher, _ = New[remoteData](r, handler.UpdateData, "test", config) + + ctx, cancel := context.WithTimeout(t.Context(), time.Second) + defer cancel() + _, err := handler.fetcher.Fetch(ctx, "secure", config["secure"]) + if err == nil { + t.Fatal("Fetch() should have errored with certificate error") + } + }) + + t.Run("WithTLSSkipVerify", func(t *testing.T) { + r := reporter.NewMock(t) + config := map[string]Source{ + "secure": { + URL: fmt.Sprintf("https://%s/data.json", address), + Method: "GET", + Timeout: 1 * time.Second, + Interval: 1 * time.Minute, + TLS: helpers.TLSConfiguration{ + Enable: true, + Verify: false, + }, + Transform: MustParseTransformQuery(".results[]"), + }, + } + handler := remoteDataHandler{ + data: []remoteData{}, + } + handler.fetcher, _ = New[remoteData](r, handler.UpdateData, "test", config) + + ctx, cancel := context.WithTimeout(t.Context(), time.Second) + defer cancel() + results, err := handler.fetcher.Fetch(ctx, "secure", config["secure"]) + if err != nil { + t.Fatalf("Fetch() error:\n%+v", err) + } + + expected := []remoteData{ + { + Name: "secure", + Description: "tls test", + }, + } + if diff := helpers.Diff(results, expected); diff != "" { + t.Fatalf("Fetch() (-got, +want):\n%s", diff) + } + }) +} diff --git a/console/data/docs/02-configuration.md b/console/data/docs/02-configuration.md index 94ac030a..c87a17b5 100644 --- a/console/data/docs/02-configuration.md +++ b/console/data/docs/02-configuration.md @@ -452,6 +452,8 @@ but the definition is fetched through HTTP. It accepts a map from source names t sources. Each source accepts these attributes: - `url` is the URL to fetch. +- `tls` defines the TLS configuration to connect to the source (it uses the same + configuration as for [Kafka](#kafka-2), be sure to set `enable` to `true`) - `method` is the method to use (`GET` or `POST`). - `headers` is a map of header names to values to add to the request. - `proxy` defines if a proxy should be used (defined with environment variables @@ -760,7 +762,7 @@ flows. It accepts the following keys: - `tls` defines the TLS configuration to connect to the cluster - `sasl` defines the SASL configuration to connect to the cluster - `topic` defines the base topic name -- `topic-configuration` describes how the topic should be configured +- `topic-cofniguration` describes how the topic should be configured The following keys are accepted for the TLS configuration: @@ -828,6 +830,7 @@ ClickHouse database. The following keys should be provided inside - `password` is the password to use for authentication - `database` defines the database to use to create tables - `cluster` defines the cluster for replicated and distributed tables, see the next section for more information +- `tls` defines the TLS configuration to connect to the database (it uses the same configuration as for [Kafka](#kafka-2)) ### ClickHouse @@ -848,6 +851,9 @@ provided inside `clickhouse`: map from source names to sources. Each source accepts the following attributes: - `url` is the URL to fetch + - `tls` defines the TLS configuration to connect to the source (it uses the + same configuration as for [Kafka](#kafka-2), be sure to set `enable` to + `true`) - `method` is the method to use (`GET` or `POST`) - `headers` is a map from header names to values to add to the request - `proxy` says if we should use a proxy (defined through environment variables like `http_proxy`) diff --git a/console/data/docs/99-changelog.md b/console/data/docs/99-changelog.md index 7d4eb158..00eaa277 100644 --- a/console/data/docs/99-changelog.md +++ b/console/data/docs/99-changelog.md @@ -10,6 +10,10 @@ identified with a specific icon: - 🩹: bug fix - 🌱: miscellaneous change +## Unreleased + +- 🌱 *common*: remote data sources accept a specific TLS configuration + ## 2.0.2 - 2025-10-29 The modification of the default value of `inlet`→`kafka`→`queue-size` should