Fixing Kafka binder tests.

* Adding a delay in testDlqWithNativeSerializationEnabledOnDlqProducer to avoid a race condition.
  Awaitility is used to wait for the proper condition in this test.

* In the MicroMeter registry tests, properly wait for the first message to arrive on the outbound
  so that the producer factory gets a chance to add the MicroMeter producer listener completely
  before the test assertions start.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1103
This commit is contained in:
Soby Chacko
2021-07-14 15:52:18 -04:00
parent 13474bdafb
commit 80b707e5e9
3 changed files with 59 additions and 10 deletions

View File

@@ -75,6 +75,11 @@
<artifactId>kafka_2.13</artifactId>
<classifier>test</classifier>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -66,9 +66,9 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -731,7 +731,6 @@ public class KafkaBinderTests extends
@Test
@SuppressWarnings("unchecked")
@Disabled("Failing when run as part of test suite")
public void testDlqWithNativeSerializationEnabledOnDlqProducer() throws Exception {
Binder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
@@ -790,12 +789,10 @@ public class KafkaBinderTests extends
.withPayload("foo").build();
moduleOutputChannel.send(message);
Message<?> receivedMessage = receive(dlqChannel, 5);
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getPayload()).isEqualTo("foo".getBytes());
assertThat(handler.getInvocationCount())
.isEqualTo(consumerProperties.getMaxAttempts());
Awaitility.await().until(() -> handler.getInvocationCount() == consumerProperties.getMaxAttempts());
assertThat(receivedMessage.getHeaders()
.get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC))
.isEqualTo("foo.bar".getBytes(StandardCharsets.UTF_8));

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2019-2019 the original author or authors.
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,9 +16,13 @@
package org.springframework.cloud.stream.binder.kafka.bootstrap;
import java.util.Map;
import java.util.function.Function;
import io.micrometer.core.instrument.MeterRegistry;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -27,9 +31,13 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
@@ -37,18 +45,33 @@ import static org.assertj.core.api.Assertions.assertThatCode;
/**
* @author Soby Chacko
*/
@EmbeddedKafka(count = 1, controlledShutdown = true, partitions = 10)
@EmbeddedKafka(count = 1, controlledShutdown = true, partitions = 10, topics = "outputTopic")
public class KafkaBinderMeterRegistryTest {
private static EmbeddedKafkaBroker embeddedKafka;
private static Consumer<String, String> consumer;
@BeforeAll
public static void setup() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "false",
embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromEmbeddedTopics(consumer, "outputTopic");
}
@AfterAll
public static void tearDown() {
consumer.close();
}
@Test
public void testMetricsWithSingleBinder() {
public void testMetricsWithSingleBinder() throws Exception {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class)
.web(WebApplicationType.NONE)
.run("--spring.cloud.stream.bindings.uppercase-in-0.destination=inputTopic",
@@ -57,6 +80,19 @@ public class KafkaBinderMeterRegistryTest {
"--spring.cloud.stream.kafka.binder.brokers" + "="
+ embeddedKafka.getBrokersAsString());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("inputTopic");
template.sendDefault("foo");
// Forcing the retrieval of the data on the outbound so that the producer factory has
// a chance to add the micrometer listener properly. Only on the first send, binder's
// internal KafkaTemplate adds the Micrometer listener (using the producer factory).
KafkaTestUtils.getSingleRecord(consumer, "outputTopic");
final MeterRegistry meterRegistry = applicationContext.getBean(MeterRegistry.class);
assertMeterRegistry(meterRegistry);
applicationContext.close();
@@ -80,6 +116,18 @@ public class KafkaBinderMeterRegistryTest {
+ ".spring.cloud.stream.kafka.binder.brokers" + "="
+ embeddedKafka.getBrokersAsString());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("inputTopic");
template.sendDefault("foo");
// Forcing the retrieval of the data on the outbound so that the producer factory has
// a chance to add the micrometer listener properly. Only on the first send, binder's
// internal KafkaTemplate adds the Micrometer listener (using the producer factory).
KafkaTestUtils.getSingleRecord(consumer, "outputTopic");
final MeterRegistry meterRegistry = applicationContext.getBean(MeterRegistry.class);
assertMeterRegistry(meterRegistry);
applicationContext.close();
@@ -97,8 +145,7 @@ public class KafkaBinderMeterRegistryTest {
assertThatCode(() -> meterRegistry.get("kafka.consumer.fetch.manager.fetch.total").meter()).doesNotThrowAnyException();
// assert producer metrics
// TODO: Investigate why Kafka producer metrics are missing.
// assertThatCode(() -> meterRegistry.get("kafka.producer.connection.count").meter()).doesNotThrowAnyException();
assertThatCode(() -> meterRegistry.get("kafka.producer.io.ratio").meter()).doesNotThrowAnyException();
}
@SpringBootApplication