#15 kafka : producer - callback producer
This commit is contained in:
@@ -13,6 +13,7 @@ public class KafkaConfig {
|
||||
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfig.BOOTSTRAP_SERVERS);
|
||||
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
// configs.put(ProducerConfig.ACKS_CONFIG, 0);
|
||||
return new KafkaProducer<>(configs);
|
||||
}
|
||||
|
||||
|
||||
16
kafka/kafka-producer/src/main/java/ProducerCallback.java
Normal file
16
kafka/kafka-producer/src/main/java/ProducerCallback.java
Normal file
@@ -0,0 +1,16 @@
|
||||
import org.apache.kafka.clients.producer.Callback;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class ProducerCallback implements Callback {
|
||||
private final static Logger logger = LoggerFactory.getLogger(ProducerCallback.class);
|
||||
|
||||
@Override
|
||||
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
|
||||
if (e != null)
|
||||
logger.error(e.getMessage(), e);
|
||||
else
|
||||
logger.info(recordMetadata.toString());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import java.util.Properties;
|
||||
|
||||
public class ProducerWithAsyncCallback {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
KafkaProducer<String, String> producer = KafkaConfig.setup();
|
||||
|
||||
ProducerRecord<String, String> record =
|
||||
new ProducerRecord<>(KafkaConfig.TOPIC_NAME, "async-callback", "async-callback");
|
||||
|
||||
producer.send(record, new ProducerCallback());
|
||||
|
||||
producer.flush();
|
||||
producer.close();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
public class ProducerWithSyncCallback {
|
||||
private final static Logger logger = LoggerFactory.getLogger(ProducerWithSyncCallback.class);
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
KafkaProducer<String, String> producer = KafkaConfig.setup();
|
||||
|
||||
ProducerRecord<String, String> record =
|
||||
new ProducerRecord<>(KafkaConfig.TOPIC_NAME, "sync-callback", "sync-callback");
|
||||
try {
|
||||
RecordMetadata metadata = producer.send(record).get();
|
||||
logger.info(metadata.toString());
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(),e);
|
||||
} finally {
|
||||
producer.flush();
|
||||
producer.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user