inlet/metadata: clean stop of gNMI subscriptions during tests

We need to drain the chans (mostly the error one that is triggered when
ending the subscription). Also, reset configuration before each test to
avoid keeping state.
This commit is contained in:
Vincent Bernat
2025-07-26 11:50:56 +02:00
parent 42d6645acf
commit 1ef1d09e71

View File

@@ -210,6 +210,24 @@ commit now
t.Fatalf("SendConfig() error:\n%+v", resp.Failed) t.Fatalf("SendConfig() error:\n%+v", resp.Failed)
} }
} }
stopSubscription := func(t *testing.T, name string) {
t.Helper()
tg.StopSubscription(name)
subRspChan, subErrChan := tg.ReadSubscriptions()
endSubscription:
for {
select {
case resp := <-subRspChan:
t.Logf("Subscribe() extra response:\n%+v", resp)
case err := <-subErrChan:
t.Logf("Subscribe() error:\n%+v", err)
default:
break endSubscription
}
}
}
for _, encoding := range []string{"json", "json_ietf"} { for _, encoding := range []string{"json", "json_ietf"} {
// Test a "once" subscription // Test a "once" subscription
t.Run(fmt.Sprintf("subscribe once %s", encoding), func(t *testing.T) { t.Run(fmt.Sprintf("subscribe once %s", encoding), func(t *testing.T) {
@@ -300,6 +318,7 @@ commit now
return return
} }
t.Run(fmt.Sprintf("subscribe changes %s", encoding), func(t *testing.T) { t.Run(fmt.Sprintf("subscribe changes %s", encoding), func(t *testing.T) {
resetConfig(t)
subscribeReq, err := api.NewSubscribeRequest( subscribeReq, err := api.NewSubscribeRequest(
api.Subscription(api.Path("/system/name/host-name"), api.SubscriptionModeON_CHANGE()), api.Subscription(api.Path("/system/name/host-name"), api.SubscriptionModeON_CHANGE()),
api.Subscription(api.Path("/interface/description"), api.SubscriptionModeON_CHANGE()), api.Subscription(api.Path("/interface/description"), api.SubscriptionModeON_CHANGE()),
@@ -312,7 +331,7 @@ commit now
} }
subscriptionName := fmt.Sprintf("changes-%s", encoding) subscriptionName := fmt.Sprintf("changes-%s", encoding)
go tg.Subscribe(ctx, subscribeReq, subscriptionName) go tg.Subscribe(ctx, subscribeReq, subscriptionName)
defer tg.StopSubscription(subscriptionName) defer stopSubscription(t, subscriptionName)
subRspChan, subErrChan := tg.ReadSubscriptions() subRspChan, subErrChan := tg.ReadSubscriptions()
// Wait for first set of answers // Wait for first set of answers
@@ -500,6 +519,7 @@ commit now
return return
} }
t.Run(fmt.Sprintf("subscribe sampling %s", encoding), func(t *testing.T) { t.Run(fmt.Sprintf("subscribe sampling %s", encoding), func(t *testing.T) {
resetConfig(t)
subscribeReq, err := api.NewSubscribeRequest( subscribeReq, err := api.NewSubscribeRequest(
api.Subscription( api.Subscription(
api.Path("/system/name/host-name"), api.Path("/system/name/host-name"),
@@ -517,7 +537,7 @@ commit now
} }
subscriptionName := fmt.Sprintf("sampling-%s", encoding) subscriptionName := fmt.Sprintf("sampling-%s", encoding)
go tg.Subscribe(ctx, subscribeReq, subscriptionName) go tg.Subscribe(ctx, subscribeReq, subscriptionName)
defer tg.StopSubscription(subscriptionName) defer stopSubscription(t, subscriptionName)
subRspChan, subErrChan := tg.ReadSubscriptions() subRspChan, subErrChan := tg.ReadSubscriptions()
// Wait for first set of answers // Wait for first set of answers
@@ -554,11 +574,11 @@ commit now
return got[i].Keys < got[j].Keys return got[i].Keys < got[j].Keys
}) })
expected := []event{ expected := []event{
{"/interface/description", "name=ethernet-1/1", "1st interface new"}, {"/interface/description", "name=ethernet-1/1", "1st interface"},
{"/interface/description", "name=ethernet-1/2", "2nd interface"}, {"/interface/description", "name=ethernet-1/2", "2nd interface"},
{"/interface/description", "name=ethernet-1/3", "3rd interface"}, {"/interface/description", "name=ethernet-1/3", "3rd interface"},
{"/interface/description", "name=lag1", "lag interface"}, {"/interface/description", "name=lag1", "lag interface"},
{"/system/name/host-name", "", "srlinux-new"}, {"/system/name/host-name", "", "srlinux"},
} }
if diff := helpers.Diff(got, expected); diff != "" { if diff := helpers.Diff(got, expected); diff != "" {
t.Fatalf("Subscribe() initial sync (-got, +want):\n%s", diff) t.Fatalf("Subscribe() initial sync (-got, +want):\n%s", diff)
@@ -609,6 +629,7 @@ commit now
} }
t.Run(fmt.Sprintf("subscribe polling %s", encoding), func(t *testing.T) { t.Run(fmt.Sprintf("subscribe polling %s", encoding), func(t *testing.T) {
t.Skip("subscribe with polling does not work on SR Linux") t.Skip("subscribe with polling does not work on SR Linux")
resetConfig(t)
subscribeReq, err := api.NewSubscribeRequest( subscribeReq, err := api.NewSubscribeRequest(
api.Subscription(api.Path("/system/name/host-name")), api.Subscription(api.Path("/system/name/host-name")),
api.Subscription(api.Path("/interface/description")), api.Subscription(api.Path("/interface/description")),
@@ -620,15 +641,15 @@ commit now
} }
subscriptionName := fmt.Sprintf("polling-%s", encoding) subscriptionName := fmt.Sprintf("polling-%s", encoding)
go tg.Subscribe(ctx, subscribeReq, subscriptionName) go tg.Subscribe(ctx, subscribeReq, subscriptionName)
defer tg.StopSubscription(subscriptionName) defer stopSubscription(t, subscriptionName)
subRspChan, subErrChan := tg.ReadSubscriptions() subRspChan, subErrChan := tg.ReadSubscriptions()
expected := []event{ expected := []event{
{"/interface/description", "name=ethernet-1/1", "1st interface new"}, {"/interface/description", "name=ethernet-1/1", "1st interface"},
{"/interface/description", "name=ethernet-1/2", "2nd interface"}, {"/interface/description", "name=ethernet-1/2", "2nd interface"},
{"/interface/description", "name=ethernet-1/3", "3rd interface"}, {"/interface/description", "name=ethernet-1/3", "3rd interface"},
{"/interface/description", "name=lag1", "lag interface"}, {"/interface/description", "name=lag1", "lag interface"},
{"/system/name/host-name", "", "srlinux-new"}, {"/system/name/host-name", "", "srlinux"},
} }
// Get a few batches // Get a few batches