diff --git a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/InitSend.java b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/InitSend.java index a561421..3a8a657 100644 --- a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/InitSend.java +++ b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/InitSend.java @@ -12,7 +12,7 @@ import org.springframework.stereotype.Component; @Component class InitSend { - Logger LOG = LoggerFactory.getLogger(InitSend.class); + private final Logger LOG = LoggerFactory.getLogger(getClass()); @Autowired private KafkaSenderExample kafkaSenderExample; diff --git a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaClassListener.java b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaClassListener.java index 73001b2..609197a 100644 --- a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaClassListener.java +++ b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaClassListener.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @KafkaListener(id = "class-level", topics = "reflectoring-1") class KafkaClassListener { - Logger LOG = LoggerFactory.getLogger(KafkaClassListener.class); + private final Logger LOG = LoggerFactory.getLogger(getClass()); @KafkaHandler void listen(String message) { diff --git a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaConsumerConfig.java b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaConsumerConfig.java index b4c8cee..c880677 100644 --- a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaConsumerConfig.java +++ b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaConsumerConfig.java @@ -9,7 +9,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; diff --git a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaListenersExample.java b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaListenersExample.java index 755373b..aa8e782 100644 --- a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaListenersExample.java +++ b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaListenersExample.java @@ -14,7 +14,7 @@ import org.springframework.stereotype.Component; @Component class KafkaListenersExample { - Logger LOG = LoggerFactory.getLogger(KafkaListenersExample.class); + private final Logger LOG = LoggerFactory.getLogger(KafkaListenersExample.class); @KafkaListener(topics = "reflectoring-1") void listener(String message) { diff --git a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaProducerConfig.java b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaProducerConfig.java index 0947103..dffef19 100644 --- a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaProducerConfig.java +++ b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaProducerConfig.java @@ -6,8 +6,11 @@ import java.util.Map; import java.util.regex.Pattern; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -16,6 +19,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.core.RoutingKafkaTemplate; +import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.converter.StringJsonMessageConverter; import org.springframework.kafka.support.serializer.JsonSerializer; @@ -47,11 +51,19 @@ class KafkaProducerConfig { KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory()); kafkaTemplate.setMessageConverter(new StringJsonMessageConverter()); kafkaTemplate.setDefaultTopic("reflectoring-user"); + kafkaTemplate.setProducerListener(new ProducerListener() { + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + LoggerFactory.getLogger(getClass()).info("Acknowledgement from ProducerListener [message: {}, offset: {}]", producerRecord.value(), recordMetadata.offset()); + } + }); return kafkaTemplate; } - + @Bean public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) { + + // ProducerFactory with Bytes serializer Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -59,8 +71,14 @@ class KafkaProducerConfig { DefaultKafkaProducerFactory bytesPF = new DefaultKafkaProducerFactory<>(props); context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF); + // ProducerFactory with String serializer + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + DefaultKafkaProducerFactory stringPF = new DefaultKafkaProducerFactory<>(props); + + Map> map = new LinkedHashMap<>(); map.put(Pattern.compile(".*-bytes"), bytesPF); + map.put(Pattern.compile("reflectoring-.*"), stringPF); return new RoutingKafkaTemplate(map); } diff --git a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaSenderExample.java b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaSenderExample.java index 3a94013..91ab371 100644 --- a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaSenderExample.java +++ b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaSenderExample.java @@ -13,12 +13,11 @@ import org.springframework.util.concurrent.ListenableFutureCallback; @Component class KafkaSenderExample { - private static Logger LOG = LoggerFactory.getLogger(KafkaSenderExample.class); + private final Logger LOG = LoggerFactory.getLogger(KafkaSenderExample.class); @Autowired private KafkaTemplate kafkaTemplate; - @Autowired private RoutingKafkaTemplate routingKafkaTemplate; diff --git a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaSenderWithMessageConverter.java b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaSenderWithMessageConverter.java index b36d8fc..abfef4e 100644 --- a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaSenderWithMessageConverter.java +++ b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/KafkaSenderWithMessageConverter.java @@ -10,10 +10,10 @@ import org.springframework.stereotype.Component; @Component public class KafkaSenderWithMessageConverter { - private static Logger LOG = LoggerFactory.getLogger(KafkaSenderWithMessageConverter.class); + private final Logger LOG = LoggerFactory.getLogger(KafkaSenderWithMessageConverter.class); @Autowired - private KafkaTemplate kafkaTemplate; + private KafkaTemplate> kafkaTemplate; void sendMessageWithConverter(Message user) { LOG.info("Sending With Message Converter : {}", user); diff --git a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/User.java b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/User.java index ded96ee..793518a 100644 --- a/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/User.java +++ b/spring-boot/spring-boot-kafka/src/main/java/io/reflectoring/kafka/User.java @@ -4,8 +4,7 @@ class User { private String name; - public User() { - + public User() { } public User(String name) {