From 1afdac7eb6ea92c3e15c22b9033670bc0211b599 Mon Sep 17 00:00:00 2001 From: Jukka Karvanen <48978068+jukkakarvanen@users.noreply.github.com> Date: Tue, 9 Apr 2019 14:08:27 +0300 Subject: [PATCH] Kafka Streams unit test demonstration in Spring Cloud Stream kafka-streams-test-utils added TopologyTestDriver, Note now kafka.version need to be kept in sync manually Reduce unnecessary logging Extract contants to be used in test class Changed non used key type from Object to Bytes to use BytesSerde Added default contructor to support JsonSerde Added toString for better test output KafkaStreamsWordCountApplication.WordCountProcessorApplication using TopologyTestDriver Unified intentation Bytes as key type Polishing --- .../kafka-streams-word-count/pom.xml | 6 + .../KafkaStreamsWordCountApplication.java | 28 ++- .../src/main/resources/logback.xml | 12 ++ ...KafkaStreamsWordCountApplicationTests.java | 7 +- .../WordCountProcessorApplicationTests.java | 162 ++++++++++++++++++ 5 files changed, 207 insertions(+), 8 deletions(-) create mode 100644 kafka-streams-samples/kafka-streams-word-count/src/main/resources/logback.xml create mode 100644 kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/WordCountProcessorApplicationTests.java diff --git a/kafka-streams-samples/kafka-streams-word-count/pom.xml b/kafka-streams-samples/kafka-streams-word-count/pom.xml index 60992fd..8899666 100644 --- a/kafka-streams-samples/kafka-streams-word-count/pom.xml +++ b/kafka-streams-samples/kafka-streams-word-count/pom.xml @@ -36,6 +36,12 @@ spring-kafka-test test + + org.apache.kafka + kafka-streams-test-utils + ${kafka.version} + test + diff --git a/kafka-streams-samples/kafka-streams-word-count/src/main/java/kafka/streams/word/count/KafkaStreamsWordCountApplication.java b/kafka-streams-samples/kafka-streams-word-count/src/main/java/kafka/streams/word/count/KafkaStreamsWordCountApplication.java index 58162d3..9c96c31 100644 --- a/kafka-streams-samples/kafka-streams-word-count/src/main/java/kafka/streams/word/count/KafkaStreamsWordCountApplication.java +++ b/kafka-streams-samples/kafka-streams-word-count/src/main/java/kafka/streams/word/count/KafkaStreamsWordCountApplication.java @@ -17,6 +17,7 @@ package kafka.streams.word.count; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; @@ -42,15 +43,19 @@ public class KafkaStreamsWordCountApplication { @EnableBinding(KafkaStreamsProcessor.class) public static class WordCountProcessorApplication { - @StreamListener("input") - @SendTo("output") - public KStream process(KStream input) { + public static final String INPUT_TOPIC = "input"; + public static final String OUTPUT_TOPIC = "output"; + public static final int WINDOW_SIZE_MS = 30000; + + @StreamListener(INPUT_TOPIC) + @SendTo(OUTPUT_TOPIC) + public KStream process(KStream input) { return input .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) - .windowedBy(TimeWindows.of(30000)) + .windowedBy(TimeWindows.of(WINDOW_SIZE_MS)) .count(Materialized.as("WordCounts-1")) .toStream() .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end())))); @@ -67,6 +72,21 @@ public class KafkaStreamsWordCountApplication { private Date end; + @Override + public String toString() { + final StringBuffer sb = new StringBuffer("WordCount{"); + sb.append("word='").append(word).append('\''); + sb.append(", count=").append(count); + sb.append(", start=").append(start); + sb.append(", end=").append(end); + sb.append('}'); + return sb.toString(); + } + + WordCount() { + + } + WordCount(String word, long count, Date start, Date end) { this.word = word; this.count = count; diff --git a/kafka-streams-samples/kafka-streams-word-count/src/main/resources/logback.xml b/kafka-streams-samples/kafka-streams-word-count/src/main/resources/logback.xml new file mode 100644 index 0000000..870ac9e --- /dev/null +++ b/kafka-streams-samples/kafka-streams-word-count/src/main/resources/logback.xml @@ -0,0 +1,12 @@ + + + + + %d{ISO8601} %5p %t %c{2}:%L - %m%n + + + + + + + \ No newline at end of file diff --git a/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/KafkaStreamsWordCountApplicationTests.java b/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/KafkaStreamsWordCountApplicationTests.java index 4107988..75e6d6b 100644 --- a/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/KafkaStreamsWordCountApplicationTests.java +++ b/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/KafkaStreamsWordCountApplicationTests.java @@ -44,7 +44,6 @@ import static org.assertj.core.api.Assertions.assertThat; "spring.jmx.enabled=false", "spring.cloud.stream.bindings.input.destination=words", "spring.cloud.stream.bindings.output.destination=counts", - "spring.cloud.stream.bindings.output.contentType=application/json", "spring.cloud.stream.kafka.streams.default.consumer.application-id=basic-word-count", "spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000", "spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde", @@ -65,14 +64,14 @@ public class KafkaStreamsWordCountApplicationTests { DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); consumer = cf.createConsumer(); embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "counts"); - - System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafka.getBrokersAsString()); + //Since there are both binders present in this app, we resort to the spring kafka broker property. + System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString()); } @AfterClass public static void tearDown() { consumer.close(); - System.clearProperty("spring.cloud.stream.kafka.streams.binder.brokers"); + System.clearProperty("spring.kafka.bootstrap-servers"); } @Test diff --git a/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/WordCountProcessorApplicationTests.java b/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/WordCountProcessorApplicationTests.java new file mode 100644 index 0000000..7bbbd28 --- /dev/null +++ b/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/WordCountProcessorApplicationTests.java @@ -0,0 +1,162 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.streams.word.count; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.test.ConsumerRecordFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.util.*; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + + + +/** + * TopologyTestDriver based test about stream processing of {@link KafkaStreamsWordCountApplication} + * + * @author Jukka Karvanen / jukinimi.com + */ + +public class WordCountProcessorApplicationTests { + private TopologyTestDriver testDriver; + public static final String INPUT_TOPIC = KafkaStreamsWordCountApplication.WordCountProcessorApplication.INPUT_TOPIC; + public static final String OUTPUT_TOPIC = KafkaStreamsWordCountApplication.WordCountProcessorApplication.OUTPUT_TOPIC; + + final Serde stringSerde = Serdes.String(); + final JsonSerde countSerde = new JsonSerde<>(KafkaStreamsWordCountApplication.WordCount.class); + final Serde nullSerde = Serdes.Bytes(); //Serde for not used key + private ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>( + stringSerde.serializer(), stringSerde.serializer()); //Key feed in as string, even read as Bytes + + static Properties getStreamsConfiguration() { + final Properties streamsConfiguration = new Properties(); + // Need to be set even these do not matter with TopologyTestDriver + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "TopologyTestDriver"); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); + return streamsConfiguration; + } + + /** + * Setup Stream topology + * Add KStream based on @StreamListener annotation + * Add to(topic) based @SendTo annotation + */ + @Before + public void setup() { + final StreamsBuilder builder = new StreamsBuilder(); + KStream input = builder.stream(INPUT_TOPIC, Consumed.with(nullSerde, stringSerde)); + KafkaStreamsWordCountApplication.WordCountProcessorApplication app = new KafkaStreamsWordCountApplication.WordCountProcessorApplication(); + KStream output = app.process(input); + output.to(OUTPUT_TOPIC, Produced.with(nullSerde, countSerde)); + testDriver = new TopologyTestDriver(builder.build(), getStreamsConfiguration()); + } + + @After + public void tearDown() { + try { + testDriver.close(); + } catch (final RuntimeException e) { + // https://issues.apache.org/jira/browse/KAFKA-6647 causes exception when executed in Windows, ignoring it + // Logged stacktrace cannot be avoided + System.out.println("Ignoring exception, test failing in Windows due this exception:" + e.getLocalizedMessage()); + } + } + + /** + * Read one Record from output topic. + * + * @return ProducerRecord containing WordCount as value + */ + private ProducerRecord readOutput() { + return testDriver.readOutput(OUTPUT_TOPIC, nullSerde.deserializer(), countSerde.deserializer()); + } + + /** + * Read counts from output to map ignoring start and end dates + * If existing word is incremented, it can appear twice in output and is replaced in map + * + * @return Map of Word and counts + */ + private Map getOutputList() { + final Map output = new HashMap<>(); + ProducerRecord outputRow; + while ((outputRow = readOutput()) != null) { + output.put(outputRow.value().getWord(), outputRow.value().getCount()); + } + return output; + } + + /** + * Simple test validating count of one word + */ + @Test + public void testOneWord() { + final String nullKey = null; + //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case + testDriver.pipeInput(recordFactory.create(INPUT_TOPIC, nullKey, "Hello", 1L)); + //Read and validate output + final ProducerRecord output = readOutput(); + assertThat(output).isNotNull(); + assertThat(output.value()).isEqualToComparingFieldByField(new KafkaStreamsWordCountApplication.WordCount("hello", 1L, new Date(0), new Date(KafkaStreamsWordCountApplication.WordCountProcessorApplication.WINDOW_SIZE_MS))); + //No more output in topic + assertThat(readOutput()).isNull(); + } + + /** + * Test Word count of sentence list. + */ + @Test + public void shouldCountWords() { + final List inputLines = Arrays.asList( + "Kafka Streams Examples", + "Spring Cloud Stream Sample", + "Using Kafka Streams Test Utils" + ); + final List> inputRecords = inputLines.stream().map(v -> new KeyValue(null, v)).collect(Collectors.toList()); + + final Map expectedWordCounts = new HashMap<>(); + expectedWordCounts.put("spring", 1L); + expectedWordCounts.put("cloud", 1L); + expectedWordCounts.put("examples", 1L); + expectedWordCounts.put("sample", 1L); + expectedWordCounts.put("streams", 2L); + expectedWordCounts.put("stream", 1L); + expectedWordCounts.put("test", 1L); + expectedWordCounts.put("utils", 1L); + expectedWordCounts.put("kafka", 2L); + expectedWordCounts.put("using", 1L); + + testDriver.pipeInput(recordFactory.create(INPUT_TOPIC, inputRecords, 1L, 1000L)); //All feed in same 30s time window + final Map actualWordCounts = getOutputList(); + assertThat(actualWordCounts).containsAllEntriesOf(expectedWordCounts).hasSameSizeAs(expectedWordCounts); + } +}