diff --git a/libraries-data/src/main/resources/kafka-connect/01_Quick_Start/connect-file-sink.properties b/apache-kafka/src/main/resources/kafka-connect/01_Quick_Start/connect-file-sink.properties similarity index 100% rename from libraries-data/src/main/resources/kafka-connect/01_Quick_Start/connect-file-sink.properties rename to apache-kafka/src/main/resources/kafka-connect/01_Quick_Start/connect-file-sink.properties diff --git a/libraries-data/src/main/resources/kafka-connect/01_Quick_Start/connect-file-source.properties b/apache-kafka/src/main/resources/kafka-connect/01_Quick_Start/connect-file-source.properties similarity index 100% rename from libraries-data/src/main/resources/kafka-connect/01_Quick_Start/connect-file-source.properties rename to apache-kafka/src/main/resources/kafka-connect/01_Quick_Start/connect-file-source.properties diff --git a/libraries-data/src/main/resources/kafka-connect/01_Quick_Start/connect-standalone.properties b/apache-kafka/src/main/resources/kafka-connect/01_Quick_Start/connect-standalone.properties similarity index 100% rename from libraries-data/src/main/resources/kafka-connect/01_Quick_Start/connect-standalone.properties rename to apache-kafka/src/main/resources/kafka-connect/01_Quick_Start/connect-standalone.properties diff --git a/libraries-data/src/main/resources/kafka-connect/02_Distributed/connect-distributed.properties b/apache-kafka/src/main/resources/kafka-connect/02_Distributed/connect-distributed.properties similarity index 100% rename from libraries-data/src/main/resources/kafka-connect/02_Distributed/connect-distributed.properties rename to apache-kafka/src/main/resources/kafka-connect/02_Distributed/connect-distributed.properties diff --git a/libraries-data/src/main/resources/kafka-connect/02_Distributed/connect-file-sink.json b/apache-kafka/src/main/resources/kafka-connect/02_Distributed/connect-file-sink.json similarity index 100% rename from libraries-data/src/main/resources/kafka-connect/02_Distributed/connect-file-sink.json rename to apache-kafka/src/main/resources/kafka-connect/02_Distributed/connect-file-sink.json diff --git a/libraries-data/src/main/resources/kafka-connect/02_Distributed/connect-file-source.json b/apache-kafka/src/main/resources/kafka-connect/02_Distributed/connect-file-source.json similarity index 100% rename from libraries-data/src/main/resources/kafka-connect/02_Distributed/connect-file-source.json rename to apache-kafka/src/main/resources/kafka-connect/02_Distributed/connect-file-source.json diff --git a/libraries-data/src/main/resources/kafka-connect/03_Transform/connect-distributed.properties b/apache-kafka/src/main/resources/kafka-connect/03_Transform/connect-distributed.properties similarity index 100% rename from libraries-data/src/main/resources/kafka-connect/03_Transform/connect-distributed.properties rename to apache-kafka/src/main/resources/kafka-connect/03_Transform/connect-distributed.properties diff --git a/libraries-data/src/main/resources/kafka-connect/03_Transform/connect-file-source-transform.json b/apache-kafka/src/main/resources/kafka-connect/03_Transform/connect-file-source-transform.json similarity index 100% rename from libraries-data/src/main/resources/kafka-connect/03_Transform/connect-file-source-transform.json rename to apache-kafka/src/main/resources/kafka-connect/03_Transform/connect-file-source-transform.json diff --git a/libraries-data/src/main/resources/kafka-connect/04_Custom/connect-mongodb-sink.json b/apache-kafka/src/main/resources/kafka-connect/04_Custom/connect-mongodb-sink.json similarity index 100% rename from libraries-data/src/main/resources/kafka-connect/04_Custom/connect-mongodb-sink.json rename to apache-kafka/src/main/resources/kafka-connect/04_Custom/connect-mongodb-sink.json diff --git a/libraries-data/src/main/resources/kafka-connect/04_Custom/connect-mqtt-source.json b/apache-kafka/src/main/resources/kafka-connect/04_Custom/connect-mqtt-source.json similarity index 100% rename from libraries-data/src/main/resources/kafka-connect/04_Custom/connect-mqtt-source.json rename to apache-kafka/src/main/resources/kafka-connect/04_Custom/connect-mqtt-source.json diff --git a/libraries-data/src/main/resources/kafka-connect/04_Custom/docker-compose.yaml b/apache-kafka/src/main/resources/kafka-connect/04_Custom/docker-compose.yaml similarity index 100% rename from libraries-data/src/main/resources/kafka-connect/04_Custom/docker-compose.yaml rename to apache-kafka/src/main/resources/kafka-connect/04_Custom/docker-compose.yaml diff --git a/apache-kafka/src/test/java/com/baeldung/kafka/streamsvsconsumer/KafkaStreamsLiveTest.java b/apache-kafka/src/test/java/com/baeldung/kafka/streamsvsconsumer/KafkaStreamsLiveTest.java new file mode 100644 index 0000000000..88de6101dc --- /dev/null +++ b/apache-kafka/src/test/java/com/baeldung/kafka/streamsvsconsumer/KafkaStreamsLiveTest.java @@ -0,0 +1,279 @@ +package com.baeldung.kafka.streamsvsconsumer; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KGroupedTable; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowStore; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Locale; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + +public class KafkaStreamsLiveTest { + private final String LEFT_TOPIC = "left-stream-topic"; + private final String RIGHT_TOPIC = "right-stream-topic"; + private final String LEFT_RIGHT_TOPIC = "left-right-stream-topic"; + + private KafkaProducer producer = createKafkaProducer(); + private Properties streamsConfiguration = new Properties(); + + static final String TEXT_LINES_TOPIC = "TextLinesTopic"; + + private final String TEXT_EXAMPLE_1 = "test test and test"; + private final String TEXT_EXAMPLE_2 = "test filter filter this sentence"; + + @ClassRule + public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")); + + @Before + public void setUp() { + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + } + + @Test + public void shouldTestKafkaTableLatestWord() throws InterruptedException { + String inputTopic = "topicTable"; + + final StreamsBuilder builder = new StreamsBuilder(); + + KTable textLinesTable = builder.table(inputTopic, + Consumed.with(Serdes.String(), Serdes.String())); + + textLinesTable.toStream().foreach((word, count) -> System.out.println("Latest word: " + word + " -> " + count)); + + final Topology topology = builder.build(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "latest-word-id"); + KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); + + streams.cleanUp(); + streams.start(); + producer.send(new ProducerRecord(inputTopic, "1", TEXT_EXAMPLE_1)); + producer.send(new ProducerRecord(inputTopic, "2", TEXT_EXAMPLE_2)); + + Thread.sleep(2000); + streams.close(); + } + + @Test + public void shouldTestWordCountKafkaStreams() throws InterruptedException { + String wordCountTopic = "wordCountTopic"; + + final StreamsBuilder builder = new StreamsBuilder(); + KStream textLines = builder.stream(wordCountTopic, + Consumed.with(Serdes.String(), Serdes.String())); + + KTable wordCounts = textLines + .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT) + .split("\\W+"))) + .groupBy((key, word) -> word) + .count(Materialized.> as("counts-store")); + + wordCounts.toStream().foreach((word, count) -> System.out.println("Word: " + word + " -> " + count)); + + wordCounts.toStream().to("outputTopic", + Produced.with(Serdes.String(), Serdes.Long())); + + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-stream-table-id"); + final Topology topology = builder.build(); + KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); + + streams.cleanUp(); + streams.start(); + + producer.send(new ProducerRecord(wordCountTopic, "1", TEXT_EXAMPLE_1)); + producer.send(new ProducerRecord(wordCountTopic, "2", TEXT_EXAMPLE_2)); + + Thread.sleep(2000); + streams.close(); + } + + // Filter, map + @Test + public void shouldTestStatelessTransformations() throws InterruptedException { + String wordCountTopic = "wordCountTopic"; + + //when + final StreamsBuilder builder = new StreamsBuilder(); + KStream textLines = builder.stream(wordCountTopic, + Consumed.with(Serdes.String(), Serdes.String())); + + final KStream textLinesUpperCase = + textLines + .map((key, value) -> KeyValue.pair(value, value.toUpperCase())) + .filter((key, value) -> value.contains("FILTER")); + + KTable wordCounts = textLinesUpperCase + .flatMapValues(value -> Arrays.asList(value.split("\\W+"))) + .groupBy((key, word) -> word) + .count(Materialized.> as("counts-store")); + + wordCounts.toStream().foreach((word, count) -> System.out.println("Word: " + word + " -> " + count)); + + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-filter-map-id"); + final Topology topology = builder.build(); + KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); + + streams.cleanUp(); + streams.start(); + + producer.send(new ProducerRecord(wordCountTopic, "1", TEXT_EXAMPLE_1)); + producer.send(new ProducerRecord(wordCountTopic, "2", TEXT_EXAMPLE_2)); + + Thread.sleep(2000); + streams.close(); + + } + + @Test + public void shouldTestAggregationStatefulTransformations() throws InterruptedException { + String aggregationTopic = "aggregationTopic"; + + final StreamsBuilder builder = new StreamsBuilder(); + final KStream input = builder.stream(aggregationTopic, + Consumed.with(Serdes.ByteArray(), Serdes.String())); + final KTable aggregated = input + .groupBy((key, value) -> (value != null && value.length() > 0) ? value.substring(0, 2).toLowerCase() : "", + Grouped.with(Serdes.String(), Serdes.String())) + .aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue.length(), + Materialized.with(Serdes.String(), Serdes.Long())); + + aggregated.toStream().foreach((word, count) -> System.out.println("Word: " + word + " -> " + count)); + + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregation-id"); + final Topology topology = builder.build(); + KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); + + streams.cleanUp(); + streams.start(); + + producer.send(new ProducerRecord(aggregationTopic, "1", "one")); + producer.send(new ProducerRecord(aggregationTopic, "2", "two")); + producer.send(new ProducerRecord(aggregationTopic, "3", "three")); + producer.send(new ProducerRecord(aggregationTopic, "4", "four")); + producer.send(new ProducerRecord(aggregationTopic, "5", "five")); + + Thread.sleep(5000); + streams.close(); + + } + + @Test + public void shouldTestWindowingJoinStatefulTransformations() throws InterruptedException { + final StreamsBuilder builder = new StreamsBuilder(); + + KStream leftSource = builder.stream(LEFT_TOPIC); + KStream rightSource = builder.stream(RIGHT_TOPIC); + + KStream leftRightSource = leftSource.outerJoin(rightSource, + (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, + JoinWindows.of(Duration.ofSeconds(5))) + .groupByKey() + .reduce(((key, lastValue) -> lastValue)) + .toStream(); + + leftRightSource.foreach((key, value) -> System.out.println("(key= " + key + ") -> (" + value + ")")); + + final Topology topology = builder.build(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowing-join-id"); + KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); + + streams.cleanUp(); + streams.start(); + + producer.send(new ProducerRecord(LEFT_TOPIC, "1", "left")); + producer.send(new ProducerRecord(RIGHT_TOPIC, "2", "right")); + + Thread.sleep(2000); + streams.close(); + } + + @Test + public void shouldTestWordCountWithInteractiveQueries() throws InterruptedException { + + final Serde stringSerde = Serdes.String(); + final StreamsBuilder builder = new StreamsBuilder(); + final KStream + textLines = builder.stream(TEXT_LINES_TOPIC, Consumed.with(Serdes.String(), Serdes.String())); + + final KGroupedStream groupedByWord = textLines + .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde)); + + groupedByWord.count(Materialized.>as("WordCountsStore") + .withValueSerde(Serdes.Long())); + + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-interactive-queries"); + + final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); + streams.cleanUp(); + streams.start(); + + producer.send(new ProducerRecord(TEXT_LINES_TOPIC, "1", TEXT_EXAMPLE_1)); + producer.send(new ProducerRecord(TEXT_LINES_TOPIC, "2", TEXT_EXAMPLE_2)); + + Thread.sleep(2000); + ReadOnlyKeyValueStore keyValueStore = + streams.store(StoreQueryParameters.fromNameAndType( + "WordCountsStore", QueryableStoreTypes.keyValueStore())); + + KeyValueIterator range = keyValueStore.all(); + while (range.hasNext()) { + KeyValue next = range.next(); + System.out.println("Count for " + next.key + ": " + next.value); + } + + streams.close(); + } + + private static KafkaProducer createKafkaProducer() { + + Properties props = new Properties(); + props.put(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + + return new KafkaProducer(props); + + } +} + + diff --git a/libraries-data/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java b/libraries-data/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java deleted file mode 100644 index 32568e9ea5..0000000000 --- a/libraries-data/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.baeldung.kafkastreams; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.test.TestUtils; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Properties; -import java.util.regex.Pattern; - -public class KafkaStreamsLiveTest { - private String bootstrapServers = "localhost:9092"; - - @Test - @Ignore("it needs to have kafka broker running on local") - public void shouldTestKafkaStreams() throws InterruptedException { - //given - String inputTopic = "inputTopic"; - - Properties streamsConfiguration = new Properties(); - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test"); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - // Use a temporary directory for storing state, which will be automatically removed after the test. - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); - - //when - KStreamBuilder builder = new KStreamBuilder(); - KStream textLines = builder.stream(inputTopic); - Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); - - KTable wordCounts = textLines - .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) - .groupBy((key, word) -> word) - .count(); - - wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); - - String outputTopic = "outputTopic"; - final Serde stringSerde = Serdes.String(); - final Serde longSerde = Serdes.Long(); - wordCounts.to(stringSerde, longSerde, outputTopic); - - KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); - streams.start(); - - //then - Thread.sleep(30000); - streams.close(); - } -}