Closing adminClient prematurely

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/803
This commit is contained in:
Oleksii Mukas
2019-11-20 15:30:35 +01:00
committed by Soby Chacko
parent 78ff4f1a70
commit 6ac9c0ed23
2 changed files with 46 additions and 57 deletions

View File

@@ -16,13 +16,6 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
@@ -31,7 +24,6 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
@@ -40,6 +32,12 @@ import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProv
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Health indicator for Kafka Streams.
*
@@ -59,8 +57,6 @@ public class KafkaStreamsBinderHealthIndicator extends AbstractHealthIndicator {
private static final ThreadLocal<Status> healthStatusThreadLocal = new ThreadLocal<>();
private AdminClient adminClient;
KafkaStreamsBinderHealthIndicator(KafkaStreamsRegistry kafkaStreamsRegistry,
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
KafkaProperties kafkaProperties,
@@ -77,57 +73,38 @@ public class KafkaStreamsBinderHealthIndicator extends AbstractHealthIndicator {
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
try {
initAdminClient();
synchronized (this.adminClient) {
final Status status = healthStatusThreadLocal.get();
//If one of the kafka streams binders (kstream, ktable, globalktable) was down before on the same request,
//retrieve that from the thead local storage where it was saved before. This is done in order to avoid
//the duration of the total health check since in the case of Kafka Streams each binder tries to do
//its own health check and since we already know that this is DOWN, simply pass that information along.
if (status == Status.DOWN) {
builder.withDetail("No topic information available", "Kafka broker is not reachable");
builder.status(Status.DOWN);
}
else {
final ListTopicsResult listTopicsResult = this.adminClient.listTopics();
listTopicsResult.listings().get(this.configurationProperties.getHealthTimeout(), TimeUnit.SECONDS);
try (AdminClient adminClient = AdminClient.create(this.adminClientProperties)) {
final Status status = healthStatusThreadLocal.get();
//If one of the kafka streams binders (kstream, ktable, globalktable) was down before on the same request,
//retrieve that from the thead local storage where it was saved before. This is done in order to avoid
//the duration of the total health check since in the case of Kafka Streams each binder tries to do
//its own health check and since we already know that this is DOWN, simply pass that information along.
if (status == Status.DOWN) {
builder.withDetail("No topic information available", "Kafka broker is not reachable");
builder.status(Status.DOWN);
} else {
final ListTopicsResult listTopicsResult = adminClient.listTopics();
listTopicsResult.listings().get(this.configurationProperties.getHealthTimeout(), TimeUnit.SECONDS);
if (this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeans().isEmpty()) {
builder.withDetail("No Kafka Streams bindings have been established", "Kafka Streams binder did not detect any processors");
builder.status(Status.UNKNOWN);
}
else {
boolean up = true;
for (KafkaStreams kStream : kafkaStreamsRegistry.getKafkaStreams()) {
up &= kStream.state().isRunning();
builder.withDetails(buildDetails(kStream));
}
builder.status(up ? Status.UP : Status.DOWN);
if (this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeans().isEmpty()) {
builder.withDetail("No Kafka Streams bindings have been established", "Kafka Streams binder did not detect any processors");
builder.status(Status.UNKNOWN);
} else {
boolean up = true;
for (KafkaStreams kStream : kafkaStreamsRegistry.getKafkaStreams()) {
up &= kStream.state().isRunning();
builder.withDetails(buildDetails(kStream));
}
builder.status(up ? Status.UP : Status.DOWN);
}
}
}
catch (Exception e) {
} catch (Exception e) {
builder.withDetail("No topic information available", "Kafka broker is not reachable");
builder.status(Status.DOWN);
builder.withException(e);
//Store binder down status into a thread local storage.
healthStatusThreadLocal.set(Status.DOWN);
}
finally {
// Close admin client immediately.
if (adminClient != null) {
adminClient.close(Duration.ofSeconds(0));
}
}
}
private synchronized AdminClient initAdminClient() {
if (this.adminClient == null) {
this.adminClient = AdminClient.create(this.adminClientProperties);
}
return this.adminClient;
}
private Map<String, Object> buildDetails(KafkaStreams kafkaStreams) {

View File

@@ -16,11 +16,6 @@
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -31,7 +26,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.actuate.health.CompositeHealthContributor;
@@ -56,6 +50,11 @@ import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
/**
@@ -85,6 +84,19 @@ public class KafkaStreamsBinderHealthIndicatorTests {
}
}
@Test
public void healthIndicatorUpMultipleCallsTest() throws Exception {
try (ConfigurableApplicationContext context = singleStream("ApplicationHealthTest-xyz")) {
int callsToPerform = 5;
for (int i = 0; i < callsToPerform; i++) {
receive(context,
Lists.newArrayList(new ProducerRecord<>("in", "{\"id\":\"123\"}"),
new ProducerRecord<>("in", "{\"id\":\"123\"}")),
Status.UP, "out");
}
}
}
@Test
public void healthIndicatorDownTest() throws Exception {
try (ConfigurableApplicationContext context = singleStream("ApplicationHealthTest-xyzabc")) {