Kafka Streams unit test demonstration in Spring Cloud Stream

kafka-streams-test-utils added TopologyTestDriver, Note now kafka.version need to be kept in sync manually

Reduce unnecessary logging

Extract contants to be used in test class
Changed non used key type from Object to Bytes to use BytesSerde
Added default contructor to support JsonSerde
Added toString for better test output

KafkaStreamsWordCountApplication.WordCountProcessorApplication using TopologyTestDriver

Unified intentation

Bytes as key type

Polishing
This commit is contained in:
Jukka Karvanen
2019-04-09 14:08:27 +03:00
committed by Soby Chacko
parent ce0e8e1e18
commit 1afdac7eb6
5 changed files with 207 additions and 8 deletions

View File

@@ -36,6 +36,12 @@
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@@ -17,6 +17,7 @@
package kafka.streams.word.count;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
@@ -42,15 +43,19 @@ public class KafkaStreamsWordCountApplication {
@EnableBinding(KafkaStreamsProcessor.class)
public static class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
public static final String INPUT_TOPIC = "input";
public static final String OUTPUT_TOPIC = "output";
public static final int WINDOW_SIZE_MS = 30000;
@StreamListener(INPUT_TOPIC)
@SendTo(OUTPUT_TOPIC)
public KStream<Bytes, WordCount> process(KStream<Bytes, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(30000))
.windowedBy(TimeWindows.of(WINDOW_SIZE_MS))
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
@@ -67,6 +72,21 @@ public class KafkaStreamsWordCountApplication {
private Date end;
@Override
public String toString() {
final StringBuffer sb = new StringBuffer("WordCount{");
sb.append("word='").append(word).append('\'');
sb.append(", count=").append(count);
sb.append(", start=").append(start);
sb.append(", end=").append(end);
sb.append('}');
return sb.toString();
}
WordCount() {
}
WordCount(String word, long count, Date start, Date end) {
this.word = word;
this.count = count;

View File

@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="stdout"/>
</root>
<logger name="org.apache.kafka.streams.processor.internals" level="WARN"/>
</configuration>

View File

@@ -44,7 +44,6 @@ import static org.assertj.core.api.Assertions.assertThat;
"spring.jmx.enabled=false",
"spring.cloud.stream.bindings.input.destination=words",
"spring.cloud.stream.bindings.output.destination=counts",
"spring.cloud.stream.bindings.output.contentType=application/json",
"spring.cloud.stream.kafka.streams.default.consumer.application-id=basic-word-count",
"spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
@@ -65,14 +64,14 @@ public class KafkaStreamsWordCountApplicationTests {
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "counts");
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafka.getBrokersAsString());
//Since there are both binders present in this app, we resort to the spring kafka broker property.
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
}
@AfterClass
public static void tearDown() {
consumer.close();
System.clearProperty("spring.cloud.stream.kafka.streams.binder.brokers");
System.clearProperty("spring.kafka.bootstrap-servers");
}
@Test

View File

@@ -0,0 +1,162 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.streams.word.count;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.kafka.support.serializer.JsonSerde;
import java.util.*;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
/**
* TopologyTestDriver based test about stream processing of {@link KafkaStreamsWordCountApplication}
*
* @author Jukka Karvanen / jukinimi.com
*/
public class WordCountProcessorApplicationTests {
private TopologyTestDriver testDriver;
public static final String INPUT_TOPIC = KafkaStreamsWordCountApplication.WordCountProcessorApplication.INPUT_TOPIC;
public static final String OUTPUT_TOPIC = KafkaStreamsWordCountApplication.WordCountProcessorApplication.OUTPUT_TOPIC;
final Serde<String> stringSerde = Serdes.String();
final JsonSerde<KafkaStreamsWordCountApplication.WordCount> countSerde = new JsonSerde<>(KafkaStreamsWordCountApplication.WordCount.class);
final Serde<Bytes> nullSerde = Serdes.Bytes(); //Serde for not used key
private ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(
stringSerde.serializer(), stringSerde.serializer()); //Key feed in as string, even read as Bytes
static Properties getStreamsConfiguration() {
final Properties streamsConfiguration = new Properties();
// Need to be set even these do not matter with TopologyTestDriver
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "TopologyTestDriver");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
return streamsConfiguration;
}
/**
* Setup Stream topology
* Add KStream based on @StreamListener annotation
* Add to(topic) based @SendTo annotation
*/
@Before
public void setup() {
final StreamsBuilder builder = new StreamsBuilder();
KStream<Bytes, String> input = builder.stream(INPUT_TOPIC, Consumed.with(nullSerde, stringSerde));
KafkaStreamsWordCountApplication.WordCountProcessorApplication app = new KafkaStreamsWordCountApplication.WordCountProcessorApplication();
KStream<Bytes, KafkaStreamsWordCountApplication.WordCount> output = app.process(input);
output.to(OUTPUT_TOPIC, Produced.with(nullSerde, countSerde));
testDriver = new TopologyTestDriver(builder.build(), getStreamsConfiguration());
}
@After
public void tearDown() {
try {
testDriver.close();
} catch (final RuntimeException e) {
// https://issues.apache.org/jira/browse/KAFKA-6647 causes exception when executed in Windows, ignoring it
// Logged stacktrace cannot be avoided
System.out.println("Ignoring exception, test failing in Windows due this exception:" + e.getLocalizedMessage());
}
}
/**
* Read one Record from output topic.
*
* @return ProducerRecord containing WordCount as value
*/
private ProducerRecord<Bytes, KafkaStreamsWordCountApplication.WordCount> readOutput() {
return testDriver.readOutput(OUTPUT_TOPIC, nullSerde.deserializer(), countSerde.deserializer());
}
/**
* Read counts from output to map ignoring start and end dates
* If existing word is incremented, it can appear twice in output and is replaced in map
*
* @return Map of Word and counts
*/
private Map<String, Long> getOutputList() {
final Map<String, Long> output = new HashMap<>();
ProducerRecord<Bytes, KafkaStreamsWordCountApplication.WordCount> outputRow;
while ((outputRow = readOutput()) != null) {
output.put(outputRow.value().getWord(), outputRow.value().getCount());
}
return output;
}
/**
* Simple test validating count of one word
*/
@Test
public void testOneWord() {
final String nullKey = null;
//Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
testDriver.pipeInput(recordFactory.create(INPUT_TOPIC, nullKey, "Hello", 1L));
//Read and validate output
final ProducerRecord<Bytes, KafkaStreamsWordCountApplication.WordCount> output = readOutput();
assertThat(output).isNotNull();
assertThat(output.value()).isEqualToComparingFieldByField(new KafkaStreamsWordCountApplication.WordCount("hello", 1L, new Date(0), new Date(KafkaStreamsWordCountApplication.WordCountProcessorApplication.WINDOW_SIZE_MS)));
//No more output in topic
assertThat(readOutput()).isNull();
}
/**
* Test Word count of sentence list.
*/
@Test
public void shouldCountWords() {
final List<String> inputLines = Arrays.asList(
"Kafka Streams Examples",
"Spring Cloud Stream Sample",
"Using Kafka Streams Test Utils"
);
final List<KeyValue<String, String>> inputRecords = inputLines.stream().map(v -> new KeyValue<String, String>(null, v)).collect(Collectors.toList());
final Map<String, Long> expectedWordCounts = new HashMap<>();
expectedWordCounts.put("spring", 1L);
expectedWordCounts.put("cloud", 1L);
expectedWordCounts.put("examples", 1L);
expectedWordCounts.put("sample", 1L);
expectedWordCounts.put("streams", 2L);
expectedWordCounts.put("stream", 1L);
expectedWordCounts.put("test", 1L);
expectedWordCounts.put("utils", 1L);
expectedWordCounts.put("kafka", 2L);
expectedWordCounts.put("using", 1L);
testDriver.pipeInput(recordFactory.create(INPUT_TOPIC, inputRecords, 1L, 1000L)); //All feed in same 30s time window
final Map<String, Long> actualWordCounts = getOutputList();
assertThat(actualWordCounts).containsAllEntriesOf(expectedWordCounts).hasSameSizeAs(expectedWordCounts);
}
}