diff --git a/libraries/src/main/java/com/baeldung/kafka/TransactionalMessageProducer.java b/libraries/src/main/java/com/baeldung/kafka/TransactionalMessageProducer.java new file mode 100644 index 0000000000..15488bbaf4 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/kafka/TransactionalMessageProducer.java @@ -0,0 +1,56 @@ +package com.baeldung.kafka; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; + +import java.util.Properties; +import java.util.stream.Stream; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + +public class TransactionalMessageProducer { + + private static final String DATA_MESSAGE_1 = "Put any space separated data here for count"; + private static final String DATA_MESSAGE_2 = "Output will contain count of every word in the message"; + + public static void main(String[] args) { + + KafkaProducer producer = createKafkaProducer(); + + producer.initTransactions(); + + try{ + + producer.beginTransaction(); + + Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2).forEach(s -> producer.send( + new ProducerRecord("input", null, s))); + + producer.commitTransaction(); + + }catch (KafkaException e){ + + producer.abortTransaction(); + + } + + } + + private static KafkaProducer createKafkaProducer() { + + Properties props = new Properties(); + props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ENABLE_IDEMPOTENCE_CONFIG, "true"); + props.put(TRANSACTIONAL_ID_CONFIG, "prod-0"); + props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + + return new KafkaProducer(props); + + } +} diff --git a/libraries/src/main/java/com/baeldung/kafka/TransactionalApp.java b/libraries/src/main/java/com/baeldung/kafka/TransactionalWordCount.java similarity index 66% rename from libraries/src/main/java/com/baeldung/kafka/TransactionalApp.java rename to libraries/src/main/java/com/baeldung/kafka/TransactionalWordCount.java index 1e95041a0d..0563ba6684 100644 --- a/libraries/src/main/java/com/baeldung/kafka/TransactionalApp.java +++ b/libraries/src/main/java/com/baeldung/kafka/TransactionalWordCount.java @@ -14,6 +14,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.time.Duration.ofSeconds; import static java.util.Collections.singleton; @@ -21,16 +23,16 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.*; import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.*; -public class TransactionalApp { +public class TransactionalWordCount { - private static final String CONSUMER_GROUP_ID = "test"; + private static final String CONSUMER_GROUP_ID = "my-group-id"; private static final String OUTPUT_TOPIC = "output"; private static final String INPUT_TOPIC = "input"; public static void main(String[] args) { - KafkaConsumer consumer = initConsumer(); - KafkaProducer producer = initProducer(); + KafkaConsumer consumer = createKafkaConsumer(); + KafkaProducer producer = createKafkaProducer(); producer.initTransactions(); @@ -38,12 +40,17 @@ public class TransactionalApp { while (true) { - ConsumerRecords records = consumer.poll(ofSeconds(20)); + ConsumerRecords records = consumer.poll(ofSeconds(60)); + + Map wordCountMap = records.records(new TopicPartition(INPUT_TOPIC, 0)) + .stream() + .flatMap(record -> Stream.of(record.value().split(" "))) + .map(word -> Tuple.of(word, 1)) + .collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2)); producer.beginTransaction(); - for (ConsumerRecord record : records) - producer.send(new ProducerRecord(OUTPUT_TOPIC, record)); + wordCountMap.forEach((key, value) -> producer.send(new ProducerRecord(OUTPUT_TOPIC, key, value.toString()))); Map offsetsToCommit = new HashMap<>(); @@ -51,7 +58,7 @@ public class TransactionalApp { List> partitionedRecords = records.records(partition); long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); - offsetsToCommit.put(partition, new OffsetAndMetadata(offset)); + offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1)); } producer.sendOffsetsToTransaction(offsetsToCommit, CONSUMER_GROUP_ID); @@ -68,11 +75,12 @@ public class TransactionalApp { } - private static KafkaConsumer initConsumer() { + private static KafkaConsumer createKafkaConsumer() { Properties props = new Properties(); props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(GROUP_ID_CONFIG, CONSUMER_GROUP_ID); props.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); @@ -81,19 +89,14 @@ public class TransactionalApp { return consumer; } - private static KafkaProducer initProducer() { + private static KafkaProducer createKafkaProducer() { Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(ACKS_CONFIG, "all"); - props.put(RETRIES_CONFIG, 3); - props.put(BATCH_SIZE_CONFIG, 16384); - props.put(LINGER_MS_CONFIG, 1); - props.put(BUFFER_MEMORY_CONFIG, 33554432); + props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ENABLE_IDEMPOTENCE_CONFIG, "true"); props.put(TRANSACTIONAL_ID_CONFIG, "prod-1"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); return new KafkaProducer(props); diff --git a/libraries/src/main/java/com/baeldung/kafka/Tuple.java b/libraries/src/main/java/com/baeldung/kafka/Tuple.java new file mode 100644 index 0000000000..883de4ba21 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/kafka/Tuple.java @@ -0,0 +1,24 @@ +package com.baeldung.kafka; + +public class Tuple { + + private String key; + private Integer value; + + private Tuple(String key, Integer value) { + this.key = key; + this.value = value; + } + + public static Tuple of(String key, Integer value){ + return new Tuple(key,value); + } + + public String getKey() { + return key; + } + + public Integer getValue() { + return value; + } +}