From 80b707e5e9d5f8ae23ec21f93c4d0d9cdb9959ec Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 14 Jul 2021 15:52:18 -0400 Subject: [PATCH] Fixing Kafka binder tests. * Adding a delay in testDlqWithNativeSerializationEnabledOnDlqProducer to avoid a race condition. Awaitility is used to wait for the proper condition in this test. * In the MicroMeter registry tests, properly wait for the first message to arrive on the outbound so that the producer factory gets a chance to add the MicroMeter producer listener completely before the test assertions start. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1103 --- spring-cloud-stream-binder-kafka/pom.xml | 5 ++ .../stream/binder/kafka/KafkaBinderTests.java | 7 +-- .../KafkaBinderMeterRegistryTest.java | 57 +++++++++++++++++-- 3 files changed, 59 insertions(+), 10 deletions(-) diff --git a/spring-cloud-stream-binder-kafka/pom.xml b/spring-cloud-stream-binder-kafka/pom.xml index c52bbb28..df027c2a 100644 --- a/spring-cloud-stream-binder-kafka/pom.xml +++ b/spring-cloud-stream-binder-kafka/pom.xml @@ -75,6 +75,11 @@ kafka_2.13 test + + org.awaitility + awaitility + test + diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java index 4a5c9619..8613d16f 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java @@ -66,9 +66,9 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.assertj.core.api.Assertions; import org.assertj.core.api.Condition; +import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; @@ -731,7 +731,6 @@ public class KafkaBinderTests extends @Test @SuppressWarnings("unchecked") - @Disabled("Failing when run as part of test suite") public void testDlqWithNativeSerializationEnabledOnDlqProducer() throws Exception { Binder binder = getBinder(); ExtendedProducerProperties producerProperties = createProducerProperties(); @@ -790,12 +789,10 @@ public class KafkaBinderTests extends .withPayload("foo").build(); moduleOutputChannel.send(message); - Message receivedMessage = receive(dlqChannel, 5); assertThat(receivedMessage).isNotNull(); assertThat(receivedMessage.getPayload()).isEqualTo("foo".getBytes()); - assertThat(handler.getInvocationCount()) - .isEqualTo(consumerProperties.getMaxAttempts()); + Awaitility.await().until(() -> handler.getInvocationCount() == consumerProperties.getMaxAttempts()); assertThat(receivedMessage.getHeaders() .get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC)) .isEqualTo("foo.bar".getBytes(StandardCharsets.UTF_8)); diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderMeterRegistryTest.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderMeterRegistryTest.java index 1c90f233..b3d2488d 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderMeterRegistryTest.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderMeterRegistryTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2019 the original author or authors. + * Copyright 2019-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,9 +16,13 @@ package org.springframework.cloud.stream.binder.kafka.bootstrap; +import java.util.Map; import java.util.function.Function; import io.micrometer.core.instrument.MeterRegistry; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -27,9 +31,13 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.condition.EmbeddedKafkaCondition; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -37,18 +45,33 @@ import static org.assertj.core.api.Assertions.assertThatCode; /** * @author Soby Chacko */ -@EmbeddedKafka(count = 1, controlledShutdown = true, partitions = 10) +@EmbeddedKafka(count = 1, controlledShutdown = true, partitions = 10, topics = "outputTopic") public class KafkaBinderMeterRegistryTest { private static EmbeddedKafkaBroker embeddedKafka; + private static Consumer consumer; + @BeforeAll public static void setup() { embeddedKafka = EmbeddedKafkaCondition.getBroker(); + + Map consumerProps = KafkaTestUtils.consumerProps("group", "false", + embeddedKafka); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); + consumer = cf.createConsumer(); + embeddedKafka.consumeFromEmbeddedTopics(consumer, "outputTopic"); + } + + @AfterAll + public static void tearDown() { + consumer.close(); } @Test - public void testMetricsWithSingleBinder() { + public void testMetricsWithSingleBinder() throws Exception { ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class) .web(WebApplicationType.NONE) .run("--spring.cloud.stream.bindings.uppercase-in-0.destination=inputTopic", @@ -57,6 +80,19 @@ public class KafkaBinderMeterRegistryTest { "--spring.cloud.stream.kafka.binder.brokers" + "=" + embeddedKafka.getBrokersAsString()); + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + + KafkaTemplate template = new KafkaTemplate<>(pf, true); + template.setDefaultTopic("inputTopic"); + template.sendDefault("foo"); + + // Forcing the retrieval of the data on the outbound so that the producer factory has + // a chance to add the micrometer listener properly. Only on the first send, binder's + // internal KafkaTemplate adds the Micrometer listener (using the producer factory). + KafkaTestUtils.getSingleRecord(consumer, "outputTopic"); + final MeterRegistry meterRegistry = applicationContext.getBean(MeterRegistry.class); assertMeterRegistry(meterRegistry); applicationContext.close(); @@ -80,6 +116,18 @@ public class KafkaBinderMeterRegistryTest { + ".spring.cloud.stream.kafka.binder.brokers" + "=" + embeddedKafka.getBrokersAsString()); + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + + KafkaTemplate template = new KafkaTemplate<>(pf, true); + template.setDefaultTopic("inputTopic"); + template.sendDefault("foo"); + + // Forcing the retrieval of the data on the outbound so that the producer factory has + // a chance to add the micrometer listener properly. Only on the first send, binder's + // internal KafkaTemplate adds the Micrometer listener (using the producer factory). + KafkaTestUtils.getSingleRecord(consumer, "outputTopic"); + final MeterRegistry meterRegistry = applicationContext.getBean(MeterRegistry.class); assertMeterRegistry(meterRegistry); applicationContext.close(); @@ -97,8 +145,7 @@ public class KafkaBinderMeterRegistryTest { assertThatCode(() -> meterRegistry.get("kafka.consumer.fetch.manager.fetch.total").meter()).doesNotThrowAnyException(); // assert producer metrics - // TODO: Investigate why Kafka producer metrics are missing. -// assertThatCode(() -> meterRegistry.get("kafka.producer.connection.count").meter()).doesNotThrowAnyException(); + assertThatCode(() -> meterRegistry.get("kafka.producer.io.ratio").meter()).doesNotThrowAnyException(); } @SpringBootApplication