diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java index 9353e63ff6..933d2353aa 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java @@ -29,39 +29,36 @@ public class KafkaConsumerConfig { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } + + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(String groupId) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory(groupId)); + return factory; + } @Bean public ConcurrentKafkaListenerContainerFactory fooKafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory("foo")); - return factory; + return kafkaListenerContainerFactory("foo"); } @Bean public ConcurrentKafkaListenerContainerFactory barKafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory("bar")); - return factory; + return kafkaListenerContainerFactory("bar"); } @Bean public ConcurrentKafkaListenerContainerFactory headersKafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory("headers")); - return factory; + return kafkaListenerContainerFactory("headers"); } @Bean public ConcurrentKafkaListenerContainerFactory partitionsKafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory("partitions")); - return factory; + return kafkaListenerContainerFactory("partitions"); } @Bean public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory("filter")); + ConcurrentKafkaListenerContainerFactory factory = kafkaListenerContainerFactory("filter"); factory.setRecordFilterStrategy(record -> record.value() .contains("World")); return factory;