diff --git a/inlet/kafka/root_test.go b/inlet/kafka/root_test.go index 94daf28c..9b0f32aa 100644 --- a/inlet/kafka/root_test.go +++ b/inlet/kafka/root_test.go @@ -159,14 +159,15 @@ func TestLoadBalancingAlgorithm(t *testing.T) { } wg.Wait() - expected := make(map[int32]int, len(messages)) + expected := make(map[int32]int, DefaultMockKafkaNumPartitions) switch algo { case LoadBalanceRandom: - for p, count := range messages { - if count > total/len(messages)*2/10 && count < total/len(messages)*18/10 { - expected[p] = count + for p := range DefaultMockKafkaNumPartitions { + p := int32(p) + if messages[p] > total/DefaultMockKafkaNumPartitions*2/10 { + expected[p] = messages[p] } else { - expected[p] = total / len(messages) + expected[p] = total / DefaultMockKafkaNumPartitions } } case LoadBalanceByExporter: diff --git a/inlet/kafka/tests.go b/inlet/kafka/tests.go index d0119076..d6561d22 100644 --- a/inlet/kafka/tests.go +++ b/inlet/kafka/tests.go @@ -16,6 +16,9 @@ import ( "akvorado/common/reporter" ) +// DefaultMockKafkaNumPartitions is the default number of partitions for the mock Kafka. +const DefaultMockKafkaNumPartitions = 8 + // NewMock creates a new Kafka component with a mocked Kafka. It will // panic if it cannot be started. func NewMock(t *testing.T, r *reporter.Reporter, configuration Configuration) (*Component, *kfake.Cluster) { @@ -24,6 +27,7 @@ func NewMock(t *testing.T, r *reporter.Reporter, configuration Configuration) (* cluster, err := kfake.NewCluster( kfake.NumBrokers(1), kfake.AllowAutoTopicCreation(), + kfake.DefaultNumPartitions(DefaultMockKafkaNumPartitions), kfake.WithLogger(kafka.NewLogger(r)), ) if err != nil {