Added example for ProducerListener & fixes

This commit is contained in:
Nandan Bn
2020-07-14 20:31:15 +05:30
parent 568fd5d7aa
commit 155521ea11
8 changed files with 26 additions and 11 deletions

View File

@@ -12,7 +12,7 @@ import org.springframework.stereotype.Component;
@Component
class InitSend {
Logger LOG = LoggerFactory.getLogger(InitSend.class);
private final Logger LOG = LoggerFactory.getLogger(getClass());
@Autowired
private KafkaSenderExample kafkaSenderExample;

View File

@@ -10,7 +10,7 @@ import org.springframework.stereotype.Component;
@KafkaListener(id = "class-level", topics = "reflectoring-1")
class KafkaClassListener {
Logger LOG = LoggerFactory.getLogger(KafkaClassListener.class);
private final Logger LOG = LoggerFactory.getLogger(getClass());
@KafkaHandler
void listen(String message) {

View File

@@ -9,7 +9,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;

View File

@@ -14,7 +14,7 @@ import org.springframework.stereotype.Component;
@Component
class KafkaListenersExample {
Logger LOG = LoggerFactory.getLogger(KafkaListenersExample.class);
private final Logger LOG = LoggerFactory.getLogger(KafkaListenersExample.class);
@KafkaListener(topics = "reflectoring-1")
void listener(String message) {

View File

@@ -6,8 +6,11 @@ import java.util.Map;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -16,6 +19,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.RoutingKafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.serializer.JsonSerializer;
@@ -47,11 +51,19 @@ class KafkaProducerConfig {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setMessageConverter(new StringJsonMessageConverter());
kafkaTemplate.setDefaultTopic("reflectoring-user");
kafkaTemplate.setProducerListener(new ProducerListener<String, String>() {
@Override
public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
LoggerFactory.getLogger(getClass()).info("Acknowledgement from ProducerListener [message: {}, offset: {}]", producerRecord.value(), recordMetadata.offset());
}
});
return kafkaTemplate;
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) {
// ProducerFactory with Bytes serializer
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@@ -59,8 +71,14 @@ class KafkaProducerConfig {
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(props);
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
// ProducerFactory with String serializer
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<Object, Object> stringPF = new DefaultKafkaProducerFactory<>(props);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile(".*-bytes"), bytesPF);
map.put(Pattern.compile("reflectoring-.*"), stringPF);
return new RoutingKafkaTemplate(map);
}

View File

@@ -13,12 +13,11 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
@Component
class KafkaSenderExample {
private static Logger LOG = LoggerFactory.getLogger(KafkaSenderExample.class);
private final Logger LOG = LoggerFactory.getLogger(KafkaSenderExample.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private RoutingKafkaTemplate routingKafkaTemplate;

View File

@@ -10,10 +10,10 @@ import org.springframework.stereotype.Component;
@Component
public class KafkaSenderWithMessageConverter {
private static Logger LOG = LoggerFactory.getLogger(KafkaSenderWithMessageConverter.class);
private final Logger LOG = LoggerFactory.getLogger(KafkaSenderWithMessageConverter.class);
@Autowired
private KafkaTemplate<String,?> kafkaTemplate;
private KafkaTemplate<String,Message<?>> kafkaTemplate;
void sendMessageWithConverter(Message<?> user) {
LOG.info("Sending With Message Converter : {}", user);

View File

@@ -4,8 +4,7 @@ class User {
private String name;
public User() {
public User() {
}
public User(String name) {