Address checkstyle warnings

Addressing checkstyle warnings in sprnig-cloud-stream-binder-kafka.

Resolves #487
This commit is contained in:
Soby Chacko
2018-11-05 20:12:23 -05:00
parent 3a396c6d37
commit 0b50a6ce2f
10 changed files with 100 additions and 62 deletions

View File

@@ -35,19 +35,19 @@ import org.springframework.core.env.MapPropertySource;
*/
public class KafkaBinderEnvironmentPostProcessor implements EnvironmentPostProcessor {
public final static String SPRING_KAFKA = "spring.kafka";
private static final String SPRING_KAFKA = "spring.kafka";
public final static String SPRING_KAFKA_PRODUCER = SPRING_KAFKA + ".producer";
private static final String SPRING_KAFKA_PRODUCER = SPRING_KAFKA + ".producer";
public final static String SPRING_KAFKA_CONSUMER = SPRING_KAFKA + ".consumer";
private static final String SPRING_KAFKA_CONSUMER = SPRING_KAFKA + ".consumer";
public final static String SPRING_KAFKA_PRODUCER_KEY_SERIALIZER = SPRING_KAFKA_PRODUCER + "." + "keySerializer";
private static final String SPRING_KAFKA_PRODUCER_KEY_SERIALIZER = SPRING_KAFKA_PRODUCER + "." + "keySerializer";
public final static String SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER = SPRING_KAFKA_PRODUCER + "." + "valueSerializer";
private static final String SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER = SPRING_KAFKA_PRODUCER + "." + "valueSerializer";
public final static String SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER = SPRING_KAFKA_CONSUMER + "." + "keyDeserializer";
private static final String SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER = SPRING_KAFKA_CONSUMER + "." + "keyDeserializer";
public final static String SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER = SPRING_KAFKA_CONSUMER + "." + "valueDeserializer";
private static final String SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER = SPRING_KAFKA_CONSUMER + "." + "valueDeserializer";
private static final String KAFKA_BINDER_DEFAULT_PROPERTIES = "kafkaBinderDefaultProperties";

View File

@@ -75,21 +75,21 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Health> future = exec.submit(() -> {
try {
if (metadataConsumer == null) {
synchronized(KafkaBinderHealthIndicator.this) {
if (metadataConsumer == null) {
metadataConsumer = consumerFactory.createConsumer();
if (this.metadataConsumer == null) {
synchronized (KafkaBinderHealthIndicator.this) {
if (this.metadataConsumer == null) {
this.metadataConsumer = this.consumerFactory.createConsumer();
}
}
}
synchronized (metadataConsumer) {
synchronized (this.metadataConsumer) {
Set<String> downMessages = new HashSet<>();
final Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse =
KafkaBinderHealthIndicator.this.binder.getTopicsInUse();
for (String topic : topicsInUse.keySet()) {
KafkaMessageChannelBinder.TopicInformation topicInformation = topicsInUse.get(topic);
if (!topicInformation.isTopicPattern()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
List<PartitionInfo> partitionInfos = this.metadataConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (topicInformation.getPartitionInfos()
.contains(partitionInfo) && partitionInfo.leader().id() == -1) {
@@ -108,23 +108,23 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
}
}
}
catch (Exception e) {
return Health.down(e).build();
catch (Exception ex) {
return Health.down(ex).build();
}
});
try {
return future.get(this.timeout, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return Health.down()
.withDetail("Interrupted while waiting for partition information in", this.timeout + " seconds")
.build();
}
catch (ExecutionException e) {
return Health.down(e).build();
catch (ExecutionException ex) {
return Health.down(ex).build();
}
catch (TimeoutException e) {
catch (TimeoutException ex) {
return Health.down()
.withDetail("Failed to retrieve partition information in", this.timeout + " seconds")
.build();

View File

@@ -28,10 +28,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
@@ -64,7 +63,7 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
private static final int DEFAULT_TIMEOUT = 60;
private final static Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
private static final Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
static final String METRIC_NAME = "spring.cloud.stream.binder.kafka.offset";
@@ -79,7 +78,7 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
private Map<String, Consumer<?, ?>> metadataConsumers;
private int timeout = DEFAULT_TIMEOUT;
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
KafkaBinderConfigurationProperties binderConfigurationProperties,
ConsumerFactory<?, ?> defaultConsumerFactory, @Nullable MeterRegistry meterRegistry) {
@@ -114,7 +113,7 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
String group = topicInfo.getValue().getConsumerGroup();
Gauge.builder(METRIC_NAME, this,
o -> computeUnconsumedMessages(topic, group))
(o) -> computeUnconsumedMessages(topic, group))
.tag("group", group)
.tag("topic", topic)
.description("Unconsumed messages for a particular group and topic")
@@ -128,9 +127,9 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
long lag = 0;
try {
Consumer<?, ?> metadataConsumer = metadataConsumers.computeIfAbsent(
group,
g -> createConsumerFactory().createConsumer(g, "monitoring"));
Consumer<?, ?> metadataConsumer = this.metadataConsumers.computeIfAbsent(
group,
(g) -> createConsumerFactory().createConsumer(g, "monitoring"));
synchronized (metadataConsumer) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new LinkedList<>();
@@ -149,19 +148,19 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
}
}
}
catch (Exception e) {
LOG.debug("Cannot generate metric for topic: " + topic, e);
catch (Exception ex) {
LOG.debug("Cannot generate metric for topic: " + topic, ex);
}
return lag;
});
try {
return future.get(this.timeout, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return 0L;
}
catch (ExecutionException | TimeoutException e) {
catch (ExecutionException | TimeoutException ex) {
return 0L;
}
finally {
@@ -171,7 +170,7 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
private ConsumerFactory<?, ?> createConsumerFactory() {
if (this.defaultConsumerFactory == null) {
synchronized (this) {
synchronized (this) {
if (this.defaultConsumerFactory == null) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

View File

@@ -134,20 +134,44 @@ public class KafkaMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
/**
* Kafka header for x-exception-fqcn.
*/
public static final String X_EXCEPTION_FQCN = "x-exception-fqcn";
/**
* Kafka header for x-exception-stacktrace.
*/
public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
/**
* Kafka header for x-exception-message.
*/
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
/**
* Kafka header for x-original-topic.
*/
public static final String X_ORIGINAL_TOPIC = "x-original-topic";
/**
* Kafka header for x-original-partition.
*/
public static final String X_ORIGINAL_PARTITION = "x-original-partition";
/**
* Kafka header for x-original-offset.
*/
public static final String X_ORIGINAL_OFFSET = "x-original-offset";
/**
* Kafka header for x-original-timestamp.
*/
public static final String X_ORIGINAL_TIMESTAMP = "x-original-timestamp";
/**
* Kafka header for x-original-timestamp-type.
*/
public static final String X_ORIGINAL_TIMESTAMP_TYPE = "x-original-timestamp-type";
private static final ThreadLocal<String> bindingNameHolder = new ThreadLocal<>();
@@ -275,7 +299,7 @@ public class KafkaMessageChannelBinder extends
+ partitions.size() + " for the topic. The larger number will be used instead.");
}
List<ChannelInterceptor> interceptors = ((ChannelInterceptorAware) channel).getChannelInterceptors();
interceptors.forEach(interceptor -> {
interceptors.forEach((interceptor) -> {
if (interceptor instanceof PartitioningInterceptor) {
((PartitioningInterceptor) interceptor).setPartitionCount(partitions.size());
}
@@ -675,8 +699,8 @@ public class KafkaMessageChannelBinder extends
extendedConsumerProperties.getExtension().getConverterBeanName(),
MessagingMessageConverter.class);
}
catch (NoSuchBeanDefinitionException e) {
throw new IllegalStateException("Converter bean not present in application context", e);
catch (NoSuchBeanDefinitionException ex) {
throw new IllegalStateException("Converter bean not present in application context", ex);
}
}
messageConverter.setHeaderMapper(getHeaderMapper(extendedConsumerProperties));
@@ -737,16 +761,16 @@ public class KafkaMessageChannelBinder extends
KafkaConsumerProperties kafkaConsumerProperties = properties.getExtension();
if (kafkaConsumerProperties.isEnableDlq()) {
KafkaProducerProperties dlqProducerProperties = kafkaConsumerProperties.getDlqProducerProperties();
ProducerFactory<?,?> producerFactory = this.transactionManager != null
ProducerFactory<?, ?> producerFactory = this.transactionManager != null
? this.transactionManager.getProducerFactory()
: getProducerFactory(null,
new ExtendedProducerProperties<>(dlqProducerProperties));
final KafkaTemplate<?,?> kafkaTemplate = new KafkaTemplate<>(producerFactory);
final KafkaTemplate<?, ?> kafkaTemplate = new KafkaTemplate<>(producerFactory);
@SuppressWarnings("rawtypes")
DlqSender<?,?> dlqSender = new DlqSender(kafkaTemplate);
DlqSender<?, ?> dlqSender = new DlqSender(kafkaTemplate);
return message -> {
return (message) -> {
final ConsumerRecord<Object, Object> record = message.getHeaders()
.get(KafkaHeaders.RAW_DATA, ConsumerRecord.class);
@@ -818,8 +842,8 @@ public class KafkaMessageChannelBinder extends
recordToSend.set(new ConsumerRecord<Object, Object>(record.topic(), record.partition(),
record.offset(), record.key(), payload));
}
catch (Exception e) {
throw new RuntimeException(e);
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
@@ -838,7 +862,7 @@ public class KafkaMessageChannelBinder extends
return getErrorMessageHandler(destination, group, properties);
}
final MessageHandler superHandler = super.getErrorMessageHandler(destination, group, properties);
return message -> {
return (message) -> {
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) message.getHeaders().get(KafkaHeaders.RAW_DATA);
if (!(message instanceof ErrorMessage)) {
logger.error("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: "
@@ -885,7 +909,7 @@ public class KafkaMessageChannelBinder extends
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, anonymous ? "latest" : "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
Map<String, Object> mergedConfig = configurationProperties.mergedConsumerConfiguration();
Map<String, Object> mergedConfig = this.configurationProperties.mergedConsumerConfiguration();
if (!ObjectUtils.isEmpty(mergedConfig)) {
props.putAll(mergedConfig);
}
@@ -965,9 +989,9 @@ public class KafkaMessageChannelBinder extends
try {
super.onInit();
}
catch (Exception e) {
this.logger.error("Initialization errors: ", e);
throw new RuntimeException(e);
catch (Exception ex) {
this.logger.error("Initialization errors: ", ex);
throw new RuntimeException(ex);
}
}
@@ -986,6 +1010,9 @@ public class KafkaMessageChannelBinder extends
}
/**
* Inner class to capture topic details.
*/
static class TopicInformation {
private final String consumerGroup;
@@ -1001,26 +1028,32 @@ public class KafkaMessageChannelBinder extends
}
String getConsumerGroup() {
return consumerGroup;
return this.consumerGroup;
}
boolean isConsumerTopic() {
return consumerGroup != null;
return this.consumerGroup != null;
}
boolean isTopicPattern() {
return isTopicPattern;
return this.isTopicPattern;
}
Collection<PartitionInfo> getPartitionInfos() {
return partitionInfos;
return this.partitionInfos;
}
}
private final class DlqSender<K,V> {
/**
* Helper class to send to DLQ.
*
* @param <K> generic type for key
* @param <V> generic type for value
*/
private final class DlqSender<K, V> {
private final KafkaTemplate<K,V> kafkaTemplate;
private final KafkaTemplate<K, V> kafkaTemplate;
DlqSender(KafkaTemplate<K, V> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
@@ -1028,9 +1061,9 @@ public class KafkaMessageChannelBinder extends
@SuppressWarnings("unchecked")
void sendToDlq(ConsumerRecord<?, ?> consumerRecord, Headers headers, String dlqName) {
K key = (K)consumerRecord.key();
V value = (V)consumerRecord.value();
ProducerRecord<K,V> producerRecord = new ProducerRecord<>(dlqName, consumerRecord.partition(),
K key = (K) consumerRecord.key();
V value = (V) consumerRecord.value();
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(dlqName, consumerRecord.partition(),
key, value, headers);
StringBuilder sb = new StringBuilder().append(" a message with key='")

View File

@@ -25,9 +25,9 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Configuration for extended binding metadata.
*
* @author Oleg Zhurakousky
*
*/
@Configuration

View File

@@ -48,6 +48,8 @@ import org.springframework.kafka.support.ProducerListener;
import org.springframework.lang.Nullable;
/**
* Kafka binder configuration class.
*
* @author David Turanski
* @author Marius Bogoevici
* @author Soby Chacko
@@ -95,7 +97,7 @@ public class KafkaBinderConfiguration {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider, listenerContainerCustomizer,
rebalanceListener.getIfUnique());
kafkaMessageChannelBinder.setProducerListener(producerListener);
kafkaMessageChannelBinder.setProducerListener(this.producerListener);
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
return kafkaMessageChannelBinder;
}
@@ -146,6 +148,9 @@ public class KafkaBinderConfiguration {
}
/**
* Properties configuration for Jaas.
*/
@SuppressWarnings("unused")
public static class JaasConfigurationProperties {

View File

@@ -34,19 +34,19 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.util.ObjectUtils;
/**
* Configuration class for Kafka binder health indicator beans.
*
* @author Oleg Zhurakousky
*
*/
@Configuration
@ConditionalOnClass(name="org.springframework.boot.actuate.health.HealthIndicator")
@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
@ConditionalOnEnabledHealthIndicator("binders")
class KafkaBinderHealthIndicatorConfiguration {
@Bean
KafkaBinderHealthIndicator kafkaBinderHealthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
KafkaBinderConfigurationProperties configurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

View File

@@ -26,7 +26,6 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaAdminProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.config.BinderFactoryConfiguration;
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.test.context.TestPropertySource;
@@ -41,7 +40,6 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {KafkaBinderConfiguration.class,
BinderFactoryConfiguration.class,
BindingServiceConfiguration.class })
@TestPropertySource(properties = {
"spring.cloud.stream.kafka.bindings.input.consumer.admin.replication-factor=2",

View File

@@ -61,6 +61,7 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
private KafkaBinderHealthIndicator kafkaBinderHealthIndicator;
@Test
@SuppressWarnings("unchecked")
public void testKafkaBinderConfigurationWithKafkaProperties() throws Exception {
assertNotNull(this.kafkaMessageChannelBinder);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
@@ -107,6 +108,7 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
}
@Test
@SuppressWarnings("unchecked")
public void testKafkaHealthIndicatorProperties() {
assertNotNull(this.kafkaBinderHealthIndicator);
Field consumerFactoryField = ReflectionUtils.findField(KafkaBinderHealthIndicator.class, "consumerFactory",

View File

@@ -58,6 +58,7 @@ public class KafkaBinderConfigurationPropertiesTest {
private KafkaMessageChannelBinder kafkaMessageChannelBinder;
@Test
@SuppressWarnings("unchecked")
public void testKafkaBinderConfigurationProperties() throws Exception {
assertNotNull(this.kafkaMessageChannelBinder);
KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties();