diff --git a/kafka-streams-samples/kafka-streams-word-count/pom.xml b/kafka-streams-samples/kafka-streams-word-count/pom.xml index 60992fd..8899666 100644 --- a/kafka-streams-samples/kafka-streams-word-count/pom.xml +++ b/kafka-streams-samples/kafka-streams-word-count/pom.xml @@ -36,6 +36,12 @@ spring-kafka-test test + + org.apache.kafka + kafka-streams-test-utils + ${kafka.version} + test + diff --git a/kafka-streams-samples/kafka-streams-word-count/src/main/java/kafka/streams/word/count/KafkaStreamsWordCountApplication.java b/kafka-streams-samples/kafka-streams-word-count/src/main/java/kafka/streams/word/count/KafkaStreamsWordCountApplication.java index 58162d3..9c96c31 100644 --- a/kafka-streams-samples/kafka-streams-word-count/src/main/java/kafka/streams/word/count/KafkaStreamsWordCountApplication.java +++ b/kafka-streams-samples/kafka-streams-word-count/src/main/java/kafka/streams/word/count/KafkaStreamsWordCountApplication.java @@ -17,6 +17,7 @@ package kafka.streams.word.count; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; @@ -42,15 +43,19 @@ public class KafkaStreamsWordCountApplication { @EnableBinding(KafkaStreamsProcessor.class) public static class WordCountProcessorApplication { - @StreamListener("input") - @SendTo("output") - public KStream process(KStream input) { + public static final String INPUT_TOPIC = "input"; + public static final String OUTPUT_TOPIC = "output"; + public static final int WINDOW_SIZE_MS = 30000; + + @StreamListener(INPUT_TOPIC) + @SendTo(OUTPUT_TOPIC) + public KStream process(KStream input) { return input .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) - .windowedBy(TimeWindows.of(30000)) + .windowedBy(TimeWindows.of(WINDOW_SIZE_MS)) .count(Materialized.as("WordCounts-1")) .toStream() .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end())))); @@ -67,6 +72,21 @@ public class KafkaStreamsWordCountApplication { private Date end; + @Override + public String toString() { + final StringBuffer sb = new StringBuffer("WordCount{"); + sb.append("word='").append(word).append('\''); + sb.append(", count=").append(count); + sb.append(", start=").append(start); + sb.append(", end=").append(end); + sb.append('}'); + return sb.toString(); + } + + WordCount() { + + } + WordCount(String word, long count, Date start, Date end) { this.word = word; this.count = count; diff --git a/kafka-streams-samples/kafka-streams-word-count/src/main/resources/logback.xml b/kafka-streams-samples/kafka-streams-word-count/src/main/resources/logback.xml new file mode 100644 index 0000000..870ac9e --- /dev/null +++ b/kafka-streams-samples/kafka-streams-word-count/src/main/resources/logback.xml @@ -0,0 +1,12 @@ + + + + + %d{ISO8601} %5p %t %c{2}:%L - %m%n + + + + + + + \ No newline at end of file diff --git a/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/KafkaStreamsWordCountApplicationTests.java b/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/KafkaStreamsWordCountApplicationTests.java index 4107988..75e6d6b 100644 --- a/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/KafkaStreamsWordCountApplicationTests.java +++ b/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/KafkaStreamsWordCountApplicationTests.java @@ -44,7 +44,6 @@ import static org.assertj.core.api.Assertions.assertThat; "spring.jmx.enabled=false", "spring.cloud.stream.bindings.input.destination=words", "spring.cloud.stream.bindings.output.destination=counts", - "spring.cloud.stream.bindings.output.contentType=application/json", "spring.cloud.stream.kafka.streams.default.consumer.application-id=basic-word-count", "spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000", "spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde", @@ -65,14 +64,14 @@ public class KafkaStreamsWordCountApplicationTests { DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); consumer = cf.createConsumer(); embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "counts"); - - System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafka.getBrokersAsString()); + //Since there are both binders present in this app, we resort to the spring kafka broker property. + System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString()); } @AfterClass public static void tearDown() { consumer.close(); - System.clearProperty("spring.cloud.stream.kafka.streams.binder.brokers"); + System.clearProperty("spring.kafka.bootstrap-servers"); } @Test diff --git a/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/WordCountProcessorApplicationTests.java b/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/WordCountProcessorApplicationTests.java new file mode 100644 index 0000000..7bbbd28 --- /dev/null +++ b/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count/WordCountProcessorApplicationTests.java @@ -0,0 +1,162 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.streams.word.count; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.test.ConsumerRecordFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.util.*; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + + + +/** + * TopologyTestDriver based test about stream processing of {@link KafkaStreamsWordCountApplication} + * + * @author Jukka Karvanen / jukinimi.com + */ + +public class WordCountProcessorApplicationTests { + private TopologyTestDriver testDriver; + public static final String INPUT_TOPIC = KafkaStreamsWordCountApplication.WordCountProcessorApplication.INPUT_TOPIC; + public static final String OUTPUT_TOPIC = KafkaStreamsWordCountApplication.WordCountProcessorApplication.OUTPUT_TOPIC; + + final Serde stringSerde = Serdes.String(); + final JsonSerde countSerde = new JsonSerde<>(KafkaStreamsWordCountApplication.WordCount.class); + final Serde nullSerde = Serdes.Bytes(); //Serde for not used key + private ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>( + stringSerde.serializer(), stringSerde.serializer()); //Key feed in as string, even read as Bytes + + static Properties getStreamsConfiguration() { + final Properties streamsConfiguration = new Properties(); + // Need to be set even these do not matter with TopologyTestDriver + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "TopologyTestDriver"); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); + return streamsConfiguration; + } + + /** + * Setup Stream topology + * Add KStream based on @StreamListener annotation + * Add to(topic) based @SendTo annotation + */ + @Before + public void setup() { + final StreamsBuilder builder = new StreamsBuilder(); + KStream input = builder.stream(INPUT_TOPIC, Consumed.with(nullSerde, stringSerde)); + KafkaStreamsWordCountApplication.WordCountProcessorApplication app = new KafkaStreamsWordCountApplication.WordCountProcessorApplication(); + KStream output = app.process(input); + output.to(OUTPUT_TOPIC, Produced.with(nullSerde, countSerde)); + testDriver = new TopologyTestDriver(builder.build(), getStreamsConfiguration()); + } + + @After + public void tearDown() { + try { + testDriver.close(); + } catch (final RuntimeException e) { + // https://issues.apache.org/jira/browse/KAFKA-6647 causes exception when executed in Windows, ignoring it + // Logged stacktrace cannot be avoided + System.out.println("Ignoring exception, test failing in Windows due this exception:" + e.getLocalizedMessage()); + } + } + + /** + * Read one Record from output topic. + * + * @return ProducerRecord containing WordCount as value + */ + private ProducerRecord readOutput() { + return testDriver.readOutput(OUTPUT_TOPIC, nullSerde.deserializer(), countSerde.deserializer()); + } + + /** + * Read counts from output to map ignoring start and end dates + * If existing word is incremented, it can appear twice in output and is replaced in map + * + * @return Map of Word and counts + */ + private Map getOutputList() { + final Map output = new HashMap<>(); + ProducerRecord outputRow; + while ((outputRow = readOutput()) != null) { + output.put(outputRow.value().getWord(), outputRow.value().getCount()); + } + return output; + } + + /** + * Simple test validating count of one word + */ + @Test + public void testOneWord() { + final String nullKey = null; + //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case + testDriver.pipeInput(recordFactory.create(INPUT_TOPIC, nullKey, "Hello", 1L)); + //Read and validate output + final ProducerRecord output = readOutput(); + assertThat(output).isNotNull(); + assertThat(output.value()).isEqualToComparingFieldByField(new KafkaStreamsWordCountApplication.WordCount("hello", 1L, new Date(0), new Date(KafkaStreamsWordCountApplication.WordCountProcessorApplication.WINDOW_SIZE_MS))); + //No more output in topic + assertThat(readOutput()).isNull(); + } + + /** + * Test Word count of sentence list. + */ + @Test + public void shouldCountWords() { + final List inputLines = Arrays.asList( + "Kafka Streams Examples", + "Spring Cloud Stream Sample", + "Using Kafka Streams Test Utils" + ); + final List> inputRecords = inputLines.stream().map(v -> new KeyValue(null, v)).collect(Collectors.toList()); + + final Map expectedWordCounts = new HashMap<>(); + expectedWordCounts.put("spring", 1L); + expectedWordCounts.put("cloud", 1L); + expectedWordCounts.put("examples", 1L); + expectedWordCounts.put("sample", 1L); + expectedWordCounts.put("streams", 2L); + expectedWordCounts.put("stream", 1L); + expectedWordCounts.put("test", 1L); + expectedWordCounts.put("utils", 1L); + expectedWordCounts.put("kafka", 2L); + expectedWordCounts.put("using", 1L); + + testDriver.pipeInput(recordFactory.create(INPUT_TOPIC, inputRecords, 1L, 1000L)); //All feed in same 30s time window + final Map actualWordCounts = getOutputList(); + assertThat(actualWordCounts).containsAllEntriesOf(expectedWordCounts).hasSameSizeAs(expectedWordCounts); + } +}