57 lines
2.6 KiB
Java
57 lines
2.6 KiB
Java
package com.eventsourcing.configuration;
|
|
|
|
import lombok.RequiredArgsConstructor;
|
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.context.annotation.Bean;
|
|
import org.springframework.context.annotation.Configuration;
|
|
import org.springframework.kafka.annotation.EnableKafka;
|
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
|
import org.springframework.kafka.core.ConsumerFactory;
|
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
|
import org.springframework.kafka.listener.ContainerProperties;
|
|
import org.springframework.kafka.listener.DefaultErrorHandler;
|
|
import org.springframework.util.backoff.FixedBackOff;
|
|
|
|
import java.util.HashMap;
|
|
import java.util.Map;
|
|
|
|
@Configuration
|
|
@RequiredArgsConstructor
|
|
@EnableKafka
|
|
public class KafkaConsumerConfig {
|
|
|
|
@Value(value = "${kafka.bootstrapServers:localhost:9093}")
|
|
private String bootstrapServers;
|
|
|
|
private final KafkaConfigProperties kafkaConfigProperties;
|
|
|
|
@Bean
|
|
public ConcurrentKafkaListenerContainerFactory<String, byte[]>
|
|
kafkaListenerContainerFactory(ConsumerFactory<String, byte[]> consumerFactory) {
|
|
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
|
factory.setConsumerFactory(consumerFactory);
|
|
factory.setConcurrency(Runtime.getRuntime().availableProcessors());
|
|
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
|
|
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
|
|
return factory;
|
|
}
|
|
|
|
@Bean
|
|
public ConsumerFactory<String, byte[]> consumerFactory() {
|
|
return new DefaultKafkaConsumerFactory<>(consumerProps());
|
|
}
|
|
|
|
private Map<String, Object> consumerProps() {
|
|
final Map<String, Object> consumerProps = new HashMap<>();
|
|
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
|
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfigProperties.getOrderMongoProjectionGroupId());
|
|
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
|
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
|
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfigProperties.getEnableAutoCommit());
|
|
return consumerProps;
|
|
}
|
|
}
|