diff --git a/kafka/kafka-producer/src/main/java/KafkaConfig.java b/kafka/kafka-producer/src/main/java/KafkaConfig.java index 4917fc7b..dfa32731 100644 --- a/kafka/kafka-producer/src/main/java/KafkaConfig.java +++ b/kafka/kafka-producer/src/main/java/KafkaConfig.java @@ -13,6 +13,7 @@ public class KafkaConfig { configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfig.BOOTSTRAP_SERVERS); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); +// configs.put(ProducerConfig.ACKS_CONFIG, 0); return new KafkaProducer<>(configs); } diff --git a/kafka/kafka-producer/src/main/java/ProducerCallback.java b/kafka/kafka-producer/src/main/java/ProducerCallback.java new file mode 100644 index 00000000..c62f0def --- /dev/null +++ b/kafka/kafka-producer/src/main/java/ProducerCallback.java @@ -0,0 +1,16 @@ +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProducerCallback implements Callback { + private final static Logger logger = LoggerFactory.getLogger(ProducerCallback.class); + + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception e) { + if (e != null) + logger.error(e.getMessage(), e); + else + logger.info(recordMetadata.toString()); + } +} \ No newline at end of file diff --git a/kafka/kafka-producer/src/main/java/ProducerWithAsyncCallback.java b/kafka/kafka-producer/src/main/java/ProducerWithAsyncCallback.java new file mode 100644 index 00000000..aef94865 --- /dev/null +++ b/kafka/kafka-producer/src/main/java/ProducerWithAsyncCallback.java @@ -0,0 +1,21 @@ +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import java.util.Properties; + +public class ProducerWithAsyncCallback { + + public static void main(String[] args) { + + KafkaProducer producer = KafkaConfig.setup(); + + ProducerRecord record = + new ProducerRecord<>(KafkaConfig.TOPIC_NAME, "async-callback", "async-callback"); + + producer.send(record, new ProducerCallback()); + + producer.flush(); + producer.close(); + } +} diff --git a/kafka/kafka-producer/src/main/java/ProducerWithSyncCallback.java b/kafka/kafka-producer/src/main/java/ProducerWithSyncCallback.java new file mode 100644 index 00000000..44b7b3ad --- /dev/null +++ b/kafka/kafka-producer/src/main/java/ProducerWithSyncCallback.java @@ -0,0 +1,30 @@ +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class ProducerWithSyncCallback { + private final static Logger logger = LoggerFactory.getLogger(ProducerWithSyncCallback.class); + + public static void main(String[] args) { + + KafkaProducer producer = KafkaConfig.setup(); + + ProducerRecord record = + new ProducerRecord<>(KafkaConfig.TOPIC_NAME, "sync-callback", "sync-callback"); + try { + RecordMetadata metadata = producer.send(record).get(); + logger.info(metadata.toString()); + } catch (Exception e) { + logger.error(e.getMessage(),e); + } finally { + producer.flush(); + producer.close(); + } + } +}