Configure the health indicator ConsumerFactory to use binder properties

Fixes #79

Replaced try...finally in KafkaBinderHealthIndicator with try-with-resources

Changed KafkaBinderHealthIndicatorTest for consistency
This commit is contained in:
Barry Commins
2017-01-11 23:28:27 +00:00
committed by Marius Bogoevici
parent 6a31e9c94f
commit 91bdee65ec
3 changed files with 108 additions and 21 deletions

View File

@@ -16,20 +16,16 @@
package org.springframework.cloud.stream.binder.kafka;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.kafka.core.ConsumerFactory;
/**
* Health indicator for Kafka.
@@ -41,24 +37,18 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
private final KafkaMessageChannelBinder binder;
private final KafkaBinderConfigurationProperties configurationProperties;
private final ConsumerFactory<?, ?> consumerFactory;
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
KafkaBinderConfigurationProperties configurationProperties) {
ConsumerFactory<?, ?> consumerFactory) {
this.binder = binder;
this.configurationProperties = configurationProperties;
this.consumerFactory = consumerFactory;
}
@Override
public Health health() {
Map<String, String> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties
.getKafkaConnectionString());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
KafkaConsumer metadataConsumer = new KafkaConsumer(properties);
try {
try (Consumer<?, ?> metadataConsumer = consumerFactory.createConsumer()) {
Set<String> downMessages = new HashSet<>();
for (String topic : this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
@@ -78,8 +68,5 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
catch (Exception e) {
return Health.down(e).build();
}
finally {
metadataConsumer.close();
}
}
}

View File

@@ -16,14 +16,17 @@
package org.springframework.cloud.stream.binder.kafka.config;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.springframework.beans.factory.annotation.Autowired;
@@ -54,6 +57,8 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.integration.codec.Codec;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.util.ObjectUtils;
@@ -114,7 +119,15 @@ public class KafkaBinderConfiguration {
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, this.configurationProperties);
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory);
}
@Bean(name = "adminUtilsOperation")

View File

@@ -0,0 +1,87 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
/**
* @author Barry Commins
*/
public class KafkaBinderHealthIndicatorTest {
private static final String TEST_TOPIC = "test";
private KafkaBinderHealthIndicator indicator;
@Mock
private DefaultKafkaConsumerFactory consumerFactory;
@Mock
private KafkaConsumer consumer;
@Mock
private KafkaMessageChannelBinder binder;
private Map<String, Collection<PartitionInfo>> topicsInUse = new HashMap<>();
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
given(consumerFactory.createConsumer()).willReturn(consumer);
given(binder.getTopicsInUse()).willReturn(topicsInUse);
indicator = new KafkaBinderHealthIndicator(binder, consumerFactory);
}
@Test
public void kafkaBinderIsUp() {
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, partitions);
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
}
@Test
public void kafkaBinderIsDown() {
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
topicsInUse.put(TEST_TOPIC, partitions);
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
}
private List<PartitionInfo> partitions(Node leader) {
List<PartitionInfo> partitions = new ArrayList<>();
partitions.add(new PartitionInfo(TEST_TOPIC, 0, leader, null, null));
return partitions;
}
}