Fix failing tests in Kafka Streams binder

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1109
This commit is contained in:
Soby Chacko
2021-07-28 18:41:22 -04:00
parent 001882de4e
commit 912c47e3ac
4 changed files with 60 additions and 11 deletions

View File

@@ -52,6 +52,7 @@ import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStr
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
@@ -222,6 +223,11 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
return new Foo(interactiveQueryService);
}
@Bean
public CleanupConfig cleanupConfig() {
return new CleanupConfig(false, true);
}
static class Foo {
InteractiveQueryService interactiveQueryService;

View File

@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
@@ -41,8 +42,8 @@ import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
@@ -50,6 +51,7 @@ import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
@@ -60,7 +62,6 @@ import org.springframework.util.Assert;
import static org.assertj.core.api.Assertions.assertThat;
@Ignore("Investigate why these tests are failing")
public class StreamToTableJoinFunctionTests {
@ClassRule
@@ -441,9 +442,14 @@ public class StreamToTableJoinFunctionTests {
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.reduce(Long::sum, Materialized.as("CountClicks-" + UUID.randomUUID()))
.toStream()));
}
@Bean
public CleanupConfig cleanupConfig() {
return new CleanupConfig(false, true);
}
}
@EnableAutoConfiguration
@@ -458,9 +464,14 @@ public class StreamToTableJoinFunctionTests {
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.reduce(Long::sum, Materialized.as("CountClicks-" + UUID.randomUUID()))
.toStream());
}
@Bean
public CleanupConfig cleanupConfig() {
return new CleanupConfig(false, true);
}
}
@EnableAutoConfiguration

View File

@@ -25,12 +25,12 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;
import org.assertj.core.util.Lists;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
@@ -46,6 +46,9 @@ import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.KafkaStreamsCustomizer;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
@@ -100,7 +103,6 @@ public class KafkaStreamsBinderHealthIndicatorTests {
}
@Test
@Ignore
public void healthIndicatorDownTest() throws Exception {
try (ConfigurableApplicationContext context = singleStream("ApplicationHealthTest-xyzabc")) {
receive(context,
@@ -121,7 +123,6 @@ public class KafkaStreamsBinderHealthIndicatorTests {
}
@Test
@Ignore
public void healthIndicatorDownMultipleKStreamsTest() throws Exception {
try (ConfigurableApplicationContext context = multipleStream()) {
receive(context,
@@ -181,7 +182,7 @@ public class KafkaStreamsBinderHealthIndicatorTests {
embeddedKafka.consumeFromEmbeddedTopics(consumer, topics);
KafkaTestUtils.getRecords(consumer, 1000);
TimeUnit.SECONDS.sleep(2);
TimeUnit.SECONDS.sleep(5);
checkHealth(context, expected);
}
finally {
@@ -256,6 +257,19 @@ public class KafkaStreamsBinderHealthIndicatorTests {
});
}
@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler(exception ->
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
}
});
};
}
}
@EnableBinding({ KafkaStreamsProcessor.class, KafkaStreamsProcessorX.class })
@@ -284,6 +298,19 @@ public class KafkaStreamsBinderHealthIndicatorTests {
});
}
@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler(exception ->
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
}
});
};
}
}
public interface KafkaStreamsProcessorX {

View File

@@ -32,7 +32,6 @@ import org.apache.kafka.streams.kstream.Serialized;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
@@ -43,6 +42,8 @@ import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
@@ -92,7 +93,6 @@ public class KafkaStreamsBinderMultipleInputTopicsTest {
}
@Test
@Ignore("Investigate why this test fails")
public void testKstreamWordCountWithStringInputAndPojoOuput() throws Exception {
SpringApplication app = new SpringApplication(
WordCountProcessorApplication.class);
@@ -165,10 +165,15 @@ public class KafkaStreamsBinderMultipleInputTopicsTest {
value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.count(Materialized.as("WordCounts")).toStream()
.count(Materialized.as("WordCounts-tKWCWSIAP0")).toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key, value)));
}
@Bean
public CleanupConfig cleanupConfig() {
return new CleanupConfig(false, true);
}
}
static class WordCount {