common/kafka: also logs output of kfake cluster

This commit is contained in:
Vincent Bernat
2025-09-23 07:06:58 +02:00
parent 369844d2a5
commit 801f3f1676
5 changed files with 37 additions and 11 deletions

View File

@@ -25,12 +25,14 @@ import (
)
func TestFakeKafka(t *testing.T) {
r := reporter.NewMock(t)
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
kfake.WithLogger(kafka.NewLogger(r)),
)
if err != nil {
t.Fatalf("NewCluster() error: %v", err)
@@ -68,7 +70,6 @@ func TestFakeKafka(t *testing.T) {
configuration.Brokers = cluster.ListenAddrs()
configuration.FetchMaxWaitTime = 100 * time.Millisecond
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
r := reporter.NewMock(t)
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
@@ -127,12 +128,14 @@ func TestFakeKafka(t *testing.T) {
}
func TestStartSeveralWorkers(t *testing.T) {
r := reporter.NewMock(t)
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
kfake.WithLogger(kafka.NewLogger(r)),
)
if err != nil {
t.Fatalf("NewCluster() error: %v", err)
@@ -146,7 +149,6 @@ func TestStartSeveralWorkers(t *testing.T) {
configuration.FetchMaxWaitTime = 100 * time.Millisecond
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
configuration.MinWorkers = 5
r := reporter.NewMock(t)
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
@@ -194,12 +196,14 @@ func TestStartSeveralWorkers(t *testing.T) {
}
func TestWorkerScaling(t *testing.T) {
r := reporter.NewMock(t)
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
kfake.WithLogger(kafka.NewLogger(r)),
)
if err != nil {
t.Fatalf("NewCluster() error: %v", err)
@@ -227,7 +231,6 @@ func TestWorkerScaling(t *testing.T) {
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
configuration.WorkerIncreaseRateLimit = 20 * time.Millisecond
configuration.WorkerDecreaseRateLimit = 20 * time.Millisecond
r := reporter.NewMock(t)
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
@@ -354,12 +357,14 @@ func TestWorkerScaling(t *testing.T) {
}
func TestKafkaLagMetric(t *testing.T) {
r := reporter.NewMock(t)
topicName := fmt.Sprintf("test-topic2-%d", rand.Int())
expectedTopicName := fmt.Sprintf("%s-v%d", topicName, pb.Version)
cluster, err := kfake.NewCluster(
kfake.NumBrokers(1),
kfake.SeedTopics(16, expectedTopicName),
kfake.WithLogger(kafka.NewLogger(r)),
)
if err != nil {
t.Fatalf("NewCluster() error: %v", err)
@@ -396,7 +401,6 @@ func TestKafkaLagMetric(t *testing.T) {
configuration.Brokers = cluster.ListenAddrs()
configuration.FetchMaxWaitTime = 10 * time.Millisecond
configuration.ConsumerGroup = fmt.Sprintf("outlet-%d", rand.Int())
r := reporter.NewMock(t)
c, err := New(r, configuration, Dependencies{Daemon: daemon.NewMock(t)})
if err != nil {
t.Fatalf("New() error:\n%+v", err)