Files
akvorado/outlet/kafka/root_test.go
Vincent Bernat 001fc71bb4
Some checks failed
CI / 🤖 Check dependabot status (push) Has been cancelled
CI / 🐧 Test on Linux (${{ github.ref_type == 'tag' }}, misc) (push) Has been cancelled
CI / 🐧 Test on Linux (coverage) (push) Has been cancelled
CI / 🐧 Test on Linux (regular) (push) Has been cancelled
CI / ❄️ Build on Nix (push) Has been cancelled
CI / 🍏 Build and test on macOS (push) Has been cancelled
CI / 🧪 End-to-end testing (push) Has been cancelled
CI / 🔍 Upload code coverage (push) Has been cancelled
CI / 🔬 Test only Go (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 20) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 22) (push) Has been cancelled
CI / 🔬 Test only JS (${{ needs.dependabot.outputs.package-ecosystem }}, 24) (push) Has been cancelled
CI / ⚖️ Check licenses (push) Has been cancelled
CI / 🐋 Build Docker images (push) Has been cancelled
CI / 🐋 Tag Docker images (push) Has been cancelled
CI / 🚀 Publish release (push) Has been cancelled
outlet/kafka: fix race in tests
This was introduced in the fix introduced in 7f5950f89c.
2025-11-12 23:53:17 +01:00

55 lines
1.2 KiB
Go

// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only
package kafka
import (
"context"
"sync/atomic"
"testing"
"time"
"akvorado/common/helpers"
)
func TestMock(t *testing.T) {
c, incoming := NewMock(t, DefaultConfiguration())
got := []string{}
expected := []string{"hello1", "hello2", "hello3"}
gotAll := make(chan bool)
var shutdownCalled atomic.Bool
callback := func(_ context.Context, message []byte) error {
got = append(got, string(message))
if len(got) == len(expected) {
close(gotAll)
}
return nil
}
c.StartWorkers(
func(int, chan<- ScaleRequest) (ReceiveFunc, ShutdownFunc) {
return callback, func() { shutdownCalled.Store(true) }
},
)
// Produce messages and wait for them
for _, msg := range expected {
incoming <- []byte(msg)
}
select {
case <-time.After(time.Second):
t.Fatal("Too long to get messages")
case <-gotAll:
}
if diff := helpers.Diff(got, expected); diff != "" {
t.Errorf("Didn't received the expected messages (-got, +want):\n%s", diff)
}
c.Stop()
time.Sleep(10 * time.Millisecond)
if !shutdownCalled.Load() {
t.Error("Stop() should have triggered shutdown function")
}
}