outlet/metadata: fix gNMI metadata provider exiting too early

We have to manage two contexts:

- one associated to a query because the queries are now synchronous
- one associated to the component because some providers need a
goroutine (like gNMI)

Fix #1991.
This commit is contained in:
Vincent Bernat
2025-09-29 18:44:34 +02:00
parent adeb080713
commit bf3aff81c4
13 changed files with 26 additions and 22 deletions

View File

@@ -13,6 +13,7 @@ identified with a specific icon:
## Unreleased
- 🩹 *inlet*: disable kernel timestamping on Linux kernel older than 5.1
- 🩹 *outlet*: fix gNMI metadata provider exiting too early
- 🩹 *doc*: fix documentation for SNMPv3 configuration
- 🌱 *inlet*: add support for RFC 5103 (bidirectional flows)

View File

@@ -135,7 +135,7 @@ outer2:
}
// startCollector starts a new gNMI collector with the given state. It should not be used with taking the lock.
func (p *Provider) startCollector(ctx context.Context, exporterIP netip.Addr, state *exporterState) {
func (p *Provider) startCollector(exporterIP netip.Addr, state *exporterState) {
exporterStr := exporterIP.Unmap().String()
l := p.r.With().Str("exporter", exporterStr).Logger()
p.metrics.ready.WithLabelValues(exporterStr).Set(0)
@@ -173,7 +173,7 @@ retryConnect:
waitBeforeRetry := func() bool {
next := time.NewTimer(retryInitBackoff.NextBackOff())
select {
case <-ctx.Done():
case <-p.ctx.Done():
next.Stop()
return false
case <-next.C:
@@ -193,7 +193,7 @@ retryConnect:
goto retryConnect
}
err = tg.CreateGNMIClient(ctx)
err = tg.CreateGNMIClient(p.ctx)
if err != nil {
l.Err(err).Msg("unable to create client")
p.metrics.errors.WithLabelValues(exporterStr, "cannot create client").Inc()
@@ -206,7 +206,7 @@ retryConnect:
retryDetect:
// We need to detect the model
model, encoding, err := p.detectModelAndEncoding(ctx, tg)
model, encoding, err := p.detectModelAndEncoding(p.ctx, tg)
if err != nil {
l.Err(err).Msg("unable to detect model")
p.metrics.errors.WithLabelValues(exporterStr, "cannot detect model").Inc()
@@ -244,7 +244,7 @@ retryDetect:
for {
l.Debug().Msg("polling")
start := time.Now()
subscribeResp, err := tg.SubscribeOnce(ctx, subscribeReq)
subscribeResp, err := tg.SubscribeOnce(p.ctx, subscribeReq)
p.metrics.times.WithLabelValues(exporterStr).Observe(time.Since(start).Seconds())
if err == nil {
events := subscribeResponsesToEvents(subscribeResp)
@@ -267,7 +267,7 @@ retryDetect:
for {
select {
case state.Ready <- true:
case <-ctx.Done():
case <-p.ctx.Done():
next.Stop()
return
case <-p.refresh:
@@ -281,7 +281,7 @@ retryDetect:
for {
select {
case state.Ready <- true:
case <-ctx.Done():
case <-p.ctx.Done():
return
case <-p.refresh:
break outerWaitRefresh
@@ -303,7 +303,7 @@ retryDetect:
for {
select {
case readyChan <- true:
case <-ctx.Done():
case <-p.ctx.Done():
next.Stop()
return
case <-next.C:

View File

@@ -18,6 +18,7 @@ type Provider struct {
r *reporter.Reporter
config *Configuration
metrics metrics
ctx context.Context
state map[netip.Addr]*exporterState
stateLock sync.Mutex
@@ -30,10 +31,11 @@ var (
)
// New creates a new gNMI provider from configuration
func (configuration Configuration) New(r *reporter.Reporter) (provider.Provider, error) {
func (configuration Configuration) New(ctx context.Context, r *reporter.Reporter) (provider.Provider, error) {
p := Provider{
r: r,
config: &configuration,
ctx: ctx,
state: map[netip.Addr]*exporterState{},
refresh: make(chan bool),
}
@@ -51,7 +53,7 @@ func (p *Provider) Query(ctx context.Context, q provider.Query) (provider.Answer
}
p.state[q.ExporterIP] = state
p.metrics.collectorCount.Inc()
go p.startCollector(ctx, q.ExporterIP, state)
go p.startCollector(q.ExporterIP, state)
}
// Trigger a refresh

View File

@@ -728,7 +728,7 @@ commit now
iface, answer.Interface.Name, answer.Interface.Description, answer.Interface.Speed)
}
r := reporter.NewMock(t)
p, err := configP.New(r)
p, err := configP.New(t.Context(), r)
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}

View File

@@ -67,6 +67,7 @@ type Provider interface {
// Configuration defines an interface to configure a provider.
type Configuration interface {
// New instantiates a new provider from its configuration.
New(r *reporter.Reporter) (Provider, error)
// New instantiates a new provider from its configuration. The provided
// context is to stop any long-running goroutine.
New(context.Context, *reporter.Reporter) (Provider, error)
}

View File

@@ -278,7 +278,7 @@ func TestPoller(t *testing.T) {
config.Ports = helpers.MustNewSubnetMap(map[string]uint16{
"::/0": uint16(port),
})
p, err := config.New(r)
p, err := config.New(t.Context(), r)
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}

View File

@@ -34,7 +34,7 @@ var (
)
// New creates a new SNMP provider from configuration
func (configuration Configuration) New(r *reporter.Reporter) (provider.Provider, error) {
func (configuration Configuration) New(_ context.Context, r *reporter.Reporter) (provider.Provider, error) {
for exporterIP, agentIP := range configuration.Agents {
if exporterIP.Is4() || agentIP.Is4() {
delete(configuration.Agents, exporterIP)

View File

@@ -41,7 +41,7 @@ var (
)
// New creates a new static provider from configuration
func (configuration Configuration) New(r *reporter.Reporter) (provider.Provider, error) {
func (configuration Configuration) New(_ context.Context, r *reporter.Reporter) (provider.Provider, error) {
p := &Provider{
r: r,
exportersMap: map[string][]exporterInfo{},

View File

@@ -97,7 +97,7 @@ func TestStaticProvider(t *testing.T) {
var got []provider.Answer
r := reporter.NewMock(t)
p, _ := config.New(r)
p, _ := config.New(t.Context(), r)
answer, _ := p.Query(context.Background(), provider.Query{
ExporterIP: netip.MustParseAddr("2001:db8:1::10"),

View File

@@ -186,7 +186,7 @@ func TestRemoteExporterSources(t *testing.T) {
},
},
}
p, _ := config.New(r)
p, _ := config.New(t.Context(), r)
// Query when json is not ready yet, we should get a timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)

View File

@@ -80,7 +80,7 @@ func New(r *reporter.Reporter, configuration Configuration, dependencies Depende
// Initialize providers
for _, p := range c.config.Providers {
selectedProvider, err := p.Config.New(r)
selectedProvider, err := p.Config.New(c.t.Context(nil), r)
if err != nil {
return nil, err
}

View File

@@ -230,7 +230,7 @@ func (ep errorProvider) Query(_ context.Context, _ provider.Query) (provider.Ans
type errorProviderConfiguration struct{}
func (epc errorProviderConfiguration) New(_ *reporter.Reporter) (provider.Provider, error) {
func (epc errorProviderConfiguration) New(context.Context, *reporter.Reporter) (provider.Provider, error) {
return errorProvider{}, nil
}

View File

@@ -73,7 +73,7 @@ func (mp mockProvider) Query(_ context.Context, query provider.Query) (provider.
type mockProviderConfiguration struct{}
// New returns a new mock provider.
func (mpc mockProviderConfiguration) New(_ *reporter.Reporter) (provider.Provider, error) {
func (mpc mockProviderConfiguration) New(context.Context, *reporter.Reporter) (provider.Provider, error) {
return mockProvider{}, nil
}
@@ -103,6 +103,6 @@ func (mp emptyProvider) Query(_ context.Context, _ provider.Query) (provider.Ans
type emptyProviderConfiguration struct{}
// New returns a new empty provider.
func (mpc emptyProviderConfiguration) New(_ *reporter.Reporter) (provider.Provider, error) {
func (mpc emptyProviderConfiguration) New(context.Context, *reporter.Reporter) (provider.Provider, error) {
return emptyProvider{}, nil
}