124 lines
5.6 KiB
Java
124 lines
5.6 KiB
Java
package com.baeldung.spring.kafka;
|
|
|
|
import java.util.HashMap;
|
|
import java.util.Map;
|
|
|
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.context.annotation.Bean;
|
|
import org.springframework.context.annotation.Configuration;
|
|
import org.springframework.kafka.annotation.EnableKafka;
|
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
|
import org.springframework.kafka.core.ConsumerFactory;
|
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
|
import org.springframework.kafka.support.converter.RecordMessageConverter;
|
|
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
|
|
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
|
|
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
|
|
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
|
|
|
@EnableKafka
|
|
@Configuration
|
|
public class KafkaConsumerConfig {
|
|
|
|
@Value(value = "${kafka.bootstrapAddress}")
|
|
private String bootstrapAddress;
|
|
|
|
public ConsumerFactory<String, String> consumerFactory(String groupId) {
|
|
Map<String, Object> props = new HashMap<>();
|
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
|
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
|
|
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
|
|
return new DefaultKafkaConsumerFactory<>(props);
|
|
}
|
|
|
|
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
|
|
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
|
factory.setConsumerFactory(consumerFactory(groupId));
|
|
return factory;
|
|
}
|
|
|
|
@Bean
|
|
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
|
|
return kafkaListenerContainerFactory("foo");
|
|
}
|
|
|
|
@Bean
|
|
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
|
|
return kafkaListenerContainerFactory("bar");
|
|
}
|
|
|
|
@Bean
|
|
public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
|
|
return kafkaListenerContainerFactory("headers");
|
|
}
|
|
|
|
@Bean
|
|
public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
|
|
return kafkaListenerContainerFactory("partitions");
|
|
}
|
|
|
|
@Bean
|
|
public ConcurrentKafkaListenerContainerFactory<String, String> longMessageKafkaListenerContainerFactory() {
|
|
return kafkaListenerContainerFactory("longMessage");
|
|
}
|
|
|
|
@Bean
|
|
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
|
|
ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory("filter");
|
|
factory.setRecordFilterStrategy(record -> record.value()
|
|
.contains("World"));
|
|
return factory;
|
|
}
|
|
|
|
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
|
|
Map<String, Object> props = new HashMap<>();
|
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting");
|
|
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class));
|
|
}
|
|
|
|
@Bean
|
|
public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
|
|
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
|
factory.setConsumerFactory(greetingConsumerFactory());
|
|
return factory;
|
|
}
|
|
|
|
@Bean
|
|
public RecordMessageConverter multiTypeConverter() {
|
|
StringJsonMessageConverter converter = new StringJsonMessageConverter();
|
|
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
|
|
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
|
|
typeMapper.addTrustedPackages("com.baeldung.spring.kafka");
|
|
Map<String, Class<?>> mappings = new HashMap<>();
|
|
mappings.put("greeting", Greeting.class);
|
|
mappings.put("farewell", Farewell.class);
|
|
typeMapper.setIdClassMapping(mappings);
|
|
converter.setTypeMapper(typeMapper);
|
|
return converter;
|
|
}
|
|
|
|
@Bean
|
|
public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
|
|
HashMap<String, Object> props = new HashMap<>();
|
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
|
|
return new DefaultKafkaConsumerFactory<>(props);
|
|
}
|
|
|
|
@Bean
|
|
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
|
|
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
|
factory.setConsumerFactory(multiTypeConsumerFactory());
|
|
factory.setMessageConverter(multiTypeConverter());
|
|
return factory;
|
|
}
|
|
|
|
}
|