diff --git a/apache-kafka/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java b/apache-kafka/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java index 0b66dd8fec..3b559b619e 100644 --- a/apache-kafka/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java +++ b/apache-kafka/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java @@ -1,5 +1,9 @@ package com.baeldung.kafkastreams; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Arrays; import java.util.Properties; import java.util.regex.Pattern; @@ -18,59 +22,57 @@ import org.junit.Test; public class KafkaStreamsLiveTest { private String bootstrapServers = "localhost:9092"; + private Path stateDirectory; @Test @Ignore("it needs to have kafka broker running on local") public void shouldTestKafkaStreams() throws InterruptedException { - //given + // given String inputTopic = "inputTopic"; Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String() + .getClass() + .getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String() + .getClass() + .getName()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - // Use a temporary directory for storing state, which will be automatically removed after the test. - // streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); - /* - * final StreamsBuilder builder = new StreamsBuilder(); - KStream textLines = builder.stream(wordCountTopic, - Consumed.with(Serdes.String(), Serdes.String())); + // Use a temporary directory for storing state, which will be automatically removed after the test. + try { + this.stateDirectory = Files.createTempDirectory("kafka-streams"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath() + .toString()); + } catch (final IOException e) { + throw new UncheckedIOException("Cannot create temporary directory", e); + } - KTable wordCounts = textLines - .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT) - .split("\\W+"))) - .groupBy((key, word) -> word) - .count(Materialized.> as("counts-store")); - */ - //when + // when final StreamsBuilder builder = new StreamsBuilder(); KStream textLines = builder.stream(inputTopic); Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); - KTable wordCounts = textLines - .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) - .groupBy((key, word) -> word) - .count(); + KTable wordCounts = textLines.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) + .groupBy((key, word) -> word) + .count(); - wordCounts.toStream().foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); + wordCounts.toStream() + .foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); String outputTopic = "outputTopic"; - //final Serde stringSerde = Serdes.String(); - //final Serde longSerde = Serdes.Long(); - //wordCounts.toStream().to(stringSerde, longSerde, outputTopic); - - wordCounts.toStream().to("outputTopic", - Produced.with(Serdes.String(), Serdes.Long())); + + wordCounts.toStream() + .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); streams.start(); - //then + // then Thread.sleep(30000); streams.close(); } diff --git a/libraries-data-3/README.md b/libraries-data-3/README.md deleted file mode 100644 index fe856436f1..0000000000 --- a/libraries-data-3/README.md +++ /dev/null @@ -1,9 +0,0 @@ -## Data Libraries - -This module contains articles about libraries for data processing in Java. - -### Relevant articles - - -##### Building the project -You can build the project from the command line using: *mvn clean install*, or in an IDE. \ No newline at end of file diff --git a/libraries-data-3/pom.xml b/libraries-data-3/pom.xml deleted file mode 100644 index 1a95613cb3..0000000000 --- a/libraries-data-3/pom.xml +++ /dev/null @@ -1,21 +0,0 @@ - - - 4.0.0 - libraries-data-3 - libraries-data-3 - - - com.baeldung - parent-modules - 1.0.0-SNAPSHOT - - - - - - - - - \ No newline at end of file