KafkaStreams binder health check improvements

Allow health checks on KafkaStreams processors that are currently stopped through
actuator bindings endpoint. Add this only as an opt-in feature through a new binder
level property - includeStoppedProcessorsForHealthCheck which is false by default
to preserve the current health indicator behavior.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
Resolves #1175
This commit is contained in:
Soby Chacko
2021-11-29 20:12:45 -05:00
parent 0be87c3666
commit be474f643a
7 changed files with 116 additions and 6 deletions

View File

@@ -2121,6 +2121,12 @@ Arbitrary consumer properties at the binder level.
producerProperties::
Arbitrary producer properties at the binder level.
includeStoppedProcessorsForHealthCheck::
When bindings for processors are stopped through actuator, then this processor will not participate in the health check by default.
Set this property to `true` to enable health check for all processors including the ones that are currently stopped through bindings actuator endpoint.
+
Default: false
==== Kafka Streams Producer Properties
The following properties are _only_ available for Kafka Streams producers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.`

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.springframework.cloud.stream.binder.AbstractBinder;
@@ -105,6 +106,12 @@ public class GlobalKTableBinder extends
if (!streamsBuilderFactoryBean.isRunning()) {
super.start();
GlobalKTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
//If we cached the previous KafkaStreams object (from a binding stop on the actuator), remove it.
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
final String applicationId = (String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG);
if (kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().containsKey(applicationId)) {
kafkaStreamsBindingInformationCatalogue.removePreviousKafkaStreamsForApplicationId(applicationId);
}
}
}
@@ -115,6 +122,10 @@ public class GlobalKTableBinder extends
super.stop();
GlobalKTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
//Caching the stopped KafkaStreams for health indicator purposes on the underlying processor.
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
GlobalKTableBinder.this.kafkaStreamsBindingInformationCatalogue.addPreviousKafkaStreamsForApplicationId(
(String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG), kafkaStreams);
}
}
};

View File

@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -134,6 +135,12 @@ class KStreamBinder extends
if (!streamsBuilderFactoryBean.isRunning()) {
super.start();
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
//If we cached the previous KafkaStreams object (from a binding stop on the actuator), remove it.
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
final String applicationId = (String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG);
if (kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().containsKey(applicationId)) {
kafkaStreamsBindingInformationCatalogue.removePreviousKafkaStreamsForApplicationId(applicationId);
}
}
}
@@ -144,6 +151,10 @@ class KStreamBinder extends
super.stop();
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
//Caching the stopped KafkaStreams for health indicator purposes on the underlying processor.
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
KStreamBinder.this.kafkaStreamsBindingInformationCatalogue.addPreviousKafkaStreamsForApplicationId(
(String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG), kafkaStreams);
}
}
};
@@ -199,6 +210,12 @@ class KStreamBinder extends
if (!streamsBuilderFactoryBean.isRunning()) {
super.start();
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
//If we cached the previous KafkaStreams object (from a binding stop on the actuator), remove it.
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
final String applicationId = (String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG);
if (kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().containsKey(applicationId)) {
kafkaStreamsBindingInformationCatalogue.removePreviousKafkaStreamsForApplicationId(applicationId);
}
}
}
@@ -209,6 +226,10 @@ class KStreamBinder extends
super.stop();
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
//Caching the stopped KafkaStreams for health indicator purposes on the underlying processor
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
KStreamBinder.this.kafkaStreamsBindingInformationCatalogue.addPreviousKafkaStreamsForApplicationId(
(String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG), kafkaStreams);
}
}
};

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.cloud.stream.binder.AbstractBinder;
@@ -106,6 +107,12 @@ class KTableBinder extends
if (!streamsBuilderFactoryBean.isRunning()) {
super.start();
KTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
//If we cached the previous KafkaStreams object (from a binding stop on the actuator), remove it.
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
final String applicationId = (String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG);
if (kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().containsKey(applicationId)) {
kafkaStreamsBindingInformationCatalogue.removePreviousKafkaStreamsForApplicationId(applicationId);
}
}
}
@@ -116,6 +123,10 @@ class KTableBinder extends
super.stop();
KTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
//Caching the stopped KafkaStreams for health indicator purposes on the underlying processor.
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
KTableBinder.this.kafkaStreamsBindingInformationCatalogue.addPreviousKafkaStreamsForApplicationId(
(String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG), kafkaStreams);
}
}
};

View File

@@ -19,6 +19,7 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -31,8 +32,8 @@ import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.ThreadMetadata;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
@@ -118,7 +119,12 @@ public class KafkaStreamsBinderHealthIndicator extends AbstractHealthIndicator i
}
else {
boolean up = true;
for (KafkaStreams kStream : kafkaStreamsRegistry.getKafkaStreams()) {
final Set<KafkaStreams> kafkaStreams = kafkaStreamsRegistry.getKafkaStreams();
Set<KafkaStreams> allKafkaStreams = new HashSet<>(kafkaStreams);
if (this.configurationProperties.isIncludeStoppedProcessorsForHealthCheck()) {
allKafkaStreams.addAll(kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().values());
}
for (KafkaStreams kStream : allKafkaStreams) {
if (isKafkaStreams25) {
up &= kStream.state().isRunningOrRebalancing();
}
@@ -156,7 +162,8 @@ public class KafkaStreamsBinderHealthIndicator extends AbstractHealthIndicator i
}
if (isRunningResult) {
for (ThreadMetadata metadata : kafkaStreams.localThreadsMetadata()) {
final Set<ThreadMetadata> threadMetadata = kafkaStreams.metadataForLocalThreads();
for (ThreadMetadata metadata : threadMetadata) {
perAppdIdDetails.put("threadName", metadata.threadName());
perAppdIdDetails.put("threadState", metadata.threadState());
perAppdIdDetails.put("adminClientId", metadata.adminClientId());
@@ -172,8 +179,19 @@ public class KafkaStreamsBinderHealthIndicator extends AbstractHealthIndicator i
}
else {
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsRegistry.streamBuilderFactoryBean(kafkaStreams);
final String applicationId = (String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG);
details.put(applicationId, String.format("The processor with application.id %s is down", applicationId));
String applicationId = null;
if (streamsBuilderFactoryBean != null) {
applicationId = (String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG);
}
else {
final Map<String, KafkaStreams> stoppedKafkaStreamsPerBinding = kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams();
for (String appId : stoppedKafkaStreamsPerBinding.keySet()) {
if (stoppedKafkaStreamsPerBinding.get(appId).equals(kafkaStreams)) {
applicationId = appId;
}
}
}
details.put(applicationId, String.format("The processor with application.id %s is down. Current state: %s", applicationId, kafkaStreams.state()));
}
return details;
}

View File

@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
@@ -62,6 +63,8 @@ public class KafkaStreamsBindingInformationCatalogue {
private final Map<Object, String> bindingNamesPerTarget = new HashMap<>();
private final Map<String, KafkaStreams> previousKafkaStreamsPerApplicationId = new HashMap<>();
private final Map<StreamsBuilderFactoryBean, List<ProducerFactory<byte[], byte[]>>> dlqProducerFactories = new HashMap<>();
/**
@@ -213,4 +216,35 @@ public class KafkaStreamsBindingInformationCatalogue {
}
producerFactories.add(producerFactory);
}
/**
* Caching the previous KafkaStreams for the applicaiton.id when binding is stopped through actuator.
* See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
*
* @param applicationId application.id
* @param kafkaStreams {@link KafkaStreams} object
*/
public void addPreviousKafkaStreamsForApplicationId(String applicationId, KafkaStreams kafkaStreams) {
this.previousKafkaStreamsPerApplicationId.put(applicationId, kafkaStreams);
}
/**
* Remove the previously cached KafkaStreams object.
* See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
*
* @param applicationId application.id
*/
public void removePreviousKafkaStreamsForApplicationId(String applicationId) {
this.previousKafkaStreamsPerApplicationId.remove(applicationId);
}
/**
* Get all stopped KafkaStreams objects through actuator binding stop.
* See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
*
* @return stopped KafkaStreams objects map
*/
public Map<String, KafkaStreams> getStoppedKafkaStreams() {
return this.previousKafkaStreamsPerApplicationId;
}
}

View File

@@ -74,6 +74,7 @@ public class KafkaStreamsBinderConfigurationProperties
*/
private DeserializationExceptionHandler deserializationExceptionHandler;
private boolean includeStoppedProcessorsForHealthCheck;
public Map<String, Functions> getFunctions() {
return functions;
@@ -127,6 +128,14 @@ public class KafkaStreamsBinderConfigurationProperties
this.deserializationExceptionHandler = deserializationExceptionHandler;
}
public boolean isIncludeStoppedProcessorsForHealthCheck() {
return includeStoppedProcessorsForHealthCheck;
}
public void setIncludeStoppedProcessorsForHealthCheck(boolean includeStoppedProcessorsForHealthCheck) {
this.includeStoppedProcessorsForHealthCheck = includeStoppedProcessorsForHealthCheck;
}
public static class StateStoreRetry {
private int maxAttempts = 1;