From bc797d88a07bad6d636a1c38a188d0fb6e308771 Mon Sep 17 00:00:00 2001 From: sampadawagde Date: Sun, 15 Aug 2021 17:28:59 +0530 Subject: [PATCH] JAVA-6390: Move kafka articles from libraries-6 to new module apache-kafka --- .../TransactionalMessageProducer.java | 10 +++++----- .../exactlyonce}/TransactionalWordCount.java | 15 ++++++++------- .../com/baeldung/kafka/exactlyonce}/Tuple.java | 6 +++--- 3 files changed, 16 insertions(+), 15 deletions(-) rename {libraries-6/src/main/java/com/baeldung/kafka => apache-kafka/src/main/java/com/baeldung/kafka/exactlyonce}/TransactionalMessageProducer.java (87%) rename {libraries-6/src/main/java/com/baeldung/kafka => apache-kafka/src/main/java/com/baeldung/kafka/exactlyonce}/TransactionalWordCount.java (90%) rename {libraries-6/src/main/java/com/baeldung/kafka => apache-kafka/src/main/java/com/baeldung/kafka/exactlyonce}/Tuple.java (69%) diff --git a/libraries-6/src/main/java/com/baeldung/kafka/TransactionalMessageProducer.java b/apache-kafka/src/main/java/com/baeldung/kafka/exactlyonce/TransactionalMessageProducer.java similarity index 87% rename from libraries-6/src/main/java/com/baeldung/kafka/TransactionalMessageProducer.java rename to apache-kafka/src/main/java/com/baeldung/kafka/exactlyonce/TransactionalMessageProducer.java index 15488bbaf4..8f2fe6e309 100644 --- a/libraries-6/src/main/java/com/baeldung/kafka/TransactionalMessageProducer.java +++ b/apache-kafka/src/main/java/com/baeldung/kafka/exactlyonce/TransactionalMessageProducer.java @@ -1,4 +1,4 @@ -package com.baeldung.kafka; +package com.baeldung.kafka.exactlyonce; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -24,16 +24,16 @@ public class TransactionalMessageProducer { producer.initTransactions(); - try{ + try { producer.beginTransaction(); - Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2).forEach(s -> producer.send( - new ProducerRecord("input", null, s))); + Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2) + .forEach(s -> producer.send(new ProducerRecord("input", null, s))); producer.commitTransaction(); - }catch (KafkaException e){ + } catch (KafkaException e) { producer.abortTransaction(); diff --git a/libraries-6/src/main/java/com/baeldung/kafka/TransactionalWordCount.java b/apache-kafka/src/main/java/com/baeldung/kafka/exactlyonce/TransactionalWordCount.java similarity index 90% rename from libraries-6/src/main/java/com/baeldung/kafka/TransactionalWordCount.java rename to apache-kafka/src/main/java/com/baeldung/kafka/exactlyonce/TransactionalWordCount.java index 0563ba6684..b9a2cb9f85 100644 --- a/libraries-6/src/main/java/com/baeldung/kafka/TransactionalWordCount.java +++ b/apache-kafka/src/main/java/com/baeldung/kafka/exactlyonce/TransactionalWordCount.java @@ -1,4 +1,4 @@ -package com.baeldung.kafka; +package com.baeldung.kafka.exactlyonce; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -43,10 +43,11 @@ public class TransactionalWordCount { ConsumerRecords records = consumer.poll(ofSeconds(60)); Map wordCountMap = records.records(new TopicPartition(INPUT_TOPIC, 0)) - .stream() - .flatMap(record -> Stream.of(record.value().split(" "))) - .map(word -> Tuple.of(word, 1)) - .collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2)); + .stream() + .flatMap(record -> Stream.of(record.value() + .split(" "))) + .map(word -> Tuple.of(word, 1)) + .collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2)); producer.beginTransaction(); @@ -56,7 +57,8 @@ public class TransactionalWordCount { for (TopicPartition partition : records.partitions()) { List> partitionedRecords = records.records(partition); - long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); + long offset = partitionedRecords.get(partitionedRecords.size() - 1) + .offset(); offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1)); } @@ -72,7 +74,6 @@ public class TransactionalWordCount { } - } private static KafkaConsumer createKafkaConsumer() { diff --git a/libraries-6/src/main/java/com/baeldung/kafka/Tuple.java b/apache-kafka/src/main/java/com/baeldung/kafka/exactlyonce/Tuple.java similarity index 69% rename from libraries-6/src/main/java/com/baeldung/kafka/Tuple.java rename to apache-kafka/src/main/java/com/baeldung/kafka/exactlyonce/Tuple.java index 883de4ba21..ad61e905fd 100644 --- a/libraries-6/src/main/java/com/baeldung/kafka/Tuple.java +++ b/apache-kafka/src/main/java/com/baeldung/kafka/exactlyonce/Tuple.java @@ -1,4 +1,4 @@ -package com.baeldung.kafka; +package com.baeldung.kafka.exactlyonce; public class Tuple { @@ -10,8 +10,8 @@ public class Tuple { this.value = value; } - public static Tuple of(String key, Integer value){ - return new Tuple(key,value); + public static Tuple of(String key, Integer value) { + return new Tuple(key, value); } public String getKey() {