mirror of
https://github.com/akvorado/akvorado.git
synced 2025-12-11 22:14:02 +01:00
common/remotedatasource: accept specific TLS configuration
Some checks failed
CI / 🤖 Check dependabot status (push) Has been cancelled
CI / 🐧 Test on Linux (${{ github.ref_type == 'tag' }}, misc) (push) Has been cancelled
CI / 🐧 Test on Linux (coverage) (push) Has been cancelled
CI / 🐧 Test on Linux (regular) (push) Has been cancelled
CI / ❄️ Build on Nix (push) Has been cancelled
CI / 🍏 Build and test on macOS (push) Has been cancelled
CI / 🧪 End-to-end testing (push) Has been cancelled
CI / 🔍 Upload code coverage (push) Has been cancelled
CI / 🔬 Test only Go (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 20) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 22) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 24) (push) Has been cancelled
CI / ⚖️ Check licenses (push) Has been cancelled
CI / 🐋 Build Docker images (push) Has been cancelled
CI / 🐋 Tag Docker images (push) Has been cancelled
CI / 🚀 Publish release (push) Has been cancelled
Update Nix dependency hashes / Update dependency hashes (push) Has been cancelled
Some checks failed
CI / 🤖 Check dependabot status (push) Has been cancelled
CI / 🐧 Test on Linux (${{ github.ref_type == 'tag' }}, misc) (push) Has been cancelled
CI / 🐧 Test on Linux (coverage) (push) Has been cancelled
CI / 🐧 Test on Linux (regular) (push) Has been cancelled
CI / ❄️ Build on Nix (push) Has been cancelled
CI / 🍏 Build and test on macOS (push) Has been cancelled
CI / 🧪 End-to-end testing (push) Has been cancelled
CI / 🔍 Upload code coverage (push) Has been cancelled
CI / 🔬 Test only Go (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 20) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 22) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 24) (push) Has been cancelled
CI / ⚖️ Check licenses (push) Has been cancelled
CI / 🐋 Build Docker images (push) Has been cancelled
CI / 🐋 Tag Docker images (push) Has been cancelled
CI / 🚀 Publish release (push) Has been cancelled
Update Nix dependency hashes / Update dependency hashes (push) Has been cancelled
This commit is contained in:
@@ -2,6 +2,12 @@
|
||||
paths:
|
||||
clickhouse.networksources.amazon:
|
||||
url: https://ip-ranges.amazonaws.com/ip-ranges.json
|
||||
tls:
|
||||
enable: false
|
||||
verify: true
|
||||
cafile: ""
|
||||
certfile: ""
|
||||
keyfile: ""
|
||||
method: GET
|
||||
headers: {}
|
||||
proxy: true
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
|
||||
// TLSConfiguration defines TLS configuration.
|
||||
type TLSConfiguration struct {
|
||||
// Enable says if TLS should be used to connect to brokers
|
||||
// Enable says if TLS should be used to connect to remote servers.
|
||||
Enable bool `validate:"required_with=CAFile CertFile KeyFile"`
|
||||
// Verify says if we need to check remote certificates
|
||||
Verify bool
|
||||
|
||||
@@ -31,6 +31,8 @@ type Source struct {
|
||||
Transform TransformQuery
|
||||
// Interval tells how much time to wait before updating the source.
|
||||
Interval time.Duration `validate:"min=1m"`
|
||||
// TLS defines the TLS configuration if the URL needs it.
|
||||
TLS helpers.TLSConfiguration
|
||||
}
|
||||
|
||||
// TransformQuery represents a jq query to transform data.
|
||||
@@ -66,6 +68,10 @@ func DefaultSourceConfiguration() Source {
|
||||
return Source{
|
||||
Method: "GET",
|
||||
Timeout: time.Minute,
|
||||
TLS: helpers.TLSConfiguration{
|
||||
Enable: false,
|
||||
Verify: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -28,6 +28,9 @@ func TestSourceDecode(t *testing.T) {
|
||||
Method: "GET",
|
||||
Timeout: time.Minute,
|
||||
Interval: 10 * time.Minute,
|
||||
TLS: helpers.TLSConfiguration{
|
||||
Verify: true,
|
||||
},
|
||||
},
|
||||
}, {
|
||||
Description: "Simple transform",
|
||||
@@ -45,6 +48,9 @@ func TestSourceDecode(t *testing.T) {
|
||||
Timeout: time.Minute,
|
||||
Interval: 10 * time.Minute,
|
||||
Transform: MustParseTransformQuery(".[]"),
|
||||
TLS: helpers.TLSConfiguration{
|
||||
Verify: true,
|
||||
},
|
||||
},
|
||||
}, {
|
||||
Description: "Use POST",
|
||||
@@ -64,6 +70,33 @@ func TestSourceDecode(t *testing.T) {
|
||||
Timeout: 2 * time.Minute,
|
||||
Interval: 10 * time.Minute,
|
||||
Transform: MustParseTransformQuery(".[]"),
|
||||
TLS: helpers.TLSConfiguration{
|
||||
Verify: true,
|
||||
},
|
||||
},
|
||||
}, {
|
||||
Description: "With TLS configuration",
|
||||
Initial: func() any { return Source{} },
|
||||
Configuration: func() any {
|
||||
return gin.H{
|
||||
"url": "https://example.net",
|
||||
"interval": "10m",
|
||||
"tls": gin.H{
|
||||
"enable": true,
|
||||
"ca-file": "something.crt",
|
||||
},
|
||||
}
|
||||
},
|
||||
Expected: Source{
|
||||
URL: "https://example.net",
|
||||
Method: "GET",
|
||||
Timeout: time.Minute,
|
||||
Interval: 10 * time.Minute,
|
||||
TLS: helpers.TLSConfiguration{
|
||||
Enable: true,
|
||||
Verify: false, // TODO this should be fixed
|
||||
CAFile: "something.crt",
|
||||
},
|
||||
},
|
||||
}, {
|
||||
Description: "Complex transform",
|
||||
@@ -85,6 +118,9 @@ func TestSourceDecode(t *testing.T) {
|
||||
Transform: MustParseTransformQuery(`
|
||||
.prefixes[] | {prefix: .ip_prefix, tenant: "amazon", region: .region, role: .service|ascii_downcase}
|
||||
`),
|
||||
TLS: helpers.TLSConfiguration{
|
||||
Verify: true,
|
||||
},
|
||||
},
|
||||
}, {
|
||||
Description: "Incorrect transform",
|
||||
|
||||
@@ -73,6 +73,9 @@ func New[T any](r *reporter.Reporter, provider ProviderFunc, dataType string, da
|
||||
source.Transform.Query, _ = gojq.Parse(".")
|
||||
c.dataSources[k] = source
|
||||
}
|
||||
if _, err := source.TLS.MakeTLSConfig(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
c.initMetrics()
|
||||
@@ -88,8 +91,10 @@ func (c *Component[T]) Fetch(ctx context.Context, name string, source Source) ([
|
||||
l := c.r.With().Str("name", name).Str("url", source.URL).Logger()
|
||||
l.Info().Msg("update data source")
|
||||
|
||||
tlsConfig, _ := source.TLS.MakeTLSConfig()
|
||||
client := &http.Client{Transport: &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
TLSClientConfig: tlsConfig,
|
||||
}}
|
||||
req, err := http.NewRequestWithContext(ctx, source.Method, source.URL, nil)
|
||||
if err != nil {
|
||||
|
||||
@@ -5,7 +5,14 @@ package remotedatasource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"crypto/rand"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"crypto/x509/pkix"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
@@ -211,3 +218,128 @@ func TestSource(t *testing.T) {
|
||||
t.Fatalf("Metrics (-got, +want):\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
// generateSelfSignedCert generates a self-signed certificate for testing
|
||||
func generateSelfSignedCert(t *testing.T) tls.Certificate {
|
||||
t.Helper()
|
||||
|
||||
// Generate a private key
|
||||
privateKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
|
||||
if err != nil {
|
||||
t.Fatalf("ecdsa.GenerateKey() error:\n%+v", err)
|
||||
}
|
||||
|
||||
// Create a certificate template
|
||||
template := x509.Certificate{
|
||||
SerialNumber: big.NewInt(1),
|
||||
Subject: pkix.Name{
|
||||
Organization: []string{"Test Organization"},
|
||||
},
|
||||
NotBefore: time.Now(),
|
||||
NotAfter: time.Now().Add(time.Hour),
|
||||
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
|
||||
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
|
||||
BasicConstraintsValid: true,
|
||||
IPAddresses: []net.IP{net.ParseIP("127.0.0.1")},
|
||||
}
|
||||
|
||||
// Create the certificate
|
||||
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)
|
||||
if err != nil {
|
||||
t.Fatalf("x509.CreateCertificate() error:\n%+v", err)
|
||||
}
|
||||
|
||||
return tls.Certificate{
|
||||
Certificate: [][]byte{certDER},
|
||||
PrivateKey: privateKey,
|
||||
}
|
||||
}
|
||||
|
||||
func TestSourceWithTLS(t *testing.T) {
|
||||
cert := generateSelfSignedCert(t)
|
||||
|
||||
// Setup TLS server
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/data.json", http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.Header().Add("Content-Type", "application/json")
|
||||
w.WriteHeader(200)
|
||||
w.Write([]byte(`{"results": [{"name": "secure", "description": "tls test"}]}`))
|
||||
}))
|
||||
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("Listen() error:\n%+v", err)
|
||||
}
|
||||
|
||||
server := &http.Server{
|
||||
Handler: mux,
|
||||
TLSConfig: &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
},
|
||||
}
|
||||
address := listener.Addr().String()
|
||||
go server.ServeTLS(listener, "", "")
|
||||
defer server.Shutdown(context.Background())
|
||||
|
||||
t.Run("WithoutTLSConfig", func(t *testing.T) {
|
||||
r := reporter.NewMock(t)
|
||||
config := map[string]Source{
|
||||
"secure": {
|
||||
URL: fmt.Sprintf("https://%s/data.json", address),
|
||||
Method: "GET",
|
||||
Timeout: 1 * time.Second,
|
||||
Interval: 1 * time.Minute,
|
||||
Transform: MustParseTransformQuery(".results[]"),
|
||||
},
|
||||
}
|
||||
handler := remoteDataHandler{
|
||||
data: []remoteData{},
|
||||
}
|
||||
handler.fetcher, _ = New[remoteData](r, handler.UpdateData, "test", config)
|
||||
|
||||
ctx, cancel := context.WithTimeout(t.Context(), time.Second)
|
||||
defer cancel()
|
||||
_, err := handler.fetcher.Fetch(ctx, "secure", config["secure"])
|
||||
if err == nil {
|
||||
t.Fatal("Fetch() should have errored with certificate error")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("WithTLSSkipVerify", func(t *testing.T) {
|
||||
r := reporter.NewMock(t)
|
||||
config := map[string]Source{
|
||||
"secure": {
|
||||
URL: fmt.Sprintf("https://%s/data.json", address),
|
||||
Method: "GET",
|
||||
Timeout: 1 * time.Second,
|
||||
Interval: 1 * time.Minute,
|
||||
TLS: helpers.TLSConfiguration{
|
||||
Enable: true,
|
||||
Verify: false,
|
||||
},
|
||||
Transform: MustParseTransformQuery(".results[]"),
|
||||
},
|
||||
}
|
||||
handler := remoteDataHandler{
|
||||
data: []remoteData{},
|
||||
}
|
||||
handler.fetcher, _ = New[remoteData](r, handler.UpdateData, "test", config)
|
||||
|
||||
ctx, cancel := context.WithTimeout(t.Context(), time.Second)
|
||||
defer cancel()
|
||||
results, err := handler.fetcher.Fetch(ctx, "secure", config["secure"])
|
||||
if err != nil {
|
||||
t.Fatalf("Fetch() error:\n%+v", err)
|
||||
}
|
||||
|
||||
expected := []remoteData{
|
||||
{
|
||||
Name: "secure",
|
||||
Description: "tls test",
|
||||
},
|
||||
}
|
||||
if diff := helpers.Diff(results, expected); diff != "" {
|
||||
t.Fatalf("Fetch() (-got, +want):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -452,6 +452,8 @@ but the definition is fetched through HTTP. It accepts a map from source names t
|
||||
sources. Each source accepts these attributes:
|
||||
|
||||
- `url` is the URL to fetch.
|
||||
- `tls` defines the TLS configuration to connect to the source (it uses the same
|
||||
configuration as for [Kafka](#kafka-2), be sure to set `enable` to `true`)
|
||||
- `method` is the method to use (`GET` or `POST`).
|
||||
- `headers` is a map of header names to values to add to the request.
|
||||
- `proxy` defines if a proxy should be used (defined with environment variables
|
||||
@@ -760,7 +762,7 @@ flows. It accepts the following keys:
|
||||
- `tls` defines the TLS configuration to connect to the cluster
|
||||
- `sasl` defines the SASL configuration to connect to the cluster
|
||||
- `topic` defines the base topic name
|
||||
- `topic-configuration` describes how the topic should be configured
|
||||
- `topic-cofniguration` describes how the topic should be configured
|
||||
|
||||
The following keys are accepted for the TLS configuration:
|
||||
|
||||
@@ -828,6 +830,7 @@ ClickHouse database. The following keys should be provided inside
|
||||
- `password` is the password to use for authentication
|
||||
- `database` defines the database to use to create tables
|
||||
- `cluster` defines the cluster for replicated and distributed tables, see the next section for more information
|
||||
- `tls` defines the TLS configuration to connect to the database (it uses the same configuration as for [Kafka](#kafka-2))
|
||||
|
||||
### ClickHouse
|
||||
|
||||
@@ -848,6 +851,9 @@ provided inside `clickhouse`:
|
||||
map from source names to sources. Each source accepts the following
|
||||
attributes:
|
||||
- `url` is the URL to fetch
|
||||
- `tls` defines the TLS configuration to connect to the source (it uses the
|
||||
same configuration as for [Kafka](#kafka-2), be sure to set `enable` to
|
||||
`true`)
|
||||
- `method` is the method to use (`GET` or `POST`)
|
||||
- `headers` is a map from header names to values to add to the request
|
||||
- `proxy` says if we should use a proxy (defined through environment variables like `http_proxy`)
|
||||
|
||||
@@ -10,6 +10,10 @@ identified with a specific icon:
|
||||
- 🩹: bug fix
|
||||
- 🌱: miscellaneous change
|
||||
|
||||
## Unreleased
|
||||
|
||||
- 🌱 *common*: remote data sources accept a specific TLS configuration
|
||||
|
||||
## 2.0.2 - 2025-10-29
|
||||
|
||||
The modification of the default value of `inlet`→`kafka`→`queue-size` should
|
||||
|
||||
Reference in New Issue
Block a user