57 lines
2.0 KiB
Java
57 lines
2.0 KiB
Java
package com.baeldung.kafka;
|
|
|
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
import org.apache.kafka.common.KafkaException;
|
|
|
|
import java.util.Properties;
|
|
import java.util.stream.Stream;
|
|
|
|
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
|
|
import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
|
|
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
|
|
import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
|
|
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
|
|
|
|
public class TransactionalMessageProducer {
|
|
|
|
private static final String DATA_MESSAGE_1 = "Put any space separated data here for count";
|
|
private static final String DATA_MESSAGE_2 = "Output will contain count of every word in the message";
|
|
|
|
public static void main(String[] args) {
|
|
|
|
KafkaProducer<String, String> producer = createKafkaProducer();
|
|
|
|
producer.initTransactions();
|
|
|
|
try{
|
|
|
|
producer.beginTransaction();
|
|
|
|
Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2).forEach(s -> producer.send(
|
|
new ProducerRecord<String, String>("input", null, s)));
|
|
|
|
producer.commitTransaction();
|
|
|
|
}catch (KafkaException e){
|
|
|
|
producer.abortTransaction();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
private static KafkaProducer<String, String> createKafkaProducer() {
|
|
|
|
Properties props = new Properties();
|
|
props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
|
props.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
|
|
props.put(TRANSACTIONAL_ID_CONFIG, "prod-0");
|
|
props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
|
props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
|
|
|
return new KafkaProducer(props);
|
|
|
|
}
|
|
}
|