Files
spring-boot-rest/apache-kafka/src/main/java/com/baeldung/kafka/exactlyonce/TransactionalMessageProducer.java
2021-08-15 17:28:59 +05:30

57 lines
2.0 KiB
Java

package com.baeldung.kafka.exactlyonce;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import java.util.Properties;
import java.util.stream.Stream;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
public class TransactionalMessageProducer {
private static final String DATA_MESSAGE_1 = "Put any space separated data here for count";
private static final String DATA_MESSAGE_2 = "Output will contain count of every word in the message";
public static void main(String[] args) {
KafkaProducer<String, String> producer = createKafkaProducer();
producer.initTransactions();
try {
producer.beginTransaction();
Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2)
.forEach(s -> producer.send(new ProducerRecord<String, String>("input", null, s)));
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
}
private static KafkaProducer<String, String> createKafkaProducer() {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(TRANSACTIONAL_ID_CONFIG, "prod-0");
props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer(props);
}
}