From 6eed115cc902fd6271fe9d2f56a147991bc3fec4 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Fri, 27 Aug 2021 20:03:56 -0400 Subject: [PATCH] Kafka Streams binder tests cleanup --- ...kaStreamsBinderWordCountFunctionTests.java | 84 ++++++++-- ... => KafkaStreamsBinderTombstoneTests.java} | 74 +++------ ...utboundValueNullSkippedConversionTest.java | 146 ------------------ 3 files changed, 90 insertions(+), 214 deletions(-) rename spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/{KafkaStreamsBinderWordCountIntegrationTests.java => KafkaStreamsBinderTombstoneTests.java} (70%) delete mode 100644 spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/OutboundValueNullSkippedConversionTest.java diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java index f2eb680c..349beff6 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java @@ -16,6 +16,7 @@ package org.springframework.cloud.stream.binder.kafka.streams.function; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Date; @@ -74,7 +75,7 @@ public class KafkaStreamsBinderWordCountFunctionTests { @ClassRule public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, - "counts", "counts-1", "counts-2"); + "counts", "counts-1", "counts-2", "counts-5", "counts-6"); private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka(); @@ -90,7 +91,7 @@ public class KafkaStreamsBinderWordCountFunctionTests { consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); consumer = cf.createConsumer(); - embeddedKafka.consumeFromEmbeddedTopics(consumer, "counts", "counts-1", "counts-2"); + embeddedKafka.consumeFromEmbeddedTopics(consumer, "counts", "counts-1", "counts-2", "counts-5", "counts-6"); } @AfterClass @@ -177,22 +178,23 @@ public class KafkaStreamsBinderWordCountFunctionTests { } @Test - public void testKstreamWordCountFunctionWithGeneratedApplicationId() throws Exception { + public void testKstreamWordCountWithApplicationIdSpecifiedAtDefaultConsumer() { SpringApplication app = new SpringApplication(WordCountProcessorApplication.class); app.setWebApplicationType(WebApplicationType.NONE); - try (ConfigurableApplicationContext context = app.run( - "--server.port=0", + try (ConfigurableApplicationContext context = app.run("--server.port=0", "--spring.jmx.enabled=false", - "--spring.cloud.stream.bindings.process-in-0.destination=words-1", - "--spring.cloud.stream.bindings.process-out-0.destination=counts-1", + "--spring.cloud.stream.bindings.process-in-0.destination=words-5", + "--spring.cloud.stream.bindings.process-out-0.destination=counts-5", + "--spring.cloud.stream.kafka.streams.default.consumer.application-id=testKstreamWordCountWithApplicationIdSpecifiedAtDefaultConsumer", "--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000", - "--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" + - "=org.apache.kafka.common.serialization.Serdes$StringSerde", - "--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" + - "=org.apache.kafka.common.serialization.Serdes$StringSerde", - "--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) { - receiveAndValidate("words-1", "counts-1"); + "--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" + + "=org.apache.kafka.common.serialization.Serdes$StringSerde", + "--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" + + "=org.apache.kafka.common.serialization.Serdes$StringSerde", + "--spring.cloud.stream.kafka.binder.brokers=" + + embeddedKafka.getBrokersAsString())) { + receiveAndValidate("words-5", "counts-5"); } } @@ -204,6 +206,7 @@ public class KafkaStreamsBinderWordCountFunctionTests { try (ConfigurableApplicationContext context = app.run( "--server.port=0", "--spring.jmx.enabled=false", + "--spring.cloud.stream.kafka.streams.binder.application-id=testKstreamWordCountFunctionWithCustomProducerStreamPartitioner", "--spring.cloud.stream.bindings.process-in-0.destination=words-2", "--spring.cloud.stream.bindings.process-out-0.destination=counts-2", "--spring.cloud.stream.bindings.process-out-0.producer.partitionCount=2", @@ -280,6 +283,45 @@ public class KafkaStreamsBinderWordCountFunctionTests { } } + // The following test verifies the fixes made for this issue: + // https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/774 + @Test + public void testOutboundNullValueIsHandledGracefully() + throws Exception { + SpringApplication app = new SpringApplication(OutboundNullApplication.class); + app.setWebApplicationType(WebApplicationType.NONE); + + try (ConfigurableApplicationContext context = app.run("--server.port=0", + "--spring.jmx.enabled=false", + "--spring.cloud.stream.bindings.process-in-0.destination=words-6", + "--spring.cloud.stream.bindings.process-out-0.destination=counts-6", + "--spring.cloud.stream.bindings.process-out-0.producer.useNativeEncoding=false", + "--spring.cloud.stream.kafka.streams.default.consumer.application-id=testOutboundNullValueIsHandledGracefully", + "--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000", + "--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" + + "=org.apache.kafka.common.serialization.Serdes$StringSerde", + "--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" + + "=org.apache.kafka.common.serialization.Serdes$StringSerde", + "--spring.cloud.stream.kafka.binder.brokers=" + + embeddedKafka.getBrokersAsString())) { + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>( + senderProps); + try { + KafkaTemplate template = new KafkaTemplate<>(pf, true); + template.setDefaultTopic("words-6"); + template.sendDefault("foobar"); + ConsumerRecord cr = KafkaTestUtils.getSingleRecord(consumer, + "counts-6"); + assertThat(cr.value() == null).isTrue(); + } + finally { + pf.destroy(); + } + } + } + private void receiveAndValidate(String in, String out) { Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); @@ -387,4 +429,20 @@ public class KafkaStreamsBinderWordCountFunctionTests { return (t, k, v, n) -> k.equals("foo") ? 0 : 1; } } + + @EnableAutoConfiguration + static class OutboundNullApplication { + + @Bean + public Function, KStream> process() { + return input -> input + .flatMapValues( + value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + .map((key, value) -> new KeyValue<>(value, value)) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("foobar-WordCounts")) + .toStream() + .map((key, value) -> new KeyValue<>(null, null)); + } + } } diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderWordCountIntegrationTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderTombstoneTests.java similarity index 70% rename from spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderWordCountIntegrationTests.java rename to spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderTombstoneTests.java index 29d3bcd5..9a9be626 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderWordCountIntegrationTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderTombstoneTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2018 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.Date; import java.util.Map; import java.util.Properties; +import java.util.function.Function; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -42,10 +43,6 @@ import org.junit.Test; import org.springframework.boot.SpringApplication; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.integration.test.util.TestUtils; @@ -57,7 +54,6 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import org.springframework.kafka.test.utils.KafkaTestUtils; -import org.springframework.messaging.handler.annotation.SendTo; import static org.assertj.core.api.Assertions.assertThat; @@ -66,11 +62,11 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Soby Chacko * @author Gary Russell */ -public class KafkaStreamsBinderWordCountIntegrationTests { +public class KafkaStreamsBinderTombstoneTests { @ClassRule public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, - "counts", "counts-1"); + "counts-1"); private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule .getEmbeddedKafka(); @@ -85,7 +81,7 @@ public class KafkaStreamsBinderWordCountIntegrationTests { DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>( consumerProps); consumer = cf.createConsumer(); - embeddedKafka.consumeFromEmbeddedTopics(consumer, "counts", "counts-1"); + embeddedKafka.consumeFromEmbeddedTopics(consumer, "counts-1"); } @AfterClass @@ -93,31 +89,6 @@ public class KafkaStreamsBinderWordCountIntegrationTests { consumer.close(); } - @Test - public void testKstreamWordCountWithApplicationIdSpecifiedAtDefaultConsumer() - throws Exception { - SpringApplication app = new SpringApplication( - WordCountProcessorApplication.class); - app.setWebApplicationType(WebApplicationType.NONE); - - try (ConfigurableApplicationContext context = app.run("--server.port=0", - "--spring.jmx.enabled=false", - "--spring.cloud.stream.bindings.input.destination=words", - "--spring.cloud.stream.bindings.output.destination=counts", - "--spring.cloud.stream.kafka.streams.default.consumer.application-id=testKstreamWordCountWithApplicationIdSpecifiedAtDefaultConsumer", - "--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000", - "--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" - + "=org.apache.kafka.common.serialization.Serdes$StringSerde", - "--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" - + "=org.apache.kafka.common.serialization.Serdes$StringSerde", - "--spring.cloud.stream.kafka.streams.timeWindow.length=5000", - "--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0", - "--spring.cloud.stream.kafka.binder.brokers=" - + embeddedKafka.getBrokersAsString())) { - receiveAndValidate("words", "counts"); - } - } - @Test public void testSendToTombstone() throws Exception { @@ -127,24 +98,22 @@ public class KafkaStreamsBinderWordCountIntegrationTests { try (ConfigurableApplicationContext context = app.run("--server.port=0", "--spring.jmx.enabled=false", - "--spring.cloud.stream.bindings.input.destination=words-1", - "--spring.cloud.stream.bindings.output.destination=counts-1", - "--spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=testKstreamWordCountWithInputBindingLevelApplicationId", + "--spring.cloud.stream.bindings.process-in-0.destination=words-1", + "--spring.cloud.stream.bindings.process-out-0.destination=counts-1", + "--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.application-id=testKstreamWordCountWithInputBindingLevelApplicationId", "--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000", "--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" + "=org.apache.kafka.common.serialization.Serdes$StringSerde", "--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" + "=org.apache.kafka.common.serialization.Serdes$StringSerde", - "--spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde=org.springframework.kafka.support.serializer.JsonSerde", - "--spring.cloud.stream.kafka.streams.timeWindow.length=5000", - "--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0", - "--spring.cloud.stream.bindings.input.consumer.concurrency=2", + "--spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=org.springframework.kafka.support.serializer.JsonSerde", + "--spring.cloud.stream.bindings.process-in-0.consumer.concurrency=2", "--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) { receiveAndValidate("words-1", "counts-1"); // Assertions on StreamBuilderFactoryBean StreamsBuilderFactoryBean streamsBuilderFactoryBean = context - .getBean("&stream-builder-WordCountProcessorApplication-process", StreamsBuilderFactoryBean.class); + .getBean("&stream-builder-process", StreamsBuilderFactoryBean.class); KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams(); assertThat(kafkaStreams).isNotNull(); // Ensure that concurrency settings are mapped to number of stream task @@ -200,26 +169,21 @@ public class KafkaStreamsBinderWordCountIntegrationTests { } } - @EnableBinding(KafkaStreamsProcessor.class) @EnableAutoConfiguration static class WordCountProcessorApplication { - @StreamListener - @SendTo("output") - public KStream process( - @Input("input") KStream input) { + @Bean + public Function, KStream> process() { - return input - .flatMapValues( - value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + return input -> input + .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) - .windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("foo-WordCounts")) + .windowedBy(TimeWindows.of(5000)) + .count(Materialized.as("foo-WordCounts")) .toStream() - .map((key, value) -> new KeyValue<>(null, - new WordCount(key.key(), value, - new Date(key.window().start()), - new Date(key.window().end())))); + .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, + new Date(key.window().start()), new Date(key.window().end())))); } @Bean diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/OutboundValueNullSkippedConversionTest.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/OutboundValueNullSkippedConversionTest.java deleted file mode 100644 index 3fc9b106..00000000 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/OutboundValueNullSkippedConversionTest.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Copyright 2019-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.kafka.streams.integration; - -import java.time.Duration; -import java.util.Arrays; -import java.util.Map; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Serialized; -import org.apache.kafka.streams.kstream.TimeWindows; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.WebApplicationType; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.test.EmbeddedKafkaBroker; -import org.springframework.kafka.test.rule.EmbeddedKafkaRule; -import org.springframework.kafka.test.utils.KafkaTestUtils; -import org.springframework.messaging.handler.annotation.SendTo; - -import static org.assertj.core.api.Assertions.assertThat; - -/** - * @author Soby Chacko - */ -public class OutboundValueNullSkippedConversionTest { - - @ClassRule - public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, - "counts"); - - private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule - .getEmbeddedKafka(); - - private static Consumer consumer; - - @BeforeClass - public static void setUp() { - Map consumerProps = KafkaTestUtils.consumerProps("group", "false", - embeddedKafka); - consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>( - consumerProps); - consumer = cf.createConsumer(); - embeddedKafka.consumeFromEmbeddedTopics(consumer, "counts"); - } - - @AfterClass - public static void tearDown() { - consumer.close(); - } - - // The following test verifies the fixes made for this issue: - // https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/774 - @Test - public void testOutboundNullValueIsHandledGracefully() - throws Exception { - SpringApplication app = new SpringApplication( - OutboundNullApplication.class); - app.setWebApplicationType(WebApplicationType.NONE); - - try (ConfigurableApplicationContext context = app.run("--server.port=0", - "--spring.jmx.enabled=false", - "--spring.cloud.stream.bindings.input.destination=words", - "--spring.cloud.stream.bindings.output.destination=counts", - "--spring.cloud.stream.bindings.output.producer.useNativeEncoding=false", - "--spring.cloud.stream.kafka.streams.default.consumer.application-id=testOutboundNullValueIsHandledGracefully", - "--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000", - "--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" - + "=org.apache.kafka.common.serialization.Serdes$StringSerde", - "--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" - + "=org.apache.kafka.common.serialization.Serdes$StringSerde", - "--spring.cloud.stream.kafka.streams.timeWindow.length=5000", - "--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0", - "--spring.cloud.stream.kafka.binder.brokers=" - + embeddedKafka.getBrokersAsString())) { - - Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); - DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>( - senderProps); - try { - KafkaTemplate template = new KafkaTemplate<>(pf, true); - template.setDefaultTopic("words"); - template.sendDefault("foobar"); - ConsumerRecord cr = KafkaTestUtils.getSingleRecord(consumer, - "counts"); - assertThat(cr.value() == null).isTrue(); - } - finally { - pf.destroy(); - } - } - } - - @EnableBinding(KafkaStreamsProcessor.class) - @EnableAutoConfiguration - static class OutboundNullApplication { - - @StreamListener - @SendTo("output") - public KStream process( - @Input("input") KStream input) { - - return input - .flatMapValues( - value -> Arrays.asList(value.toLowerCase().split("\\W+"))) - .map((key, value) -> new KeyValue<>(value, value)) - .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) - .windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("foo-WordCounts")) - .toStream() - .map((key, value) -> new KeyValue<>(null, null)); - } - } -}