GH-1161: InteractiveQueryService improvements

This PR safe guards state store instances in case there are multiple KafkaStreams
instances present that have distinct application IDs but share State Store Names.

Change is backwards compatible: In case no KafkaStreams association of the thread
can be found, all local state stores are queried as before.

In case an associated KafkaStreams Instance is found, but required StateStore is
not found in this instance, a warning is issued but backwards compatibility is
preserved by looking up all state stores.

Store within KafkaStreams instance of thread is preferred over "foreign" store with same name.

Warning is issued if requested store is not found within KafkaStreams instance of thread.

The main benefit here is to get rid of randomly selecting stores across all KafkaStreams instances
in case a store is contained within multiple streams instances with same name.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1161
This commit is contained in:
Pommerening, Nico
2021-11-17 22:48:21 +01:00
committed by Soby Chacko
parent 7840decc86
commit aff0dc00ef
2 changed files with 63 additions and 8 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018-2020 the original author or authors.
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;
@@ -52,6 +53,7 @@ import org.springframework.util.StringUtils;
* @author Soby Chacko
* @author Renwei Han
* @author Serhii Siryi
* @author Nico Pommerening
* @since 2.1.0
*/
public class InteractiveQueryService {
@@ -92,15 +94,16 @@ public class InteractiveQueryService {
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
KafkaStreams contextSpecificKafkaStreams = getThreadContextSpecificKafkaStreams();
return retryTemplate.execute(context -> {
T store = null;
final Set<KafkaStreams> kafkaStreams = InteractiveQueryService.this.kafkaStreamsRegistry.getKafkaStreams();
final Iterator<KafkaStreams> iterator = kafkaStreams.iterator();
Throwable throwable = null;
while (iterator.hasNext()) {
if (contextSpecificKafkaStreams != null) {
try {
store = iterator.next().store(StoreQueryParameters.fromNameAndType(storeName, storeType));
store = contextSpecificKafkaStreams.store(
StoreQueryParameters.fromNameAndType(
storeName, storeType));
}
catch (InvalidStateStoreException e) {
// pass through..
@@ -110,10 +113,56 @@ public class InteractiveQueryService {
if (store != null) {
return store;
}
throw new IllegalStateException("Error when retrieving state store: " + storeName, throwable);
else if (contextSpecificKafkaStreams != null) {
LOG.warn("Store " + storeName
+ " could not be found in Streams context, falling back to all known Streams instances");
}
final Set<KafkaStreams> kafkaStreams = kafkaStreamsRegistry.getKafkaStreams();
final Iterator<KafkaStreams> iterator = kafkaStreams.iterator();
while (iterator.hasNext()) {
try {
store = iterator.next()
.store(StoreQueryParameters.fromNameAndType(
storeName, storeType));
}
catch (InvalidStateStoreException e) {
// pass through..
throwable = e;
}
}
if (store != null) {
return store;
}
throw new IllegalStateException(
"Error when retrieving state store: " + storeName,
throwable);
});
}
/**
* Retrieves the current {@link KafkaStreams} context if executing Thread is created by a Streams App (contains a matching application id in Thread's name).
*
* @return KafkaStreams instance associated with Thread
*/
private KafkaStreams getThreadContextSpecificKafkaStreams() {
return this.kafkaStreamsRegistry.getKafkaStreams().stream()
.filter(this::filterByThreadName).findAny().orElse(null);
}
/**
* Checks if the supplied {@link KafkaStreams} instance belongs to the calling Thread by matching the Thread's name with the Streams Application Id.
*
* @param streams {@link KafkaStreams} instance to filter
* @return true if Streams Instance is associated with Thread
*/
private boolean filterByThreadName(KafkaStreams streams) {
String applicationId = kafkaStreamsRegistry.streamBuilderFactoryBean(
streams).getStreamsConfiguration()
.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
// TODO: is there some better way to find out if a Stream App created the Thread?
return Thread.currentThread().getName().contains(applicationId);
}
/**
* Gets the current {@link HostInfo} that the calling kafka streams application is
* running on.

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -28,6 +29,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
@@ -69,6 +71,7 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
/**
* @author Soby Chacko
* @author Gary Russell
* @author Nico Pommerening
*/
public class KafkaStreamsInteractiveQueryIntegrationTests {
@@ -106,6 +109,9 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
KafkaStreamsRegistry kafkaStreamsRegistry = new KafkaStreamsRegistry();
kafkaStreamsRegistry.registerKafkaStreams(mock);
Mockito.when(mock.isRunning()).thenReturn(true);
Properties mockProperties = new Properties();
mockProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "fooApp");
Mockito.when(mock.getStreamsConfiguration()).thenReturn(mockProperties);
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties =
new KafkaStreamsBinderConfigurationProperties(new KafkaProperties());
binderConfigurationProperties.getStateStoreRetry().setMaxAttempts(3);